Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 72 additions & 36 deletions backend/persisters/ogc_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
# Human-readable description of the trend method, embedded in the product so
# consumers know how the classification was derived.
TREND_METHOD_DESCRIPTION = (
"Depth-to-water trend per well. Monotonic trend is tested with the "
"non-parametric Mann-Kendall test (pymannkendall.original_test, alpha=0.05) "
"on depth-to-water-below-ground-surface (feet) ordered by observation time. "
"The rate is the Theil-Sen slope of depth-to-water vs time, in ft/year. A "
"well is classified only when it has at least 10 measurements, or at least 4 "
"measurements spanning at least 2 years; otherwise 'not enough data'. When "
"Depth-to-water trend per well. Observations are first downsampled to one "
"point per calendar day, keeping the daily minimum depth-to-water (the "
"shallowest reading). Monotonic trend is then tested with the non-parametric "
"Mann-Kendall test (pymannkendall.original_test, alpha=0.05) on the daily "
"depth-to-water-below-ground-surface (feet) series ordered by time. The rate "
"is the Theil-Sen slope of depth-to-water vs time, in ft/year. A well is "
"classified only when it has at least 10 daily points, or at least 4 daily "
"points spanning at least 2 years; otherwise 'not enough data'. When "
"classified: a statistically significant increasing trend is 'increasing' "
"(water level getting DEEPER, i.e. a declining water table), a significant "
"decreasing trend is 'decreasing' (water level getting SHALLOWER), and no "
Expand Down Expand Up @@ -111,6 +113,45 @@ def _parse_epoch_seconds(date, time) -> Optional[float]:
return None


def _daily_min_series(obs_list: list) -> tuple[int, list]:
"""Reduce a well's observations to one point per calendar day, keeping the
minimum depth-to-water for each day (the shallowest reading), keyed at the
day's UTC midnight epoch.

*obs_list* is a list of observation payload dicts (parameter_value,
date_measured, time_measured). Operating on dicts avoids rebuilding
ParameterRecord objects for what can be millions of observations.

Downsampling bounds the O(n^2) Mann-Kendall cost for high-frequency wells
(e.g. continuous loggers) and removes within-day sampling noise. Returns
(raw_observation_count, [(day_epoch_seconds, min_value), ...] sorted by day).
"""
raw_count = 0
daily: dict = {} # date -> (day_epoch, min_value)
for obs in obs_list:
value = obs.get("parameter_value")
epoch = _parse_epoch_seconds(
obs.get("date_measured"), obs.get("time_measured")
)
if value is None or epoch is None:
continue
try:
v = float(value)
except (TypeError, ValueError):
continue
raw_count += 1
day = datetime.fromtimestamp(epoch, tz=timezone.utc).date()
day_epoch = datetime(
day.year, day.month, day.day, tzinfo=timezone.utc
).timestamp()
existing = daily.get(day)
if existing is None or v < existing[1]:
daily[day] = (day_epoch, v)

pairs = sorted(daily.values(), key=lambda p: p[0])
return raw_count, pairs


def _qualifies_for_trend(record_count, span_years) -> bool:
return record_count >= _TREND_MIN_RECORDS or (
record_count >= _TREND_MIN_RECORDS_WITH_SPAN
Expand Down Expand Up @@ -255,15 +296,19 @@ def dump_waterlevel_trend_collection(
Write an OGC FeatureCollection of per-well depth-to-water trends to *path*.
One Feature per well.

*site_records* and *timeseries_records* are index-aligned: ``site_records[i]``
is the well and ``timeseries_records[i]`` is its list of ParameterRecord
observations (DIE water-level values are already depth-to-water below ground
surface in feet, so no measuring-point adjustment is applied here).
*site_records* and *timeseries_records* are index-aligned **payload dicts**:
``site_records[i]`` is the well's site dict and ``timeseries_records[i]`` is
its list of observation dicts. They are consumed as dicts (not rebuilt into
record objects) to keep memory bounded for statewide, high-frequency data.
DIE water-level values are already depth-to-water below ground surface in
feet, so no measuring-point adjustment is applied here.

Each feature carries: record_count, first/last_observation_datetime,
span_years, slope_ft_per_year (Theil-Sen), trend_category (Mann-Kendall),
mk_p_value, mk_tau, well_depth(+units). The collection carries
``trend_method`` describing the calculation. See TREND_METHOD_DESCRIPTION.
Observations are downsampled to the daily minimum depth-to-water before the
trend test (see _daily_min_series). Each feature carries: record_count
(daily points used), observation_count (raw readings),
first/last_observation_datetime, span_years, slope_ft_per_year (Theil-Sen),
trend_category (Mann-Kendall), mk_p_value, mk_tau, well_depth(+units). The
collection carries ``trend_method``. See TREND_METHOD_DESCRIPTION.

§V: MUST include top-level id, type, numberReturned, timeStamp.
§V: Each Feature MUST have top-level id.
Expand All @@ -272,20 +317,10 @@ def dump_waterlevel_trend_collection(

features = []
for site, obs_list in zip(site_records, timeseries_records):
pairs = []
for obs in obs_list:
value = getattr(obs, "parameter_value", None)
epoch = _parse_epoch_seconds(
getattr(obs, "date_measured", None), getattr(obs, "time_measured", None)
)
if value is None or epoch is None:
continue
try:
pairs.append((epoch, float(value)))
except (TypeError, ValueError):
continue

pairs.sort(key=lambda p: p[0])
# Downsample to one min-depth-to-water point per day before the trend
# test (see _daily_min_series). record_count is the daily-point count
# used for the trend; observation_count is the raw reading count.
observation_count, pairs = _daily_min_series(obs_list)
record_count = len(pairs)
xs = [p[0] for p in pairs]
ys = [p[1] for p in pairs]
Expand All @@ -303,12 +338,13 @@ def dump_waterlevel_trend_collection(
trend_category = "not enough data"

props = {
"source": getattr(site, "source", "") or "",
"id": getattr(site, "id", "") or "",
"name": getattr(site, "name", None),
"well_depth": getattr(site, "well_depth", None),
"well_depth_units": getattr(site, "well_depth_units", None),
"source": site.get("source") or "",
"id": site.get("id") or "",
"name": site.get("name"),
"well_depth": site.get("well_depth"),
"well_depth_units": site.get("well_depth_units"),
"record_count": record_count,
"observation_count": observation_count,
"first_observation_datetime": _iso_utc(xs[0]) if record_count else None,
"last_observation_datetime": _iso_utc(xs[-1]) if record_count else None,
"span_years": round(span_years, 3),
Expand All @@ -324,9 +360,9 @@ def dump_waterlevel_trend_collection(
"type": "Feature",
"id": _feature_id(props["source"], props["id"]),
"geometry": _point_geometry(
getattr(site, "latitude", None),
getattr(site, "longitude", None),
getattr(site, "elevation", None),
site.get("latitude"),
site.get("longitude"),
site.get("elevation"),
),
"properties": props,
})
Expand Down
14 changes: 7 additions & 7 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ def _combine_asset(
records = [SummaryRecord(p) for p in all_records]
dump_summary_collection(str(out), records, meta)
elif output_type == "ogc_waterlevel_trend":
# site_records and the per-site observation lists are index
# aligned (see source asset); compute one trend per well.
site_records = [SiteRecord(p) for p in all_sites]
series = [
[ParameterRecord(o) for o in site_ts] for site_ts in all_timeseries
]
dump_waterlevel_trend_collection(str(out), site_records, series, meta)
# all_sites and all_timeseries are index-aligned payload dicts
# (see source asset). The trend dumper consumes dicts directly —
# no ParameterRecord/SiteRecord rebuild — to keep memory bounded
# for statewide, high-frequency water-level data.
dump_waterlevel_trend_collection(
str(out), all_sites, all_timeseries, meta
)
else:
site_records = [SiteRecord(p) for p in all_sites]
flat = [
Expand Down
26 changes: 23 additions & 3 deletions tests/test_persisters/test_ogc_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,17 @@ def test_geometry_and_required_fields(self, tmp_path):
from backend.persisters.ogc_features import dump_waterlevel_trend_collection


# The trend dumper consumes payload dicts directly (no record rebuild).
def _trend_site(source="NMBGMR", rid="W1", well_depth=100.0):
return SiteRecord({
return {
"source": source, "id": rid, "name": f"Well {rid}",
"latitude": 34.0, "longitude": -106.0, "elevation": None,
"well_depth": well_depth, "well_depth_units": "ft",
})
}


def _trend_obs(date, value):
return ParameterRecord({"parameter_value": value, "date_measured": date, "time_measured": None})
return {"parameter_value": value, "date_measured": date, "time_measured": None}


class TestWaterLevelTrendCollection:
Expand Down Expand Up @@ -278,3 +279,22 @@ def test_required_fields_and_geometry(self, tmp_path):
feat = result["features"][0]
assert feat["geometry"]["coordinates"] == [-106.0, 34.0]
assert feat["properties"]["trend_category"] == "not enough data" # single record


class TestWaterLevelTrendDailyMin:
def test_downsamples_to_daily_min(self, tmp_path):
# Two readings on the same day -> keep the min (shallowest) DTW; one
# reading the next day. record_count counts days, observation_count raw.
obs = [
_trend_obs("2020-01-01", 50.0),
_trend_obs("2020-01-01", 48.0), # same day, lower -> kept
_trend_obs("2020-01-02", 52.0),
]
out = tmp_path / "tr.geojson"
result = dump_waterlevel_trend_collection(
str(out), [_trend_site(rid="W1")], [obs], {"id": "nm_waterlevel_trends"}
)
props = result["features"][0]["properties"]
assert props["observation_count"] == 3
assert props["record_count"] == 2 # two distinct days
assert props["first_observation_datetime"].startswith("2020-01-01")
Loading