diff --git a/backend/persisters/ogc_features.py b/backend/persisters/ogc_features.py index e6d1cb4..9787650 100644 --- a/backend/persisters/ogc_features.py +++ b/backend/persisters/ogc_features.py @@ -25,12 +25,14 @@ # Human-readable description of the trend method, embedded in the product so # consumers know how the classification was derived. TREND_METHOD_DESCRIPTION = ( - "Depth-to-water trend per well. Monotonic trend is tested with the " - "non-parametric Mann-Kendall test (pymannkendall.original_test, alpha=0.05) " - "on depth-to-water-below-ground-surface (feet) ordered by observation time. " - "The rate is the Theil-Sen slope of depth-to-water vs time, in ft/year. A " - "well is classified only when it has at least 10 measurements, or at least 4 " - "measurements spanning at least 2 years; otherwise 'not enough data'. When " + "Depth-to-water trend per well. Observations are first downsampled to one " + "point per calendar day, keeping the daily minimum depth-to-water (the " + "shallowest reading). Monotonic trend is then tested with the non-parametric " + "Mann-Kendall test (pymannkendall.original_test, alpha=0.05) on the daily " + "depth-to-water-below-ground-surface (feet) series ordered by time. The rate " + "is the Theil-Sen slope of depth-to-water vs time, in ft/year. A well is " + "classified only when it has at least 10 daily points, or at least 4 daily " + "points spanning at least 2 years; otherwise 'not enough data'. When " "classified: a statistically significant increasing trend is 'increasing' " "(water level getting DEEPER, i.e. a declining water table), a significant " "decreasing trend is 'decreasing' (water level getting SHALLOWER), and no " @@ -111,6 +113,45 @@ def _parse_epoch_seconds(date, time) -> Optional[float]: return None +def _daily_min_series(obs_list: list) -> tuple[int, list]: + """Reduce a well's observations to one point per calendar day, keeping the + minimum depth-to-water for each day (the shallowest reading), keyed at the + day's UTC midnight epoch. + + *obs_list* is a list of observation payload dicts (parameter_value, + date_measured, time_measured). Operating on dicts avoids rebuilding + ParameterRecord objects for what can be millions of observations. + + Downsampling bounds the O(n^2) Mann-Kendall cost for high-frequency wells + (e.g. continuous loggers) and removes within-day sampling noise. Returns + (raw_observation_count, [(day_epoch_seconds, min_value), ...] sorted by day). + """ + raw_count = 0 + daily: dict = {} # date -> (day_epoch, min_value) + for obs in obs_list: + value = obs.get("parameter_value") + epoch = _parse_epoch_seconds( + obs.get("date_measured"), obs.get("time_measured") + ) + if value is None or epoch is None: + continue + try: + v = float(value) + except (TypeError, ValueError): + continue + raw_count += 1 + day = datetime.fromtimestamp(epoch, tz=timezone.utc).date() + day_epoch = datetime( + day.year, day.month, day.day, tzinfo=timezone.utc + ).timestamp() + existing = daily.get(day) + if existing is None or v < existing[1]: + daily[day] = (day_epoch, v) + + pairs = sorted(daily.values(), key=lambda p: p[0]) + return raw_count, pairs + + def _qualifies_for_trend(record_count, span_years) -> bool: return record_count >= _TREND_MIN_RECORDS or ( record_count >= _TREND_MIN_RECORDS_WITH_SPAN @@ -255,15 +296,19 @@ def dump_waterlevel_trend_collection( Write an OGC FeatureCollection of per-well depth-to-water trends to *path*. One Feature per well. - *site_records* and *timeseries_records* are index-aligned: ``site_records[i]`` - is the well and ``timeseries_records[i]`` is its list of ParameterRecord - observations (DIE water-level values are already depth-to-water below ground - surface in feet, so no measuring-point adjustment is applied here). + *site_records* and *timeseries_records* are index-aligned **payload dicts**: + ``site_records[i]`` is the well's site dict and ``timeseries_records[i]`` is + its list of observation dicts. They are consumed as dicts (not rebuilt into + record objects) to keep memory bounded for statewide, high-frequency data. + DIE water-level values are already depth-to-water below ground surface in + feet, so no measuring-point adjustment is applied here. - Each feature carries: record_count, first/last_observation_datetime, - span_years, slope_ft_per_year (Theil-Sen), trend_category (Mann-Kendall), - mk_p_value, mk_tau, well_depth(+units). The collection carries - ``trend_method`` describing the calculation. See TREND_METHOD_DESCRIPTION. + Observations are downsampled to the daily minimum depth-to-water before the + trend test (see _daily_min_series). Each feature carries: record_count + (daily points used), observation_count (raw readings), + first/last_observation_datetime, span_years, slope_ft_per_year (Theil-Sen), + trend_category (Mann-Kendall), mk_p_value, mk_tau, well_depth(+units). The + collection carries ``trend_method``. See TREND_METHOD_DESCRIPTION. §V: MUST include top-level id, type, numberReturned, timeStamp. §V: Each Feature MUST have top-level id. @@ -272,20 +317,10 @@ def dump_waterlevel_trend_collection( features = [] for site, obs_list in zip(site_records, timeseries_records): - pairs = [] - for obs in obs_list: - value = getattr(obs, "parameter_value", None) - epoch = _parse_epoch_seconds( - getattr(obs, "date_measured", None), getattr(obs, "time_measured", None) - ) - if value is None or epoch is None: - continue - try: - pairs.append((epoch, float(value))) - except (TypeError, ValueError): - continue - - pairs.sort(key=lambda p: p[0]) + # Downsample to one min-depth-to-water point per day before the trend + # test (see _daily_min_series). record_count is the daily-point count + # used for the trend; observation_count is the raw reading count. + observation_count, pairs = _daily_min_series(obs_list) record_count = len(pairs) xs = [p[0] for p in pairs] ys = [p[1] for p in pairs] @@ -303,12 +338,13 @@ def dump_waterlevel_trend_collection( trend_category = "not enough data" props = { - "source": getattr(site, "source", "") or "", - "id": getattr(site, "id", "") or "", - "name": getattr(site, "name", None), - "well_depth": getattr(site, "well_depth", None), - "well_depth_units": getattr(site, "well_depth_units", None), + "source": site.get("source") or "", + "id": site.get("id") or "", + "name": site.get("name"), + "well_depth": site.get("well_depth"), + "well_depth_units": site.get("well_depth_units"), "record_count": record_count, + "observation_count": observation_count, "first_observation_datetime": _iso_utc(xs[0]) if record_count else None, "last_observation_datetime": _iso_utc(xs[-1]) if record_count else None, "span_years": round(span_years, 3), @@ -324,9 +360,9 @@ def dump_waterlevel_trend_collection( "type": "Feature", "id": _feature_id(props["source"], props["id"]), "geometry": _point_geometry( - getattr(site, "latitude", None), - getattr(site, "longitude", None), - getattr(site, "elevation", None), + site.get("latitude"), + site.get("longitude"), + site.get("elevation"), ), "properties": props, }) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index e951c78..77aa496 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -218,13 +218,13 @@ def _combine_asset( records = [SummaryRecord(p) for p in all_records] dump_summary_collection(str(out), records, meta) elif output_type == "ogc_waterlevel_trend": - # site_records and the per-site observation lists are index - # aligned (see source asset); compute one trend per well. - site_records = [SiteRecord(p) for p in all_sites] - series = [ - [ParameterRecord(o) for o in site_ts] for site_ts in all_timeseries - ] - dump_waterlevel_trend_collection(str(out), site_records, series, meta) + # all_sites and all_timeseries are index-aligned payload dicts + # (see source asset). The trend dumper consumes dicts directly — + # no ParameterRecord/SiteRecord rebuild — to keep memory bounded + # for statewide, high-frequency water-level data. + dump_waterlevel_trend_collection( + str(out), all_sites, all_timeseries, meta + ) else: site_records = [SiteRecord(p) for p in all_sites] flat = [ diff --git a/tests/test_persisters/test_ogc_features.py b/tests/test_persisters/test_ogc_features.py index bc405a6..81214bb 100644 --- a/tests/test_persisters/test_ogc_features.py +++ b/tests/test_persisters/test_ogc_features.py @@ -233,16 +233,17 @@ def test_geometry_and_required_fields(self, tmp_path): from backend.persisters.ogc_features import dump_waterlevel_trend_collection +# The trend dumper consumes payload dicts directly (no record rebuild). def _trend_site(source="NMBGMR", rid="W1", well_depth=100.0): - return SiteRecord({ + return { "source": source, "id": rid, "name": f"Well {rid}", "latitude": 34.0, "longitude": -106.0, "elevation": None, "well_depth": well_depth, "well_depth_units": "ft", - }) + } def _trend_obs(date, value): - return ParameterRecord({"parameter_value": value, "date_measured": date, "time_measured": None}) + return {"parameter_value": value, "date_measured": date, "time_measured": None} class TestWaterLevelTrendCollection: @@ -278,3 +279,22 @@ def test_required_fields_and_geometry(self, tmp_path): feat = result["features"][0] assert feat["geometry"]["coordinates"] == [-106.0, 34.0] assert feat["properties"]["trend_category"] == "not enough data" # single record + + +class TestWaterLevelTrendDailyMin: + def test_downsamples_to_daily_min(self, tmp_path): + # Two readings on the same day -> keep the min (shallowest) DTW; one + # reading the next day. record_count counts days, observation_count raw. + obs = [ + _trend_obs("2020-01-01", 50.0), + _trend_obs("2020-01-01", 48.0), # same day, lower -> kept + _trend_obs("2020-01-02", 52.0), + ] + out = tmp_path / "tr.geojson" + result = dump_waterlevel_trend_collection( + str(out), [_trend_site(rid="W1")], [obs], {"id": "nm_waterlevel_trends"} + ) + props = result["features"][0]["properties"] + assert props["observation_count"] == 3 + assert props["record_count"] == 2 # two distinct days + assert props["first_observation_datetime"].startswith("2020-01-01")