From 39c81832cdd7932f4ae347aededeca4eeb5c512b Mon Sep 17 00:00:00 2001 From: jross Date: Fri, 16 Jan 2026 16:54:56 -0700 Subject: [PATCH 1/7] feat: add legacy data transferers and models for AssociatedData, SurfaceWaterPhotos, WeatherPhotos, and SoilRockResults --- ...c2f4a9d0b1e2_create_nma_associated_data.py | 50 +++++++ ...5c6d7e8_create_nma_surface_water_photos.py | 53 ++++++++ .../e4b5c6d7e8f9_create_nma_weather_photos.py | 49 +++++++ ...a6b7c8d9e0_create_nma_soil_rock_results.py | 46 +++++++ db/nma_legacy.py | 71 ++++++++++ tests/test_associated_data_legacy.py | 80 ++++++++++++ tests/test_soil_rock_results_legacy.py | 79 ++++++++++++ tests/test_surface_water_photos_legacy.py | 79 ++++++++++++ tests/test_weather_photos_legacy.py | 79 ++++++++++++ transfers/associated_data.py | 122 ++++++++++++++++++ transfers/metrics.py | 8 ++ transfers/soil_rock_results.py | 94 ++++++++++++++ transfers/surface_water_photos.py | 120 +++++++++++++++++ transfers/transfer.py | 32 +++++ transfers/weather_photos.py | 120 +++++++++++++++++ 15 files changed, 1082 insertions(+) create mode 100644 alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py create mode 100644 alembic/versions/d3a4b5c6d7e8_create_nma_surface_water_photos.py create mode 100644 alembic/versions/e4b5c6d7e8f9_create_nma_weather_photos.py create mode 100644 alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py create mode 100644 tests/test_associated_data_legacy.py create mode 100644 tests/test_soil_rock_results_legacy.py create mode 100644 tests/test_surface_water_photos_legacy.py create mode 100644 tests/test_weather_photos_legacy.py create mode 100644 transfers/associated_data.py create mode 100644 transfers/soil_rock_results.py create mode 100644 transfers/surface_water_photos.py create mode 100644 transfers/weather_photos.py diff --git a/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py b/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py new file mode 100644 index 000000000..161c36aca --- /dev/null +++ b/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py @@ -0,0 +1,50 @@ +"""Create legacy NMA_AssociatedData table. + +Revision ID: c2f4a9d0b1e2 +Revises: a7b8c9d0e1f2 +Create Date: 2026-03-05 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "c2f4a9d0b1e2" +down_revision: Union[str, Sequence[str], None] = "a7b8c9d0e1f2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy associated data table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_AssociatedData"): + op.create_table( + "NMA_AssociatedData", + sa.Column("LocationId", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=10), nullable=True), + sa.Column( + "AssocID", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, + ), + sa.Column("Notes", sa.String(length=255), nullable=True), + sa.Column("Formation", sa.String(length=15), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.UniqueConstraint("LocationId", name="AssociatedData$LocationId"), + ) + op.create_index("AssociatedData$PointID", "NMA_AssociatedData", ["PointID"]) + + +def downgrade() -> None: + """Drop the legacy associated data table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_AssociatedData"): + op.drop_table("NMA_AssociatedData") diff --git a/alembic/versions/d3a4b5c6d7e8_create_nma_surface_water_photos.py b/alembic/versions/d3a4b5c6d7e8_create_nma_surface_water_photos.py new file mode 100644 index 000000000..19f28cdf4 --- /dev/null +++ b/alembic/versions/d3a4b5c6d7e8_create_nma_surface_water_photos.py @@ -0,0 +1,53 @@ +"""Create legacy NMA_SurfaceWaterPhotos table. + +Revision ID: d3a4b5c6d7e8 +Revises: c2f4a9d0b1e2 +Create Date: 2026-03-05 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "d3a4b5c6d7e8" +down_revision: Union[str, Sequence[str], None] = "c2f4a9d0b1e2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy surface water photos table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_SurfaceWaterPhotos"): + op.create_table( + "NMA_SurfaceWaterPhotos", + sa.Column("SurfaceID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=50), nullable=False), + sa.Column("OLEPath", sa.String(length=50), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, + ), + ) + op.create_index( + "SurfaceWaterPhotos$PointID", "NMA_SurfaceWaterPhotos", ["PointID"] + ) + op.create_index( + "SurfaceWaterPhotos$SurfaceID", "NMA_SurfaceWaterPhotos", ["SurfaceID"] + ) + + +def downgrade() -> None: + """Drop the legacy surface water photos table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_SurfaceWaterPhotos"): + op.drop_table("NMA_SurfaceWaterPhotos") diff --git a/alembic/versions/e4b5c6d7e8f9_create_nma_weather_photos.py b/alembic/versions/e4b5c6d7e8f9_create_nma_weather_photos.py new file mode 100644 index 000000000..8914b0f5e --- /dev/null +++ b/alembic/versions/e4b5c6d7e8f9_create_nma_weather_photos.py @@ -0,0 +1,49 @@ +"""Create legacy NMA_WeatherPhotos table. + +Revision ID: e4b5c6d7e8f9 +Revises: d3a4b5c6d7e8 +Create Date: 2026-03-05 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "e4b5c6d7e8f9" +down_revision: Union[str, Sequence[str], None] = "d3a4b5c6d7e8" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy weather photos table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WeatherPhotos"): + op.create_table( + "NMA_WeatherPhotos", + sa.Column("WeatherID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=50), nullable=False), + sa.Column("OLEPath", sa.String(length=50), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + nullable=False, + primary_key=True, + ), + ) + op.create_index("WeatherPhotos$PointID", "NMA_WeatherPhotos", ["PointID"]) + op.create_index("WeatherPhotos$WeatherID", "NMA_WeatherPhotos", ["WeatherID"]) + + +def downgrade() -> None: + """Drop the legacy weather photos table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_WeatherPhotos"): + op.drop_table("NMA_WeatherPhotos") diff --git a/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py b/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py new file mode 100644 index 000000000..2877c549a --- /dev/null +++ b/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py @@ -0,0 +1,46 @@ +"""Create legacy NMA_Soil_Rock_Results table. + +Revision ID: f5a6b7c8d9e0 +Revises: e4b5c6d7e8f9 +Create Date: 2026-03-05 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "f5a6b7c8d9e0" +down_revision: Union[str, Sequence[str], None] = "e4b5c6d7e8f9" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy soil/rock results table.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_Soil_Rock_Results"): + op.create_table( + "NMA_Soil_Rock_Results", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("Point_ID", sa.String(length=255), nullable=True), + sa.Column("Sample Type", sa.String(length=255), nullable=True), + sa.Column("Date Sampled", sa.String(length=255), nullable=True), + sa.Column("d13C", sa.Float(), nullable=True), + sa.Column("d18O", sa.Float(), nullable=True), + sa.Column("Sampled by", sa.String(length=255), nullable=True), + ) + op.create_index( + "Soil_Rock_Results$Point_ID", "NMA_Soil_Rock_Results", ["Point_ID"] + ) + + +def downgrade() -> None: + """Drop the legacy soil/rock results table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_Soil_Rock_Results"): + op.drop_table("NMA_Soil_Rock_Results") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index d36ee0f28..951f02fba 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -278,6 +278,25 @@ class ChemistrySampleInfo(Base): passive_deletes=True, ) + +class AssociatedData(Base): + """ + Legacy AssociatedData table from NM_Aquifer. + """ + + __tablename__ = "NMA_AssociatedData" + + location_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "LocationId", UUID(as_uuid=True), unique=True + ) + point_id: Mapped[Optional[str]] = mapped_column("PointID", String(10)) + assoc_id: Mapped[uuid.UUID] = mapped_column( + "AssocID", UUID(as_uuid=True), primary_key=True + ) + notes: Mapped[Optional[str]] = mapped_column("Notes", String(255)) + formation: Mapped[Optional[str]] = mapped_column("Formation", String(15)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + major_chemistries: Mapped[List["NMAMajorChemistry"]] = relationship( "NMAMajorChemistry", back_populates="chemistry_sample_info", @@ -328,6 +347,24 @@ class SurfaceWaterData(Base): data_source: Mapped[Optional[str]] = mapped_column("DataSource", String(255)) +class SurfaceWaterPhotos(Base): + """ + Legacy SurfaceWaterPhotos table from NM_Aquifer. + """ + + __tablename__ = "NMA_SurfaceWaterPhotos" + + surface_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "SurfaceID", UUID(as_uuid=True) + ) + point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) + ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + + class WeatherData(Base): """ Legacy WeatherData table from AMPAPI. @@ -345,6 +382,40 @@ class WeatherData(Base): object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True) +class WeatherPhotos(Base): + """ + Legacy WeatherPhotos table from NM_Aquifer. + """ + + __tablename__ = "NMA_WeatherPhotos" + + weather_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "WeatherID", UUID(as_uuid=True) + ) + point_id: Mapped[str] = mapped_column("PointID", String(50), nullable=False) + ole_path: Mapped[Optional[str]] = mapped_column("OLEPath", String(50)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + + +class SoilRockResults(Base): + """ + Legacy Soil_Rock_Results table from NM_Aquifer. + """ + + __tablename__ = "NMA_Soil_Rock_Results" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + point_id: Mapped[Optional[str]] = mapped_column("Point_ID", String(255)) + sample_type: Mapped[Optional[str]] = mapped_column("Sample Type", String(255)) + date_sampled: Mapped[Optional[str]] = mapped_column("Date Sampled", String(255)) + d13c: Mapped[Optional[float]] = mapped_column("d13C", Float) + d18o: Mapped[Optional[float]] = mapped_column("d18O", Float) + sampled_by: Mapped[Optional[str]] = mapped_column("Sampled by", String(255)) + + class NMAMinorTraceChemistry(Base): """ Legacy MinorandTraceChemistry table from AMPAPI. diff --git a/tests/test_associated_data_legacy.py b/tests/test_associated_data_legacy.py new file mode 100644 index 000000000..0a92032d2 --- /dev/null +++ b/tests/test_associated_data_legacy.py @@ -0,0 +1,80 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Unit tests for AssociatedData legacy model. + +These tests verify the migration of columns from the legacy AssociatedData table. +Migrated columns: +- LocationId -> location_id +- PointID -> point_id +- AssocID -> assoc_id +- Notes -> notes +- Formation -> formation +- OBJECTID -> object_id +""" + +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import AssociatedData + + +def test_create_associated_data_all_fields(): + """Test creating an associated data record with all fields.""" + with session_ctx() as session: + record = AssociatedData( + location_id=uuid4(), + point_id="AA-0001", + assoc_id=uuid4(), + notes="Legacy notes", + formation="TEST", + object_id=42, + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.assoc_id is not None + assert record.location_id is not None + assert record.point_id == "AA-0001" + assert record.notes == "Legacy notes" + assert record.formation == "TEST" + assert record.object_id == 42 + + session.delete(record) + session.commit() + + +def test_create_associated_data_minimal(): + """Test creating an associated data record with required fields only.""" + with session_ctx() as session: + record = AssociatedData(assoc_id=uuid4()) + session.add(record) + session.commit() + session.refresh(record) + + assert record.assoc_id is not None + assert record.location_id is None + assert record.point_id is None + assert record.notes is None + assert record.formation is None + assert record.object_id is None + + session.delete(record) + session.commit() + + +# ============= EOF ============================================= diff --git a/tests/test_soil_rock_results_legacy.py b/tests/test_soil_rock_results_legacy.py new file mode 100644 index 000000000..e252ab908 --- /dev/null +++ b/tests/test_soil_rock_results_legacy.py @@ -0,0 +1,79 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Unit tests for Soil_Rock_Results legacy model. + +These tests verify the migration of columns from the legacy Soil_Rock_Results table. +Migrated columns: +- Point_ID -> point_id +- Sample Type -> sample_type +- Date Sampled -> date_sampled +- d13C -> d13c +- d18O -> d18o +- Sampled by -> sampled_by +- SSMA_TimeStamp -> ssma_timestamp +""" + +from db.engine import session_ctx +from db.nma_legacy import SoilRockResults + + +def test_create_soil_rock_results_all_fields(): + """Test creating a soil/rock results record with all fields.""" + with session_ctx() as session: + record = SoilRockResults( + point_id="SR-0001", + sample_type="Soil", + date_sampled="2026-01-01", + d13c=-5.5, + d18o=12.3, + sampled_by="Tester", + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.id is not None + assert record.point_id == "SR-0001" + assert record.sample_type == "Soil" + assert record.date_sampled == "2026-01-01" + assert record.d13c == -5.5 + assert record.d18o == 12.3 + assert record.sampled_by == "Tester" + session.delete(record) + session.commit() + + +def test_create_soil_rock_results_minimal(): + """Test creating a soil/rock results record with required fields only.""" + with session_ctx() as session: + record = SoilRockResults() + session.add(record) + session.commit() + session.refresh(record) + + assert record.id is not None + assert record.point_id is None + assert record.sample_type is None + assert record.date_sampled is None + assert record.d13c is None + assert record.d18o is None + assert record.sampled_by is None + session.delete(record) + session.commit() + + +# ============= EOF ============================================= diff --git a/tests/test_surface_water_photos_legacy.py b/tests/test_surface_water_photos_legacy.py new file mode 100644 index 000000000..4660bf84b --- /dev/null +++ b/tests/test_surface_water_photos_legacy.py @@ -0,0 +1,79 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Unit tests for SurfaceWaterPhotos legacy model. + +These tests verify the migration of columns from the legacy SurfaceWaterPhotos table. +Migrated columns: +- SurfaceID -> surface_id +- PointID -> point_id +- OLEPath -> ole_path +- OBJECTID -> object_id +- GlobalID -> global_id +""" + +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import SurfaceWaterPhotos + + +def test_create_surface_water_photos_all_fields(): + """Test creating a surface water photos record with all fields.""" + with session_ctx() as session: + record = SurfaceWaterPhotos( + surface_id=uuid4(), + point_id="SW-0001", + ole_path="photo.jpg", + object_id=123, + global_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.surface_id is not None + assert record.point_id == "SW-0001" + assert record.ole_path == "photo.jpg" + assert record.object_id == 123 + + session.delete(record) + session.commit() + + +def test_create_surface_water_photos_minimal(): + """Test creating a surface water photos record with required fields only.""" + with session_ctx() as session: + record = SurfaceWaterPhotos( + point_id="SW-0002", + global_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.point_id == "SW-0002" + assert record.surface_id is None + assert record.ole_path is None + assert record.object_id is None + + session.delete(record) + session.commit() + + +# ============= EOF ============================================= diff --git a/tests/test_weather_photos_legacy.py b/tests/test_weather_photos_legacy.py new file mode 100644 index 000000000..c470aa764 --- /dev/null +++ b/tests/test_weather_photos_legacy.py @@ -0,0 +1,79 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Unit tests for WeatherPhotos legacy model. + +These tests verify the migration of columns from the legacy WeatherPhotos table. +Migrated columns: +- WeatherID -> weather_id +- PointID -> point_id +- OLEPath -> ole_path +- OBJECTID -> object_id +- GlobalID -> global_id +""" + +from uuid import uuid4 + +from db.engine import session_ctx +from db.nma_legacy import WeatherPhotos + + +def test_create_weather_photos_all_fields(): + """Test creating a weather photos record with all fields.""" + with session_ctx() as session: + record = WeatherPhotos( + weather_id=uuid4(), + point_id="WP-0001", + ole_path="weather.jpg", + object_id=321, + global_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.weather_id is not None + assert record.point_id == "WP-0001" + assert record.ole_path == "weather.jpg" + assert record.object_id == 321 + + session.delete(record) + session.commit() + + +def test_create_weather_photos_minimal(): + """Test creating a weather photos record with required fields only.""" + with session_ctx() as session: + record = WeatherPhotos( + point_id="WP-0002", + global_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.global_id is not None + assert record.point_id == "WP-0002" + assert record.weather_id is None + assert record.ole_path is None + assert record.object_id is None + + session.delete(record) + session.commit() + + +# ============= EOF ============================================= diff --git a/transfers/associated_data.py b/transfers/associated_data.py new file mode 100644 index 000000000..774653f41 --- /dev/null +++ b/transfers/associated_data.py @@ -0,0 +1,122 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import annotations + +from typing import Any, Optional +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import AssociatedData +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import replace_nans + + +class AssociatedDataTransferer(Transferer): + """Transfer legacy AssociatedData rows from NM_Aquifer.""" + + source_table = "AssociatedData" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = self._read_csv(self.source_table) + cleaned_df = replace_nans(df) + return df, cleaned_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows = self._dedupe_rows(rows, key="AssocID") + + if not rows: + logger.info("No AssociatedData rows to transfer") + return + + insert_stmt = insert(AssociatedData) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + "Upserting AssociatedData rows %s-%s (%s rows)", + i, + i + len(chunk) - 1, + len(chunk), + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["AssocID"], + set_={ + "LocationId": excluded["LocationId"], + "PointID": excluded["PointID"], + "Notes": excluded["Notes"], + "Formation": excluded["Formation"], + "OBJECTID": excluded["OBJECTID"], + }, + ) + session.execute(stmt) + session.commit() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + return { + "LocationId": self._uuid_val(row.get("LocationId")), + "PointID": row.get("PointID"), + "AssocID": self._uuid_val(row.get("AssocID")), + "Notes": row.get("Notes"), + "Formation": row.get("Formation"), + "OBJECTID": row.get("OBJECTID"), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" + deduped = {} + for row in rows: + assoc_id = row.get(key) + if assoc_id is None: + continue + deduped[assoc_id] = row + return list(deduped.values()) + + def _uuid_val(self, value: Any) -> Optional[UUID]: + if value is None or pd.isna(value): + return None + if isinstance(value, UUID): + return value + if isinstance(value, str): + try: + return UUID(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = AssociatedDataTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + run() + +# ============= EOF ============================================= diff --git a/transfers/metrics.py b/transfers/metrics.py index cf50644a1..14d0e6b21 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -41,11 +41,13 @@ NMARadionuclides, NMAMajorChemistry, SurfaceWaterData, + SurfaceWaterPhotos, NMAWaterLevelsContinuousPressureDaily, ViewNGWMNWellConstruction, ViewNGWMNWaterLevels, ViewNGWMNLithology, WeatherData, + WeatherPhotos, NMAMinorTraceChemistry, ) from db.engine import session_ctx @@ -111,6 +113,9 @@ def group_metrics(self, *args, **kw) -> None: def surface_water_data_metrics(self, *args, **kw) -> None: self._handle_metrics(SurfaceWaterData, *args, **kw) + def surface_water_photos_metrics(self, *args, **kw) -> None: + self._handle_metrics(SurfaceWaterPhotos, name="SurfaceWaterPhotos", *args, **kw) + def hydraulics_data_metrics(self, *args, **kw) -> None: self._handle_metrics(NMAHydraulicsData, name="HydraulicsData", *args, **kw) @@ -138,6 +143,9 @@ def ngwmn_water_levels_metrics(self, *args, **kw) -> None: def ngwmn_lithology_metrics(self, *args, **kw) -> None: self._handle_metrics(ViewNGWMNLithology, name="NGWMN Lithology", *args, **kw) + def weather_photos_metrics(self, *args, **kw) -> None: + self._handle_metrics(WeatherPhotos, name="WeatherPhotos", *args, **kw) + def waterlevels_pressure_daily_metrics(self, *args, **kw) -> None: self._handle_metrics( NMAWaterLevelsContinuousPressureDaily, diff --git a/transfers/soil_rock_results.py b/transfers/soil_rock_results.py new file mode 100644 index 000000000..28d44f05b --- /dev/null +++ b/transfers/soil_rock_results.py @@ -0,0 +1,94 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import annotations + +from typing import Any, Optional + +import pandas as pd +from sqlalchemy.orm import Session + +from db import SoilRockResults +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import replace_nans + + +class SoilRockResultsTransferer(Transferer): + """Transfer legacy Soil_Rock_Results rows from NM_Aquifer.""" + + source_table = "Soil_Rock_Results" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = self._read_csv(self.source_table) + cleaned_df = replace_nans(df) + return df, cleaned_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + + if not rows: + logger.info("No Soil_Rock_Results rows to transfer") + return + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + "Inserting Soil_Rock_Results rows %s-%s (%s rows)", + i, + i + len(chunk) - 1, + len(chunk), + ) + session.bulk_insert_mappings(SoilRockResults, chunk) + session.commit() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + return { + "Point_ID": row.get("Point_ID"), + "Sample Type": row.get("Sample Type"), + "Date Sampled": row.get("Date Sampled"), + "d13C": self._float_val(row.get("d13C")), + "d18O": self._float_val(row.get("d18O")), + "Sampled by": row.get("Sampled by"), + } + + def _float_val(self, value: Any) -> Optional[float]: + if value is None or pd.isna(value): + return None + if isinstance(value, (int, float)): + return float(value) + if isinstance(value, str) and value.strip(): + try: + return float(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = SoilRockResultsTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + run() + +# ============= EOF ============================================= diff --git a/transfers/surface_water_photos.py b/transfers/surface_water_photos.py new file mode 100644 index 000000000..1aecd0bb9 --- /dev/null +++ b/transfers/surface_water_photos.py @@ -0,0 +1,120 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import annotations + +from typing import Any, Optional +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import SurfaceWaterPhotos +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import replace_nans + + +class SurfaceWaterPhotosTransferer(Transferer): + """Transfer legacy SurfaceWaterPhotos rows from NM_Aquifer.""" + + source_table = "SurfaceWaterPhotos" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = self._read_csv(self.source_table) + cleaned_df = replace_nans(df) + return df, cleaned_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows = self._dedupe_rows(rows, key="GlobalID") + + if not rows: + logger.info("No SurfaceWaterPhotos rows to transfer") + return + + insert_stmt = insert(SurfaceWaterPhotos) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + "Upserting SurfaceWaterPhotos rows %s-%s (%s rows)", + i, + i + len(chunk) - 1, + len(chunk), + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "SurfaceID": excluded["SurfaceID"], + "PointID": excluded["PointID"], + "OLEPath": excluded["OLEPath"], + "OBJECTID": excluded["OBJECTID"], + }, + ) + session.execute(stmt) + session.commit() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + return { + "SurfaceID": self._uuid_val(row.get("SurfaceID")), + "PointID": row.get("PointID"), + "OLEPath": row.get("OLEPath"), + "OBJECTID": row.get("OBJECTID"), + "GlobalID": self._uuid_val(row.get("GlobalID")), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" + deduped = {} + for row in rows: + global_id = row.get(key) + if global_id is None: + continue + deduped[global_id] = row + return list(deduped.values()) + + def _uuid_val(self, value: Any) -> Optional[UUID]: + if value is None or pd.isna(value): + return None + if isinstance(value, UUID): + return value + if isinstance(value, str): + try: + return UUID(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = SurfaceWaterPhotosTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + run() + +# ============= EOF ============================================= diff --git a/transfers/transfer.py b/transfers/transfer.py index 226b7fafc..37f0c6be6 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -65,11 +65,13 @@ NGWMNWellConstructionTransferer, ) from transfers.surface_water_data import SurfaceWaterDataTransferer +from transfers.surface_water_photos import SurfaceWaterPhotosTransferer from transfers.util import timeit from transfers.waterlevelscontinuous_pressure_daily import ( NMAWaterLevelsContinuousPressureDailyTransferer, ) from transfers.weather_data import WeatherDataTransferer +from transfers.weather_photos import WeatherPhotosTransferer from transfers.logger import logger, save_log_to_bucket @@ -226,6 +228,7 @@ def transfer_all(metrics, limit=100): transfer_link_ids = get_bool_env("TRANSFER_LINK_IDS", True) transfer_groups = get_bool_env("TRANSFER_GROUPS", True) transfer_assets = get_bool_env("TRANSFER_ASSETS", False) + transfer_surface_water_photos = get_bool_env("TRANSFER_SURFACE_WATER_PHOTOS", True) transfer_surface_water_data = get_bool_env("TRANSFER_SURFACE_WATER_DATA", True) transfer_hydraulics_data = get_bool_env("TRANSFER_HYDRAULICS_DATA", True) transfer_chemistry_sampleinfo = get_bool_env("TRANSFER_CHEMISTRY_SAMPLEINFO", True) @@ -234,6 +237,7 @@ def transfer_all(metrics, limit=100): transfer_ngwmn_views = get_bool_env("TRANSFER_NGWMN_VIEWS", True) transfer_pressure_daily = get_bool_env("TRANSFER_WATERLEVELS_PRESSURE_DAILY", True) transfer_weather_data = get_bool_env("TRANSFER_WEATHER_DATA", True) + transfer_weather_photos = get_bool_env("TRANSFER_WEATHER_PHOTOS", True) transfer_minor_trace_chemistry = get_bool_env( "TRANSFER_MINOR_TRACE_CHEMISTRY", True ) @@ -254,6 +258,7 @@ def transfer_all(metrics, limit=100): transfer_link_ids, transfer_groups, transfer_assets, + transfer_surface_water_photos, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -262,6 +267,7 @@ def transfer_all(metrics, limit=100): transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, + transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, ) @@ -279,6 +285,7 @@ def transfer_all(metrics, limit=100): transfer_link_ids, transfer_groups, transfer_assets, + transfer_surface_water_photos, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -287,6 +294,7 @@ def transfer_all(metrics, limit=100): transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, + transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, ) @@ -305,6 +313,7 @@ def _transfer_parallel( transfer_link_ids, transfer_groups, transfer_assets, + transfer_surface_water_photos, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -313,6 +322,7 @@ def _transfer_parallel( transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, + transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, ): @@ -337,6 +347,12 @@ def _transfer_parallel( ) if transfer_groups: parallel_tasks_1.append(("Groups", ProjectGroupTransferer, flags)) + if transfer_surface_water_photos: + parallel_tasks_1.append( + ("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer, flags) + ) + if transfer_weather_photos: + parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer, flags)) if transfer_assets: parallel_tasks_1.append(("Assets", AssetTransferer, flags)) if transfer_surface_water_data: @@ -426,6 +442,8 @@ def _transfer_parallel( metrics.location_link_ids_metrics(*results_map["LinkIdsLocation"]) if "Groups" in results_map and results_map["Groups"]: metrics.group_metrics(*results_map["Groups"]) + if "SurfaceWaterPhotos" in results_map and results_map["SurfaceWaterPhotos"]: + metrics.surface_water_photos_metrics(*results_map["SurfaceWaterPhotos"]) if "Assets" in results_map and results_map["Assets"]: metrics.asset_metrics(*results_map["Assets"]) if "SurfaceWaterData" in results_map and results_map["SurfaceWaterData"]: @@ -449,6 +467,8 @@ def _transfer_parallel( ) if "WeatherData" in results_map and results_map["WeatherData"]: metrics.weather_data_metrics(*results_map["WeatherData"]) + if "WeatherPhotos" in results_map and results_map["WeatherPhotos"]: + metrics.weather_photos_metrics(*results_map["WeatherPhotos"]) if transfer_major_chemistry: message("TRANSFERRING MAJOR CHEMISTRY") results = _execute_transfer(MajorChemistryTransferer, flags=flags) @@ -526,6 +546,7 @@ def _transfer_sequential( transfer_link_ids, transfer_groups, transfer_assets, + transfer_surface_water_photos, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -534,6 +555,7 @@ def _transfer_sequential( transfer_ngwmn_views, transfer_pressure_daily, transfer_weather_data, + transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, ): @@ -584,6 +606,16 @@ def _transfer_sequential( results = _execute_transfer(ProjectGroupTransferer, flags=flags) metrics.group_metrics(*results) + if transfer_surface_water_photos: + message("TRANSFERRING SURFACE WATER PHOTOS") + results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) + metrics.surface_water_photos_metrics(*results) + + if transfer_weather_photos: + message("TRANSFERRING WEATHER PHOTOS") + results = _execute_transfer(WeatherPhotosTransferer, flags=flags) + metrics.weather_photos_metrics(*results) + if transfer_assets: message("TRANSFERRING ASSETS") results = _execute_transfer(AssetTransferer, flags=flags) diff --git a/transfers/weather_photos.py b/transfers/weather_photos.py new file mode 100644 index 000000000..82e5bc254 --- /dev/null +++ b/transfers/weather_photos.py @@ -0,0 +1,120 @@ +# ============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from __future__ import annotations + +from typing import Any, Optional +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import WeatherPhotos +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import replace_nans + + +class WeatherPhotosTransferer(Transferer): + """Transfer legacy WeatherPhotos rows from NM_Aquifer.""" + + source_table = "WeatherPhotos" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = self._read_csv(self.source_table) + cleaned_df = replace_nans(df) + return df, cleaned_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows = self._dedupe_rows(rows, key="GlobalID") + + if not rows: + logger.info("No WeatherPhotos rows to transfer") + return + + insert_stmt = insert(WeatherPhotos) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + "Upserting WeatherPhotos rows %s-%s (%s rows)", + i, + i + len(chunk) - 1, + len(chunk), + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "WeatherID": excluded["WeatherID"], + "PointID": excluded["PointID"], + "OLEPath": excluded["OLEPath"], + "OBJECTID": excluded["OBJECTID"], + }, + ) + session.execute(stmt) + session.commit() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + return { + "WeatherID": self._uuid_val(row.get("WeatherID")), + "PointID": row.get("PointID"), + "OLEPath": row.get("OLEPath"), + "OBJECTID": row.get("OBJECTID"), + "GlobalID": self._uuid_val(row.get("GlobalID")), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """Dedupe rows by unique key to avoid ON CONFLICT loops. Later rows win.""" + deduped = {} + for row in rows: + global_id = row.get(key) + if global_id is None: + continue + deduped[global_id] = row + return list(deduped.values()) + + def _uuid_val(self, value: Any) -> Optional[UUID]: + if value is None or pd.isna(value): + return None + if isinstance(value, UUID): + return value + if isinstance(value, str): + try: + return UUID(value) + except ValueError: + return None + return None + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the transfer.""" + transferer = WeatherPhotosTransferer(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + run() + +# ============= EOF ============================================= From 32d6aba686c0bd25f5762d914dfed7e25c1759af Mon Sep 17 00:00:00 2001 From: jross Date: Fri, 16 Jan 2026 16:59:53 -0700 Subject: [PATCH 2/7] feat: add support for SoilRockResults metrics and transferer --- transfers/metrics.py | 4 ++++ transfers/transfer.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/transfers/metrics.py b/transfers/metrics.py index 14d0e6b21..bd716c5c1 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -43,6 +43,7 @@ SurfaceWaterData, SurfaceWaterPhotos, NMAWaterLevelsContinuousPressureDaily, + SoilRockResults, ViewNGWMNWellConstruction, ViewNGWMNWaterLevels, ViewNGWMNLithology, @@ -116,6 +117,9 @@ def surface_water_data_metrics(self, *args, **kw) -> None: def surface_water_photos_metrics(self, *args, **kw) -> None: self._handle_metrics(SurfaceWaterPhotos, name="SurfaceWaterPhotos", *args, **kw) + def soil_rock_results_metrics(self, *args, **kw) -> None: + self._handle_metrics(SoilRockResults, name="Soil_Rock_Results", *args, **kw) + def hydraulics_data_metrics(self, *args, **kw) -> None: self._handle_metrics(NMAHydraulicsData, name="HydraulicsData", *args, **kw) diff --git a/transfers/transfer.py b/transfers/transfer.py index 37f0c6be6..907bde30d 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -64,6 +64,7 @@ NGWMNWaterLevelsTransferer, NGWMNWellConstructionTransferer, ) +from transfers.soil_rock_results import SoilRockResultsTransferer from transfers.surface_water_data import SurfaceWaterDataTransferer from transfers.surface_water_photos import SurfaceWaterPhotosTransferer from transfers.util import timeit @@ -229,6 +230,7 @@ def transfer_all(metrics, limit=100): transfer_groups = get_bool_env("TRANSFER_GROUPS", True) transfer_assets = get_bool_env("TRANSFER_ASSETS", False) transfer_surface_water_photos = get_bool_env("TRANSFER_SURFACE_WATER_PHOTOS", True) + transfer_soil_rock_results = get_bool_env("TRANSFER_SOIL_ROCK_RESULTS", True) transfer_surface_water_data = get_bool_env("TRANSFER_SURFACE_WATER_DATA", True) transfer_hydraulics_data = get_bool_env("TRANSFER_HYDRAULICS_DATA", True) transfer_chemistry_sampleinfo = get_bool_env("TRANSFER_CHEMISTRY_SAMPLEINFO", True) @@ -259,6 +261,7 @@ def transfer_all(metrics, limit=100): transfer_groups, transfer_assets, transfer_surface_water_photos, + transfer_soil_rock_results, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -286,6 +289,7 @@ def transfer_all(metrics, limit=100): transfer_groups, transfer_assets, transfer_surface_water_photos, + transfer_soil_rock_results, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -314,6 +318,7 @@ def _transfer_parallel( transfer_groups, transfer_assets, transfer_surface_water_photos, + transfer_soil_rock_results, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -351,6 +356,8 @@ def _transfer_parallel( parallel_tasks_1.append( ("SurfaceWaterPhotos", SurfaceWaterPhotosTransferer, flags) ) + if transfer_soil_rock_results: + parallel_tasks_1.append(("SoilRockResults", SoilRockResultsTransferer, flags)) if transfer_weather_photos: parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer, flags)) if transfer_assets: @@ -444,6 +451,8 @@ def _transfer_parallel( metrics.group_metrics(*results_map["Groups"]) if "SurfaceWaterPhotos" in results_map and results_map["SurfaceWaterPhotos"]: metrics.surface_water_photos_metrics(*results_map["SurfaceWaterPhotos"]) + if "SoilRockResults" in results_map and results_map["SoilRockResults"]: + metrics.soil_rock_results_metrics(*results_map["SoilRockResults"]) if "Assets" in results_map and results_map["Assets"]: metrics.asset_metrics(*results_map["Assets"]) if "SurfaceWaterData" in results_map and results_map["SurfaceWaterData"]: @@ -547,6 +556,7 @@ def _transfer_sequential( transfer_groups, transfer_assets, transfer_surface_water_photos, + transfer_soil_rock_results, transfer_surface_water_data, transfer_hydraulics_data, transfer_chemistry_sampleinfo, @@ -611,6 +621,11 @@ def _transfer_sequential( results = _execute_transfer(SurfaceWaterPhotosTransferer, flags=flags) metrics.surface_water_photos_metrics(*results) + if transfer_soil_rock_results: + message("TRANSFERRING SOIL ROCK RESULTS") + results = _execute_transfer(SoilRockResultsTransferer, flags=flags) + metrics.soil_rock_results_metrics(*results) + if transfer_weather_photos: message("TRANSFERRING WEATHER PHOTOS") results = _execute_transfer(WeatherPhotosTransferer, flags=flags) From ed33aa2daf5ccc44c8b0c547ebfcbd1248a5adbc Mon Sep 17 00:00:00 2001 From: jross Date: Fri, 16 Jan 2026 17:07:47 -0700 Subject: [PATCH 3/7] feat: add thing_id association to AssociatedData and SoilRockResults models --- ...c2f4a9d0b1e2_create_nma_associated_data.py | 6 +++++ ...a6b7c8d9e0_create_nma_soil_rock_results.py | 6 +++++ db/nma_legacy.py | 22 +++++++------------ tests/test_associated_data_legacy.py | 4 +++- tests/test_soil_rock_results_legacy.py | 4 +++- transfers/associated_data.py | 12 +++++++++- transfers/soil_rock_results.py | 12 +++++++++- 7 files changed, 48 insertions(+), 18 deletions(-) diff --git a/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py b/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py index 161c36aca..61d791db5 100644 --- a/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py +++ b/alembic/versions/c2f4a9d0b1e2_create_nma_associated_data.py @@ -37,6 +37,12 @@ def upgrade() -> None: sa.Column("Notes", sa.String(length=255), nullable=True), sa.Column("Formation", sa.String(length=15), nullable=True), sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + sa.Column( + "thing_id", + sa.Integer(), + sa.ForeignKey("thing.id", ondelete="CASCADE"), + nullable=True, + ), sa.UniqueConstraint("LocationId", name="AssociatedData$LocationId"), ) op.create_index("AssociatedData$PointID", "NMA_AssociatedData", ["PointID"]) diff --git a/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py b/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py index 2877c549a..a9a76751b 100644 --- a/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py +++ b/alembic/versions/f5a6b7c8d9e0_create_nma_soil_rock_results.py @@ -32,6 +32,12 @@ def upgrade() -> None: sa.Column("d13C", sa.Float(), nullable=True), sa.Column("d18O", sa.Float(), nullable=True), sa.Column("Sampled by", sa.String(length=255), nullable=True), + sa.Column( + "thing_id", + sa.Integer(), + sa.ForeignKey("thing.id", ondelete="CASCADE"), + nullable=True, + ), ) op.create_index( "Soil_Rock_Results$Point_ID", "NMA_Soil_Rock_Results", ["Point_ID"] diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 951f02fba..a69ebce0c 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -296,22 +296,11 @@ class AssociatedData(Base): notes: Mapped[Optional[str]] = mapped_column("Notes", String(255)) formation: Mapped[Optional[str]] = mapped_column("Formation", String(15)) object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) - - major_chemistries: Mapped[List["NMAMajorChemistry"]] = relationship( - "NMAMajorChemistry", - back_populates="chemistry_sample_info", - cascade="all, delete-orphan", - passive_deletes=True, + thing_id: Mapped[Optional[int]] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE") ) - @validates("thing_id") - def validate_thing_id(self, key, value): - """Prevent orphan ChemistrySampleInfo - must have a parent Thing.""" - if value is None: - raise ValueError( - "ChemistrySampleInfo requires a parent Thing (thing_id cannot be None)" - ) - return value + thing: Mapped["Thing"] = relationship("Thing") class SurfaceWaterData(Base): @@ -414,6 +403,11 @@ class SoilRockResults(Base): d13c: Mapped[Optional[float]] = mapped_column("d13C", Float) d18o: Mapped[Optional[float]] = mapped_column("d18O", Float) sampled_by: Mapped[Optional[str]] = mapped_column("Sampled by", String(255)) + thing_id: Mapped[Optional[int]] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE") + ) + + thing: Mapped["Thing"] = relationship("Thing") class NMAMinorTraceChemistry(Base): diff --git a/tests/test_associated_data_legacy.py b/tests/test_associated_data_legacy.py index 0a92032d2..a08e95bc0 100644 --- a/tests/test_associated_data_legacy.py +++ b/tests/test_associated_data_legacy.py @@ -32,7 +32,7 @@ from db.nma_legacy import AssociatedData -def test_create_associated_data_all_fields(): +def test_create_associated_data_all_fields(water_well_thing): """Test creating an associated data record with all fields.""" with session_ctx() as session: record = AssociatedData( @@ -42,6 +42,7 @@ def test_create_associated_data_all_fields(): notes="Legacy notes", formation="TEST", object_id=42, + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -53,6 +54,7 @@ def test_create_associated_data_all_fields(): assert record.notes == "Legacy notes" assert record.formation == "TEST" assert record.object_id == 42 + assert record.thing_id == water_well_thing.id session.delete(record) session.commit() diff --git a/tests/test_soil_rock_results_legacy.py b/tests/test_soil_rock_results_legacy.py index e252ab908..988a64bcb 100644 --- a/tests/test_soil_rock_results_legacy.py +++ b/tests/test_soil_rock_results_legacy.py @@ -31,7 +31,7 @@ from db.nma_legacy import SoilRockResults -def test_create_soil_rock_results_all_fields(): +def test_create_soil_rock_results_all_fields(water_well_thing): """Test creating a soil/rock results record with all fields.""" with session_ctx() as session: record = SoilRockResults( @@ -41,6 +41,7 @@ def test_create_soil_rock_results_all_fields(): d13c=-5.5, d18o=12.3, sampled_by="Tester", + thing_id=water_well_thing.id, ) session.add(record) session.commit() @@ -53,6 +54,7 @@ def test_create_soil_rock_results_all_fields(): assert record.d13c == -5.5 assert record.d18o == 12.3 assert record.sampled_by == "Tester" + assert record.thing_id == water_well_thing.id session.delete(record) session.commit() diff --git a/transfers/associated_data.py b/transfers/associated_data.py index 774653f41..56d6d8363 100644 --- a/transfers/associated_data.py +++ b/transfers/associated_data.py @@ -23,7 +23,8 @@ from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session -from db import AssociatedData +from db import AssociatedData, Thing +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import replace_nans @@ -37,6 +38,14 @@ class AssociatedDataTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.name, Thing.id).all() + self._thing_id_cache = {name: thing_id for name, thing_id in things} + logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -83,6 +92,7 @@ def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: "Notes": row.get("Notes"), "Formation": row.get("Formation"), "OBJECTID": row.get("OBJECTID"), + "thing_id": self._thing_id_cache.get(row.get("PointID")), } def _dedupe_rows( diff --git a/transfers/soil_rock_results.py b/transfers/soil_rock_results.py index 28d44f05b..066d9fd5a 100644 --- a/transfers/soil_rock_results.py +++ b/transfers/soil_rock_results.py @@ -21,7 +21,8 @@ import pandas as pd from sqlalchemy.orm import Session -from db import SoilRockResults +from db import SoilRockResults, Thing +from db.engine import session_ctx from transfers.logger import logger from transfers.transferer import Transferer from transfers.util import replace_nans @@ -35,6 +36,14 @@ class SoilRockResultsTransferer(Transferer): def __init__(self, *args, batch_size: int = 1000, **kwargs): super().__init__(*args, **kwargs) self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + self._build_thing_id_cache() + + def _build_thing_id_cache(self) -> None: + with session_ctx() as session: + things = session.query(Thing.name, Thing.id).all() + self._thing_id_cache = {name: thing_id for name, thing_id in things} + logger.info(f"Built Thing ID cache with {len(self._thing_id_cache)} entries") def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: df = self._read_csv(self.source_table) @@ -67,6 +76,7 @@ def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: "d13C": self._float_val(row.get("d13C")), "d18O": self._float_val(row.get("d18O")), "Sampled by": row.get("Sampled by"), + "thing_id": self._thing_id_cache.get(row.get("Point_ID")), } def _float_val(self, value: Any) -> Optional[float]: From dc6ff685d0b5cd39245e8ac37a9981c2da8a03a9 Mon Sep 17 00:00:00 2001 From: jross Date: Fri, 16 Jan 2026 17:16:35 -0700 Subject: [PATCH 4/7] feat: add major_chemistries relationship and validation for ChemistrySampleInfo --- db/nma_legacy.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index a69ebce0c..cb8ce17a1 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -278,6 +278,22 @@ class ChemistrySampleInfo(Base): passive_deletes=True, ) + major_chemistries: Mapped[List["NMAMajorChemistry"]] = relationship( + "NMAMajorChemistry", + back_populates="chemistry_sample_info", + cascade="all, delete-orphan", + passive_deletes=True, + ) + + @validates("thing_id") + def validate_thing_id(self, key, value): + """Prevent orphan ChemistrySampleInfo - must have a parent Thing.""" + if value is None: + raise ValueError( + "ChemistrySampleInfo requires a parent Thing (thing_id cannot be None)" + ) + return value + class AssociatedData(Base): """ From ba9eecb8f4b53204dab4895b55e9ffe33a70c481 Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 17 Jan 2026 10:08:29 -0700 Subject: [PATCH 5/7] feat: implement StratigraphyLegacyTransferer for importing Stratigraphy.csv into NMA_Stratigraphy --- transfers/stratigraphy_legacy.py | 164 +++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 transfers/stratigraphy_legacy.py diff --git a/transfers/stratigraphy_legacy.py b/transfers/stratigraphy_legacy.py new file mode 100644 index 000000000..701c7d6eb --- /dev/null +++ b/transfers/stratigraphy_legacy.py @@ -0,0 +1,164 @@ +"""Transfer Stratigraphy.csv into the NMA_Stratigraphy legacy table.""" + +from __future__ import annotations + +from typing import Any, Dict, List +from uuid import UUID + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import Stratigraphy, Thing +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import ( + filter_to_valid_point_ids, + read_csv, + replace_nans, +) + + +class StratigraphyLegacyTransferer(Transferer): + """Imports Stratigraphy.csv rows into NMA_Stratigraphy.""" + + source_table = "NMA_Stratigraphy" + + def __init__(self, batch_size: int = 1000, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.batch_size = batch_size + self._thing_id_cache: dict[str, int] = {} + + def _get_dfs(self): # type: ignore[override] + df = read_csv("Stratigraphy") + cleaned = replace_nans(df) + cleaned = filter_to_valid_point_ids(cleaned, self.pointids) + return df, cleaned + + def _transfer_hook(self, session: Session) -> None: # type: ignore[override] + if self.cleaned_df is None or self.cleaned_df.empty: + logger.info("No Stratigraphy rows to import after cleaning") + return + + self._prime_thing_cache(session) + rows: List[Dict[str, Any]] = [] + for row in self.cleaned_df.itertuples(): + record = self._row_dict(row) + if record is None: + continue + rows.append(record) + + if not rows: + logger.warning("All Stratigraphy rows were skipped during processing") + return + + insert_stmt = insert(Stratigraphy) + excluded = insert_stmt.excluded + + for start in range(0, len(rows), self.batch_size): + chunk = rows[start : start + self.batch_size] + logger.info( + "Upserting Stratigraphy batch %s-%s (%s rows)", + start, + start + len(chunk) - 1, + len(chunk), + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "WellID": excluded.WellID, + "PointID": excluded.PointID, + "thing_id": excluded.thing_id, + "StratTop": excluded.StratTop, + "StratBottom": excluded.StratBottom, + "UnitIdentifier": excluded.UnitIdentifier, + "Lithology": excluded.Lithology, + "LithologicModifier": excluded.LithologicModifier, + "ContributingUnit": excluded.ContributingUnit, + "StratSource": excluded.StratSource, + "StratNotes": excluded.StratNotes, + "OBJECTID": excluded.OBJECTID, + }, + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _prime_thing_cache(self, session: Session) -> None: + point_ids = set(self.cleaned_df["PointID"].dropna()) # type: ignore[index] + if not point_ids: + self._thing_id_cache = {} + return + results = ( + session.query(Thing.id, Thing.name).filter(Thing.name.in_(point_ids)).all() + ) + self._thing_id_cache = {name: thing_id for thing_id, name in results} + + def _row_dict(self, row: pd.Series) -> Dict[str, Any] | None: + point_id = getattr(row, "PointID", None) + if not point_id: + self._capture_error("", "Missing PointID", "PointID") + return None + thing_id = self._thing_id_cache.get(point_id) + if not thing_id: + self._capture_error(point_id, "No Thing found for PointID", "thing_id") + return None + + global_id = self._uuid_value(getattr(row, "GlobalID", None)) + if global_id is None: + self._capture_error(point_id, "Invalid GlobalID", "GlobalID") + return None + + return { + "GlobalID": global_id, + "WellID": self._uuid_value(getattr(row, "WellID", None)), + "PointID": point_id, + "thing_id": thing_id, + "StratTop": self._float_value(getattr(row, "StratTop", None)), + "StratBottom": self._float_value(getattr(row, "StratBottom", None)), + "UnitIdentifier": self._string_value(getattr(row, "UnitIdentifier", None)), + "Lithology": self._string_value(getattr(row, "Lithology", None)), + "LithologicModifier": self._string_value( + getattr(row, "LithologicModifier", None) + ), + "ContributingUnit": self._string_value( + getattr(row, "ContributingUnit", None) + ), + "StratSource": self._string_value(getattr(row, "StratSource", None)), + "StratNotes": self._string_value(getattr(row, "StratNotes", None)), + "OBJECTID": self._int_value(getattr(row, "OBJECTID", None)), + } + + def _uuid_value(self, value: Any) -> UUID | None: + if value in (None, ""): + return None + if isinstance(value, UUID): + return value + try: + return UUID(str(value)) + except (ValueError, TypeError): + return None + + def _float_value(self, value: Any) -> float | None: + if value in (None, ""): + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + def _int_value(self, value: Any) -> int | None: + if value in (None, ""): + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + def _string_value(self, value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + trimmed = value.strip() + return trimmed or None + return str(value) From 02be9dabbf6cb28ccb38c8b2b350c7f75034319f Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 17 Jan 2026 10:41:23 -0700 Subject: [PATCH 6/7] feat: add Stratigraphy model and associated metrics for data transfer --- db/nma_legacy.py | 32 +++++++++++++++++++++++++++++++- db/thing.py | 9 ++++++++- transfers/metrics.py | 4 ++++ transfers/soil_rock_results.py | 15 ++++++++------- transfers/transfer.py | 26 +++++++++++++++++++++----- transfers/transferer.py | 11 ++++++++++- 6 files changed, 82 insertions(+), 15 deletions(-) diff --git a/db/nma_legacy.py b/db/nma_legacy.py index cb8ce17a1..656e7069a 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -17,7 +17,6 @@ """Legacy NM Aquifer models copied from AMPAPI.""" import uuid - from datetime import date, datetime from typing import TYPE_CHECKING, List, Optional @@ -206,6 +205,37 @@ class NMAHydraulicsData(Base): thing: Mapped["Thing"] = relationship("Thing") +class Stratigraphy(Base): + """Legacy stratigraphy (lithology log) data from AMPAPI.""" + + __tablename__ = "NMA_Stratigraphy" + + global_id: Mapped[uuid.UUID] = mapped_column( + "GlobalID", UUID(as_uuid=True), primary_key=True + ) + well_id: Mapped[Optional[uuid.UUID]] = mapped_column("WellID", UUID(as_uuid=True)) + point_id: Mapped[str] = mapped_column("PointID", String(10), nullable=False) + thing_id: Mapped[int] = mapped_column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) + + strat_top: Mapped[Optional[float]] = mapped_column("StratTop", Float) + strat_bottom: Mapped[Optional[float]] = mapped_column("StratBottom", Float) + unit_identifier: Mapped[Optional[str]] = mapped_column("UnitIdentifier", String(50)) + lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(100)) + lithologic_modifier: Mapped[Optional[str]] = mapped_column( + "LithologicModifier", String(100) + ) + contributing_unit: Mapped[Optional[str]] = mapped_column( + "ContributingUnit", String(10) + ) + strat_source: Mapped[Optional[str]] = mapped_column("StratSource", Text) + strat_notes: Mapped[Optional[str]] = mapped_column("StratNotes", Text) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer, unique=True) + + thing: Mapped["Thing"] = relationship("Thing", back_populates="stratigraphy_logs") + + class ChemistrySampleInfo(Base): """ Legacy Chemistry SampleInfo table from AMPAPI. diff --git a/db/thing.py b/db/thing.py index 8283bdc4f..4365245fa 100644 --- a/db/thing.py +++ b/db/thing.py @@ -47,7 +47,7 @@ from db.thing_geologic_formation_association import ( ThingGeologicFormationAssociation, ) - from db.nma_legacy import ChemistrySampleInfo + from db.nma_legacy import ChemistrySampleInfo, Stratigraphy class Thing( @@ -312,6 +312,13 @@ class Thing( passive_deletes=True, ) + stratigraphy_logs: Mapped[List["Stratigraphy"]] = relationship( + "Stratigraphy", + back_populates="thing", + cascade="all, delete-orphan", + passive_deletes=True, + ) + # --- Association Proxies --- assets: AssociationProxy[list["Asset"]] = association_proxy( "asset_associations", "asset" diff --git a/transfers/metrics.py b/transfers/metrics.py index bd716c5c1..e2083beb4 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -50,6 +50,7 @@ WeatherData, WeatherPhotos, NMAMinorTraceChemistry, + AssociatedData, ) from db.engine import session_ctx from services.gcs_helper import get_storage_bucket @@ -167,6 +168,9 @@ def permissions_metrics(self, *args, **kw) -> None: def stratigraphy_metrics(self, *args, **kw) -> None: self._handle_metrics(ThingGeologicFormationAssociation, *args, **kw) + def associated_data_metrics(self, *args, **kw) -> None: + self._handle_metrics(AssociatedData, name="AssociatedData", *args, **kw) + def minor_trace_chemistry_metrics(self, *args, **kw) -> None: self._handle_metrics( NMAMinorTraceChemistry, name="MinorTraceChemistry", *args, **kw diff --git a/transfers/soil_rock_results.py b/transfers/soil_rock_results.py index 066d9fd5a..c2202282a 100644 --- a/transfers/soil_rock_results.py +++ b/transfers/soil_rock_results.py @@ -69,14 +69,15 @@ def _transfer_hook(self, session: Session) -> None: session.commit() def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + point_id = row.get("Point_ID") return { - "Point_ID": row.get("Point_ID"), - "Sample Type": row.get("Sample Type"), - "Date Sampled": row.get("Date Sampled"), - "d13C": self._float_val(row.get("d13C")), - "d18O": self._float_val(row.get("d18O")), - "Sampled by": row.get("Sampled by"), - "thing_id": self._thing_id_cache.get(row.get("Point_ID")), + "point_id": point_id, + "sample_type": row.get("Sample Type"), + "date_sampled": row.get("Date Sampled"), + "d13c": self._float_val(row.get("d13C")), + "d18o": self._float_val(row.get("d18O")), + "sampled_by": row.get("Sampled by"), + "thing_id": self._thing_id_cache.get(point_id), } def _float_val(self, value: Any) -> Optional[float]: diff --git a/transfers/transfer.py b/transfers/transfer.py index 907bde30d..174ee8cff 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -19,9 +19,10 @@ from alembic import command from alembic.config import Config +from dotenv import load_dotenv + from db.engine import session_ctx from db.initialization import recreate_public_schema, sync_search_vector_triggers -from dotenv import load_dotenv from services.util import get_bool_env from transfers.aquifer_system_transfer import transfer_aquifer_systems from transfers.geologic_formation_transfer import transfer_geologic_formations @@ -64,6 +65,7 @@ NGWMNWaterLevelsTransferer, NGWMNWellConstructionTransferer, ) +from transfers.associated_data import AssociatedDataTransferer from transfers.soil_rock_results import SoilRockResultsTransferer from transfers.surface_water_data import SurfaceWaterDataTransferer from transfers.surface_water_photos import SurfaceWaterPhotosTransferer @@ -244,6 +246,7 @@ def transfer_all(metrics, limit=100): "TRANSFER_MINOR_TRACE_CHEMISTRY", True ) transfer_nma_stratigraphy = get_bool_env("TRANSFER_NMA_STRATIGRAPHY", True) + transfer_associated_data = get_bool_env("TRANSFER_ASSOCIATED_DATA", True) use_parallel = get_bool_env("TRANSFER_PARALLEL", True) if use_parallel: @@ -273,6 +276,7 @@ def transfer_all(metrics, limit=100): transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, + transfer_associated_data, ) else: _transfer_sequential( @@ -301,6 +305,7 @@ def transfer_all(metrics, limit=100): transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, + transfer_associated_data, ) @@ -330,6 +335,7 @@ def _transfer_parallel( transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, + transfer_associated_data, ): """Execute transfers in parallel where possible.""" message("PARALLEL TRANSFER GROUP 1") @@ -362,6 +368,8 @@ def _transfer_parallel( parallel_tasks_1.append(("WeatherPhotos", WeatherPhotosTransferer, flags)) if transfer_assets: parallel_tasks_1.append(("Assets", AssetTransferer, flags)) + if transfer_associated_data: + parallel_tasks_1.append(("AssociatedData", AssociatedDataTransferer, flags)) if transfer_surface_water_data: parallel_tasks_1.append(("SurfaceWaterData", SurfaceWaterDataTransferer, flags)) if transfer_hydraulics_data: @@ -405,11 +413,11 @@ def _transfer_parallel( if transfer_nma_stratigraphy: future = executor.submit( _execute_transfer_with_timing, - "NMAStratigraphy", + "Stratigraphy", StratigraphyLegacyTransferer, flags, ) - futures[future] = "NMAStratigraphy" + futures[future] = "StratigraphyLegacy" future = executor.submit( _execute_session_transfer_with_timing, @@ -439,8 +447,10 @@ def _transfer_parallel( metrics.contact_metrics(*results_map["Contacts"]) if "Stratigraphy" in results_map and results_map["Stratigraphy"]: metrics.stratigraphy_metrics(*results_map["Stratigraphy"]) - if "NMAStratigraphy" in results_map and results_map["NMAStratigraphy"]: - metrics.nma_stratigraphy_metrics(*results_map["NMAStratigraphy"]) + if "StratigraphyLegacy" in results_map and results_map["StratigraphyLegacy"]: + metrics.nma_stratigraphy_metrics(*results_map["StratigraphyLegacy"]) + if "AssociatedData" in results_map and results_map["AssociatedData"]: + metrics.associated_data_metrics(*results_map["AssociatedData"]) if "WaterLevels" in results_map and results_map["WaterLevels"]: metrics.water_level_metrics(*results_map["WaterLevels"]) if "LinkIdsWellData" in results_map and results_map["LinkIdsWellData"]: @@ -568,6 +578,7 @@ def _transfer_sequential( transfer_weather_photos, transfer_minor_trace_chemistry, transfer_nma_stratigraphy, + transfer_associated_data, ): """Original sequential transfer logic.""" if transfer_screens: @@ -636,6 +647,11 @@ def _transfer_sequential( results = _execute_transfer(AssetTransferer, flags=flags) metrics.asset_metrics(*results) + if transfer_associated_data: + message("TRANSFERRING ASSOCIATED DATA") + results = _execute_transfer(AssociatedDataTransferer, flags=flags) + metrics.associated_data_metrics(*results) + if transfer_surface_water_data: message("TRANSFERRING SURFACE WATER DATA") results = _execute_transfer(SurfaceWaterDataTransferer, flags=flags) diff --git a/transfers/transferer.py b/transfers/transferer.py index dcfb39752..47826b0fb 100644 --- a/transfers/transferer.py +++ b/transfers/transferer.py @@ -24,7 +24,7 @@ from db import Thing, Base from db.engine import session_ctx from transfers.logger import logger -from transfers.util import chunk_by_size +from transfers.util import chunk_by_size, read_csv class ManualFixer(object): @@ -132,6 +132,15 @@ def _after_hook(self, session: Session): def _get_dfs(self): raise NotImplementedError("Must implement _get_dfs method") + def _read_csv(self, name: str, dtype: dict | None = None, **kw) -> pd.DataFrame: + if dtype is not None and "dtype" not in kw: + kw["dtype"] = dtype + csv_paths = self.flags.get("CSV_PATHS") or {} + csv_path = csv_paths.get(name) + if csv_path: + return pd.read_csv(csv_path, **kw) + return read_csv(name, dtype=dtype, **kw) + class ChunkTransferer(Transferer): def __init__(self, *args, **kwargs): From e6fdb416eb07359c873442debaebff4201e7624f Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 17 Jan 2026 10:52:29 -0700 Subject: [PATCH 7/7] feat: create legacy NMA_Stratigraphy table with associated columns and indexes --- ...c3b4a5e67_create_nma_stratigraphy_table.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py diff --git a/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py new file mode 100644 index 000000000..97770d567 --- /dev/null +++ b/alembic/versions/1d2c3b4a5e67_create_nma_stratigraphy_table.py @@ -0,0 +1,70 @@ +"""Create legacy NMA_Stratigraphy table. + +Revision ID: 1d2c3b4a5e67 +Revises: a7b8c9d0e1f2 +Create Date: 2026-01-15 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import inspect +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "1d2c3b4a5e67" +down_revision: Union[str, Sequence[str], None] = "f5a6b7c8d9e0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy stratigraphy table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_Stratigraphy"): + return + + op.create_table( + "NMA_Stratigraphy", + sa.Column( + "GlobalID", + postgresql.UUID(as_uuid=True), + primary_key=True, + nullable=False, + ), + sa.Column("WellID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=10), nullable=False), + sa.Column( + "thing_id", + sa.Integer(), + sa.ForeignKey("thing.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("StratTop", sa.Float(), nullable=True), + sa.Column("StratBottom", sa.Float(), nullable=True), + sa.Column("UnitIdentifier", sa.String(length=50), nullable=True), + sa.Column("Lithology", sa.String(length=100), nullable=True), + sa.Column("LithologicModifier", sa.String(length=100), nullable=True), + sa.Column("ContributingUnit", sa.String(length=10), nullable=True), + sa.Column("StratSource", sa.Text(), nullable=True), + sa.Column("StratNotes", sa.Text(), nullable=True), + sa.Column("OBJECTID", sa.Integer(), nullable=True, unique=True), + ) + op.create_index( + "ix_nma_stratigraphy_point_id", + "NMA_Stratigraphy", + ["PointID"], + ) + op.create_index( + "ix_nma_stratigraphy_thing_id", + "NMA_Stratigraphy", + ["thing_id"], + ) + + +def downgrade() -> None: + op.drop_index("ix_nma_stratigraphy_thing_id", table_name="NMA_Stratigraphy") + op.drop_index("ix_nma_stratigraphy_point_id", table_name="NMA_Stratigraphy") + op.drop_table("NMA_Stratigraphy")