DIE is a data integration engine that unifies NM water data from 12 heterogeneous sources.
Currently: pip-installable CLI (die weave), partial Dagster code on jir-dagster (unmerged).
Goals for this branch:
- Migrate package management to uv + pyproject.toml
- Add Dagster orchestration layer (internal, not shipped in pip package)
- Deploy orchestration as GCP Cloud Run
- Produce OGC Feature Collections as canonical data products
- Generate time-series data products per well / parameter
- Products defined in a configurable YAML manifest
- Serve data products via pygeoapi (OGC API - Features standard)
- No changes to public CLI (
die weave,die sites,die sources) - No changes to existing
backend/integration logic (sources, transformers, unifier) - No changes to existing
frontend/cli.py - Orchestration code NOT shipped to PyPI
- No database required for orchestration pipeline (GCS is the store)
- GeoServer/PostGIS persister remains available as optional CLI output — not used here
DataIntegrationEngine/
├── pyproject.toml # uv project (replaces setup.py + requirements.txt)
├── uv.lock # pinned lockfile
├── backend/ # unchanged — core integration
├── frontend/ # unchanged — CLI + legacy API
├── orchestration/ # NEW — not in pip package
│ ├── pyproject.toml # orchestration-specific deps
│ ├── Dockerfile # Cloud Run Job image (Dagster)
│ ├── cloudbuild.yaml # Cloud Build CI/CD
│ ├── assets/
│ │ ├── __init__.py
│ │ ├── wells.py # well site assets
│ │ ├── waterlevels.py # water level timeseries assets
│ │ └── analytes.py # analyte assets
│ ├── resources/
│ │ ├── __init__.py
│ │ ├── die_config.py # DIE Config Dagster resource
│ │ └── gcs.py # GCS upload/download resource
│ ├── config/
│ │ └── products.yaml # configurable product manifest
│ ├── definitions.py # Dagster Definitions (entry point)
│ └── pygeoapi/ # pygeoapi API server
│ ├── config.yml.j2 # Jinja2 template for pygeoapi config
│ ├── generate_config.py # renders config.yml from products.yaml
│ ├── Dockerfile # extends geopython/pygeoapi
│ └── cloudbuild.yaml # Cloud Build for pygeoapi image
├── tests/ # unchanged
└── SPEC.md
pyproject.toml (public CLI)
[project.dependencies] ← lean: click, httpx, geopandas, pyyaml, pandas, etc.
[project.optional-dependencies]
dev = [pytest, mypy, flake8]
geoserver = [psycopg2-binary, GeoAlchemy2, SQLAlchemy] # optional, existing feature
orchestration/pyproject.toml (internal, never published)
[project.dependencies] ← dagster, dagster-gcp, google-cloud-storage,
google-cloud-secret-manager, Jinja2
No database deps in orchestration core. GCS is the sole store.
Cloud Scheduler (cron)
→ Cloud Run Job ← Dagster orchestration (stateless)
└── DIE unifier (existing backend)
├── fetch from 12 sources
├── transform → OGC FC GeoJSON files
└── upload → gs://die-products/{product_id}/
│
GCS bucket
(public read)
│
GDAL /vsigs/ virtual FS
│
Cloud Run Service ← pygeoapi (always-on)
(OGR provider reads GeoJSON from GCS)
│
HTTP clients
(OGC API - Features)
Two Cloud Run deployments:
- Cloud Run Job (Dagster, stateless): triggered by Cloud Scheduler, runs pipeline per product
- Cloud Run Service (pygeoapi, always-on): serves OGC API - Features via GDAL OGR + GCS
No PostgreSQL required. GCS is authoritative store. pygeoapi reads GeoJSON directly from
GCS via GDAL's /vsigs/ virtual filesystem — no proxy, no DB, no sync step.
GeoServer/PostGIS persister in
backend/persisters/geoserver.pyis unchanged and still usable via CLI--output-format geoserver. Not part of this pipeline.
OGC API - Features compliant GeoJSON written by OGCFeaturesPersister.
Collection envelope:
{
"type": "FeatureCollection",
"id": "nm_waterlevels_summary",
"title": "NM Unified Water Levels Summary",
"description": "...",
"timeStamp": "2026-06-22T06:00:00Z",
"numberMatched": 1234,
"numberReturned": 1234,
"links": [
{"href": "gs://die-products/nm_waterlevels_summary/latest.geojson",
"rel": "self", "type": "application/geo+json"}
],
"features": [...]
}Each Feature has a top-level id (OGC requirement):
{
"type": "Feature",
"id": "nmbgmr_amp:RA-1234",
"geometry": {"type": "Point", "coordinates": [-106.5, 35.2, 1650.0]},
"properties": { ... }
}One feature per well site. Properties = existing SummaryRecord fields
(nrecords, min, max, mean, earliest_date, latest_date, latest_value, etc.).
One feature per observation — not per well. This enables pygeoapi time_field
temporal filtering natively without custom code.
{
"type": "Feature",
"id": "nmbgmr_amp:RA-1234:2024-04-20",
"geometry": {"type": "Point", "coordinates": [-106.5, 35.2, 1650.0]},
"properties": {
"site_id": "RA-1234",
"site_name": "Roswell Basin Well",
"source": "nmbgmr_amp",
"parameter": "waterlevels",
"value": 218.1,
"units": "ft",
"datetime": "2024-04-20T00:00:00Z"
}
}datetime is an ISO 8601 timestamp — pygeoapi maps it to time_field for
?datetime= query parameter support.
backend/persisters/ogc_features.py → OGCFeaturesPersister
dump_summary_collection(path, records, meta)— §4.2 formatdump_timeseries_collection(path, site_records, timeseries_records, meta)— §4.3 format- Writes local
.geojsonfile; Dagster GCS resource handles upload
products_config ← loads products.yaml at startup
│
▼
[per product, per schedule]
source_data ← unify_waterlevels / unify_analytes (existing)
│
▼
ogc_collection ← OGCFeaturesPersister → tmp .geojson
│
▼
gcs_upload ← gs://die-products/{product_id}/{YYYY-MM-DD}.geojson
gs://die-products/{product_id}/latest.geojson (overwrite)
gcs_bucket: die-products
products:
- id: nm_waterlevels_summary
parameter: waterlevels
output_type: ogc_summary
title: "NM Unified Water Levels Summary"
description: "Summary stats for water levels, all NM sources"
schedule: "0 6 * * *" # UTC cron
spatial_filter:
state: NM
sources:
exclude: []
- id: nm_waterlevels_timeseries
parameter: waterlevels
output_type: ogc_timeseries
title: "NM Water Levels Time Series"
description: "Per-observation water level measurements, all NM sources"
schedule: "0 7 * * *"
spatial_filter:
state: NM
sources:
exclude: []
- id: bernco_waterlevels_timeseries
parameter: waterlevels
output_type: ogc_timeseries
title: "Bernalillo County Water Level Time Series"
description: "Bernalillo County water level timeseries per well"
schedule: "0 8 * * *"
spatial_filter:
county: Bernalillo
sources:
include: [bernco]
- id: nm_arsenic_summary
parameter: arsenic
output_type: ogc_summary
title: "NM Arsenic Summary"
description: "Arsenic concentration summary stats, all NM sources"
schedule: "0 9 * * *"
spatial_filter:
state: NM
sources:
exclude: []Assets are dynamically generated from products.yaml at Dagster definition time.
One Cloud Run Job per schedule group. Cloud Scheduler triggers with PRODUCT_ID env var.
Single Dagster definitions.py handles all products; job selects by product id.
Structure supports later migration to persistent Dagster daemon (change Cloud Run Job → Service).
pygeoapi serves the GCS-stored GeoJSON files as OGC API - Features collections.
No DB. pygeoapi uses the OGR provider backed by GDAL's /vsigs/ virtual filesystem,
which reads GeoJSON directly from GCS using Application Default Credentials on Cloud Run.
GET /collections
GET /collections/{id}/items
GET /collections/{id}/items/{feature_id}
GET /collections/{id}/items?bbox=-107,32,-103,37
GET /collections/{id}/items?datetime=2020-01-01/2024-12-31 ← timeseries only
server:
bind:
host: 0.0.0.0
port: 80
url: ${PYGEOAPI_SERVER_URL}
mimetype: application/json
encoding: utf-8
language: en-US
cors: true
pretty_print: false
limit: 500
logging:
level: ERROR
metadata:
identification:
title: NM Unified Water Data
description: OGC API - Features for New Mexico water data
keywords: [water, groundwater, "New Mexico", NMBGMR]
keywords_type: theme
terms_of_service: https://creativecommons.org/licenses/by/4.0/
url: https://waterdata.nmt.edu
license:
name: CC-BY 4.0
url: https://creativecommons.org/licenses/by/4.0/
provider:
name: NM Bureau of Geology & Mineral Resources
url: https://geoinfo.nmt.edu
resources:
{% for product in products %}
{{ product.id }}:
type: collection
title: {{ product.title }}
description: {{ product.description }}
keywords: [water, groundwater, "New Mexico"]
extent:
spatial:
bbox: [-109.05, 31.33, -103.00, 37.00]
crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84
{% if product.output_type == 'ogc_timeseries' %}
temporal:
interval: [["1900-01-01T00:00:00Z", null]]
{% endif %}
providers:
- type: feature
name: OGR
data:
source_type: GeoJSON
source: /vsigs/{{ gcs_bucket }}/{{ product.id }}/latest.geojson
source_options:
GDAL_HTTP_UNSAFESSL: NO
gdal_ogr_options:
EMPTY_AS_NULL: NO
GDAL_CACHEMAX: 64
id_field: id
layer: OGRGeoJSON
{% if product.output_type == 'ogc_timeseries' %}
time_field: datetime
{% endif %}
{% endfor %}import yaml
from jinja2 import Environment, FileSystemLoader
from pathlib import Path
def generate(products_path: Path, template_path: Path, output_path: Path):
products = yaml.safe_load(products_path.read_text())
env = Environment(loader=FileSystemLoader(str(template_path.parent)))
tmpl = env.get_template(template_path.name)
output_path.write_text(tmpl.render(
products=products["products"],
gcs_bucket=products["gcs_bucket"],
))Run at Docker build time in cloudbuild.yaml — baked into image, not runtime.
On Cloud Run, GDAL /vsigs/ uses the service account's ADC automatically.
No credentials file needed. Require the pygeoapi Cloud Run Service account to have
roles/storage.objectViewer on the die-products bucket.
For local dev:
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/sa-key.jsonFROM geopython/pygeoapi:latest
# Generate config from products.yaml at build time
COPY ../config/products.yaml /tmp/products.yaml
COPY config.yml.j2 /tmp/config.yml.j2
COPY generate_config.py /tmp/generate_config.py
RUN python /tmp/generate_config.py \
--products /tmp/products.yaml \
--template /tmp/config.yml.j2 \
--output /pygeoapi/local.config.yml
EXPOSE 80Cloud Run Service env vars:
PYGEOAPI_SERVER_URL— public Cloud Run URL- Port: 80
Replaces setup.py, requirements.txt, pytest.ini, mypy.ini:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "nmuwd"
version = "0.10.3"
requires-python = ">=3.10"
dependencies = [
"click>=8.2.1",
"python-dotenv",
"frost_sta_client",
"geopandas",
"httpx",
"pandas",
"pyyaml",
"types-pyyaml",
"urllib3>=2.2.0,<3.0.0",
]
[project.optional-dependencies]
dev = ["pytest", "mypy", "flake8"]
geoserver = ["psycopg2-binary", "GeoAlchemy2", "SQLAlchemy"]
gcs = ["google-cloud-storage"]
[project.scripts]
die = "frontend.cli:cli"
[tool.hatch.build.targets.wheel]
packages = ["frontend", "backend"]
[tool.pytest.ini_options]
testpaths = ["tests"]
norecursedirs = ["tests/archived"]
[tool.mypy]
ignore_missing_imports = trueflask, gunicorn removed from core — belong in deployment layer.
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "die-orchestration"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"dagster>=1.8",
"dagster-gcp>=0.24",
"dagster-webserver>=1.8",
"google-cloud-storage",
"google-cloud-secret-manager",
"Jinja2",
]
[tool.uv.sources]
nmuwd = { path = "..", editable = true }No DB deps. pygeoapi runs in its own image — not a Python dep here.
FROM python:3.12-slim
WORKDIR /app
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
COPY orchestration/pyproject.toml ./orchestration/
COPY pyproject.toml ./
RUN uv sync --frozen --project orchestration
COPY backend/ ./backend/
COPY frontend/ ./frontend/
COPY orchestration/ ./orchestration/
ENV DAGSTER_HOME=/app/.dagster
ENV PYTHONPATH=/app
CMD ["uv", "run", "--project", "orchestration", \
"dagster", "job", "execute", \
"-f", "orchestration/definitions.py", \
"-j", "${PRODUCT_ID}"]Cloud Run Job env vars (from Secret Manager):
PRODUCT_ID, GCS_BUCKET, USGS_API_KEY
- Delete
setup.py,requirements.txt,pytest.ini,mypy.ini - Write root
pyproject.toml(§7.1) - Run
uv lock - Update
.github/workflows/cicd.yml:uv run pytest,uv run mypy,uv run flake8 - Verify:
uv pip install -e ".[dev]"+ all existing tests pass
- Add
backend/persisters/ogc_features.py→OGCFeaturesPersisterdump_summary_collection(path, records, meta)— §4.2dump_timeseries_collection(path, site_records, timeseries_records, meta)— §4.3 flat format
- Add
ogc_summary,ogc_timeseriestoOutputFormatenum inbackend/__init__.py - Tests:
tests/test_persisters/test_ogc_features.py
- Create
orchestration/directory (§3.1) - Write
orchestration/pyproject.toml(§7.2) - Write
orchestration/config/products.yaml(§5.2) - Write
orchestration/resources/die_config.py— Dagster resource wrappingConfig - Write
orchestration/resources/gcs.py— upload/overwrite GCS objects
- Port
jir-dagsterbranch assets intoorchestration/assets/ - Rewrite to: load products.yaml → dynamically define one asset per product
- Each asset: build
Configfrom product spec → call unifier →OGCFeaturesPersister→ GCS upload - Write
orchestration/definitions.py - Local test:
uv run dagster asset materialize -f orchestration/definitions.py --select nm_waterlevels_summary
orchestration/assets/waterlevels.py—ogc_timeseriesproduct type- Flat observation-per-feature output (§4.3) with
datetimefield - Reuses existing
unify_waterlevels— no backend changes
orchestration/resources/gcs.py- Upload:
gs://{bucket}/products/{product_id}/{YYYY-MM-DD}.geojson - Overwrite:
gs://{bucket}/products/{product_id}/latest.geojson - Emit Dagster
AssetMaterializationmetadata: feature count, bbox, file size, timestamp
- Write
orchestration/Dockerfile(§8) - Write
orchestration/cloudbuild.yaml - Write
orchestration/cloudrun.yaml— Cloud Run Job definition - Write
orchestration/README.md— env vars, Secret Manager bindings, deploy commands
- Update
cicd.yml: useuv runfor all checks - Add
orchestration-ci.yml: lint + import-check for orchestration code - Orchestration CI never triggers PyPI publish
- Write
orchestration/pygeoapi/config.yml.j2(§6.2) - Write
orchestration/pygeoapi/generate_config.py(§6.3) - Write
orchestration/pygeoapi/Dockerfile(§6.5) - Write
orchestration/pygeoapi/cloudbuild.yaml - Verify GDAL
/vsigs/reads from GCS with ADC in local Docker test - Smoke test:
GET /collectionsreturns one entry per product inproducts.yaml
Retry backoff (_execute_text_request, _execute_json_request in source.py): linear time.sleep(tries) → exponential backoff capped at 60s.
Polygon re-parse per record (BaseTransformer.contained(), transformer.py): _cached_polygon is set at instance level but config.bounding_wkt() is called on every record. Cache shapely object permanently at first call.
Redundant list extraction in BaseParameterSource.read() (source.py): _extract_parameter_dates(), _extract_source_parameter_results(), _extract_source_parameter_units(), _extract_source_parameter_names() called independently per site, each iterating the same records list. Batch extract once before loop.
Bare except Exception (_execute_text_request line ~241, _site_wrapper in unifier.py): catches everything including KeyboardInterrupt siblings. Catch httpx.HTTPError, httpx.TimeoutException, json.JSONDecodeError specifically. Log full traceback.
No coordinate range validation (do_transform() in transformer.py): checks x == 0 or y == 0 but not whether lng/lat are in valid ranges (−180..180, −90..90). Silent pass-through of bogus coords.
Unchecked unit conversion (convert_units() transformer.py): returns None if die_parameter_name is unrecognized, propagates silently into record payload.
with statement missing on file open (Config._load_from_yaml() config.py): unclosed handle on read failure.
Manual slice rollback (_site_wrapper() unifier.py lines ~183–202): slices persister.records/timeseries/sites back to pre-chunk length on error. Fragile — an atomic checkpoint abstraction is safer.
print() instead of logger (multiple): generate_bounding_polygon() in source.py, lines ~52/63/75 in unifier.py, line ~29 in persister.py. None go through self.log().
No request timing (_execute_text_request/json_request): no record of latency, retry count, or which URL failed. Add structured log entry: source, url, status_code, attempt, elapsed_ms on every attempt.
Low-information warnings: "Failed to retrieve records after multiple attempts" doesn't include URL, params, or last exception.
No transform failure metrics (do_transform() transformer.py): returns None silently. Caller doesn't know how many records were dropped and why.
No chunk progress (_site_wrapper() unifier.py): no log of chunk index, site count per chunk, or timing.
BaseParameterSource god class (source.py, ~476 lines): handles extraction, validation, unit conversion, and summarization in one class + one 167-line read() method with 5 levels of nesting. Split into: RecordExtractor, RecordValidator, RecordSummarizer.
do_transform() god method (transformer.py, ~191 lines): 6 sequential transform steps in one method body. Extract each into _apply_datum_transform(), _apply_elevation_transform(), _apply_well_depth_transform(), _apply_unit_conversion().
Config.get_config_and_false_agencies() (config.py, ~107 lines): repetitive if/elif per parameter. Replace with a dict mapping parameter → (agency_defaults, source_classes).
start_ind / end_ind in BaseParameterSource.read(): only used for logging but add confusion. Rename or remove if unused.
bookend naming (_extract_terminal_record()): unclear. Rename to position or use Literal["earliest", "latest"].
HTTP client injection (BaseSource): uses httpx.get() directly. Inject httpx.Client (or a protocol) so retry policy is testable and swappable.
Config post-construction injection (set_config() on both BaseSource and BaseTransformer): config is required to function. Move to __init__ param with Optional type; keep set_config() only as override for unifier's late binding.
RecordExtractor protocol (BaseParameterSource): the 8 abstract _extract_* methods form an implicit interface. Define an explicit ParameterExtractor Protocol; BaseParameterSource accepts one in __init__. Enables injecting fake extractors in tests.
UnitConverter strategy (convert_units() in transformer.py): 120+ line monolithic function. Extract to UnitConverter class; inject into BaseTransformer. Enables per-source custom conversions.
Persister factory in unifier.py: _unify_parameter() contains if/else to pick persister class. Extract to PersisterFactory(config) -> BasePersister; inject factory into Unifier.__init__.
Replace inheritance-for-code-reuse with injected dependencies. Targets:
Loggablebase class — used only to getself.log()→ inject loggerSTSourcemixin via multiple inheritance →STClientcomposed into sources- ST2 class explosion (5 near-identical subclasses) → instances with config
CloudStoragePersisteroverrides_dump_*to redirect output → Strategy pattern- Transformer coupled by
transformer_klassclass attribute → inject transformer - Empty record subclasses (
WaterLevelRecord,AnalyteRecord, etc.) → type field
feature/composition-refactor ← branch off main after §T.9 merged
Goal: Remove Loggable from the inheritance chain of all classes.
Changes:
backend/logger.py: addmake_logger(name: str) -> Loggerfactory functionBaseSource,BasePersister,BaseTransformer,Config: remove(Loggable)base; callmake_logger(self.__class__.__name__)in__init__- All
self.log()/self.warn()/self.debug()calls: keep working — keep the same helper wrappers as module-level or instance-assigned callables rather than inherited methods
Verification: uv run pytest tests/test_cli/ tests/test_persisters/ -q
Goal: Kill multiple inheritance in all ST source classes.
Changes:
backend/connectors/st_connector.py: extractSTSourcemethods intoSTClientclass with__init__(self, url: str)get_service(),get_things(),_extract_terminal_record(),_parse_result()→ methods onSTClient
STSiteSource(BaseSiteSource, STSource)→STSiteSource(BaseSiteSource)withself.client = STClient(self.url)STWaterLevelSource(STSource, BaseWaterLevelSource)→STWaterLevelSource(BaseWaterLevelSource)withself.client = STClient(self.url)STAnalyteSource(STSource, BaseAnalyteSource)→STAnalyteSource(BaseAnalyteSource)withself.client = STClient(self.url)- All
self.get_service()/self._get_things()call sites →self.client.get_service()/self.client.get_things()
Verification: uv run pytest tests/test_sources/ -k "st or bernco or cabq or ebid or pvacd or roswell" -q
Goal: Delete 5 nearly-identical site source classes; replace with factory.
Affected classes (delete):
BernCoSiteSource, CABQSiteSource, EBIDSiteSource, PVACDSiteSource, NMOSERoswellSiteSource
Changes:
ST2SiteSource: acceptagency: str,bounding_wkt: str | None,transformer_klassin__init__; move per-subclass logic (bounding polygon, filter) into constructorbackend/connectors/st2/source.py(or equivalent): replace class definitions with module-level instances:BernCoSiteSource = ST2SiteSource(agency="BernCo", bounding_wkt=BERNCO_WKT, transformer_klass=BernCoSiteTransformer)
Config.water_level_sources()/Config.analyte_sources(): update to use instances
Verification: uv run pytest tests/test_sources/ -k "bernco or cabq or ebid or pvacd" -q
Goal: BasePersister accepts an output strategy; CloudStoragePersister subclass deleted.
Changes:
- Add
backend/persisters/strategies.py:class OutputStrategy(Protocol): def write(self, name: str, content: bytes) -> None: ... def make_directory(self, path: str) -> None: ... class LocalFileStrategy: def write(self, name, content): Path(name).write_bytes(content) def make_directory(self, path): Path(path).mkdir(parents=True, exist_ok=True) class GCSStrategy: def __init__(self, bucket_name: str, prefix: str): ... def write(self, name, content): ... # uploads to GCS def make_directory(self, path): pass # no-op
BasePersister.__init__: acceptstrategy: OutputStrategy = LocalFileStrategy()- All
_dump_*methods: callself.strategy.write(...)instead ofPath.write_* - Delete
CloudStoragePersisterclass - Update
backend/unifier.py: createGCSStrategyinstead ofCloudStoragePersisterwhenconfig.use_cloud_storage
Verification: uv run pytest tests/ -q --ignore=tests/test_sources
Goal: Remove transformer_klass class attribute pattern; pass transformer as dependency.
Changes:
BaseSource.__init__: accepttransformer: BaseTransformerparameter; removeself.transformer = self.transformer_klass()- All concrete source classes: remove
transformer_klassclass attribute; pass transformer insuper().__init__(transformer=XTransformer()) set_config(config): still propagates to both source + transformer- Tests that construct sources directly: update constructors
Verification: uv run pytest tests/test_cli/ tests/test_persisters/ -q
Goal: WaterLevelRecord, AnalyteRecord, WaterLevelSummaryRecord, AnalyteSummaryRecord add zero behavior — remove them.
Changes:
backend/record.py: deleteWaterLevelRecord,AnalyteRecord,WaterLevelSummaryRecord,AnalyteSummaryRecord- Add
record_type: strfield toParameterRecordandSummaryRecordkeys WaterLevelTransformer._get_record_klass()→ returnsParameterRecordorSummaryRecord; setsrecord_type="waterlevels"in transformAnalyteTransformer._get_record_klass()→ same pattern withrecord_type="analytes"- Grep for
isinstance(r, WaterLevelRecord)etc. — update tor.record_type == "waterlevels"
Verification: uv run pytest tests/test_cli/ tests/test_persisters/ -q
Goal: Fix linear retry backoff; add per-request structured log entries.
Changes:
backend/source.py_execute_text_request()+_execute_json_request():- Replace
time.sleep(tries)withtime.sleep(min(2 ** tries, 60)) - After each attempt log:
source,url,status_code,attempt,elapsed_ms - Catch
httpx.HTTPStatusError,httpx.TimeoutException,httpx.RequestErrorspecifically — no bareexcept Exception - Include last exception message in "Failed after N attempts" warning
- Replace
Verification: uv run pytest tests/test_cli/ -q
Goal: Prevent re-parsing WKT shapely object on every record.
Changes:
backend/transformer.pyBaseTransformer.contained():- Move
_cached_polygonfrom instance variable to class-level cache keyed on WKT string (e.g._polygon_cache: dict[str, Polygon] = {}) - First call for a given WKT parses and caches; subsequent calls return cached object
- Move
Verification: uv run pytest tests/test_cli/ tests/test_persisters/ -q + manual timing on 1000-record transform
Goal: Extract dates/results/units/names once before the per-site loop, not once per site.
Changes:
backend/source.pyBaseParameterSource.read():- Call
_extract_parameter_dates(),_extract_source_parameter_results(),_extract_source_parameter_units(),_extract_source_parameter_names()once on fullcleanedrecords before the site loop - Pass extracted lists into inner loop rather than re-extracting per site
- Extract 167-line
read()body into_summarize_records()and_build_timeseries_records()helpers (≤50 lines each)
- Call
Verification: uv run pytest tests/test_cli/ -q
Goal: All console output goes through the logger; no raw print() in backend.
Changes:
backend/source.pygenerate_bounding_polygon()lines ~450–452:print()→self.log()backend/unifier.pylines ~52/63/75:print()→config.log()backend/persister.pyline ~29:print("google cloud storage not available")→logging.warning()- Grep
print(acrossbackend/— replace every hit - Add
elapsed_msto transform failure log indo_transform()when returningNone - Log chunk index + site count per chunk in
_site_wrapper()
Verification: grep -r "print(" backend/ | wc -l → 0
Goal: No bare except Exception; all swallowed errors surface detail.
Changes:
backend/source.py:_execute_text_request/_execute_json_request: replace bare except → specific httpx exceptions (see §T.16)_extract_site_records(): guard againstNone/emptyrecordsbefore returningread()inner ValueError/TypeError catches: log fulltraceback.format_exc(), not just message
backend/transformer.pyconvert_units():- If
die_parameter_nameunrecognized → raiseValueError(f"Unknown parameter: {die_parameter_name}")instead of returningNone - Add lat/lng range check:
assert -180 <= lng <= 180 and -90 <= lat <= 90
- If
backend/unifier.py_site_wrapper():- Replace
except BaseException→except Exception; logtraceback.format_exc()viaconfig.warn()
- Replace
backend/config.py_load_from_yaml():- Wrap file open in
withstatement
- Wrap file open in
Verification: uv run pytest tests/test_cli/ tests/test_persisters/ -q
Goal: 476-line class → focused classes ≤150 lines each.
Changes:
- Extract
RecordValidatorclass withvalidate(record) -> bool; holds current_validate_record()logic - Extract
RecordSummarizerclass withsummarize(records, site_record) -> SummaryRecord; holds summary path ofread() BaseParameterSource.__init__acceptsvalidator: RecordValidator(default = existing subclass method shim during migration)- Split
read()intoread_summary()+read_timeseries()≤50 lines each - Rename
bookendparameter →position: Literal["earliest", "latest"]
Verification: uv run pytest tests/ -q --ignore=tests/test_sources
Goal: 191-line method → orchestrator + focused helpers ≤30 lines each.
Changes:
backend/transformer.pyBaseTransformer.do_transform():- Extract
_apply_geographic_filter(record) -> bool - Extract
_apply_datum_transform(record) -> record - Extract
_apply_elevation_transform(record) -> record - Extract
_apply_well_depth_transform(record) -> record - Extract
_apply_unit_conversion(record) -> record do_transform()becomes orchestrator calling each in sequence ≤40 lines
- Extract
Verification: uv run pytest tests/test_cli/ tests/test_persisters/ -q
Goal: Replace ~107-line if/elif per parameter in get_config_and_false_agencies() with a mapping.
Changes:
backend/config.py:- Add
PARAMETER_SOURCE_MAP: dict[str, dict]mapping each parameter name →{site_source_klass, parameter_source_klass, agencies} get_config_and_false_agencies()looks up parameter in map; raisesValueErrorfor unknown parameter- Extract duplicate
set_config()calls inanalyte_sources()/water_level_sources()/all_site_sources()into_build_source_pair(site_klass, param_klass) -> tuple
- Add
Verification: uv run pytest tests/test_cli/ -q
Goal: httpx.get() hardcoded → injected client; enables testability without live network.
Changes:
backend/source.pyBaseSource.__init__: accepthttp_client: httpx.Client | None = None; default createshttpx.Client(timeout=900)_execute_text_request()/_execute_json_request(): useself._http_client.get(...)instead ofhttpx.get(...)- Tests in
tests/test_cli/or newtests/test_sources_unit/: pass mock client returning fixture responses — no live HTTP
Verification: uv run pytest tests/test_cli/ tests/test_persisters/ -q
Goal: Replace 120+ line convert_units() monolith with pluggable converter.
Changes:
backend/converter.py(new file):class UnitConverter(Protocol): def convert(self, value: float, from_units: str, to_units: str, parameter: str) -> float: ... class StandardUnitConverter: def convert(self, value, from_units, to_units, parameter): ... # current convert_units() logic moved here
backend/transformer.pyBaseTransformer.__init__: acceptconverter: UnitConverter = StandardUnitConverter()- Remove
convert_units()module-level function; callself.converter.convert(...)in_apply_unit_conversion() - ST/DWB sources needing custom conversion: pass custom
UnitConvertersubclass
Verification: uv run pytest tests/test_cli/ tests/test_persisters/ -q
Goal: Remove persister selection if/else from _unify_parameter().
Changes:
backend/persisters/factory.py(new file):def make_persister(config: Config) -> BasePersister: if config.output_format == OutputFormat.GEOSERVER: ... elif config.use_cloud_storage: ... else: return BasePersister(config)
backend/unifier.py_unify_parameter(): callmake_persister(config)instead of inline if/elseUnifier.__init__: optionally acceptpersister_factory: Callable[[Config], BasePersister]for testing
Verification: uv run pytest tests/test_cli/ -q
- HTTP retry backoff MUST be exponential with cap:
min(2**n, 60)seconds (§T.16) - HTTP request attempts MUST log
source,url,status_code,attempt,elapsed_ms(§T.16) - No bare
except Exceptioninbackend/— catch specific exception types (§T.20) convert_units()MUST raiseValueErroron unknown parameter, never returnNonesilently (§T.20)print()MUST NOT appear inbackend/— all output through logger (§T.19)- No method in
backend/MUST exceed 50 lines (excluding__init__) (§T.21 §T.22) Configsource setup MUST be driven byPARAMETER_SOURCE_MAP, notif/elifchains (§T.23)BaseSourceMUST accept injectedhttp_client; no directhttpx.get()calls (§T.24)UnitConverterMUST be injectable intoBaseTransformer(§T.25)- Persister selection logic MUST live in
make_persister(), not inUnifier(§T.26) - No class MUST inherit
Loggable— usemake_logger()factory (§T.10) - No ST source class MUST use multiple inheritance —
STClientinjected asself.client(§T.11) - ST2 per-agency behavior MUST be expressed as constructor args, not subclasses (§T.12)
BasePersisterMUST NOT contain GCS-specific logic — output target injected via strategy (§T.13)- Source classes MUST NOT declare
transformer_klass— transformer passed to__init__(§T.14) WaterLevelRecord,AnalyteRecord,WaterLevelSummaryRecord,AnalyteSummaryRecordMUST NOT exist (§T.15)- Orchestration code MUST NOT appear in
[tool.hatch.build.targets.wheel].packages - OGC FC output MUST include top-level
id,type,numberReturned,timeStamp - Each Feature MUST have top-level
id(not only in properties) ogc_timeseriesfeatures MUST be flat (one per observation) with ISO 8601datetimepropertydieCLI behavior unchanged after uv migration- All existing tests pass under
uv run pytest - pygeoapi config MUST be generated from
products.yaml— never hand-edited - pygeoapi OGR provider MUST use
/vsigs/path (GCS), never local filesystem path - No database introduced in orchestration pipeline — GCS is sole store
latest.geojsonMUST be overwritten atomically (upload to tmp key, then copy/rename)
Cause: ALL_SOURCE_OPTIONS used is_flag=True, default=True — Click flag presence also sets True, so both states gave True. Assignment use_source_X = no_X further confused the polarity.
Fix: default=False on all --no-* options + not lcs.get(f"no_{agency}", False) in weave and sites commands.
Invariant added: --no-* flags MUST have default=False; assignment MUST negate the flag value.