From 15da9abe650541fd193b0f0e5df2a2630285c1cd Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 26 Jun 2026 15:27:37 -0600 Subject: [PATCH 1/2] Downsample waterlevel trends to daily min depth-to-water High-frequency wells (e.g. continuous loggers, tens of thousands of readings) made the per-well Mann-Kendall test (O(n^2)) blow up memory and get the combine child process OOM-killed (ChildProcessCrashException, no Python traceback). Before the trend test, reduce each well's observations to one point per calendar day keeping the daily minimum depth-to-water (the shallowest reading), via _daily_min_series. This bounds the MK cost and removes within-day sampling noise. - record_count is now the daily-point count used for the trend; new observation_count carries the raw reading count. - qualification gate and classification unchanged (applied to daily points); updated TREND_METHOD_DESCRIPTION. - test: daily-min downsampling (same-day readings collapse to the min). 14 persister tests pass. Co-Authored-By: Claude Opus 4.8 --- backend/persisters/ogc_features.py | 78 +++++++++++++++------- tests/test_persisters/test_ogc_features.py | 19 ++++++ 2 files changed, 73 insertions(+), 24 deletions(-) diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index e6d1cb4..53b4a61 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -25,12 +25,14 @@ # Human-readable description of the trend method, embedded in the product so # consumers know how the classification was derived. TREND_METHOD_DESCRIPTION = ( - "Depth-to-water trend per well. Monotonic trend is tested with the " - "non-parametric Mann-Kendall test (pymannkendall.original_test, alpha=0.05) " - "on depth-to-water-below-ground-surface (feet) ordered by observation time. " - "The rate is the Theil-Sen slope of depth-to-water vs time, in ft/year. A " - "well is classified only when it has at least 10 measurements, or at least 4 " - "measurements spanning at least 2 years; otherwise 'not enough data'. When " + "Depth-to-water trend per well. Observations are first downsampled to one " + "point per calendar day, keeping the daily minimum depth-to-water (the " + "shallowest reading). Monotonic trend is then tested with the non-parametric " + "Mann-Kendall test (pymannkendall.original_test, alpha=0.05) on the daily " + "depth-to-water-below-ground-surface (feet) series ordered by time. The rate " + "is the Theil-Sen slope of depth-to-water vs time, in ft/year. A well is " + "classified only when it has at least 10 daily points, or at least 4 daily " + "points spanning at least 2 years; otherwise 'not enough data'. When " "classified: a statistically significant increasing trend is 'increasing' " "(water level getting DEEPER, i.e. a declining water table), a significant " "decreasing trend is 'decreasing' (water level getting SHALLOWER), and no " @@ -111,6 +113,41 @@ def _parse_epoch_seconds(date, time) -> Optional[float]: return None +def _daily_min_series(obs_list: list) -> tuple[int, list]: + """Reduce a well's observations to one point per calendar day, keeping the + minimum depth-to-water for each day (the shallowest reading), keyed at the + day's UTC midnight epoch. + + Downsampling bounds the O(n^2) Mann-Kendall cost for high-frequency wells + (e.g. continuous loggers) and removes within-day sampling noise. Returns + (raw_observation_count, [(day_epoch_seconds, min_value), ...] sorted by day). + """ + raw_count = 0 + daily: dict = {} # date -> (day_epoch, min_value) + for obs in obs_list: + value = getattr(obs, "parameter_value", None) + epoch = _parse_epoch_seconds( + getattr(obs, "date_measured", None), getattr(obs, "time_measured", None) + ) + if value is None or epoch is None: + continue + try: + v = float(value) + except (TypeError, ValueError): + continue + raw_count += 1 + day = datetime.fromtimestamp(epoch, tz=timezone.utc).date() + day_epoch = datetime( + day.year, day.month, day.day, tzinfo=timezone.utc + ).timestamp() + existing = daily.get(day) + if existing is None or v < existing[1]: + daily[day] = (day_epoch, v) + + pairs = sorted(daily.values(), key=lambda p: p[0]) + return raw_count, pairs + + def _qualifies_for_trend(record_count, span_years) -> bool: return record_count >= _TREND_MIN_RECORDS or ( record_count >= _TREND_MIN_RECORDS_WITH_SPAN @@ -260,10 +297,12 @@ def dump_waterlevel_trend_collection( observations (DIE water-level values are already depth-to-water below ground surface in feet, so no measuring-point adjustment is applied here). - Each feature carries: record_count, first/last_observation_datetime, - span_years, slope_ft_per_year (Theil-Sen), trend_category (Mann-Kendall), - mk_p_value, mk_tau, well_depth(+units). The collection carries - ``trend_method`` describing the calculation. See TREND_METHOD_DESCRIPTION. + Observations are downsampled to the daily minimum depth-to-water before the + trend test (see _daily_min_series). Each feature carries: record_count + (daily points used), observation_count (raw readings), + first/last_observation_datetime, span_years, slope_ft_per_year (Theil-Sen), + trend_category (Mann-Kendall), mk_p_value, mk_tau, well_depth(+units). The + collection carries ``trend_method``. See TREND_METHOD_DESCRIPTION. §V: MUST include top-level id, type, numberReturned, timeStamp. §V: Each Feature MUST have top-level id. @@ -272,20 +311,10 @@ def dump_waterlevel_trend_collection( features = [] for site, obs_list in zip(site_records, timeseries_records): - pairs = [] - for obs in obs_list: - value = getattr(obs, "parameter_value", None) - epoch = _parse_epoch_seconds( - getattr(obs, "date_measured", None), getattr(obs, "time_measured", None) - ) - if value is None or epoch is None: - continue - try: - pairs.append((epoch, float(value))) - except (TypeError, ValueError): - continue - - pairs.sort(key=lambda p: p[0]) + # Downsample to one min-depth-to-water point per day before the trend + # test (see _daily_min_series). record_count is the daily-point count + # used for the trend; observation_count is the raw reading count. + observation_count, pairs = _daily_min_series(obs_list) record_count = len(pairs) xs = [p[0] for p in pairs] ys = [p[1] for p in pairs] @@ -309,6 +338,7 @@ def dump_waterlevel_trend_collection( "well_depth": getattr(site, "well_depth", None), "well_depth_units": getattr(site, "well_depth_units", None), "record_count": record_count, + "observation_count": observation_count, "first_observation_datetime": _iso_utc(xs[0]) if record_count else None, "last_observation_datetime": _iso_utc(xs[-1]) if record_count else None, "span_years": round(span_years, 3), diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index bc405a6..b07f296 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -278,3 +278,22 @@ def test_required_fields_and_geometry(self, tmp_path): feat = result["features"][0] assert feat["geometry"]["coordinates"] == [-106.0, 34.0] assert feat["properties"]["trend_category"] == "not enough data" # single record + + +class TestWaterLevelTrendDailyMin: + def test_downsamples_to_daily_min(self, tmp_path): + # Two readings on the same day -> keep the min (shallowest) DTW; one + # reading the next day. record_count counts days, observation_count raw. + obs = [ + _trend_obs("2020-01-01", 50.0), + _trend_obs("2020-01-01", 48.0), # same day, lower -> kept + _trend_obs("2020-01-02", 52.0), + ] + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection( + str(out), [_trend_site(rid="W1")], [obs], {"id": "nm_waterlevel_trends"} + ) + props = result["features"][0]["properties"] + assert props["observation_count"] == 3 + assert props["record_count"] == 2 # two distinct days + assert props["first_observation_datetime"].startswith("2020-01-01") From efa4d86f3d3cb1c6422bd39da64f1d7f989d235f Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 26 Jun 2026 15:30:52 -0600 Subject: [PATCH 2/2] Compute waterlevel trend from payload dicts (no record rebuild) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The trend combine rebuilt every observation into a ParameterRecord (and each site into a SiteRecord) before computing — millions of objects for statewide water-level data, on top of the already-large pickled inputs. Consume the payload dicts directly: dump_waterlevel_trend_collection and _daily_min_series now read dicts (obs.get / site.get) instead of getattr on record objects, and the combine asset passes all_sites/all_timeseries straight through. Cuts peak memory in the step that was OOM-crashing. Other dumpers (summary, major-chemistry, timeseries) still use record objects. Trend test helpers now build dicts. 14 persister tests pass. Co-Authored-By: Claude Opus 4.8 --- backend/persisters/ogc_features.py | 34 +++++++++++++--------- orchestration/assets/products.py | 14 ++++----- tests/test_persisters/test_ogc_features.py | 7 +++-- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index 53b4a61..9787650 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -118,6 +118,10 @@ def _daily_min_series(obs_list: list) -> tuple[int, list]: minimum depth-to-water for each day (the shallowest reading), keyed at the day's UTC midnight epoch. + *obs_list* is a list of observation payload dicts (parameter_value, + date_measured, time_measured). Operating on dicts avoids rebuilding + ParameterRecord objects for what can be millions of observations. + Downsampling bounds the O(n^2) Mann-Kendall cost for high-frequency wells (e.g. continuous loggers) and removes within-day sampling noise. Returns (raw_observation_count, [(day_epoch_seconds, min_value), ...] sorted by day). @@ -125,9 +129,9 @@ def _daily_min_series(obs_list: list) -> tuple[int, list]: raw_count = 0 daily: dict = {} # date -> (day_epoch, min_value) for obs in obs_list: - value = getattr(obs, "parameter_value", None) + value = obs.get("parameter_value") epoch = _parse_epoch_seconds( - getattr(obs, "date_measured", None), getattr(obs, "time_measured", None) + obs.get("date_measured"), obs.get("time_measured") ) if value is None or epoch is None: continue @@ -292,10 +296,12 @@ def dump_waterlevel_trend_collection( Write an OGC FeatureCollection of per-well depth-to-water trends to *path*. One Feature per well. - *site_records* and *timeseries_records* are index-aligned: ``site_records[i]`` - is the well and ``timeseries_records[i]`` is its list of ParameterRecord - observations (DIE water-level values are already depth-to-water below ground - surface in feet, so no measuring-point adjustment is applied here). + *site_records* and *timeseries_records* are index-aligned **payload dicts**: + ``site_records[i]`` is the well's site dict and ``timeseries_records[i]`` is + its list of observation dicts. They are consumed as dicts (not rebuilt into + record objects) to keep memory bounded for statewide, high-frequency data. + DIE water-level values are already depth-to-water below ground surface in + feet, so no measuring-point adjustment is applied here. Observations are downsampled to the daily minimum depth-to-water before the trend test (see _daily_min_series). Each feature carries: record_count @@ -332,11 +338,11 @@ def dump_waterlevel_trend_collection( trend_category = "not enough data" props = { - "source": getattr(site, "source", "") or "", - "id": getattr(site, "id", "") or "", - "name": getattr(site, "name", None), - "well_depth": getattr(site, "well_depth", None), - "well_depth_units": getattr(site, "well_depth_units", None), + "source": site.get("source") or "", + "id": site.get("id") or "", + "name": site.get("name"), + "well_depth": site.get("well_depth"), + "well_depth_units": site.get("well_depth_units"), "record_count": record_count, "observation_count": observation_count, "first_observation_datetime": _iso_utc(xs[0]) if record_count else None, @@ -354,9 +360,9 @@ def dump_waterlevel_trend_collection( "type": "Feature", "id": _feature_id(props["source"], props["id"]), "geometry": _point_geometry( - getattr(site, "latitude", None), - getattr(site, "longitude", None), - getattr(site, "elevation", None), + site.get("latitude"), + site.get("longitude"), + site.get("elevation"), ), "properties": props, }) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index e951c78..77aa496 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -218,13 +218,13 @@ def _combine_asset( records = [SummaryRecord(p) for p in all_records] dump_summary_collection(str(out), records, meta) elif output_type == "ogc_waterlevel_trend": - # site_records and the per-site observation lists are index - # aligned (see source asset); compute one trend per well. - site_records = [SiteRecord(p) for p in all_sites] - series = [ - [ParameterRecord(o) for o in site_ts] for site_ts in all_timeseries - ] - dump_waterlevel_trend_collection(str(out), site_records, series, meta) + # all_sites and all_timeseries are index-aligned payload dicts + # (see source asset). The trend dumper consumes dicts directly — + # no ParameterRecord/SiteRecord rebuild — to keep memory bounded + # for statewide, high-frequency water-level data. + dump_waterlevel_trend_collection( + str(out), all_sites, all_timeseries, meta + ) else: site_records = [SiteRecord(p) for p in all_sites] flat = [ diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index b07f296..81214bb 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -233,16 +233,17 @@ def test_geometry_and_required_fields(self, tmp_path): from backend.persisters.ogc_features import dump_waterlevel_trend_collection +# The trend dumper consumes payload dicts directly (no record rebuild). def _trend_site(source="NMBGMR", rid="W1", well_depth=100.0): - return SiteRecord({ + return { "source": source, "id": rid, "name": f"Well {rid}", "latitude": 34.0, "longitude": -106.0, "elevation": None, "well_depth": well_depth, "well_depth_units": "ft", - }) + } def _trend_obs(date, value): - return ParameterRecord({"parameter_value": value, "date_measured": date, "time_measured": None}) + return {"parameter_value": value, "date_measured": date, "time_measured": None} class TestWaterLevelTrendCollection: