From 8ab4f84c41aeef447cf7bbdf1f3468a4a1c1d78a Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Wed, 7 Jan 2026 21:09:42 -0700 Subject: [PATCH 1/3] feat: add legacy SurfaceWaterData model and admin view with backfill functionality --- admin/config.py | 4 +- admin/views/__init__.py | 2 + admin/views/surface_water.py | 118 ++++++++++++++ ...b77_add_surface_water_data_legacy_model.py | 47 ++++++ db/nma_legacy.py | 36 +++++ transfers/backfill/staging.py | 8 +- transfers/backfill/surface_water_data.py | 150 ++++++++++++++++++ 7 files changed, 361 insertions(+), 4 deletions(-) create mode 100644 admin/views/surface_water.py create mode 100644 alembic/versions/1680a4a7cb77_add_surface_water_data_legacy_model.py create mode 100644 transfers/backfill/surface_water_data.py diff --git a/admin/config.py b/admin/config.py index 2ba35b4f8..859c696d7 100644 --- a/admin/config.py +++ b/admin/config.py @@ -43,6 +43,7 @@ FieldEventAdmin, FieldActivityAdmin, ParameterAdmin, + SurfaceWaterDataAdmin, ) from db.engine import engine @@ -62,7 +63,7 @@ from db.group import Group from db.notes import Notes from db.sample import Sample -from db.nma_legacy import ChemistrySampleInfo +from db.nma_legacy import ChemistrySampleInfo, SurfaceWaterData from db.geologic_formation import GeologicFormation from db.data_provenance import DataProvenance from db.transducer import TransducerObservation @@ -130,6 +131,7 @@ def create_admin(app): # Samples admin.add_view(SampleAdmin(Sample)) admin.add_view(ChemistrySampleInfoAdmin(ChemistrySampleInfo)) + admin.add_view(SurfaceWaterDataAdmin(SurfaceWaterData)) # Field admin.add_view(FieldEventAdmin(FieldEvent)) diff --git a/admin/views/__init__.py b/admin/views/__init__.py index ffb8a25fe..596d3f3d0 100644 --- a/admin/views/__init__.py +++ b/admin/views/__init__.py @@ -41,6 +41,7 @@ FieldEventParticipantAdmin, ) from admin.views.parameter import ParameterAdmin +from admin.views.surface_water import SurfaceWaterDataAdmin __all__ = [ "LocationAdmin", @@ -65,4 +66,5 @@ "FieldActivityAdmin", "FieldEventParticipantAdmin", "ParameterAdmin", + "SurfaceWaterDataAdmin", ] diff --git a/admin/views/surface_water.py b/admin/views/surface_water.py new file mode 100644 index 000000000..1c09ded42 --- /dev/null +++ b/admin/views/surface_water.py @@ -0,0 +1,118 @@ +# =============================================================================== +# Copyright 2025 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +SurfaceWaterDataAdmin view for NMSampleLocations. +""" +from admin.views.base import OcotilloModelView + + +class SurfaceWaterDataAdmin(OcotilloModelView): + """ + Admin view for SurfaceWaterData legacy model. + """ + + name = "Surface Water" + label = "Surface Water" + icon = "fa fa-water" + enable_publish_actions = False + + column_list = [ + "surface_id", + "point_id", + "date_measured", + "discharge", + "discharge_units", + "discharge_method", + "discharge_source", + "formation_zone", + "aq_class", + "data_source", + ] + + column_sortable_list = [ + "surface_id", + "point_id", + "date_measured", + "discharge", + "discharge_units", + "discharge_method", + "discharge_source", + "formation_zone", + "aq_class", + ] + + column_default_sort = ("date_measured", True) + + search_fields = [ + "point_id", + "discharge", + "formation_zone", + "aq_class", + "data_source", + ] + + column_filters = [ + "discharge_units", + "discharge_method", + "discharge_source", + "formation_zone", + "aq_class", + ] + + can_export = True + export_types = ["csv", "excel"] + + page_size = 50 + page_size_options = [25, 50, 100, 200] + + fields = [ + "surface_id", + "point_id", + "object_id", + "date_measured", + "discharge", + "discharge_rate", + "discharge_units", + "discharge_method", + "discharge_source", + "formation_zone", + "aq_class", + "site_notes", + "field_method_notes", + "source_notes", + "data_source", + ] + + labels = { + "surface_id": "Surface ID", + "point_id": "Point ID", + "object_id": "Object ID", + "date_measured": "Date Measured", + "discharge": "Discharge", + "discharge_rate": "Discharge Rate", + "discharge_units": "Discharge Units", + "discharge_method": "Discharge Method", + "discharge_source": "Discharge Source", + "formation_zone": "Formation Zone", + "aq_class": "Aquifer Class", + "site_notes": "Site Notes", + "field_method_notes": "Field Method Notes", + "source_notes": "Source Notes", + "data_source": "Data Source", + } + + +# ============= EOF ============================================= diff --git a/alembic/versions/1680a4a7cb77_add_surface_water_data_legacy_model.py b/alembic/versions/1680a4a7cb77_add_surface_water_data_legacy_model.py new file mode 100644 index 000000000..84329a21b --- /dev/null +++ b/alembic/versions/1680a4a7cb77_add_surface_water_data_legacy_model.py @@ -0,0 +1,47 @@ +"""add surface water data legacy model + +Revision ID: 1680a4a7cb77 +Revises: c9f1d2e3a4b5 +Create Date: 2026-01-07 20:46:51.010596 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision: str = "1680a4a7cb77" +down_revision: Union[str, Sequence[str], None] = "c9f1d2e3a4b5" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_table( + "NMA_SurfaceWaterData", + sa.Column("SurfaceID", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("PointID", sa.String(length=10), nullable=False), + sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("Discharge", sa.String(length=50), nullable=True), + sa.Column("DischargeMethod", sa.String(length=50), nullable=True), + sa.Column("DischargeRate", sa.Float(), nullable=True), + sa.Column("DischargeUnits", sa.String(length=3), nullable=True), + sa.Column("DateMeasured", sa.DateTime(), nullable=True), + sa.Column("DischargeSource", sa.String(length=50), nullable=True), + sa.Column("SiteNotes", sa.String(length=200), nullable=True), + sa.Column("FieldMethodNotes", sa.String(length=200), nullable=True), + sa.Column("FormationZone", sa.String(length=15), nullable=True), + sa.Column("AqClass", sa.String(length=50), nullable=True), + sa.Column("SourceNotes", sa.String(length=200), nullable=True), + sa.Column("DataSource", sa.String(length=255), nullable=True), + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_table("NMA_SurfaceWaterData") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 9d300d7e5..5c930bb11 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -16,6 +16,8 @@ """Legacy NM Aquifer models copied from AMPAPI.""" +import uuid + from datetime import date, datetime from typing import Optional @@ -28,6 +30,7 @@ String, Text, ) +from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Mapped, mapped_column from db.base import Base @@ -190,4 +193,37 @@ class ChemistrySampleInfo(Base): sample_notes: Mapped[Optional[str]] = mapped_column("SampleNotes", Text) +class SurfaceWaterData(Base): + """ + Legacy SurfaceWaterData table from AMPAPI. + """ + + __tablename__ = "NMA_SurfaceWaterData" + + surface_id: Mapped[uuid.UUID] = mapped_column( + "SurfaceID", UUID(as_uuid=True), primary_key=True + ) + point_id: Mapped[str] = mapped_column("PointID", String(10)) + object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer) + + discharge: Mapped[Optional[str]] = mapped_column("Discharge", String(50)) + discharge_method: Mapped[Optional[str]] = mapped_column( + "DischargeMethod", String(50) + ) + discharge_rate: Mapped[Optional[float]] = mapped_column("DischargeRate", Float) + discharge_units: Mapped[Optional[str]] = mapped_column("DischargeUnits", String(3)) + date_measured: Mapped[Optional[datetime]] = mapped_column("DateMeasured", DateTime) + discharge_source: Mapped[Optional[str]] = mapped_column( + "DischargeSource", String(50) + ) + site_notes: Mapped[Optional[str]] = mapped_column("SiteNotes", String(200)) + field_method_notes: Mapped[Optional[str]] = mapped_column( + "FieldMethodNotes", String(200) + ) + formation_zone: Mapped[Optional[str]] = mapped_column("FormationZone", String(15)) + aq_class: Mapped[Optional[str]] = mapped_column("AqClass", String(50)) + source_notes: Mapped[Optional[str]] = mapped_column("SourceNotes", String(200)) + data_source: Mapped[Optional[str]] = mapped_column("DataSource", String(255)) + + # ============= EOF ============================================= diff --git a/transfers/backfill/staging.py b/transfers/backfill/staging.py index 94f679506..e23684ff6 100644 --- a/transfers/backfill/staging.py +++ b/transfers/backfill/staging.py @@ -29,6 +29,7 @@ if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from transfers.backfill.ngwmn_views import run as run_ngwmn_views +from transfers.backfill.surface_water_data import run as run_surface_water_data from transfers.backfill.waterlevelscontinuous_pressure_daily import ( run as run_pressure_daily, ) @@ -41,9 +42,10 @@ def run(batch_size: int = 1000) -> None: Execute all backfill steps in a deterministic order. """ steps = ( - ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), - ("Chemistry_SampleInfo", run_chemistry_sampleinfo), - ("NGWMN views", run_ngwmn_views), + ("SurfaceWaterData", run_surface_water_data), + # ("Chemistry_SampleInfo", run_chemistry_sampleinfo), + # ("NGWMN views", run_ngwmn_views), + # ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), ) for name, fn in steps: diff --git a/transfers/backfill/surface_water_data.py b/transfers/backfill/surface_water_data.py new file mode 100644 index 000000000..063a298e8 --- /dev/null +++ b/transfers/backfill/surface_water_data.py @@ -0,0 +1,150 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +from __future__ import annotations + +import uuid +from typing import Any, Optional + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import SurfaceWaterData +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import read_csv + + +class SurfaceWaterDataBackfill(Transferer): + """ + Backfill for the legacy SurfaceWaterData table. + """ + + source_table = "SurfaceWaterData" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = read_csv(self.source_table, parse_dates=["DateMeasured"]) + return df, df + + def _transfer_hook(self, session: Session) -> None: + rows = self._dedupe_rows( + [self._row_dict(row) for row in self.cleaned_df.to_dict("records")], + key="SurfaceID", + ) + + insert_stmt = insert(SurfaceWaterData) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into SurfaceWaterData" + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["SurfaceID"], + set_={ + "PointID": excluded.PointID, + "OBJECTID": excluded.OBJECTID, + "Discharge": excluded.Discharge, + "DischargeMethod": excluded.DischargeMethod, + "DischargeRate": excluded.DischargeRate, + "DischargeUnits": excluded.DischargeUnits, + "DateMeasured": excluded.DateMeasured, + "DischargeSource": excluded.DischargeSource, + "SiteNotes": excluded.SiteNotes, + "FieldMethodNotes": excluded.FieldMethodNotes, + "FormationZone": excluded.FormationZone, + "AqClass": excluded.AqClass, + "SourceNotes": excluded.SourceNotes, + "DataSource": excluded.DataSource, + }, + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def val(key: str) -> Optional[Any]: + v = row.get(key) + if pd.isna(v): + return None + return v + + def to_uuid(v: Any) -> Optional[uuid.UUID]: + if v is None or pd.isna(v): + return None + if isinstance(v, uuid.UUID): + return v + if isinstance(v, str) and v.strip(): + return uuid.UUID(v) + return None + + dt = val("DateMeasured") + if hasattr(dt, "to_pydatetime"): + dt = dt.to_pydatetime() + + return { + "SurfaceID": to_uuid(val("SurfaceID")), + "PointID": val("PointID"), + "OBJECTID": val("OBJECTID"), + "Discharge": val("Discharge"), + "DischargeMethod": val("DischargeMethod"), + "DischargeRate": val("DischargeRate"), + "DischargeUnits": val("DischargeUnits"), + "DateMeasured": dt, + "DischargeSource": val("DischargeSource"), + "SiteNotes": val("SiteNotes"), + "FieldMethodNotes": val("FieldMethodNotes"), + "FormationZone": val("FormationZone"), + "AqClass": val("AqClass"), + "SourceNotes": val("SourceNotes"), + "DataSource": val("DataSource"), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """ + Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops. + Later rows win. + """ + deduped: dict[Any, dict[str, Any]] = {} + passthrough: list[dict[str, Any]] = [] + for row in rows: + row_key = row.get(key) + if row_key is None: + passthrough.append(row) + else: + deduped[row_key] = row + return list(deduped.values()) + passthrough + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the backfill.""" + transferer = SurfaceWaterDataBackfill(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + # Allow running via `python -m transfers.backfill.surface_water_data` + run() + +# ============= EOF ============================================= From ad640ce0c8fcd73f76e77b4363cb6ac2869d7ea4 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Wed, 7 Jan 2026 21:18:06 -0700 Subject: [PATCH 2/3] feat: enhance backfill process by adding conditional execution for steps --- transfers/backfill/staging.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/transfers/backfill/staging.py b/transfers/backfill/staging.py index e23684ff6..bea333d2b 100644 --- a/transfers/backfill/staging.py +++ b/transfers/backfill/staging.py @@ -34,6 +34,7 @@ run as run_pressure_daily, ) from transfers.backfill.chemistry_sampleinfo import run as run_chemistry_sampleinfo +from services.util import get_bool_env from transfers.logger import logger @@ -42,13 +43,24 @@ def run(batch_size: int = 1000) -> None: Execute all backfill steps in a deterministic order. """ steps = ( - ("SurfaceWaterData", run_surface_water_data), - # ("Chemistry_SampleInfo", run_chemistry_sampleinfo), - # ("NGWMN views", run_ngwmn_views), - # ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), + ("SurfaceWaterData", run_surface_water_data, "BACKFILL_SURFACE_WATER_DATA"), + ( + "Chemistry_SampleInfo", + run_chemistry_sampleinfo, + "BACKFILL_CHEMISTRY_SAMPLEINFO", + ), + ("NGWMN views", run_ngwmn_views, "BACKFILL_NGWMN_VIEWS"), + ( + "WaterLevelsContinuous_Pressure_Daily", + run_pressure_daily, + "BACKFILL_WATERLEVELS_PRESSURE_DAILY", + ), ) - for name, fn in steps: + for name, fn, flag in steps: + if not get_bool_env(flag, True): + logger.info(f"Skipping backfill: {name} ({flag}=false)") + continue logger.info(f"Starting backfill: {name}") fn(batch_size) logger.info(f"Completed backfill: {name}") From 6bed7447ca8011306dfbf910a9c3362be3de01d8 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Wed, 7 Jan 2026 21:24:26 -0700 Subject: [PATCH 3/3] feat: update NMA_SurfaceWaterData model to enforce primary key constraints --- .../1680a4a7cb77_add_surface_water_data_legacy_model.py | 4 ++-- db/nma_legacy.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/alembic/versions/1680a4a7cb77_add_surface_water_data_legacy_model.py b/alembic/versions/1680a4a7cb77_add_surface_water_data_legacy_model.py index 84329a21b..d4460d7c3 100644 --- a/alembic/versions/1680a4a7cb77_add_surface_water_data_legacy_model.py +++ b/alembic/versions/1680a4a7cb77_add_surface_water_data_legacy_model.py @@ -24,9 +24,9 @@ def upgrade() -> None: """Upgrade schema.""" op.create_table( "NMA_SurfaceWaterData", - sa.Column("SurfaceID", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("SurfaceID", postgresql.UUID(as_uuid=True), nullable=False), sa.Column("PointID", sa.String(length=10), nullable=False), - sa.Column("OBJECTID", sa.Integer(), nullable=True), + sa.Column("OBJECTID", sa.Integer(), primary_key=True), sa.Column("Discharge", sa.String(length=50), nullable=True), sa.Column("DischargeMethod", sa.String(length=50), nullable=True), sa.Column("DischargeRate", sa.Float(), nullable=True), diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 5c930bb11..695d2a976 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -201,10 +201,10 @@ class SurfaceWaterData(Base): __tablename__ = "NMA_SurfaceWaterData" surface_id: Mapped[uuid.UUID] = mapped_column( - "SurfaceID", UUID(as_uuid=True), primary_key=True + "SurfaceID", UUID(as_uuid=True), nullable=False ) point_id: Mapped[str] = mapped_column("PointID", String(10)) - object_id: Mapped[Optional[int]] = mapped_column("OBJECTID", Integer) + object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True) discharge: Mapped[Optional[str]] = mapped_column("Discharge", String(50)) discharge_method: Mapped[Optional[str]] = mapped_column(