From 3bacce1dd7264dddd653c374fbeb8c0d0923f870 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Thu, 8 Jan 2026 22:15:48 -0700 Subject: [PATCH] feat: add legacy WeatherData model and backfill functionality --- ...e9d3a1c45_add_weather_data_legacy_model.py | 51 +++++ db/nma_legacy.py | 17 ++ tests/test_weather_data_legacy.py | 194 ++++++++++++++++++ transfers/backfill/backfill.py | 2 + transfers/backfill/weather_data.py | 125 +++++++++++ 5 files changed, 389 insertions(+) create mode 100644 alembic/versions/2f6e9d3a1c45_add_weather_data_legacy_model.py create mode 100644 tests/test_weather_data_legacy.py create mode 100644 transfers/backfill/weather_data.py diff --git a/alembic/versions/2f6e9d3a1c45_add_weather_data_legacy_model.py b/alembic/versions/2f6e9d3a1c45_add_weather_data_legacy_model.py new file mode 100644 index 000000000..ec1095ee9 --- /dev/null +++ b/alembic/versions/2f6e9d3a1c45_add_weather_data_legacy_model.py @@ -0,0 +1,51 @@ +"""add weather data legacy model + +Revision ID: 2f6e9d3a1c45 +Revises: 8ed4b9770721 +Create Date: 2026-01-09 09:42:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision: str = "2f6e9d3a1c45" +down_revision: Union[str, Sequence[str], None] = "8ed4b9770721" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + bind = op.get_bind() + inspector = sa.inspect(bind) + table_name = "NMA_WeatherData" + + if not inspector.has_table(table_name): + op.create_table( + table_name, + sa.Column("LocationId", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("PointID", sa.String(length=10), nullable=False), + sa.Column("WeatherID", postgresql.UUID(as_uuid=True), nullable=True), + sa.Column("OBJECTID", sa.Integer(), primary_key=True), + ) + return + + pk = inspector.get_pk_constraint(table_name) + pk_columns = pk.get("constrained_columns") or [] + if pk_columns != ["OBJECTID"]: + op.create_primary_key( + "NMA_WeatherData_pkey", + table_name, + ["OBJECTID"], + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_table("NMA_WeatherData") diff --git a/db/nma_legacy.py b/db/nma_legacy.py index 695d2a976..8b0d4981c 100644 --- a/db/nma_legacy.py +++ b/db/nma_legacy.py @@ -226,4 +226,21 @@ class SurfaceWaterData(Base): data_source: Mapped[Optional[str]] = mapped_column("DataSource", String(255)) +class WeatherData(Base): + """ + Legacy WeatherData table from AMPAPI. + """ + + __tablename__ = "NMA_WeatherData" + + location_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "LocationId", UUID(as_uuid=True) + ) + point_id: Mapped[str] = mapped_column("PointID", String(10)) + weather_id: Mapped[Optional[uuid.UUID]] = mapped_column( + "WeatherID", UUID(as_uuid=True) + ) + object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True) + + # ============= EOF ============================================= diff --git a/tests/test_weather_data_legacy.py b/tests/test_weather_data_legacy.py new file mode 100644 index 000000000..a2f7eb14b --- /dev/null +++ b/tests/test_weather_data_legacy.py @@ -0,0 +1,194 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Unit tests for WeatherData legacy model. + +These tests verify the migration of columns from the legacy WeatherData table. +Migrated columns (excluding SSMA_TimeStamp): +- LocationId -> location_id +- PointID -> point_id +- WeatherID -> weather_id +- OBJECTID -> object_id +""" + +from uuid import uuid4 + +from sqlalchemy import func + +from db.engine import session_ctx +from db.nma_legacy import WeatherData + + +def _next_object_id(session) -> int: + max_id = session.query(func.max(WeatherData.object_id)).scalar() + return (max_id or 0) + 1 + + +# ===================== CREATE tests ========================== +def test_create_weather_data_all_fields(): + """Test creating a weather data record with all migrated fields.""" + with session_ctx() as session: + record = WeatherData( + object_id=_next_object_id(session), + location_id=uuid4(), + point_id="WX-1001", + weather_id=uuid4(), + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.object_id is not None + assert record.point_id == "WX-1001" + assert record.location_id is not None + assert record.weather_id is not None + + session.delete(record) + session.commit() + + +def test_create_weather_data_minimal(): + """Test creating a weather data record with minimal fields.""" + with session_ctx() as session: + record = WeatherData( + object_id=_next_object_id(session), + point_id="WX-1002", + ) + session.add(record) + session.commit() + session.refresh(record) + + assert record.object_id is not None + assert record.point_id == "WX-1002" + assert record.location_id is None + assert record.weather_id is None + + session.delete(record) + session.commit() + + +# ===================== READ tests ========================== +def test_read_weather_data_by_object_id(): + """Test reading a specific weather data record by OBJECTID.""" + with session_ctx() as session: + record = WeatherData( + object_id=_next_object_id(session), + point_id="WX-1003", + ) + session.add(record) + session.commit() + + fetched = session.get(WeatherData, record.object_id) + assert fetched is not None + assert fetched.object_id == record.object_id + assert fetched.point_id == "WX-1003" + + session.delete(record) + session.commit() + + +def test_query_weather_data_by_point_id(): + """Test querying weather data by point_id.""" + with session_ctx() as session: + record1 = WeatherData( + object_id=_next_object_id(session), + point_id="WX-1004", + ) + record2 = WeatherData( + object_id=_next_object_id(session), + point_id="WX-1005", + ) + session.add_all([record1, record2]) + session.commit() + + results = ( + session.query(WeatherData).filter(WeatherData.point_id == "WX-1004").all() + ) + assert len(results) >= 1 + assert all(r.point_id == "WX-1004" for r in results) + + session.delete(record1) + session.delete(record2) + session.commit() + + +# ===================== UPDATE tests ========================== +def test_update_weather_data(): + """Test updating a weather data record.""" + with session_ctx() as session: + record = WeatherData( + object_id=_next_object_id(session), + point_id="WX-1006", + ) + session.add(record) + session.commit() + + new_location_id = uuid4() + new_weather_id = uuid4() + record.location_id = new_location_id + record.weather_id = new_weather_id + session.commit() + session.refresh(record) + + assert record.location_id == new_location_id + assert record.weather_id == new_weather_id + + session.delete(record) + session.commit() + + +# ===================== DELETE tests ========================== +def test_delete_weather_data(): + """Test deleting a weather data record.""" + with session_ctx() as session: + record = WeatherData( + object_id=_next_object_id(session), + point_id="WX-1007", + ) + session.add(record) + session.commit() + + session.delete(record) + session.commit() + + fetched = session.get(WeatherData, record.object_id) + assert fetched is None + + +# ===================== Column existence tests ========================== +def test_weather_data_has_all_migrated_columns(): + """ + Test that the model has all expected columns from WeatherData. + """ + expected_columns = [ + "location_id", + "point_id", + "weather_id", + "object_id", + ] + + for column in expected_columns: + assert hasattr( + WeatherData, column + ), f"Expected column '{column}' not found in WeatherData model" + + +def test_weather_data_table_name(): + """Test that the table name follows convention.""" + assert WeatherData.__tablename__ == "NMA_WeatherData" + + +# ============= EOF ============================================= diff --git a/transfers/backfill/backfill.py b/transfers/backfill/backfill.py index 44cfa8c56..07f6b5e3b 100644 --- a/transfers/backfill/backfill.py +++ b/transfers/backfill/backfill.py @@ -30,6 +30,7 @@ sys.path.insert(0, str(ROOT)) from transfers.backfill.ngwmn_views import run as run_ngwmn_views from transfers.backfill.surface_water_data import run as run_surface_water_data +from transfers.backfill.weather_data import run as run_weather_data from transfers.backfill.waterlevelscontinuous_pressure_daily import ( run as run_pressure_daily, ) @@ -44,6 +45,7 @@ def run(batch_size: int = 1000) -> None: """ steps = ( ("SurfaceWaterData", run_surface_water_data, "BACKFILL_SURFACE_WATER_DATA"), + ("WeatherData", run_weather_data, "BACKFILL_WEATHER_DATA"), ( "Chemistry_SampleInfo", run_chemistry_sampleinfo, diff --git a/transfers/backfill/weather_data.py b/transfers/backfill/weather_data.py new file mode 100644 index 000000000..b4ee169fd --- /dev/null +++ b/transfers/backfill/weather_data.py @@ -0,0 +1,125 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +from __future__ import annotations + +import uuid +from typing import Any, Optional + +import pandas as pd +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import Session + +from db import WeatherData +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import read_csv + + +class WeatherDataBackfill(Transferer): + """ + Backfill for the legacy WeatherData table. + """ + + source_table = "WeatherData" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = read_csv(self.source_table) + return df, df + + def _transfer_hook(self, session: Session) -> None: + rows = self._dedupe_rows( + [self._row_dict(row) for row in self.cleaned_df.to_dict("records")], + key="OBJECTID", + ) + + insert_stmt = insert(WeatherData) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into WeatherData" + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["OBJECTID"], + set_={ + "LocationId": excluded.LocationId, + "PointID": excluded.PointID, + "WeatherID": excluded.WeatherID, + "OBJECTID": excluded.OBJECTID, + }, + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def val(key: str) -> Optional[Any]: + v = row.get(key) + if pd.isna(v): + return None + return v + + def to_uuid(v: Any) -> Optional[uuid.UUID]: + if v is None or pd.isna(v): + return None + if isinstance(v, uuid.UUID): + return v + if isinstance(v, str) and v.strip(): + return uuid.UUID(v) + return None + + return { + "LocationId": to_uuid(val("LocationId")), + "PointID": val("PointID"), + "WeatherID": to_uuid(val("WeatherID")), + "OBJECTID": val("OBJECTID"), + } + + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """ + Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops. + Later rows win. + """ + deduped: dict[Any, dict[str, Any]] = {} + passthrough: list[dict[str, Any]] = [] + for row in rows: + row_key = row.get(key) + if row_key is None: + passthrough.append(row) + else: + deduped[row_key] = row + return list(deduped.values()) + passthrough + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the backfill.""" + transferer = WeatherDataBackfill(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + # Allow running via `python -m transfers.backfill.weather_data` + run() + +# ============= EOF =============================================