-
Notifications
You must be signed in to change notification settings - Fork 4
migration/waterlevels-continuous-pressure-daily #340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ | |
| from typing import Any, Optional | ||
|
|
||
| import pandas as pd | ||
| from sqlalchemy import insert, text | ||
| from sqlalchemy.dialects.postgresql import insert | ||
| from sqlalchemy.orm import Session | ||
|
|
||
| from db import NMAWaterLevelsContinuousPressureDaily | ||
|
|
@@ -53,7 +53,10 @@ def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: | |
| return input_df, input_df | ||
|
|
||
| def _transfer_hook(self, session: Session) -> None: | ||
| rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] | ||
| rows = self._dedupe_rows( | ||
| [self._row_dict(row) for row in self.cleaned_df.to_dict("records")], | ||
| key="GlobalID", | ||
| ) | ||
|
|
||
| insert_stmt = insert(NMAWaterLevelsContinuousPressureDaily) | ||
| excluded = insert_stmt.excluded | ||
|
|
@@ -119,6 +122,21 @@ def val(key: str) -> Optional[Any]: | |
| "CONDDL (mS/cm)": val("CONDDL (mS/cm)"), | ||
| } | ||
|
|
||
| def _dedupe_rows( | ||
| self, rows: list[dict[str, Any]], key: str | ||
| ) -> list[dict[str, Any]]: | ||
| """ | ||
| Deduplicate rows within a batch by the given key to avoid ON CONFLICT loops. | ||
| Later rows win. | ||
| """ | ||
| deduped = {} | ||
| for row in rows: | ||
| gid = row.get(key) | ||
| if gid is None: | ||
| continue | ||
|
Comment on lines
+133
to
+136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The new dedupe logic skips any row where Useful? React with 👍 / 👎. |
||
| deduped[gid] = row | ||
| return list(deduped.values()) | ||
|
|
||
|
|
||
| def run(batch_size: int = 1000) -> None: | ||
| """Entrypoint to execute the backfill.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The staging orchestrator no longer runs the
WaterLevelsContinuous_Pressure_Dailybackfill because that step is commented out intransfers/backfill/staging.py:42-44.services/ngwmn_helper.py:45-49still queriesNMAWaterLevelsContinuous_Pressure_Dailyfor water-level responses, so any staging deployment after this change won’t refresh that table and those API responses can become stale or empty unless another job populates it. This regression manifests whenever the staging pipeline is the only backfill path for that table.Useful? React with 👍 / 👎.