diff --git a/.github/workflows/CD_staging.yml b/.github/workflows/CD_staging.yml index 3d6eb72ec..6d1ac96cb 100644 --- a/.github/workflows/CD_staging.yml +++ b/.github/workflows/CD_staging.yml @@ -54,6 +54,8 @@ jobs: CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" CLOUD_SQL_PASSWORD: "${{ secrets.CLOUD_SQL_PASSWORD }}" + GCS_SERVICE_ACCOUNT_KEY: "${{ secrets.GCS_SERVICE_ACCOUNT_KEY }}" + GCS_BUCKET_NAME: "${{ vars.GCS_BUCKET_NAME }}" run: | uv run python -m transfers.backfill.staging diff --git a/transfers/backfill/ngwmn_views.py b/transfers/backfill/ngwmn_views.py index 05f16459e..1d7d6844a 100644 --- a/transfers/backfill/ngwmn_views.py +++ b/transfers/backfill/ngwmn_views.py @@ -19,7 +19,7 @@ from typing import Any, Optional import pandas as pd -from sqlalchemy import insert, text +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session from db import ( @@ -49,24 +49,26 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return df, df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] - - for i in range(0, len(rows), self.batch_size): - chunk = rows[i : i + self.batch_size] - logger.info( - f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into {self.model.__tablename__}" + rows = self._dedupe_rows( + [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] ) - stmt = ( - insert(self.model) - .values(chunk) - .on_conflict_do_update( - index_elements=self._conflict_columns(), - set_=self._upsert_set_clause(), + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into {self.model.__tablename__}" ) - ) - session.execute(stmt) - session.commit() - session.expunge_all() + stmt = ( + insert(self.model) + .values(chunk) + .on_conflict_do_update( + index_elements=self._conflict_columns(), + set_=self._upsert_set_clause(), + ) + ) + session.execute(stmt) + session.commit() + session.expunge_all() def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: raise NotImplementedError("_row_dict must be implemented in subclasses") @@ -78,12 +80,48 @@ def _val(row: dict[str, Any], key: str) -> Optional[Any]: return None return v + @staticmethod + def _float_or_none(v: Any) -> Optional[float]: + if v is None or pd.isna(v): + return None + if isinstance(v, (int, float)): + return float(v) + if isinstance(v, str): + import re + + match = re.search(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?", v) + if match: + try: + return float(match.group(0)) + except ValueError: + return None + return None + def _conflict_columns(self) -> list[str]: raise NotImplementedError("_conflict_columns must be implemented") def _upsert_set_clause(self) -> dict[str, Any]: raise NotImplementedError("_upsert_set_clause must be implemented") + def _dedupe_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Deduplicate rows within a batch on conflict columns to avoid ON CONFLICT loops. + Later rows win. + """ + keys = self._conflict_columns() + deduped: dict[tuple, dict[str, Any]] = {} + passthrough: list[dict[str, Any]] = [] + + for row in rows: + key_tuple = tuple(row.get(k) for k in keys) + # If any part of the conflict key is missing, don't dedupe—let it pass through. + if any(k is None for k in key_tuple): + passthrough.append(row) + else: + deduped[key_tuple] = row + + return list(deduped.values()) + passthrough + class NGWMNWellConstructionBackfill(_BaseNGWMNBackfill): source_table = "view_NGWMN_WellConstruction" @@ -91,13 +129,14 @@ class NGWMNWellConstructionBackfill(_BaseNGWMNBackfill): def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: val = self._val + f = self._float_or_none return { "PointID": val(row, "PointID"), - "CasingTop": val(row, "CasingTop"), - "CasingBottom": val(row, "CasingBottom"), + "CasingTop": f(val(row, "CasingTop")), + "CasingBottom": f(val(row, "CasingBottom")), "CasingDepthUnits": val(row, "CasingDepthUnits"), - "ScreenTop": val(row, "ScreenTop"), - "ScreenBottom": val(row, "ScreenBottom"), + "ScreenTop": f(val(row, "ScreenTop")), + "ScreenBottom": f(val(row, "ScreenBottom")), "ScreenBottomUnit": val(row, "ScreenBottomUnit"), "ScreenDescription": val(row, "ScreenDescription"), "CasingDescription": val(row, "CasingDescription"), @@ -125,16 +164,17 @@ class NGWMNWaterLevelsBackfill(_BaseNGWMNBackfill): def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: val = self._val + f = self._float_or_none dm = val(row, "DateMeasured") if hasattr(dm, "date"): dm = dm.date() return { "PointID": val(row, "PointID"), "DateMeasured": dm, - "DepthToWaterBGS": val(row, "DepthToWaterBGS"), + "DepthToWaterBGS": f(val(row, "DepthToWaterBGS")), "WLUnits": val(row, "WLUnits"), "MeasurementMethod": val(row, "MeasurementMethod"), - "WLAccuracy": val(row, "WLAccuracy"), + "WLAccuracy": f(val(row, "WLAccuracy")), "PublicRelease": val(row, "PublicRelease"), } @@ -158,15 +198,16 @@ class NGWMNLithologyBackfill(_BaseNGWMNBackfill): def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: val = self._val + f = self._float_or_none return { "OBJECTID": val(row, "OBJECTID"), "PointID": val(row, "PointID"), "Lithology": val(row, "Lithology"), "TERM": val(row, "TERM"), "StratSource": val(row, "StratSource"), - "StratTop": val(row, "StratTop"), + "StratTop": f(val(row, "StratTop")), "StratTopUnit": val(row, "StratTopUnit"), - "StratBottom": val(row, "StratBottom"), + "StratBottom": f(val(row, "StratBottom")), "StratBottomUnit": val(row, "StratBottomUnit"), } diff --git a/transfers/backfill/staging.py b/transfers/backfill/staging.py index ed511aee0..db94ffaa5 100644 --- a/transfers/backfill/staging.py +++ b/transfers/backfill/staging.py @@ -40,13 +40,13 @@ def run(batch_size: int = 1000) -> None: Execute all backfill steps in a deterministic order. """ steps = ( - ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), - ("NGWMN views", lambda: run_ngwmn_views(batch_size=batch_size)), + # ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), + ("NGWMN views", run_ngwmn_views), ) for name, fn in steps: logger.info(f"Starting backfill: {name}") - fn(batch_size=batch_size) + fn(batch_size) logger.info(f"Completed backfill: {name}") diff --git a/transfers/backfill/waterlevelscontinuous_pressure_daily.py b/transfers/backfill/waterlevelscontinuous_pressure_daily.py index 886c9a900..89947bc48 100644 --- a/transfers/backfill/waterlevelscontinuous_pressure_daily.py +++ b/transfers/backfill/waterlevelscontinuous_pressure_daily.py @@ -19,7 +19,7 @@ from typing import Any, Optional import pandas as pd -from sqlalchemy import insert, text +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session from db import NMAWaterLevelsContinuousPressureDaily @@ -53,7 +53,10 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: return input_df, input_df def _transfer_hook(self, session: Session) -> None: - rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + rows = self._dedupe_rows( + [self._row_dict(row) for row in self.cleaned_df.to_dict("records")], + key="GlobalID", + ) insert_stmt = insert(NMAWaterLevelsContinuousPressureDaily) excluded = insert_stmt.excluded @@ -119,6 +122,21 @@ def val(key: str) -> Optional[Any]: "CONDDL (mS/cm)": val("CONDDL (mS/cm)"), } + def _dedupe_rows( + self, rows: list[dict[str, Any]], key: str + ) -> list[dict[str, Any]]: + """ + Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops. + Later rows win. + """ + deduped = {} + for row in rows: + gid = row.get(key) + if gid is None: + continue + deduped[gid] = row + return list(deduped.values()) + def run(batch_size: int = 1000) -> None: """Entrypoint to execute the backfill."""