diff --git a/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py new file mode 100644 index 000000000..97770d567 --- /dev/null +++ b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py @@ -0,0 +1,70 @@ +"""Create legacy NMA_Stratigraphy table. + +Revision ID: 1d2c3b4a5e67 +Revises: a7b8c9d0e1f2 +Create Date: 2026-01-15 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "1d2c3b4a5e67" +down_revision: Union[str, Sequence[str], None] = "f5a6b7c8d9e0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy stratigraphy table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_Stratigraphy"): + return + + op.create_table( + "NMA_Stratigraphy", + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + primary_key=True, + nullable=False, + ), + sa.Column("WellID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=10), nullable=False), + sa.Column( + "thing_id", + sa.Integer(), + sa.ForeignKey("thing.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("StratTop", sa.Float(), nullable=True), + sa.Column("StratBottom", sa.Float(), nullable=True), + sa.Column("UnitIdentifier", sa.String(length=50), nullable=True), + sa.Column("Lithology", sa.String(length=100), nullable=True), + sa.Column("LithologicModifier", sa.String(length=100), nullable=True), + sa.Column("ContributingUnit", sa.String(length=10), nullable=True), + sa.Column("StratSource", sa.Text(), nullable=True), + sa.Column("StratNotes", sa.Text(), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + ) + op.create_index( + "ix_nma_stratigraphy_point_id", + "NMA_Stratigraphy", + ["PointID"], + ) + op.create_index( + "ix_nma_stratigraphy_thing_id", + "NMA_Stratigraphy", + ["thing_id"], + ) + + +def downgrade() -> None: + op.drop_index("ix_nma_stratigraphy_thing_id", table_name="NMA_Stratigraphy") + op.drop_index("ix_nma_stratigraphy_point_id", table_name="NMA_Stratigraphy") + op.drop_table("NMA_Stratigraphy") diff --git a/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py b/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py new file mode 100644 index 000000000..61d791db5 --- /dev/null +++ b/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py @@ -0,0 +1,56 @@ +"""Create legacy NMA_AssociatedData table. + +Revision ID: c2f4a9d0b1e2 +Revises: a7b8c9d0e1f2 +Create Date: 2026-03-05 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "c2f4a9d0b1e2" +down_revision: Union[str, Sequence[str], None] = "a7b8c9d0e1f2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy associated data table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_AssociatedData"): + op.create_table( + "NMA_AssociatedData", + sa.Column("LocationId", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=10), nullable=True), + sa.Column( + "AssocID", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, + ), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.Column("Formation", sa.String(length=15), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.Column( + "thing_id", + sa.Integer(), + sa.ForeignKey("thing.id", ondelete="CASCADE"), + nullable=True, + ), + sa.UniqueConstraint("LocationId", name="AssociatedData$LocationId"), + ) + op.create_index("AssociatedData$PointID", "NMA_AssociatedData", ["PointID"]) + + +def downgrade() -> None: + """Drop the legacy associated data table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_AssociatedData"): + op.drop_table("NMA_AssociatedData") diff --git a/alembic/versions/d3a4b5c6d7e8_create_nma_surface_water_photos.py b/alembic/versions/d3a4b5c6d7e8_create_nma_surface_water_photos.py new file mode 100644 index 000000000..19f28cdf4 --- /dev/null +++ b/alembic/versions/d3a4b5c6d7e8_create_nma_surface_water_photos.py @@ -0,0 +1,53 @@ +"""Create legacy NMA_SurfaceWaterPhotos table. + +Revision ID: d3a4b5c6d7e8 +Revises: c2f4a9d0b1e2 +Create Date: 2026-03-05 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "d3a4b5c6d7e8" +down_revision: Union[str, Sequence[str], None] = "c2f4a9d0b1e2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy surface water photos table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_SurfaceWaterPhotos"): + op.create_table( + "NMA_SurfaceWaterPhotos", + sa.Column("SurfaceID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=50), nullable=False), + sa.Column("OLEPath", sa.String(length=50), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, + ), + ) + op.create_index( + "SurfaceWaterPhotos$PointID", "NMA_SurfaceWaterPhotos", ["PointID"] + ) + op.create_index( + "SurfaceWaterPhotos$SurfaceID", "NMA_SurfaceWaterPhotos", ["SurfaceID"] + ) + + +def downgrade() -> None: + """Drop the legacy surface water photos table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_SurfaceWaterPhotos"): + op.drop_table("NMA_SurfaceWaterPhotos") diff --git a/alembic/versions/e4b5c6d7e8f9_create_nma_weather_photos.py b/alembic/versions/e4b5c6d7e8f9_create_nma_weather_photos.py new file mode 100644 index 000000000..8914b0f5e --- /dev/null +++ b/alembic/versions/e4b5c6d7e8f9_create_nma_weather_photos.py @@ -0,0 +1,49 @@ +"""Create legacy NMA_WeatherPhotos table. + +Revision ID: e4b5c6d7e8f9 +Revises: d3a4b5c6d7e8 +Create Date: 2026-03-05 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "e4b5c6d7e8f9" +down_revision: Union[str, Sequence[str], None] = "d3a4b5c6d7e8" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy weather photos table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WeatherPhotos"): + op.create_table( + "NMA_WeatherPhotos", + sa.Column("WeatherID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=50), nullable=False), + sa.Column("OLEPath", sa.String(length=50), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, + ), + ) + op.create_index("WeatherPhotos$PointID", "NMA_WeatherPhotos", ["PointID"]) + op.create_index("WeatherPhotos$WeatherID", "NMA_WeatherPhotos", ["WeatherID"]) + + +def downgrade() -> None: + """Drop the legacy weather photos table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_WeatherPhotos"): + op.drop_table("NMA_WeatherPhotos") diff --git a/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py b/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py new file mode 100644 index 000000000..a9a76751b --- /dev/null +++ b/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py @@ -0,0 +1,52 @@ +"""Create legacy NMA_Soil_Rock_Results table. + +Revision ID: f5a6b7c8d9e0 +Revises: e4b5c6d7e8f9 +Create Date: 2026-03-05 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "f5a6b7c8d9e0" +down_revision: Union[str, Sequence[str], None] = "e4b5c6d7e8f9" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy soil/rock results table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_Soil_Rock_Results"): + op.create_table( + "NMA_Soil_Rock_Results", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("Point_ID", sa.String(length=255), nullable=True), + sa.Column("Sample Type", sa.String(length=255), nullable=True), + sa.Column("Date Sampled", sa.String(length=255), nullable=True), + sa.Column("d13C", sa.Float(), nullable=True), + sa.Column("d18O", sa.Float(), nullable=True), + sa.Column("Sampled by", sa.String(length=255), nullable=True), + sa.Column( + "thing_id", + sa.Integer(), + sa.ForeignKey("thing.id", ondelete="CASCADE"), + nullable=True, + ), + ) + op.create_index( + "Soil_Rock_Results$Point_ID", "NMA_Soil_Rock_Results", ["Point_ID"] + ) + + +def downgrade() -> None: + """Drop the legacy soil/rock results table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_Soil_Rock_Results"): + op.drop_table("NMA_Soil_Rock_Results") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index d36ee0f28..656e7069a 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -17,7 +17,6 @@ """Legacy NM Aquifer models copied from AMPAPI.""" import uuid - from datetime import date, datetime from typing import TYPE_CHECKING, List, Optional @@ -206,6 +205,37 @@ class NMAHydraulicsData(Base): thing: Mapped["Thing"] = relationship("Thing") +class Stratigraphy(Base): + """Legacy stratigraphy (lithology log) data from AMPAPI.""" + + __tablename__ = "NMA_Stratigraphy" + + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) + point_id: Mapped[str] = mapped_column("PointID", String(10), nullable=False) + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) + + strat_top: Mapped[Optional[float]] = mapped_column("StratTop", Float) + strat_bottom: Mapped[Optional[float]] = mapped_column("StratBottom", Float) + unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(50)) + lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(100)) + lithologic_modifier: Mapped[Optional[str]] = mapped_column( + "LithologicModifier", String(100) + ) + contributing_unit: Mapped[Optional[str]] = mapped_column( + "ContributingUnit", String(10) + ) + strat_source: Mapped[Optional[str]] = mapped_column("StratSource", Text) + strat_notes: Mapped[Optional[str]] = mapped_column("StratNotes", Text) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + + thing: Mapped["Thing"] = relationship("Thing", back_populates="stratigraphy_logs") + + class ChemistrySampleInfo(Base): """ Legacy Chemistry SampleInfo table from AMPAPI. @@ -295,6 +325,30 @@ def validate_thing_id(self, key, value): return value +class AssociatedData(Base): + """ + Legacy AssociatedData table from NM_Aquifer. + """ + + __tablename__ = "NMA_AssociatedData" + + location_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "LocationId", UUID(as_uuid=True), unique=True + ) + point_id: Mapped[Optional[str]] = mapped_column("PointID", String(10)) + assoc_id: Mapped[uuid.UUID] = mapped_column( + "AssocID", UUID(as_uuid=True), primary_key=True + ) + notes: Mapped[Optional[str]] = mapped_column("Notes", String(255)) + formation: Mapped[Optional[str]] = mapped_column("Formation", String(15)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + thing_id: Mapped[Optional[int]] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE") + ) + + thing: Mapped["Thing"] = relationship("Thing") + + class SurfaceWaterData(Base): """ Legacy SurfaceWaterData table from AMPAPI. @@ -328,6 +382,24 @@ class SurfaceWaterData(Base): data_source: Mapped[Optional[str]] = mapped_column("DataSource", String(255)) +class SurfaceWaterPhotos(Base): + """ + Legacy SurfaceWaterPhotos table from NM_Aquifer. + """ + + __tablename__ = "NMA_SurfaceWaterPhotos" + + surface_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "SurfaceID", UUID(as_uuid=True) + ) + point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) + ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + + class WeatherData(Base): """ Legacy WeatherData table from AMPAPI. @@ -345,6 +417,45 @@ class WeatherData(Base): object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True) +class WeatherPhotos(Base): + """ + Legacy WeatherPhotos table from NM_Aquifer. + """ + + __tablename__ = "NMA_WeatherPhotos" + + weather_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "WeatherID", UUID(as_uuid=True) + ) + point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) + ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + + +class SoilRockResults(Base): + """ + Legacy Soil_Rock_Results table from NM_Aquifer. + """ + + __tablename__ = "NMA_Soil_Rock_Results" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + point_id: Mapped[Optional[str]] = mapped_column("Point_ID", String(255)) + sample_type: Mapped[Optional[str]] = mapped_column("Sample Type", String(255)) + date_sampled: Mapped[Optional[str]] = mapped_column("Date Sampled", String(255)) + d13c: Mapped[Optional[float]] = mapped_column("d13C", Float) + d18o: Mapped[Optional[float]] = mapped_column("d18O", Float) + sampled_by: Mapped[Optional[str]] = mapped_column("Sampled by", String(255)) + thing_id: Mapped[Optional[int]] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE") + ) + + thing: Mapped["Thing"] = relationship("Thing") + + class NMAMinorTraceChemistry(Base): """ Legacy MinorandTraceChemistry table from AMPAPI. diff --git a/db/thing.py b/db/thing.py index 8283bdc4f..4365245fa 100644 --- a/db/thing.py +++ b/db/thing.py @@ -47,7 +47,7 @@ from db.thing_geologic_formation_association import ( ThingGeologicFormationAssociation, ) - from db.nma_legacy import ChemistrySampleInfo + from db.nma_legacy import ChemistrySampleInfo, Stratigraphy class Thing( @@ -312,6 +312,13 @@ class Thing( passive_deletes=True, ) + stratigraphy_logs: Mapped[List["Stratigraphy"]] = relationship( + "Stratigraphy", + back_populates="thing", + cascade="all, delete-orphan", + passive_deletes=True, + ) + # --- Association Proxies --- assets: AssociationProxy[list["Asset"]] = association_proxy( "asset_associations", "asset" diff --git a/tests/test_associated_data_legacy.py b/tests/test_associated_data_legacy.py new file mode 100644 index 000000000..a08e95bc0 --- /dev/null +++ b/tests/test_associated_data_legacy.py @@ -0,0 +1,82 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Unit tests for AssociatedData legacy model. + +These tests verify the migration of columns from the legacy AssociatedData table. +Migrated columns: +- LocationId -> location_id +- PointID -> point_id +- AssocID -> assoc_id +- Notes -> notes +- Formation -> formation +- OBJECTID -> object_id +""" + +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import AssociatedData + + +def test_create_associated_data_all_fields(water_well_thing): + """Test creating an associated data record with all fields.""" + with session_ctx() as session: + record = AssociatedData( + location_id=uuid4(), + point_id="AA-0001", + assoc_id=uuid4(), + notes="Legacy notes", + formation="TEST", + object_id=42, + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.assoc_id is not None + assert record.location_id is not None + assert record.point_id == "AA-0001" + assert record.notes == "Legacy notes" + assert record.formation == "TEST" + assert record.object_id == 42 + assert record.thing_id == water_well_thing.id + + session.delete(record) + session.commit() + + +def test_create_associated_data_minimal(): + """Test creating an associated data record with required fields only.""" + with session_ctx() as session: + record = AssociatedData(assoc_id=uuid4()) + session.add(record) + session.commit() + session.refresh(record) + + assert record.assoc_id is not None + assert record.location_id is None + assert record.point_id is None + assert record.notes is None + assert record.formation is None + assert record.object_id is None + + session.delete(record) + session.commit() + + +# ============= EOF ============================================= diff --git a/tests/test_soil_rock_results_legacy.py b/tests/test_soil_rock_results_legacy.py new file mode 100644 index 000000000..988a64bcb --- /dev/null +++ b/tests/test_soil_rock_results_legacy.py @@ -0,0 +1,81 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Unit tests for Soil_Rock_Results legacy model. + +These tests verify the migration of columns from the legacy Soil_Rock_Results table. +Migrated columns: +- Point_ID -> point_id +- Sample Type -> sample_type +- Date Sampled -> date_sampled +- d13C -> d13c +- d18O -> d18o +- Sampled by -> sampled_by +- SSMA_TimeStamp -> ssma_timestamp +""" + +from db.engine import session_ctx +from db.nma_legacy import SoilRockResults + + +def test_create_soil_rock_results_all_fields(water_well_thing): + """Test creating a soil/rock results record with all fields.""" + with session_ctx() as session: + record = SoilRockResults( + point_id="SR-0001", + sample_type="Soil", + date_sampled="2026-01-01", + d13c=-5.5, + d18o=12.3, + sampled_by="Tester", + thing_id=water_well_thing.id, + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.id is not None + assert record.point_id == "SR-0001" + assert record.sample_type == "Soil" + assert record.date_sampled == "2026-01-01" + assert record.d13c == -5.5 + assert record.d18o == 12.3 + assert record.sampled_by == "Tester" + assert record.thing_id == water_well_thing.id + session.delete(record) + session.commit() + + +def test_create_soil_rock_results_minimal(): + """Test creating a soil/rock results record with required fields only.""" + with session_ctx() as session: + record = SoilRockResults() + session.add(record) + session.commit() + session.refresh(record) + + assert record.id is not None + assert record.point_id is None + assert record.sample_type is None + assert record.date_sampled is None + assert record.d13c is None + assert record.d18o is None + assert record.sampled_by is None + session.delete(record) + session.commit() + + +# ============= EOF ============================================= diff --git a/tests/test_surface_water_photos_legacy.py b/tests/test_surface_water_photos_legacy.py new file mode 100644 index 000000000..4660bf84b --- /dev/null +++ b/tests/test_surface_water_photos_legacy.py @@ -0,0 +1,79 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Unit tests for SurfaceWaterPhotos legacy model. + +These tests verify the migration of columns from the legacy SurfaceWaterPhotos table. +Migrated columns: +- SurfaceID -> surface_id +- PointID -> point_id +- OLEPath -> ole_path +- OBJECTID -> object_id +- GlobalID -> global_id +""" + +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import SurfaceWaterPhotos + + +def test_create_surface_water_photos_all_fields(): + """Test creating a surface water photos record with all fields.""" + with session_ctx() as session: + record = SurfaceWaterPhotos( + surface_id=uuid4(), + point_id="SW-0001", + ole_path="photo.jpg", + object_id=123, + global_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.surface_id is not None + assert record.point_id == "SW-0001" + assert record.ole_path == "photo.jpg" + assert record.object_id == 123 + + session.delete(record) + session.commit() + + +def test_create_surface_water_photos_minimal(): + """Test creating a surface water photos record with required fields only.""" + with session_ctx() as session: + record = SurfaceWaterPhotos( + point_id="SW-0002", + global_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.point_id == "SW-0002" + assert record.surface_id is None + assert record.ole_path is None + assert record.object_id is None + + session.delete(record) + session.commit() + + +# ============= EOF ============================================= diff --git a/tests/test_weather_photos_legacy.py b/tests/test_weather_photos_legacy.py new file mode 100644 index 000000000..c470aa764 --- /dev/null +++ b/tests/test_weather_photos_legacy.py @@ -0,0 +1,79 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Unit tests for WeatherPhotos legacy model. + +These tests verify the migration of columns from the legacy WeatherPhotos table. +Migrated columns: +- WeatherID -> weather_id +- PointID -> point_id +- OLEPath -> ole_path +- OBJECTID -> object_id +- GlobalID -> global_id +""" + +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import WeatherPhotos + + +def test_create_weather_photos_all_fields(): + """Test creating a weather photos record with all fields.""" + with session_ctx() as session: + record = WeatherPhotos( + weather_id=uuid4(), + point_id="WP-0001", + ole_path="weather.jpg", + object_id=321, + global_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.weather_id is not None + assert record.point_id == "WP-0001" + assert record.ole_path == "weather.jpg" + assert record.object_id == 321 + + session.delete(record) + session.commit() + + +def test_create_weather_photos_minimal(): + """Test creating a weather photos record with required fields only.""" + with session_ctx() as session: + record = WeatherPhotos( + point_id="WP-0002", + global_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.point_id == "WP-0002" + assert record.weather_id is None + assert record.ole_path is None + assert record.object_id is None + + session.delete(record) + session.commit() + + +# ============= EOF ============================================= diff --git a/transfers/associated_data.py b/transfers/associated_data.py new file mode 100644 index 000000000..56d6d8363 --- /dev/null +++ b/transfers/associated_data.py @@ -0,0 +1,132 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import annotations + +from typing import Any, Optional +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import AssociatedData, Thing +from db.engine import session_ctx +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import replace_nans + + +class AssociatedDataTransferer(Transferer): + """Transfer legacy AssociatedData rows from NM_Aquifer.""" + + source_table = "AssociatedData" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.name, Thing.id).all() + self._thing_id_cache = {name: thing_id for name, thing_id in things} + logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = self._read_csv(self.source_table) + cleaned_df = replace_nans(df) + return df, cleaned_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows = self._dedupe_rows(rows, key="AssocID") + + if not rows: + logger.info("No AssociatedData rows to transfer") + return + + insert_stmt = insert(AssociatedData) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + "Upserting AssociatedData rows %s-%s (%s rows)", + i, + i + len(chunk) - 1, + len(chunk), + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["AssocID"], + set_={ + "LocationId": excluded["LocationId"], + "PointID": excluded["PointID"], + "Notes": excluded["Notes"], + "Formation": excluded["Formation"], + "OBJECTID": excluded["OBJECTID"], + }, + ) + session.execute(stmt) + session.commit() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + return { + "LocationId": self._uuid_val(row.get("LocationId")), + "PointID": row.get("PointID"), + "AssocID": self._uuid_val(row.get("AssocID")), + "Notes": row.get("Notes"), + "Formation": row.get("Formation"), + "OBJECTID": row.get("OBJECTID"), + "thing_id": self._thing_id_cache.get(row.get("PointID")), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" + deduped = {} + for row in rows: + assoc_id = row.get(key) + if assoc_id is None: + continue + deduped[assoc_id] = row + return list(deduped.values()) + + def _uuid_val(self, value: Any) -> Optional[UUID]: + if value is None or pd.isna(value): + return None + if isinstance(value, UUID): + return value + if isinstance(value, str): + try: + return UUID(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = AssociatedDataTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + run() + +# ============= EOF ============================================= diff --git a/transfers/metrics.py b/transfers/metrics.py index cf50644a1..e2083beb4 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -41,12 +41,16 @@ NMARadionuclides, NMAMajorChemistry, SurfaceWaterData, + SurfaceWaterPhotos, NMAWaterLevelsContinuousPressureDaily, + SoilRockResults, ViewNGWMNWellConstruction, ViewNGWMNWaterLevels, ViewNGWMNLithology, WeatherData, + WeatherPhotos, NMAMinorTraceChemistry, + AssociatedData, ) from db.engine import session_ctx from services.gcs_helper import get_storage_bucket @@ -111,6 +115,12 @@ def group_metrics(self, *args, **kw) -> None: def surface_water_data_metrics(self, *args, **kw) -> None: self._handle_metrics(SurfaceWaterData, *args, **kw) + def surface_water_photos_metrics(self, *args, **kw) -> None: + self._handle_metrics(SurfaceWaterPhotos, name="SurfaceWaterPhotos", *args, **kw) + + def soil_rock_results_metrics(self, *args, **kw) -> None: + self._handle_metrics(SoilRockResults, name="Soil_Rock_Results", *args, **kw) + def hydraulics_data_metrics(self, *args, **kw) -> None: self._handle_metrics(NMAHydraulicsData, name="HydraulicsData", *args, **kw) @@ -138,6 +148,9 @@ def ngwmn_water_levels_metrics(self, *args, **kw) -> None: def ngwmn_lithology_metrics(self, *args, **kw) -> None: self._handle_metrics(ViewNGWMNLithology, name="NGWMN Lithology", *args, **kw) + def weather_photos_metrics(self, *args, **kw) -> None: + self._handle_metrics(WeatherPhotos, name="WeatherPhotos", *args, **kw) + def waterlevels_pressure_daily_metrics(self, *args, **kw) -> None: self._handle_metrics( NMAWaterLevelsContinuousPressureDaily, @@ -155,6 +168,9 @@ def permissions_metrics(self, *args, **kw) -> None: def stratigraphy_metrics(self, *args, **kw) -> None: self._handle_metrics(ThingGeologicFormationAssociation, *args, **kw) + def associated_data_metrics(self, *args, **kw) -> None: + self._handle_metrics(AssociatedData, name="AssociatedData", *args, **kw) + def minor_trace_chemistry_metrics(self, *args, **kw) -> None: self._handle_metrics( NMAMinorTraceChemistry, name="MinorTraceChemistry", *args, **kw diff --git a/transfers/soil_rock_results.py b/transfers/soil_rock_results.py new file mode 100644 index 000000000..c2202282a --- /dev/null +++ b/transfers/soil_rock_results.py @@ -0,0 +1,105 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import annotations + +from typing import Any, Optional + +import pandas as pd +from sqlalchemy.orm import Session + +from db import SoilRockResults, Thing +from db.engine import session_ctx +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import replace_nans + + +class SoilRockResultsTransferer(Transferer): + """Transfer legacy Soil_Rock_Results rows from NM_Aquifer.""" + + source_table = "Soil_Rock_Results" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.name, Thing.id).all() + self._thing_id_cache = {name: thing_id for name, thing_id in things} + logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = self._read_csv(self.source_table) + cleaned_df = replace_nans(df) + return df, cleaned_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + + if not rows: + logger.info("No Soil_Rock_Results rows to transfer") + return + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + "Inserting Soil_Rock_Results rows %s-%s (%s rows)", + i, + i + len(chunk) - 1, + len(chunk), + ) + session.bulk_insert_mappings(SoilRockResults, chunk) + session.commit() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + point_id = row.get("Point_ID") + return { + "point_id": point_id, + "sample_type": row.get("Sample Type"), + "date_sampled": row.get("Date Sampled"), + "d13c": self._float_val(row.get("d13C")), + "d18o": self._float_val(row.get("d18O")), + "sampled_by": row.get("Sampled by"), + "thing_id": self._thing_id_cache.get(point_id), + } + + def _float_val(self, value: Any) -> Optional[float]: + if value is None or pd.isna(value): + return None + if isinstance(value, (int, float)): + return float(value) + if isinstance(value, str) and value.strip(): + try: + return float(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = SoilRockResultsTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + run() + +# ============= EOF ============================================= diff --git a/transfers/stratigraphy_legacy.py b/transfers/stratigraphy_legacy.py new file mode 100644 index 000000000..701c7d6eb --- /dev/null +++ b/transfers/stratigraphy_legacy.py @@ -0,0 +1,164 @@ +"""Transfer Stratigraphy.csv into the NMA_Stratigraphy legacy table.""" + +from __future__ import annotations + +from typing import Any, Dict, List +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import Stratigraphy, Thing +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import ( + filter_to_valid_point_ids, + read_csv, + replace_nans, +) + + +class StratigraphyLegacyTransferer(Transferer): + """Imports Stratigraphy.csv rows into NMA_Stratigraphy.""" + + source_table = "NMA_Stratigraphy" + + def __init__(self, batch_size: int = 1000, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + + def _get_dfs(self): # type: ignore[override] + df = read_csv("Stratigraphy") + cleaned = replace_nans(df) + cleaned = filter_to_valid_point_ids(cleaned, self.pointids) + return df, cleaned + + def _transfer_hook(self, session: Session) -> None: # type: ignore[override] + if self.cleaned_df is None or self.cleaned_df.empty: + logger.info("No Stratigraphy rows to import after cleaning") + return + + self._prime_thing_cache(session) + rows: List[Dict[str, Any]] = [] + for row in self.cleaned_df.itertuples(): + record = self._row_dict(row) + if record is None: + continue + rows.append(record) + + if not rows: + logger.warning("All Stratigraphy rows were skipped during processing") + return + + insert_stmt = insert(Stratigraphy) + excluded = insert_stmt.excluded + + for start in range(0, len(rows), self.batch_size): + chunk = rows[start : start + self.batch_size] + logger.info( + "Upserting Stratigraphy batch %s-%s (%s rows)", + start, + start + len(chunk) - 1, + len(chunk), + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "WellID": excluded.WellID, + "PointID": excluded.PointID, + "thing_id": excluded.thing_id, + "StratTop": excluded.StratTop, + "StratBottom": excluded.StratBottom, + "UnitIdentifier": excluded.UnitIdentifier, + "Lithology": excluded.Lithology, + "LithologicModifier": excluded.LithologicModifier, + "ContributingUnit": excluded.ContributingUnit, + "StratSource": excluded.StratSource, + "StratNotes": excluded.StratNotes, + "OBJECTID": excluded.OBJECTID, + }, + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _prime_thing_cache(self, session: Session) -> None: + point_ids = set(self.cleaned_df["PointID"].dropna()) # type: ignore[index] + if not point_ids: + self._thing_id_cache = {} + return + results = ( + session.query(Thing.id, Thing.name).filter(Thing.name.in_(point_ids)).all() + ) + self._thing_id_cache = {name: thing_id for thing_id, name in results} + + def _row_dict(self, row: pd.Series) -> Dict[str, Any] | None: + point_id = getattr(row, "PointID", None) + if not point_id: + self._capture_error("", "Missing PointID", "PointID") + return None + thing_id = self._thing_id_cache.get(point_id) + if not thing_id: + self._capture_error(point_id, "No Thing found for PointID", "thing_id") + return None + + global_id = self._uuid_value(getattr(row, "GlobalID", None)) + if global_id is None: + self._capture_error(point_id, "Invalid GlobalID", "GlobalID") + return None + + return { + "GlobalID": global_id, + "WellID": self._uuid_value(getattr(row, "WellID", None)), + "PointID": point_id, + "thing_id": thing_id, + "StratTop": self._float_value(getattr(row, "StratTop", None)), + "StratBottom": self._float_value(getattr(row, "StratBottom", None)), + "UnitIdentifier": self._string_value(getattr(row, "UnitIdentifier", None)), + "Lithology": self._string_value(getattr(row, "Lithology", None)), + "LithologicModifier": self._string_value( + getattr(row, "LithologicModifier", None) + ), + "ContributingUnit": self._string_value( + getattr(row, "ContributingUnit", None) + ), + "StratSource": self._string_value(getattr(row, "StratSource", None)), + "StratNotes": self._string_value(getattr(row, "StratNotes", None)), + "OBJECTID": self._int_value(getattr(row, "OBJECTID", None)), + } + + def _uuid_value(self, value: Any) -> UUID | None: + if value in (None, ""): + return None + if isinstance(value, UUID): + return value + try: + return UUID(str(value)) + except (ValueError, TypeError): + return None + + def _float_value(self, value: Any) -> float | None: + if value in (None, ""): + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + def _int_value(self, value: Any) -> int | None: + if value in (None, ""): + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + def _string_value(self, value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + trimmed = value.strip() + return trimmed or None + return str(value) diff --git a/transfers/surface_water_photos.py b/transfers/surface_water_photos.py new file mode 100644 index 000000000..1aecd0bb9 --- /dev/null +++ b/transfers/surface_water_photos.py @@ -0,0 +1,120 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import annotations + +from typing import Any, Optional +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import SurfaceWaterPhotos +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import replace_nans + + +class SurfaceWaterPhotosTransferer(Transferer): + """Transfer legacy SurfaceWaterPhotos rows from NM_Aquifer.""" + + source_table = "SurfaceWaterPhotos" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = self._read_csv(self.source_table) + cleaned_df = replace_nans(df) + return df, cleaned_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows = self._dedupe_rows(rows, key="GlobalID") + + if not rows: + logger.info("No SurfaceWaterPhotos rows to transfer") + return + + insert_stmt = insert(SurfaceWaterPhotos) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + "Upserting SurfaceWaterPhotos rows %s-%s (%s rows)", + i, + i + len(chunk) - 1, + len(chunk), + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "SurfaceID": excluded["SurfaceID"], + "PointID": excluded["PointID"], + "OLEPath": excluded["OLEPath"], + "OBJECTID": excluded["OBJECTID"], + }, + ) + session.execute(stmt) + session.commit() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + return { + "SurfaceID": self._uuid_val(row.get("SurfaceID")), + "PointID": row.get("PointID"), + "OLEPath": row.get("OLEPath"), + "OBJECTID": row.get("OBJECTID"), + "GlobalID": self._uuid_val(row.get("GlobalID")), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" + deduped = {} + for row in rows: + global_id = row.get(key) + if global_id is None: + continue + deduped[global_id] = row + return list(deduped.values()) + + def _uuid_val(self, value: Any) -> Optional[UUID]: + if value is None or pd.isna(value): + return None + if isinstance(value, UUID): + return value + if isinstance(value, str): + try: + return UUID(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = SurfaceWaterPhotosTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + run() + +# ============= EOF ============================================= diff --git a/transfers/transfer.py b/transfers/transfer.py index 226b7fafc..174ee8cff 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -19,9 +19,10 @@ from alembic import command from alembic.config import Config +from dotenv import load_dotenv + from db.engine import session_ctx from db.initialization import recreate_public_schema, sync_search_vector_triggers -from dotenv import load_dotenv from services.util import get_bool_env from transfers.aquifer_system_transfer import transfer_aquifer_systems from transfers.geologic_formation_transfer import transfer_geologic_formations @@ -64,12 +65,16 @@ NGWMNWaterLevelsTransferer, NGWMNWellConstructionTransferer, ) +from transfers.associated_data import AssociatedDataTransferer +from transfers.soil_rock_results import SoilRockResultsTransferer from transfers.surface_water_data import SurfaceWaterDataTransferer +from transfers.surface_water_photos import SurfaceWaterPhotosTransferer from transfers.util import timeit from transfers.waterlevelscontinuous_pressure_daily import ( NMAWaterLevelsContinuousPressureDailyTransferer, ) from transfers.weather_data import WeatherDataTransferer +from transfers.weather_photos import WeatherPhotosTransferer from transfers.logger import logger, save_log_to_bucket @@ -226,6 +231,8 @@ def transfer_all(metrics, limit=100): transfer_link_ids = get_bool_env("TRANSFER_LINK_IDS", True) transfer_groups = get_bool_env("TRANSFER_GROUPS", True) transfer_assets = get_bool_env("TRANSFER_ASSETS", False) + transfer_surface_water_photos = get_bool_env("TRANSFER_SURFACE_WATER_PHOTOS", True) + transfer_soil_rock_results = get_bool_env("TRANSFER_SOIL_ROCK_RESULTS", True) transfer_surface_water_data = get_bool_env("TRANSFER_SURFACE_WATER_DATA", True) transfer_hydraulics_data = get_bool_env("TRANSFER_HYDRAULICS_DATA", True) transfer_chemistry_sampleinfo = get_bool_env("TRANSFER_CHEMISTRY_SAMPLEINFO", True) @@ -234,10 +241,12 @@ def transfer_all(metrics, limit=100): transfer_ngwmn_views = get_bool_env("TRANSFER_NGWMN_VIEWS", True) transfer_pressure_daily = get_bool_env("TRANSFER_WATERLEVELS_PRESSURE_DAILY", True) transfer_weather_data = get_bool_env("TRANSFER_WEATHER_DATA", True) + transfer_weather_photos = get_bool_env("TRANSFER_WEATHER_PHOTOS", True) transfer_minor_trace_chemistry = get_bool_env( "TRANSFER_MINOR_TRACE_CHEMISTRY", True ) transfer_nma_stratigraphy = get_bool_env("TRANSFER_NMA_STRATIGRAPHY", True) + transfer_associated_data = get_bool_env("TRANSFER_ASSOCIATED_DATA", True) use_parallel = get_bool_env("TRANSFER_PARALLEL", True) if use_parallel: @@ -254,6 +263,8 @@ def transfer_all(metrics, limit=100): transfer_link_ids, transfer_groups, transfer_assets, + transfer_surface_water_photos, + transfer_soil_rock_results, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -262,8 +273,10 @@ def transfer_all(metrics, limit=100): transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, + transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, + transfer_associated_data, ) else: _transfer_sequential( @@ -279,6 +292,8 @@ def transfer_all(metrics, limit=100): transfer_link_ids, transfer_groups, transfer_assets, + transfer_surface_water_photos, + transfer_soil_rock_results, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -287,8 +302,10 @@ def transfer_all(metrics, limit=100): transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, + transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, + transfer_associated_data, ) @@ -305,6 +322,8 @@ def _transfer_parallel( transfer_link_ids, transfer_groups, transfer_assets, + transfer_surface_water_photos, + transfer_soil_rock_results, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -313,8 +332,10 @@ def _transfer_parallel( transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, + transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, + transfer_associated_data, ): """Execute transfers in parallel where possible.""" message("PARALLEL TRANSFER GROUP 1") @@ -337,8 +358,18 @@ def _transfer_parallel( ) if transfer_groups: parallel_tasks_1.append(("Groups", ProjectGroupTransferer, flags)) + if transfer_surface_water_photos: + parallel_tasks_1.append( + ("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer, flags) + ) + if transfer_soil_rock_results: + parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer, flags)) + if transfer_weather_photos: + parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer, flags)) if transfer_assets: parallel_tasks_1.append(("Assets", AssetTransferer, flags)) + if transfer_associated_data: + parallel_tasks_1.append(("AssociatedData", AssociatedDataTransferer, flags)) if transfer_surface_water_data: parallel_tasks_1.append(("SurfaceWaterData", SurfaceWaterDataTransferer, flags)) if transfer_hydraulics_data: @@ -382,11 +413,11 @@ def _transfer_parallel( if transfer_nma_stratigraphy: future = executor.submit( _execute_transfer_with_timing, - "NMAStratigraphy", + "Stratigraphy", StratigraphyLegacyTransferer, flags, ) - futures[future] = "NMAStratigraphy" + futures[future] = "StratigraphyLegacy" future = executor.submit( _execute_session_transfer_with_timing, @@ -416,8 +447,10 @@ def _transfer_parallel( metrics.contact_metrics(*results_map["Contacts"]) if "Stratigraphy" in results_map and results_map["Stratigraphy"]: metrics.stratigraphy_metrics(*results_map["Stratigraphy"]) - if "NMAStratigraphy" in results_map and results_map["NMAStratigraphy"]: - metrics.nma_stratigraphy_metrics(*results_map["NMAStratigraphy"]) + if "StratigraphyLegacy" in results_map and results_map["StratigraphyLegacy"]: + metrics.nma_stratigraphy_metrics(*results_map["StratigraphyLegacy"]) + if "AssociatedData" in results_map and results_map["AssociatedData"]: + metrics.associated_data_metrics(*results_map["AssociatedData"]) if "WaterLevels" in results_map and results_map["WaterLevels"]: metrics.water_level_metrics(*results_map["WaterLevels"]) if "LinkIdsWellData" in results_map and results_map["LinkIdsWellData"]: @@ -426,6 +459,10 @@ def _transfer_parallel( metrics.location_link_ids_metrics(*results_map["LinkIdsLocation"]) if "Groups" in results_map and results_map["Groups"]: metrics.group_metrics(*results_map["Groups"]) + if "SurfaceWaterPhotos" in results_map and results_map["SurfaceWaterPhotos"]: + metrics.surface_water_photos_metrics(*results_map["SurfaceWaterPhotos"]) + if "SoilRockResults" in results_map and results_map["SoilRockResults"]: + metrics.soil_rock_results_metrics(*results_map["SoilRockResults"]) if "Assets" in results_map and results_map["Assets"]: metrics.asset_metrics(*results_map["Assets"]) if "SurfaceWaterData" in results_map and results_map["SurfaceWaterData"]: @@ -449,6 +486,8 @@ def _transfer_parallel( ) if "WeatherData" in results_map and results_map["WeatherData"]: metrics.weather_data_metrics(*results_map["WeatherData"]) + if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: + metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) if transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) @@ -526,6 +565,8 @@ def _transfer_sequential( transfer_link_ids, transfer_groups, transfer_assets, + transfer_surface_water_photos, + transfer_soil_rock_results, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -534,8 +575,10 @@ def _transfer_sequential( transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, + transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, + transfer_associated_data, ): """Original sequential transfer logic.""" if transfer_screens: @@ -584,11 +627,31 @@ def _transfer_sequential( results = _execute_transfer(ProjectGroupTransferer, flags=flags) metrics.group_metrics(*results) + if transfer_surface_water_photos: + message("TRANSFERRING SURFACE WATER PHOTOS") + results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) + metrics.surface_water_photos_metrics(*results) + + if transfer_soil_rock_results: + message("TRANSFERRING SOIL ROCK RESULTS") + results = _execute_transfer(SoilRockResultsTransferer, flags=flags) + metrics.soil_rock_results_metrics(*results) + + if transfer_weather_photos: + message("TRANSFERRING WEATHER PHOTOS") + results = _execute_transfer(WeatherPhotosTransferer, flags=flags) + metrics.weather_photos_metrics(*results) + if transfer_assets: message("TRANSFERRING ASSETS") results = _execute_transfer(AssetTransferer, flags=flags) metrics.asset_metrics(*results) + if transfer_associated_data: + message("TRANSFERRING ASSOCIATED DATA") + results = _execute_transfer(AssociatedDataTransferer, flags=flags) + metrics.associated_data_metrics(*results) + if transfer_surface_water_data: message("TRANSFERRING SURFACE WATER DATA") results = _execute_transfer(SurfaceWaterDataTransferer, flags=flags) diff --git a/transfers/transferer.py b/transfers/transferer.py index dcfb39752..47826b0fb 100644 --- a/transfers/transferer.py +++ b/transfers/transferer.py @@ -24,7 +24,7 @@ from db import Thing, Base from db.engine import session_ctx from transfers.logger import logger -from transfers.util import chunk_by_size +from transfers.util import chunk_by_size, read_csv class ManualFixer(object): @@ -132,6 +132,15 @@ def _after_hook(self, session: Session): def _get_dfs(self): raise NotImplementedError("Must implement _get_dfs method") + def _read_csv(self, name: str, dtype: dict | None = None, **kw) -> pd.DataFrame: + if dtype is not None and "dtype" not in kw: + kw["dtype"] = dtype + csv_paths = self.flags.get("CSV_PATHS") or {} + csv_path = csv_paths.get(name) + if csv_path: + return pd.read_csv(csv_path, **kw) + return read_csv(name, dtype=dtype, **kw) + class ChunkTransferer(Transferer): def __init__(self, *args, **kwargs): diff --git a/transfers/weather_photos.py b/transfers/weather_photos.py new file mode 100644 index 000000000..82e5bc254 --- /dev/null +++ b/transfers/weather_photos.py @@ -0,0 +1,120 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import annotations + +from typing import Any, Optional +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import WeatherPhotos +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import replace_nans + + +class WeatherPhotosTransferer(Transferer): + """Transfer legacy WeatherPhotos rows from NM_Aquifer.""" + + source_table = "WeatherPhotos" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = self._read_csv(self.source_table) + cleaned_df = replace_nans(df) + return df, cleaned_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows = self._dedupe_rows(rows, key="GlobalID") + + if not rows: + logger.info("No WeatherPhotos rows to transfer") + return + + insert_stmt = insert(WeatherPhotos) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + "Upserting WeatherPhotos rows %s-%s (%s rows)", + i, + i + len(chunk) - 1, + len(chunk), + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "WeatherID": excluded["WeatherID"], + "PointID": excluded["PointID"], + "OLEPath": excluded["OLEPath"], + "OBJECTID": excluded["OBJECTID"], + }, + ) + session.execute(stmt) + session.commit() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + return { + "WeatherID": self._uuid_val(row.get("WeatherID")), + "PointID": row.get("PointID"), + "OLEPath": row.get("OLEPath"), + "OBJECTID": row.get("OBJECTID"), + "GlobalID": self._uuid_val(row.get("GlobalID")), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" + deduped = {} + for row in rows: + global_id = row.get(key) + if global_id is None: + continue + deduped[global_id] = row + return list(deduped.values()) + + def _uuid_val(self, value: Any) -> Optional[UUID]: + if value is None or pd.isna(value): + return None + if isinstance(value, UUID): + return value + if isinstance(value, str): + try: + return UUID(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = WeatherPhotosTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + run() + +# ============= EOF =============================================