Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/CD_staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ jobs:
CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}"
CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}"
CLOUD_SQL_PASSWORD: "${{ secrets.CLOUD_SQL_PASSWORD }}"
GCS_SERVICE_ACCOUNT_KEY: "${{ secrets.GCS_SERVICE_ACCOUNT_KEY }}"
GCS_BUCKET_NAME: "${{ vars.GCS_BUCKET_NAME }}"
run: |
uv run python -m transfers.backfill.staging

Expand Down
91 changes: 66 additions & 25 deletions transfers/backfill/ngwmn_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Any, Optional

import pandas as pd
from sqlalchemy import insert, text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

from db import (
Expand Down Expand Up @@ -49,24 +49,26 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
return df, df

def _transfer_hook(self, session: Session) -> None:
rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")]

for i in range(0, len(rows), self.batch_size):
chunk = rows[i : i + self.batch_size]
logger.info(
f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into {self.model.__tablename__}"
rows = self._dedupe_rows(
[self._row_dict(row) for row in self.cleaned_df.to_dict("records")]
)
stmt = (
insert(self.model)
.values(chunk)
.on_conflict_do_update(
index_elements=self._conflict_columns(),
set_=self._upsert_set_clause(),

for i in range(0, len(rows), self.batch_size):
chunk = rows[i : i + self.batch_size]
logger.info(
f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into {self.model.__tablename__}"
)
)
session.execute(stmt)
session.commit()
session.expunge_all()
stmt = (
insert(self.model)
.values(chunk)
.on_conflict_do_update(
index_elements=self._conflict_columns(),
set_=self._upsert_set_clause(),
)
)
session.execute(stmt)
session.commit()
session.expunge_all()

def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
raise NotImplementedError("_row_dict must be implemented in subclasses")
Expand All @@ -78,26 +80,63 @@ def _val(row: dict[str, Any], key: str) -> Optional[Any]:
return None
return v

@staticmethod
def _float_or_none(v: Any) -> Optional[float]:
if v is None or pd.isna(v):
return None
if isinstance(v, (int, float)):
return float(v)
if isinstance(v, str):
import re

match = re.search(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?", v)
if match:
try:
return float(match.group(0))
except ValueError:
return None
return None

def _conflict_columns(self) -> list[str]:
raise NotImplementedError("_conflict_columns must be implemented")

def _upsert_set_clause(self) -> dict[str, Any]:
raise NotImplementedError("_upsert_set_clause must be implemented")

def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Deduplicate rows within a batch on conflict columns to avoid ON CONFLICT loops.
Later rows win.
"""
keys = self._conflict_columns()
deduped: dict[tuple, dict[str, Any]] = {}
passthrough: list[dict[str, Any]] = []

for row in rows:
key_tuple = tuple(row.get(k) for k in keys)
# If any part of the conflict key is missing, don't dedupe—let it pass through.
if any(k is None for k in key_tuple):
passthrough.append(row)
else:
deduped[key_tuple] = row

return list(deduped.values()) + passthrough


class NGWMNWellConstructionBackfill(_BaseNGWMNBackfill):
source_table = "view_NGWMN_WellConstruction"
model = ViewNGWMNWellConstruction

def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
val = self._val
f = self._float_or_none
return {
"PointID": val(row, "PointID"),
"CasingTop": val(row, "CasingTop"),
"CasingBottom": val(row, "CasingBottom"),
"CasingTop": f(val(row, "CasingTop")),
"CasingBottom": f(val(row, "CasingBottom")),
"CasingDepthUnits": val(row, "CasingDepthUnits"),
"ScreenTop": val(row, "ScreenTop"),
"ScreenBottom": val(row, "ScreenBottom"),
"ScreenTop": f(val(row, "ScreenTop")),
"ScreenBottom": f(val(row, "ScreenBottom")),
"ScreenBottomUnit": val(row, "ScreenBottomUnit"),
"ScreenDescription": val(row, "ScreenDescription"),
"CasingDescription": val(row, "CasingDescription"),
Expand Down Expand Up @@ -125,16 +164,17 @@ class NGWMNWaterLevelsBackfill(_BaseNGWMNBackfill):

def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
val = self._val
f = self._float_or_none
dm = val(row, "DateMeasured")
if hasattr(dm, "date"):
dm = dm.date()
return {
"PointID": val(row, "PointID"),
"DateMeasured": dm,
"DepthToWaterBGS": val(row, "DepthToWaterBGS"),
"DepthToWaterBGS": f(val(row, "DepthToWaterBGS")),
"WLUnits": val(row, "WLUnits"),
"MeasurementMethod": val(row, "MeasurementMethod"),
"WLAccuracy": val(row, "WLAccuracy"),
"WLAccuracy": f(val(row, "WLAccuracy")),
"PublicRelease": val(row, "PublicRelease"),
}

Expand All @@ -158,15 +198,16 @@ class NGWMNLithologyBackfill(_BaseNGWMNBackfill):

def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]:
val = self._val
f = self._float_or_none
return {
"OBJECTID": val(row, "OBJECTID"),
"PointID": val(row, "PointID"),
"Lithology": val(row, "Lithology"),
"TERM": val(row, "TERM"),
"StratSource": val(row, "StratSource"),
"StratTop": val(row, "StratTop"),
"StratTop": f(val(row, "StratTop")),
"StratTopUnit": val(row, "StratTopUnit"),
"StratBottom": val(row, "StratBottom"),
"StratBottom": f(val(row, "StratBottom")),
"StratBottomUnit": val(row, "StratBottomUnit"),
}

Expand Down
6 changes: 3 additions & 3 deletions transfers/backfill/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ def run(batch_size: int = 1000) -> None:
Execute all backfill steps in a deterministic order.
"""
steps = (
("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily),
("NGWMN views", lambda: run_ngwmn_views(batch_size=batch_size)),
# ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily),
("NGWMN views", run_ngwmn_views),
Comment on lines 42 to +44

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Restore pressure-daily backfill in staging

The staging orchestrator no longer runs the WaterLevelsContinuous_Pressure_Daily backfill because that step is commented out in transfers/backfill/staging.py:42-44. services/ngwmn_helper.py:45-49 still queries NMAWaterLevelsContinuous_Pressure_Daily for water-level responses, so any staging deployment after this change won’t refresh that table and those API responses can become stale or empty unless another job populates it. This regression manifests whenever the staging pipeline is the only backfill path for that table.

Useful? React with 👍 / 👎.

)

for name, fn in steps:
logger.info(f"Starting backfill: {name}")
fn(batch_size=batch_size)
fn(batch_size)
logger.info(f"Completed backfill: {name}")


Expand Down
22 changes: 20 additions & 2 deletions transfers/backfill/waterlevelscontinuous_pressure_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Any, Optional

import pandas as pd
from sqlalchemy import insert, text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

from db import NMAWaterLevelsContinuousPressureDaily
Expand Down Expand Up @@ -53,7 +53,10 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]:
return input_df, input_df

def _transfer_hook(self, session: Session) -> None:
rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")]
rows = self._dedupe_rows(
[self._row_dict(row) for row in self.cleaned_df.to_dict("records")],
key="GlobalID",
)

insert_stmt = insert(NMAWaterLevelsContinuousPressureDaily)
excluded = insert_stmt.excluded
Expand Down Expand Up @@ -119,6 +122,21 @@ def val(key: str) -> Optional[Any]:
"CONDDL (mS/cm)": val("CONDDL (mS/cm)"),
}

def _dedupe_rows(
self, rows: list[dict[str, Any]], key: str
) -> list[dict[str, Any]]:
"""
Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops.
Later rows win.
"""
deduped = {}
for row in rows:
gid = row.get(key)
if gid is None:
continue
Comment on lines +133 to +136

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid silently dropping rows missing GlobalID

The new dedupe logic skips any row where GlobalID is None (transfers/backfill/waterlevelscontinuous_pressure_daily.py:133-136). If the source CSV contains any missing/invalid GlobalID values, those records are now silently discarded instead of triggering an insert failure or other visible error, which can mask data quality issues and lead to incomplete backfills. This only appears when the input has blank GlobalID fields, but it changes behavior from failing loudly to losing data quietly.

Useful? React with 👍 / 👎.

deduped[gid] = row
return list(deduped.values())


def run(batch_size: int = 1000) -> None:
"""Entrypoint to execute the backfill."""
Expand Down
Loading