From 08b990d8999a389a79cf7dbb143457ec60635bff Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Tue, 6 Jan 2026 21:06:37 -0700 Subject: [PATCH] feat: add unique constraints for NMA_WaterLevelsContinuous_Pressure_Daily and NGWMN tables --- ...ad_add_unique_constraint_pressure_daily.py | 34 ++++++ ...b0a6b8b_ensure_ngwmn_unique_constraints.py | 113 ++++++++++++++++++ transfers/backfill/staging.py | 2 +- 3 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 alembic/versions/4b7aa74b15ad_add_unique_constraint_pressure_daily.py create mode 100644 alembic/versions/5f4e2b0a6b8b_ensure_ngwmn_unique_constraints.py diff --git a/alembic/versions/4b7aa74b15ad_add_unique_constraint_pressure_daily.py b/alembic/versions/4b7aa74b15ad_add_unique_constraint_pressure_daily.py new file mode 100644 index 000000000..1b0058343 --- /dev/null +++ b/alembic/versions/4b7aa74b15ad_add_unique_constraint_pressure_daily.py @@ -0,0 +1,34 @@ +"""Add unique constraint for NMA_WaterLevelsContinuous_Pressure_Daily + +Revision ID: 4b7aa74b15ad +Revises: 8a1de3e3f0b3 +Create Date: 2026-02-10 01:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "4b7aa74b15ad" +down_revision: Union[str, Sequence[str], None] = "8a1de3e3f0b3" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Ensure unique constraint on GlobalID for upserts.""" + op.create_unique_constraint( + "uq_nma_pressure_daily_globalid", + "NMA_WaterLevelsContinuous_Pressure_Daily", + ["GlobalID"], + ) + + +def downgrade() -> None: + """Drop the unique constraint.""" + op.drop_constraint( + "uq_nma_pressure_daily_globalid", + "NMA_WaterLevelsContinuous_Pressure_Daily", + type_="unique", + ) diff --git a/alembic/versions/5f4e2b0a6b8b_ensure_ngwmn_unique_constraints.py b/alembic/versions/5f4e2b0a6b8b_ensure_ngwmn_unique_constraints.py new file mode 100644 index 000000000..a647bca28 --- /dev/null +++ b/alembic/versions/5f4e2b0a6b8b_ensure_ngwmn_unique_constraints.py @@ -0,0 +1,113 @@ +"""Ensure NGWMN unique constraints for upserts + +Revision ID: 5f4e2b0a6b8b +Revises: 4b7aa74b15ad +Create Date: 2026-02-10 01:20:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "5f4e2b0a6b8b" +down_revision: Union[str, Sequence[str], None] = "4b7aa74b15ad" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add unique constraints needed for ON CONFLICT upserts (idempotent).""" + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'uq_nma_view_ngwmn_waterlevels_point_date' + ) THEN + ALTER TABLE "NMA_view_NGWMN_WaterLevels" + ADD CONSTRAINT uq_nma_view_ngwmn_waterlevels_point_date UNIQUE ("PointID", "DateMeasured"); + END IF; + END; + $$; + """ + ) + + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'uq_nma_view_ngwmn_wellconstruction_point_casing_screen' + ) THEN + ALTER TABLE "NMA_view_NGWMN_WellConstruction" + ADD CONSTRAINT uq_nma_view_ngwmn_wellconstruction_point_casing_screen + UNIQUE ("PointID", "CasingTop", "ScreenTop"); + END IF; + END; + $$; + """ + ) + + op.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'uq_nma_view_ngwmn_lithology_objectid' + ) THEN + ALTER TABLE "NMA_view_NGWMN_Lithology" + ADD CONSTRAINT uq_nma_view_ngwmn_lithology_objectid UNIQUE ("OBJECTID"); + END IF; + END; + $$; + """ + ) + + +def downgrade() -> None: + """Drop the NGWMN unique constraints if they exist.""" + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'uq_nma_view_ngwmn_waterlevels_point_date' + ) THEN + ALTER TABLE "NMA_view_NGWMN_WaterLevels" + DROP CONSTRAINT uq_nma_view_ngwmn_waterlevels_point_date; + END IF; + END; + $$; + """ + ) + + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'uq_nma_view_ngwmn_wellconstruction_point_casing_screen' + ) THEN + ALTER TABLE "NMA_view_NGWMN_WellConstruction" + DROP CONSTRAINT uq_nma_view_ngwmn_wellconstruction_point_casing_screen; + END IF; + END; + $$; + """ + ) + + op.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'uq_nma_view_ngwmn_lithology_objectid' + ) THEN + ALTER TABLE "NMA_view_NGWMN_Lithology" + DROP CONSTRAINT uq_nma_view_ngwmn_lithology_objectid; + END IF; + END; + $$; + """ + ) diff --git a/transfers/backfill/staging.py b/transfers/backfill/staging.py index db94ffaa5..172b67371 100644 --- a/transfers/backfill/staging.py +++ b/transfers/backfill/staging.py @@ -40,7 +40,7 @@ def run(batch_size: int = 1000) -> None: Execute all backfill steps in a deterministic order. """ steps = ( - # ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), + ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), ("NGWMN views", run_ngwmn_views), )