From 814092b2b916efd453e94598107ff1fb01cb343e Mon Sep 17 00:00:00 2001 From: jross Date: Wed, 4 Mar 2026 10:55:33 -0700 Subject: [PATCH 1/9] feat: add normalized chemistry results materialized view and update related configurations --- ...zed_chemistry_results_materialized_view.py | 293 ++++++++++++++++++ cli/cli.py | 1 + core/pygeoapi-config.yml | 23 ++ docker-compose.yml | 38 ++- entrypoint.sh | 11 +- tests/test_cli_commands.py | 9 +- tests/test_ogc.py | 2 + 7 files changed, 362 insertions(+), 15 deletions(-) create mode 100644 alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py diff --git a/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py b/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py new file mode 100644 index 000000000..ba10d567b --- /dev/null +++ b/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py @@ -0,0 +1,293 @@ +"""add normalized chemistry results materialized view + +Revision ID: b6f7a8b9c0d1 +Revises: l5e6f7a8b9c0 +Create Date: 2026-03-04 14:10:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "b6f7a8b9c0d1" +down_revision: Union[str, Sequence[str], None] = "l5e6f7a8b9c0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +LATEST_LOCATION_CTE = """ +SELECT DISTINCT ON (lta.thing_id) + lta.thing_id, + lta.location_id, + lta.effective_start +FROM location_thing_association AS lta +WHERE lta.effective_end IS NULL +ORDER BY lta.thing_id, lta.effective_start DESC +""".strip() + +# Static analyte columns for major chemistry pivots. +# Includes aliases observed in current DB values (e.g., Ca(total), IONBAL, TAn, TCat, Na+K). +STATIC_ANALYTE_COLUMNS: list[tuple[str, str]] = [ + ("tds", "tds"), + ("calcium", "calcium"), + ("calcium_total", "calcium_total"), + ("magnesium", "magnesium"), + ("magnesium_total", "magnesium_total"), + ("sodium", "sodium"), + ("sodium_total", "sodium_total"), + ("potassium", "potassium"), + ("potassium_total", "potassium_total"), + ("sodium_plus_potassium", "sodium_plus_potassium"), + ("bicarbonate", "bicarbonate"), + ("carbonate", "carbonate"), + ("sulfate", "sulfate"), + ("chloride", "chloride"), + ("ion_balance", "ion_balance"), + ("total_anions", "total_anions"), + ("total_cations", "total_cations"), + ("alkalinity", "alkalinity"), + ("hardness", "hardness"), + ("specific_conductance", "specific_conductance"), + ("ph", "ph"), + ("nitrate", "nitrate"), + ("fluoride", "fluoride"), + ("silica", "silica"), +] + + +def _static_analyte_select_columns() -> str: + return ",\n".join( + [ + ( + " MAX(lr.sample_value) FILTER " + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}" + ) + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS + ] + ) + + +def _static_analyte_unit_columns() -> str: + return ",\n".join( + [ + ( + " MAX(lr.units) FILTER " + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}_units" + ) + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS + ] + ) + + +def _create_normalized_chemistry_results_view() -> str: + static_columns = _static_analyte_select_columns() + static_unit_columns = _static_analyte_unit_columns() + return f""" + CREATE MATERIALIZED VIEW ogc_normalized_chemistry_results AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + chemistry_rows AS ( + SELECT + csi.thing_id, + mc.id AS result_id, + COALESCE(mc."AnalysisDate", csi."CollectionDate") AS observation_datetime, + trim(mc."Analyte") AS analyte_name, + trim(mc."Symbol") AS symbol_name, + mc."SampleValue"::double precision AS sample_value, + mc."Units" AS units + FROM "NMA_MajorChemistry" AS mc + JOIN "NMA_Chemistry_SampleInfo" AS csi + ON csi.id = mc.chemistry_sample_info_id + WHERE mc."SampleValue" IS NOT NULL + ), + normalized_rows AS ( + SELECT + cr.thing_id, + cr.result_id, + cr.observation_datetime, + NULLIF( + regexp_replace( + lower(trim(coalesce(cr.analyte_name, ''))), + '[^a-z0-9]+', + '', + 'g' + ), + '' + ) AS analyte_token, + NULLIF( + regexp_replace( + lower(trim(coalesce(cr.symbol_name, ''))), + '[^a-z0-9]+', + '', + 'g' + ), + '' + ) AS symbol_token, + cr.sample_value, + cr.units + FROM chemistry_rows AS cr + ), + mapped_rows AS ( + SELECT + nr.thing_id, + nr.result_id, + nr.observation_datetime, + CASE + WHEN coalesce(nr.symbol_token, '') = 'tds' + OR coalesce(nr.analyte_token, '') IN ('tds', 'totaldissolvedsolids') + THEN 'tds' + + WHEN coalesce(nr.symbol_token, '') = 'ca' + OR coalesce(nr.analyte_token, '') = 'ca' + THEN 'calcium' + WHEN coalesce(nr.analyte_token, '') = 'catotal' + THEN 'calcium_total' + + WHEN coalesce(nr.symbol_token, '') = 'mg' + OR coalesce(nr.analyte_token, '') = 'mg' + THEN 'magnesium' + WHEN coalesce(nr.analyte_token, '') = 'mgtotal' + THEN 'magnesium_total' + + WHEN coalesce(nr.symbol_token, '') = 'na' + OR coalesce(nr.analyte_token, '') = 'na' + THEN 'sodium' + WHEN coalesce(nr.analyte_token, '') = 'natotal' + THEN 'sodium_total' + + WHEN coalesce(nr.symbol_token, '') = 'k' + OR coalesce(nr.analyte_token, '') = 'k' + THEN 'potassium' + WHEN coalesce(nr.analyte_token, '') = 'ktotal' + THEN 'potassium_total' + + WHEN coalesce(nr.analyte_token, '') = 'nak' + THEN 'sodium_plus_potassium' + + WHEN coalesce(nr.symbol_token, '') = 'hco3' + OR coalesce(nr.analyte_token, '') = 'hco3' + THEN 'bicarbonate' + WHEN coalesce(nr.symbol_token, '') = 'co3' + OR coalesce(nr.analyte_token, '') = 'co3' + THEN 'carbonate' + WHEN coalesce(nr.symbol_token, '') = 'so4' + OR coalesce(nr.analyte_token, '') = 'so4' + THEN 'sulfate' + WHEN coalesce(nr.symbol_token, '') = 'cl' + OR coalesce(nr.analyte_token, '') = 'cl' + THEN 'chloride' + + WHEN coalesce(nr.analyte_token, '') = 'ionbal' + THEN 'ion_balance' + WHEN coalesce(nr.analyte_token, '') = 'tan' + THEN 'total_anions' + WHEN coalesce(nr.analyte_token, '') = 'tcat' + THEN 'total_cations' + + WHEN coalesce(nr.analyte_token, '') IN ('alk', 'alkalinity') + THEN 'alkalinity' + WHEN coalesce(nr.analyte_token, '') IN ('hrd', 'hardness') + THEN 'hardness' + WHEN coalesce(nr.analyte_token, '') IN ( + 'condlab', + 'specificconductance', + 'specificconductivity', + 'conductivity' + ) + THEN 'specific_conductance' + WHEN coalesce(nr.symbol_token, '') = 'ph' + OR coalesce(nr.analyte_token, '') IN ('ph', 'phl') + THEN 'ph' + + WHEN coalesce(nr.symbol_token, '') = 'no3' + OR coalesce(nr.analyte_token, '') IN ('no3', 'nitrate') + THEN 'nitrate' + WHEN coalesce(nr.symbol_token, '') = 'f' + OR coalesce(nr.analyte_token, '') IN ('f', 'fluoride') + THEN 'fluoride' + WHEN coalesce(nr.symbol_token, '') = 'sio2' + OR coalesce(nr.analyte_token, '') IN ('sio2', 'silica') + THEN 'silica' + + ELSE NULL + END AS analyte_key, + nr.sample_value, + nr.units + FROM normalized_rows AS nr + ), + latest_results AS ( + SELECT + mr.thing_id, + mr.analyte_key, + mr.sample_value, + mr.units, + mr.observation_datetime, + ROW_NUMBER() OVER ( + PARTITION BY mr.thing_id, mr.analyte_key + ORDER BY mr.observation_datetime DESC NULLS LAST, mr.result_id DESC + ) AS rn + FROM mapped_rows AS mr + WHERE mr.analyte_key IS NOT NULL + ) + SELECT + t.id AS id, + ll.location_id, + t.name, + t.thing_type, + COUNT(*)::integer AS analyte_count, + MAX(lr.observation_datetime::date) AS latest_chemistry_date, +{static_columns}, +{static_unit_columns}, + l.point + FROM latest_results AS lr + JOIN thing AS t ON t.id = lr.thing_id + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + WHERE lr.rn = 1 + GROUP BY t.id, ll.location_id, t.name, t.thing_type, l.point + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + required_tables = { + "thing", + "location", + "location_thing_association", + "NMA_Chemistry_SampleInfo", + "NMA_MajorChemistry", + } + + if not required_tables.issubset(existing_tables): + missing = sorted(t for t in required_tables if t not in existing_tables) + raise RuntimeError( + "Cannot create ogc_normalized_chemistry_results. Missing required tables: " + + ", ".join(missing) + ) + + op.execute( + text("DROP MATERIALIZED VIEW IF EXISTS ogc_normalized_chemistry_results") + ) + op.execute(text(_create_normalized_chemistry_results_view())) + op.execute( + text( + "COMMENT ON MATERIALIZED VIEW ogc_normalized_chemistry_results IS " + "'Latest major-chemistry analyte values per location, pivoted into static analyte columns.'" + ) + ) + op.execute( + text( + "CREATE UNIQUE INDEX ux_ogc_normalized_chemistry_results_id " + "ON ogc_normalized_chemistry_results (id)" + ) + ) + + +def downgrade() -> None: + op.execute( + text("DROP MATERIALIZED VIEW IF EXISTS ogc_normalized_chemistry_results") + ) diff --git a/cli/cli.py b/cli/cli.py index 09c201850..da1972c04 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -55,6 +55,7 @@ class SmokePopulation(str, Enum): "ogc_avg_tds_wells", "ogc_depth_to_water_trend_wells", "ogc_water_well_summary", + "ogc_normalized_chemistry_results", ) diff --git a/core/pygeoapi-config.yml b/core/pygeoapi-config.yml index 1a468b138..80227db03 100644 --- a/core/pygeoapi-config.yml +++ b/core/pygeoapi-config.yml @@ -172,4 +172,27 @@ resources: table: ogc_water_well_summary geom_field: point + normalized_chemistry_results: + type: collection + title: Normalized Chemistry Results + description: Latest major chemistry analyte values per location, represented as static analyte columns. + keywords: [chemistry, analytes, normalized, major-chemistry] + extents: + spatial: + bbox: [-109.05, 31.33, -103.00, 37.00] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: PostgreSQL + data: + host: {postgres_host} + port: {postgres_port} + dbname: {postgres_db} + user: {postgres_user} + password: {postgres_password_env} + search_path: [public] + id_field: id + table: ogc_normalized_chemistry_results + geom_field: point + {thing_collections_block} diff --git a/docker-compose.yml b/docker-compose.yml index 9eb88baf1..e3a57f964 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ # keep docker-compose.yml in root directory to configure with root .env services: - db: + db_dev: build: context: . dockerfile: ./docker/db/Dockerfile @@ -9,13 +9,32 @@ services: environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - - POSTGRES_DB=${POSTGRES_DB} + - POSTGRES_DB=ocotilloapi_dev ports: - 5432:5432 volumes: - - postgres_data:/var/lib/postgresql/data + - postgres_data_dev:/var/lib/postgresql/data healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"] + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ocotilloapi_dev"] + interval: 2s + timeout: 5s + retries: 20 + + db_test: + build: + context: . + dockerfile: ./docker/db/Dockerfile + platform: linux/amd64 + environment: + - POSTGRES_USER=${POSTGRES_USER} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - POSTGRES_DB=ocotilloapi_test + ports: + - 5433:5432 + volumes: + - postgres_data_test:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ocotilloapi_test"] interval: 2s timeout: 5s retries: 20 @@ -27,20 +46,21 @@ services: environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - - POSTGRES_DB=${POSTGRES_DB} - - POSTGRES_HOST=db + - POSTGRES_DB=ocotilloapi_dev + - POSTGRES_HOST=db_dev - POSTGRES_PORT=5432 - MODE=${MODE} - AUTHENTIK_DISABLE_AUTHENTICATION=${AUTHENTIK_DISABLE_AUTHENTICATION} ports: - 8000:8000 depends_on: - db: + db_dev: condition: service_healthy # <-- wait for DB to be ready links: - - db + - db_dev volumes: - .:/app volumes: - postgres_data: + postgres_data_dev: + postgres_data_test: diff --git a/entrypoint.sh b/entrypoint.sh index 662487618..91b46aa64 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,7 +1,12 @@ #!/bin/sh + +DB_HOST="${POSTGRES_HOST:-db}" +DB_PORT="${POSTGRES_PORT:-5432}" +DB_NAME="${POSTGRES_DB:-postgres}" + # Wait for PostgreSQL to be ready -until PGPASSWORD="$POSTGRES_PASSWORD" pg_isready -h db -p 5432 -U "$POSTGRES_USER"; do - echo "Waiting for postgres..." +until PGPASSWORD="$POSTGRES_PASSWORD" pg_isready -h "$DB_HOST" -p "$DB_PORT" -U "$POSTGRES_USER" -d "$DB_NAME"; do + echo "Waiting for postgres at ${DB_HOST}:${DB_PORT}/${DB_NAME}..." sleep 2 done echo "PostgreSQL is ready!" @@ -9,4 +14,4 @@ echo "PostgreSQL is ready!" echo "Applying migrations..." alembic upgrade head echo "Starting the application..." -uvicorn main:app --host 0.0.0.0 --port 8000 --reload \ No newline at end of file +uvicorn main:app --host 0.0.0.0 --port 8000 --reload diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 0673d8ba7..738de2914 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -58,9 +58,10 @@ def __exit__(self, exc_type, exc, tb): "REFRESH MATERIALIZED VIEW ogc_avg_tds_wells", "REFRESH MATERIALIZED VIEW ogc_depth_to_water_trend_wells", "REFRESH MATERIALIZED VIEW ogc_water_well_summary", + "REFRESH MATERIALIZED VIEW ogc_normalized_chemistry_results", ] assert commit_called["value"] is True - assert "Refreshed 4 materialized view(s)." in result.output + assert "Refreshed 5 materialized view(s)." in result.output def test_refresh_pygeoapi_materialized_views_custom_and_concurrently(monkeypatch): @@ -335,10 +336,12 @@ def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): """ def _write_csv(path: Path, *, well_name: str, notes: str): - csv_text = textwrap.dedent(f"""\ + csv_text = textwrap.dedent( + f"""\ field_staff,well_name_point_id,field_event_date_time,measurement_date_time,sampler,sample_method,mp_height,level_status,depth_to_water_ft,data_quality,water_level_notes CLI Tester,{well_name},2025-02-15T08:00:00-07:00,2025-02-15T10:30:00-07:00,Groundwater Team,electric tape,1.5,stable,42.5,approved,{notes} - """) + """ + ) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" diff --git a/tests/test_ogc.py b/tests/test_ogc.py index e243c90b2..52b36118b 100644 --- a/tests/test_ogc.py +++ b/tests/test_ogc.py @@ -202,6 +202,7 @@ def test_ogc_collections(): "latest_tds_wells", "depth_to_water_trend_wells", "water_well_summary", + "normalized_chemistry_results", }.issubset(ids) @@ -210,6 +211,7 @@ def test_ogc_new_collection_items_endpoints(): "latest_tds_wells", "depth_to_water_trend_wells", "water_well_summary", + "normalized_chemistry_results", ): response = client.get(f"/ogcapi/collections/{collection_id}/items?limit=10") assert response.status_code == 200 From 884ffc66eb7f96aaa846965cc0a3372c5a16f355 Mon Sep 17 00:00:00 2001 From: jirhiker <2035568+jirhiker@users.noreply.github.com> Date: Wed, 4 Mar 2026 17:56:00 +0000 Subject: [PATCH 2/9] Formatting changes --- tests/test_cli_commands.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 738de2914..d504fb96a 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -336,12 +336,10 @@ def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): """ def _write_csv(path: Path, *, well_name: str, notes: str): - csv_text = textwrap.dedent( - f"""\ + csv_text = textwrap.dedent(f"""\ field_staff,well_name_point_id,field_event_date_time,measurement_date_time,sampler,sample_method,mp_height,level_status,depth_to_water_ft,data_quality,water_level_notes CLI Tester,{well_name},2025-02-15T08:00:00-07:00,2025-02-15T10:30:00-07:00,Groundwater Team,electric tape,1.5,stable,42.5,approved,{notes} - """ - ) + """) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" From 8e9f4e61bac39f6f265432f56ebec5da4daebca4 Mon Sep 17 00:00:00 2001 From: jross Date: Wed, 4 Mar 2026 13:39:38 -0700 Subject: [PATCH 3/9] feat: update environment configuration for Docker and enhance README with local development setup --- .env.example | 2 +- README.md | 38 ++++++++++++++++++++++++++++++++++++-- docker-compose.yml | 2 ++ entrypoint.sh | 1 + 4 files changed, 40 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index d8a7547d8..08dda83ea 100644 --- a/.env.example +++ b/.env.example @@ -2,7 +2,7 @@ DB_DRIVER=postgres POSTGRES_USER=admin POSTGRES_PASSWORD=password -POSTGRES_DB=ocotillo +POSTGRES_DB=ocotilloapi_dev POSTGRES_HOST=localhost POSTGRES_PORT=5432 diff --git a/README.md b/README.md index 155dc2b94..8e20b6782 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,36 @@ Notes: * Create file gcs_credentials.json in the root directory of the project, and obtain its contents from a teammate. * PostgreSQL uses the default port 5432. +Minimum vars to set in `.env` for local development: +* `POSTGRES_USER` +* `POSTGRES_PASSWORD` +* `POSTGRES_DB` (`ocotilloapi_dev` when using Docker Compose dev) +* `POSTGRES_HOST` (`localhost` for local psql/pytest against mapped Docker port) +* `POSTGRES_PORT` (`5432`) +* `MODE` (`development` recommended locally) +* `SESSION_SECRET_KEY` + +Auth-related vars (required when auth is enabled, optional when `AUTHENTIK_DISABLE_AUTHENTICATION=1`): +* `AUTHENTIK_DISABLE_AUTHENTICATION` +* `AUTHENTIK_URL` +* `AUTHENTIK_CLIENT_ID` +* `AUTHENTIK_AUTHORIZE_URL` +* `AUTHENTIK_TOKEN_URL` + +pygeoapi vars: +* `PYGEOAPI_MOUNT_PATH` (default `/ogcapi`) +* `PYGEOAPI_RUNTIME_DIR` (default `/tmp/pygeoapi`) +* `PYGEOAPI_POSTGRES_HOST` +* `PYGEOAPI_POSTGRES_PORT` +* `PYGEOAPI_POSTGRES_DB` +* `PYGEOAPI_POSTGRES_USER` +* `PYGEOAPI_POSTGRES_PASSWORD` + +Optional telemetry vars: +* `SENTRY_DSN` +* `APITALLY_CLIENT_ID` +* `ENVIRONMENT` + In development set `MODE=development` to allow lexicon enums to be populated. When `MODE=development`, the app attempts to seed the database with 10 example records via `transfers/seed.py`; if a `contact` record already exists, the seed step is skipped. #### 5. Database and server @@ -169,9 +199,13 @@ docker compose up --build Notes: * Requires Docker Desktop. -* Spins up two containers: `db` (PostGIS/PostgreSQL) and `app` (FastAPI API service). +* By default, spins up two containers: `db_dev` (PostGIS/PostgreSQL) and `app` (FastAPI API service). +* `db_test` is opt-in via profile: `docker compose --profile test up`. * `alembic upgrade head` runs on app startup after `docker compose up`. -* The database listens on port `5432` both inside the container and on your host. Ensure `POSTGRES_PORT=5432` in your `.env` to run local commands against the Docker DB (e.g., `uv run pytest`, `uv run python -m transfers.transfer`). +* Compose uses hardcoded DB names: + * dev: `ocotilloapi_dev` + * test: `ocotilloapi_test` +* The database listens on port `5432` both inside the container and on your host. Ensure `POSTGRES_PORT=5432` and `POSTGRES_DB=ocotilloapi_dev` in your `.env` to run local commands against the Docker dev DB (e.g., `uv run pytest`, `uv run python -m transfers.transfer`). #### Staging Data diff --git a/docker-compose.yml b/docker-compose.yml index e3a57f964..61ddc3b75 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,8 @@ services: retries: 20 db_test: + profiles: + - test build: context: . dockerfile: ./docker/db/Dockerfile diff --git a/entrypoint.sh b/entrypoint.sh index 91b46aa64..3fd13d48c 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,4 +1,5 @@ #!/bin/sh +set -eu DB_HOST="${POSTGRES_HOST:-db}" DB_PORT="${POSTGRES_PORT:-5432}" From ae1ce5ed280733e87b8d3aa87d2c62a8029565c7 Mon Sep 17 00:00:00 2001 From: jross Date: Wed, 4 Mar 2026 13:41:55 -0700 Subject: [PATCH 4/9] feat: update database service configuration in tests to use development setup --- .github/workflows/tests.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 79bfcd7eb..a1ef30109 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -37,8 +37,8 @@ jobs: - name: Start database (PostGIS) run: | - docker compose build db - docker compose up -d db + docker compose build db_dev + docker compose up -d db_dev - name: Wait for database readiness run: | @@ -122,8 +122,8 @@ jobs: - name: Start database (PostGIS) run: | - docker compose build db - docker compose up -d db + docker compose build db_dev + docker compose up -d db_dev - name: Wait for database readiness run: | From 181218ac2f8f2e08c3b9375565cf0a0c7b947d77 Mon Sep 17 00:00:00 2001 From: jross Date: Wed, 4 Mar 2026 14:00:56 -0700 Subject: [PATCH 5/9] feat: add minor chemistry wells materialized view and update related configurations --- ...zed_chemistry_results_materialized_view.py | 1 + ...minor_chemistry_wells_materialized_view.py | 329 ++++++++++++++++++ cli/cli.py | 1 + core/pygeoapi-config.yml | 29 +- tests/test_cli_commands.py | 9 +- tests/test_ogc.py | 2 + 6 files changed, 365 insertions(+), 6 deletions(-) create mode 100644 alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py diff --git a/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py b/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py index ba10d567b..b497740a0 100644 --- a/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py +++ b/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py @@ -246,6 +246,7 @@ def _create_normalized_chemistry_results_view() -> str: JOIN latest_location AS ll ON ll.thing_id = t.id JOIN location AS l ON l.id = ll.location_id WHERE lr.rn = 1 + AND t.thing_type = 'water well' GROUP BY t.id, ll.location_id, t.name, t.thing_type, l.point """ diff --git a/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py b/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py new file mode 100644 index 000000000..302e88bfd --- /dev/null +++ b/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py @@ -0,0 +1,329 @@ +"""add minor chemistry wells materialized view + +Revision ID: c7f8a9b0d1e2 +Revises: b6f7a8b9c0d1 +Create Date: 2026-03-04 16:20:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "c7f8a9b0d1e2" +down_revision: Union[str, Sequence[str], None] = "b6f7a8b9c0d1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +LATEST_LOCATION_CTE = """ +SELECT DISTINCT ON (lta.thing_id) + lta.thing_id, + lta.location_id, + lta.effective_start +FROM location_thing_association AS lta +WHERE lta.effective_end IS NULL +ORDER BY lta.thing_id, lta.effective_start DESC +""".strip() + +STATIC_ANALYTE_COLUMNS: list[tuple[str, str]] = [ + ("h2r", "h2r"), + ("o18r", "o18r"), + ("c13r", "c13r"), + ("c14", "c14"), + ("c14_years", "c14_years"), + ("fluoride", "fluoride"), + ("barium", "barium"), + ("barium_total", "barium_total"), + ("copper", "copper"), + ("copper_total", "copper_total"), + ("zinc", "zinc"), + ("zinc_total", "zinc_total"), + ("molybdenum", "molybdenum"), + ("molybdenum_total", "molybdenum_total"), + ("silica", "silica"), + ("silicon", "silicon"), + ("silicon_total", "silicon_total"), + ("manganese", "manganese"), + ("manganese_total", "manganese_total"), + ("iron", "iron"), + ("iron_total", "iron_total"), + ("strontium", "strontium"), + ("strontium_total", "strontium_total"), + ("chromium", "chromium"), + ("chromium_total", "chromium_total"), + ("boron", "boron"), + ("boron_total", "boron_total"), + ("uranium", "uranium"), + ("uranium_total", "uranium_total"), + ("lithium", "lithium"), + ("lithium_total", "lithium_total"), + ("silver", "silver"), + ("silver_total", "silver_total"), + ("antimony", "antimony"), + ("antimony_total", "antimony_total"), + ("beryllium", "beryllium"), + ("beryllium_total", "beryllium_total"), + ("lead", "lead"), + ("lead_total", "lead_total"), + ("thallium", "thallium"), + ("thallium_total", "thallium_total"), + ("bromide", "bromide"), + ("selenium", "selenium"), + ("selenium_total", "selenium_total"), + ("vanadium", "vanadium"), + ("vanadium_total", "vanadium_total"), + ("aluminum", "aluminum"), + ("aluminum_total", "aluminum_total"), + ("arsenic", "arsenic"), + ("arsenic_total", "arsenic_total"), + ("nickel", "nickel"), + ("nickel_total", "nickel_total"), + ("cadmium", "cadmium"), + ("cadmium_total", "cadmium_total"), + ("cobalt", "cobalt"), + ("cobalt_total", "cobalt_total"), + ("phosphate", "phosphate"), + ("nitrite", "nitrite"), + ("nitrate", "nitrate"), + ("nitrate_as_n", "nitrate_as_n"), + ("thorium", "thorium"), + ("thorium_total", "thorium_total"), + ("tin", "tin"), + ("tin_total", "tin_total"), + ("mercury", "mercury"), + ("mercury_total", "mercury_total"), + ("titanium", "titanium"), + ("titanium_total", "titanium_total"), +] + + +def _static_analyte_value_columns() -> str: + return ",\n".join( + [ + ( + " MAX(lr.sample_value) FILTER " + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}" + ) + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS + ] + ) + + +def _static_analyte_unit_columns() -> str: + return ",\n".join( + [ + ( + " MAX(lr.units) FILTER " + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}_units" + ) + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS + ] + ) + + +def _create_minor_chemistry_wells_view() -> str: + value_columns = _static_analyte_value_columns() + unit_columns = _static_analyte_unit_columns() + + return f""" + CREATE MATERIALIZED VIEW ogc_minor_chemistry_wells AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + chemistry_rows AS ( + SELECT + csi.thing_id, + mtc.id AS result_id, + COALESCE(mtc.analysis_date::timestamp, csi."CollectionDate") AS observation_datetime, + trim(mtc.analyte) AS analyte_name, + trim(mtc.symbol) AS symbol_name, + mtc.sample_value::double precision AS sample_value, + mtc.units AS units + FROM "NMA_MinorTraceChemistry" AS mtc + JOIN "NMA_Chemistry_SampleInfo" AS csi + ON csi.id = mtc.chemistry_sample_info_id + JOIN thing AS t ON t.id = csi.thing_id + WHERE + mtc.sample_value IS NOT NULL + AND t.thing_type = 'water well' + ), + normalized_rows AS ( + SELECT + cr.thing_id, + cr.result_id, + cr.observation_datetime, + NULLIF( + regexp_replace( + lower(trim(coalesce(cr.analyte_name, ''))), + '[^a-z0-9]+', + '', + 'g' + ), + '' + ) AS analyte_token, + NULLIF( + regexp_replace( + lower(trim(coalesce(cr.symbol_name, ''))), + '[^a-z0-9]+', + '', + 'g' + ), + '' + ) AS symbol_token, + cr.sample_value, + cr.units + FROM chemistry_rows AS cr + ), + mapped_rows AS ( + SELECT + nr.thing_id, + nr.result_id, + nr.observation_datetime, + CASE + WHEN coalesce(nr.analyte_token, '') = 'h2r' THEN 'h2r' + WHEN coalesce(nr.analyte_token, '') = 'o18r' THEN 'o18r' + WHEN coalesce(nr.analyte_token, '') = 'c13r' THEN 'c13r' + WHEN coalesce(nr.analyte_token, '') = 'c14' THEN 'c14' + WHEN coalesce(nr.analyte_token, '') = 'c14years' THEN 'c14_years' + + WHEN coalesce(nr.analyte_token, '') = 'f' THEN 'fluoride' + WHEN coalesce(nr.analyte_token, '') = 'ba' THEN 'barium' + WHEN coalesce(nr.analyte_token, '') = 'batotal' THEN 'barium_total' + WHEN coalesce(nr.analyte_token, '') = 'cu' THEN 'copper' + WHEN coalesce(nr.analyte_token, '') = 'cutotal' THEN 'copper_total' + WHEN coalesce(nr.analyte_token, '') = 'zn' THEN 'zinc' + WHEN coalesce(nr.analyte_token, '') = 'zntotal' THEN 'zinc_total' + WHEN coalesce(nr.analyte_token, '') = 'mo' THEN 'molybdenum' + WHEN coalesce(nr.analyte_token, '') = 'mototal' THEN 'molybdenum_total' + WHEN coalesce(nr.analyte_token, '') = 'sio2' THEN 'silica' + WHEN coalesce(nr.analyte_token, '') = 'si' THEN 'silicon' + WHEN coalesce(nr.analyte_token, '') = 'sitotal' THEN 'silicon_total' + WHEN coalesce(nr.analyte_token, '') = 'mn' THEN 'manganese' + WHEN coalesce(nr.analyte_token, '') = 'mntotal' THEN 'manganese_total' + WHEN coalesce(nr.analyte_token, '') = 'fe' THEN 'iron' + WHEN coalesce(nr.analyte_token, '') = 'fetotal' THEN 'iron_total' + WHEN coalesce(nr.analyte_token, '') = 'sr' THEN 'strontium' + WHEN coalesce(nr.analyte_token, '') = 'srtotal' THEN 'strontium_total' + WHEN coalesce(nr.analyte_token, '') = 'cr' THEN 'chromium' + WHEN coalesce(nr.analyte_token, '') = 'crtotal' THEN 'chromium_total' + WHEN coalesce(nr.analyte_token, '') = 'b' THEN 'boron' + WHEN coalesce(nr.analyte_token, '') = 'btotal' THEN 'boron_total' + WHEN coalesce(nr.analyte_token, '') = 'u' THEN 'uranium' + WHEN coalesce(nr.analyte_token, '') = 'utotal' THEN 'uranium_total' + WHEN coalesce(nr.analyte_token, '') = 'li' THEN 'lithium' + WHEN coalesce(nr.analyte_token, '') = 'litotal' THEN 'lithium_total' + WHEN coalesce(nr.analyte_token, '') = 'ag' THEN 'silver' + WHEN coalesce(nr.analyte_token, '') = 'agtotal' THEN 'silver_total' + WHEN coalesce(nr.analyte_token, '') = 'sb' THEN 'antimony' + WHEN coalesce(nr.analyte_token, '') = 'sbtotal' THEN 'antimony_total' + WHEN coalesce(nr.analyte_token, '') = 'be' THEN 'beryllium' + WHEN coalesce(nr.analyte_token, '') = 'betotal' THEN 'beryllium_total' + WHEN coalesce(nr.analyte_token, '') = 'pb' THEN 'lead' + WHEN coalesce(nr.analyte_token, '') = 'pbtotal' THEN 'lead_total' + WHEN coalesce(nr.analyte_token, '') = 'tl' THEN 'thallium' + WHEN coalesce(nr.analyte_token, '') = 'tltotal' THEN 'thallium_total' + WHEN coalesce(nr.analyte_token, '') = 'br' THEN 'bromide' + WHEN coalesce(nr.analyte_token, '') = 'se' THEN 'selenium' + WHEN coalesce(nr.analyte_token, '') = 'setotal' THEN 'selenium_total' + WHEN coalesce(nr.analyte_token, '') = 'v' THEN 'vanadium' + WHEN coalesce(nr.analyte_token, '') = 'vtotal' THEN 'vanadium_total' + WHEN coalesce(nr.analyte_token, '') = 'al' THEN 'aluminum' + WHEN coalesce(nr.analyte_token, '') = 'altotal' THEN 'aluminum_total' + WHEN coalesce(nr.analyte_token, '') = 'as' THEN 'arsenic' + WHEN coalesce(nr.analyte_token, '') = 'astotal' THEN 'arsenic_total' + WHEN coalesce(nr.analyte_token, '') = 'ni' THEN 'nickel' + WHEN coalesce(nr.analyte_token, '') = 'nitotal' THEN 'nickel_total' + WHEN coalesce(nr.analyte_token, '') = 'cd' THEN 'cadmium' + WHEN coalesce(nr.analyte_token, '') = 'cdtotal' THEN 'cadmium_total' + WHEN coalesce(nr.analyte_token, '') = 'co' THEN 'cobalt' + WHEN coalesce(nr.analyte_token, '') = 'cototal' THEN 'cobalt_total' + WHEN coalesce(nr.analyte_token, '') = 'po4' THEN 'phosphate' + WHEN coalesce(nr.analyte_token, '') = 'no2' THEN 'nitrite' + WHEN coalesce(nr.analyte_token, '') = 'no3' THEN 'nitrate' + WHEN coalesce(nr.analyte_token, '') = 'no3n' THEN 'nitrate_as_n' + WHEN coalesce(nr.analyte_token, '') = 'th' THEN 'thorium' + WHEN coalesce(nr.analyte_token, '') = 'thtotal' THEN 'thorium_total' + WHEN coalesce(nr.analyte_token, '') = 'sn' THEN 'tin' + WHEN coalesce(nr.analyte_token, '') = 'sntotal' THEN 'tin_total' + WHEN coalesce(nr.analyte_token, '') = 'hg' THEN 'mercury' + WHEN coalesce(nr.analyte_token, '') = 'hgtotal' THEN 'mercury_total' + WHEN coalesce(nr.analyte_token, '') = 'ti' THEN 'titanium' + WHEN coalesce(nr.analyte_token, '') = 'titotal' THEN 'titanium_total' + ELSE NULL + END AS analyte_key, + nr.sample_value, + nr.units + FROM normalized_rows AS nr + ), + latest_results AS ( + SELECT + mr.thing_id, + mr.analyte_key, + mr.sample_value, + mr.units, + mr.observation_datetime, + ROW_NUMBER() OVER ( + PARTITION BY mr.thing_id, mr.analyte_key + ORDER BY mr.observation_datetime DESC NULLS LAST, mr.result_id DESC + ) AS rn + FROM mapped_rows AS mr + WHERE mr.analyte_key IS NOT NULL + ) + SELECT + t.id AS id, + ll.location_id, + t.name, + t.thing_type, + COUNT(*)::integer AS analyte_count, + MAX(lr.observation_datetime::date) AS latest_chemistry_date, +{value_columns}, +{unit_columns}, + l.point + FROM latest_results AS lr + JOIN thing AS t ON t.id = lr.thing_id + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + WHERE lr.rn = 1 + AND t.thing_type = 'water well' + GROUP BY t.id, ll.location_id, t.name, t.thing_type, l.point + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + required_tables = { + "thing", + "location", + "location_thing_association", + "NMA_Chemistry_SampleInfo", + "NMA_MinorTraceChemistry", + } + + if not required_tables.issubset(existing_tables): + missing = sorted(t for t in required_tables if t not in existing_tables) + raise RuntimeError( + "Cannot create ogc_minor_chemistry_wells. Missing required tables: " + + ", ".join(missing) + ) + + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_minor_chemistry_wells")) + op.execute(text(_create_minor_chemistry_wells_view())) + op.execute( + text( + "COMMENT ON MATERIALIZED VIEW ogc_minor_chemistry_wells IS " + "'Latest minor/trace chemistry analyte values for water wells, pivoted into static analyte columns.'" + ) + ) + op.execute( + text( + "CREATE UNIQUE INDEX ux_ogc_minor_chemistry_wells_id " + "ON ogc_minor_chemistry_wells (id)" + ) + ) + + +def downgrade() -> None: + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_minor_chemistry_wells")) diff --git a/cli/cli.py b/cli/cli.py index da1972c04..808a7f68c 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -56,6 +56,7 @@ class SmokePopulation(str, Enum): "ogc_depth_to_water_trend_wells", "ogc_water_well_summary", "ogc_normalized_chemistry_results", + "ogc_minor_chemistry_wells", ) diff --git a/core/pygeoapi-config.yml b/core/pygeoapi-config.yml index 80227db03..15cf2a738 100644 --- a/core/pygeoapi-config.yml +++ b/core/pygeoapi-config.yml @@ -174,9 +174,9 @@ resources: normalized_chemistry_results: type: collection - title: Normalized Chemistry Results - description: Latest major chemistry analyte values per location, represented as static analyte columns. - keywords: [chemistry, analytes, normalized, major-chemistry] + title: Major Chemistry (Water Wells) + description: Latest major chemistry analyte values for water wells, represented as static analyte columns. + keywords: [water-wells, chemistry, analytes, major-chemistry] extents: spatial: bbox: [-109.05, 31.33, -103.00, 37.00] @@ -195,4 +195,27 @@ resources: table: ogc_normalized_chemistry_results geom_field: point + minor_chemistry_wells: + type: collection + title: Minor Chemistry (Water Wells) + description: Latest minor/trace chemistry analyte values for water wells, represented as static analyte columns. + keywords: [water-wells, chemistry, analytes, minor-chemistry, trace-chemistry] + extents: + spatial: + bbox: [-109.05, 31.33, -103.00, 37.00] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: PostgreSQL + data: + host: {postgres_host} + port: {postgres_port} + dbname: {postgres_db} + user: {postgres_user} + password: {postgres_password_env} + search_path: [public] + id_field: id + table: ogc_minor_chemistry_wells + geom_field: point + {thing_collections_block} diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index d504fb96a..7b218351b 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -59,9 +59,10 @@ def __exit__(self, exc_type, exc, tb): "REFRESH MATERIALIZED VIEW ogc_depth_to_water_trend_wells", "REFRESH MATERIALIZED VIEW ogc_water_well_summary", "REFRESH MATERIALIZED VIEW ogc_normalized_chemistry_results", + "REFRESH MATERIALIZED VIEW ogc_minor_chemistry_wells", ] assert commit_called["value"] is True - assert "Refreshed 5 materialized view(s)." in result.output + assert "Refreshed 6 materialized view(s)." in result.output def test_refresh_pygeoapi_materialized_views_custom_and_concurrently(monkeypatch): @@ -336,10 +337,12 @@ def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): """ def _write_csv(path: Path, *, well_name: str, notes: str): - csv_text = textwrap.dedent(f"""\ + csv_text = textwrap.dedent( + f"""\ field_staff,well_name_point_id,field_event_date_time,measurement_date_time,sampler,sample_method,mp_height,level_status,depth_to_water_ft,data_quality,water_level_notes CLI Tester,{well_name},2025-02-15T08:00:00-07:00,2025-02-15T10:30:00-07:00,Groundwater Team,electric tape,1.5,stable,42.5,approved,{notes} - """) + """ + ) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" diff --git a/tests/test_ogc.py b/tests/test_ogc.py index 52b36118b..4793996ac 100644 --- a/tests/test_ogc.py +++ b/tests/test_ogc.py @@ -203,6 +203,7 @@ def test_ogc_collections(): "depth_to_water_trend_wells", "water_well_summary", "normalized_chemistry_results", + "minor_chemistry_wells", }.issubset(ids) @@ -212,6 +213,7 @@ def test_ogc_new_collection_items_endpoints(): "depth_to_water_trend_wells", "water_well_summary", "normalized_chemistry_results", + "minor_chemistry_wells", ): response = client.get(f"/ogcapi/collections/{collection_id}/items?limit=10") assert response.status_code == 200 From 8513ad01739f44d93c266f2a052c74527848b55d Mon Sep 17 00:00:00 2001 From: jirhiker <2035568+jirhiker@users.noreply.github.com> Date: Wed, 4 Mar 2026 21:01:18 +0000 Subject: [PATCH 6/9] Formatting changes --- tests/test_cli_commands.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 7b218351b..ea68aeece 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -337,12 +337,10 @@ def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): """ def _write_csv(path: Path, *, well_name: str, notes: str): - csv_text = textwrap.dedent( - f"""\ + csv_text = textwrap.dedent(f"""\ field_staff,well_name_point_id,field_event_date_time,measurement_date_time,sampler,sample_method,mp_height,level_status,depth_to_water_ft,data_quality,water_level_notes CLI Tester,{well_name},2025-02-15T08:00:00-07:00,2025-02-15T10:30:00-07:00,Groundwater Team,electric tape,1.5,stable,42.5,approved,{notes} - """ - ) + """) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" From e15d366144f9359b9e450e861edef7420d271337 Mon Sep 17 00:00:00 2001 From: jross Date: Wed, 4 Mar 2026 14:04:09 -0700 Subject: [PATCH 7/9] feat: add test for normalized major chemistry to ensure latest results are used --- tests/test_ogc.py | 71 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/test_ogc.py b/tests/test_ogc.py index 4793996ac..57ffe3ef6 100644 --- a/tests/test_ogc.py +++ b/tests/test_ogc.py @@ -190,6 +190,77 @@ def test_latest_tds_uses_latest_timestamp_within_same_day(water_well_thing): session.commit() +def test_ogc_normalized_major_chemistry_uses_latest_per_analyte(water_well_thing): + with session_ctx() as session: + csi = NMA_Chemistry_SampleInfo( + thing_id=water_well_thing.id, + nma_sample_point_id="MAJOR-NORM-01", + collection_date=datetime(2024, 3, 1, 10, 0, 0), + ) + session.add(csi) + session.flush() + + # Older calcium result + calcium_old = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Ca", + symbol="", + sample_value=80.0, + units="mg/L", + analysis_date=datetime(2024, 3, 1, 9, 0, 0), + ) + # Newer calcium result that should win for calcium + calcium_units + calcium_new = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Ca", + symbol="", + sample_value=95.0, + units="mg/L as CaCO3", + analysis_date=datetime(2024, 3, 2, 9, 0, 0), + ) + # Separate analyte with even later date to drive latest_chemistry_date + chloride = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Cl", + symbol="", + sample_value=40.0, + units="mg/L", + analysis_date=datetime(2024, 3, 3, 8, 0, 0), + ) + + session.add_all([calcium_old, calcium_new, chloride]) + session.commit() + + session.execute( + text("REFRESH MATERIALIZED VIEW ogc_normalized_chemistry_results") + ) + session.commit() + + row = session.execute( + text( + "SELECT calcium, calcium_units, chloride, chloride_units, latest_chemistry_date " + "FROM ogc_normalized_chemistry_results WHERE id = :thing_id" + ), + {"thing_id": water_well_thing.id}, + ).one() + + assert float(row.calcium) == 95.0 + assert row.calcium_units == "mg/L as CaCO3" + assert float(row.chloride) == 40.0 + assert row.chloride_units == "mg/L" + assert row.latest_chemistry_date.isoformat() == "2024-03-03" + + session.delete(chloride) + session.delete(calcium_new) + session.delete(calcium_old) + session.delete(csi) + session.commit() + session.execute( + text("REFRESH MATERIALIZED VIEW ogc_normalized_chemistry_results") + ) + session.commit() + + def test_ogc_collections(): response = client.get("/ogcapi/collections") assert response.status_code == 200 From 7cebfcc9b9d592bf7526e26920b16255adc07dfd Mon Sep 17 00:00:00 2001 From: jross Date: Wed, 4 Mar 2026 14:39:27 -0700 Subject: [PATCH 8/9] feat: rename normalized chemistry results to major chemistry results and update related configurations --- .github/workflows/tests.yml | 8 +- README.md | 8 +- ...zed_chemistry_results_materialized_view.py | 20 +++-- ...minor_chemistry_wells_materialized_view.py | 10 --- cli/cli.py | 2 +- core/pygeoapi-config.yml | 4 +- docker-compose.yml | 31 ++----- docker/db/init/01-create-test-db.sql | 10 +++ tests/__init__.py | 20 ++++- tests/conftest.py | 11 ++- tests/test_cli_commands.py | 8 +- tests/test_ogc.py | 90 ++++++++++++++++--- 12 files changed, 150 insertions(+), 72 deletions(-) create mode 100644 docker/db/init/01-create-test-db.sql diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a1ef30109..79bfcd7eb 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -37,8 +37,8 @@ jobs: - name: Start database (PostGIS) run: | - docker compose build db_dev - docker compose up -d db_dev + docker compose build db + docker compose up -d db - name: Wait for database readiness run: | @@ -122,8 +122,8 @@ jobs: - name: Start database (PostGIS) run: | - docker compose build db_dev - docker compose up -d db_dev + docker compose build db + docker compose up -d db - name: Wait for database readiness run: | diff --git a/README.md b/README.md index 8e20b6782..7e35d3ec1 100644 --- a/README.md +++ b/README.md @@ -199,12 +199,14 @@ docker compose up --build Notes: * Requires Docker Desktop. -* By default, spins up two containers: `db_dev` (PostGIS/PostgreSQL) and `app` (FastAPI API service). -* `db_test` is opt-in via profile: `docker compose --profile test up`. +* By default, spins up two containers: `db` (PostGIS/PostgreSQL) and `app` (FastAPI API service). +* `db` initializes both application databases in the same Postgres service: + * `ocotilloapi_dev` + * `ocotilloapi_test` * `alembic upgrade head` runs on app startup after `docker compose up`. * Compose uses hardcoded DB names: * dev: `ocotilloapi_dev` - * test: `ocotilloapi_test` + * test: `ocotilloapi_test` (created by init SQL in `docker/db/init/01-create-test-db.sql`) * The database listens on port `5432` both inside the container and on your host. Ensure `POSTGRES_PORT=5432` and `POSTGRES_DB=ocotilloapi_dev` in your `.env` to run local commands against the Docker dev DB (e.g., `uv run pytest`, `uv run python -m transfers.transfer`). #### Staging Data diff --git a/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py b/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py index b497740a0..a70edaf03 100644 --- a/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py +++ b/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py @@ -80,11 +80,11 @@ def _static_analyte_unit_columns() -> str: ) -def _create_normalized_chemistry_results_view() -> str: +def _create_major_chemistry_results_view() -> str: static_columns = _static_analyte_select_columns() static_unit_columns = _static_analyte_unit_columns() return f""" - CREATE MATERIALIZED VIEW ogc_normalized_chemistry_results AS + CREATE MATERIALIZED VIEW ogc_major_chemistry_results AS WITH latest_location AS ( {LATEST_LOCATION_CTE} ), @@ -100,7 +100,10 @@ def _create_normalized_chemistry_results_view() -> str: FROM "NMA_MajorChemistry" AS mc JOIN "NMA_Chemistry_SampleInfo" AS csi ON csi.id = mc.chemistry_sample_info_id + JOIN thing AS t + ON t.id = csi.thing_id WHERE mc."SampleValue" IS NOT NULL + AND t.thing_type = 'water well' ), normalized_rows AS ( SELECT @@ -246,7 +249,6 @@ def _create_normalized_chemistry_results_view() -> str: JOIN latest_location AS ll ON ll.thing_id = t.id JOIN location AS l ON l.id = ll.location_id WHERE lr.rn = 1 - AND t.thing_type = 'water well' GROUP BY t.id, ll.location_id, t.name, t.thing_type, l.point """ @@ -266,29 +268,31 @@ def upgrade() -> None: if not required_tables.issubset(existing_tables): missing = sorted(t for t in required_tables if t not in existing_tables) raise RuntimeError( - "Cannot create ogc_normalized_chemistry_results. Missing required tables: " + "Cannot create ogc_major_chemistry_results. Missing required tables: " + ", ".join(missing) ) + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_major_chemistry_results")) op.execute( text("DROP MATERIALIZED VIEW IF EXISTS ogc_normalized_chemistry_results") ) - op.execute(text(_create_normalized_chemistry_results_view())) + op.execute(text(_create_major_chemistry_results_view())) op.execute( text( - "COMMENT ON MATERIALIZED VIEW ogc_normalized_chemistry_results IS " + "COMMENT ON MATERIALIZED VIEW ogc_major_chemistry_results IS " "'Latest major-chemistry analyte values per location, pivoted into static analyte columns.'" ) ) op.execute( text( - "CREATE UNIQUE INDEX ux_ogc_normalized_chemistry_results_id " - "ON ogc_normalized_chemistry_results (id)" + "CREATE UNIQUE INDEX ux_ogc_major_chemistry_results_id " + "ON ogc_major_chemistry_results (id)" ) ) def downgrade() -> None: + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_major_chemistry_results")) op.execute( text("DROP MATERIALIZED VIEW IF EXISTS ogc_normalized_chemistry_results") ) diff --git a/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py b/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py index 302e88bfd..e2e014acc 100644 --- a/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py +++ b/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py @@ -137,7 +137,6 @@ def _create_minor_chemistry_wells_view() -> str: mtc.id AS result_id, COALESCE(mtc.analysis_date::timestamp, csi."CollectionDate") AS observation_datetime, trim(mtc.analyte) AS analyte_name, - trim(mtc.symbol) AS symbol_name, mtc.sample_value::double precision AS sample_value, mtc.units AS units FROM "NMA_MinorTraceChemistry" AS mtc @@ -162,15 +161,6 @@ def _create_minor_chemistry_wells_view() -> str: ), '' ) AS analyte_token, - NULLIF( - regexp_replace( - lower(trim(coalesce(cr.symbol_name, ''))), - '[^a-z0-9]+', - '', - 'g' - ), - '' - ) AS symbol_token, cr.sample_value, cr.units FROM chemistry_rows AS cr diff --git a/cli/cli.py b/cli/cli.py index 808a7f68c..e9f8dc36c 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -55,7 +55,7 @@ class SmokePopulation(str, Enum): "ogc_avg_tds_wells", "ogc_depth_to_water_trend_wells", "ogc_water_well_summary", - "ogc_normalized_chemistry_results", + "ogc_major_chemistry_results", "ogc_minor_chemistry_wells", ) diff --git a/core/pygeoapi-config.yml b/core/pygeoapi-config.yml index 15cf2a738..0a205d298 100644 --- a/core/pygeoapi-config.yml +++ b/core/pygeoapi-config.yml @@ -172,7 +172,7 @@ resources: table: ogc_water_well_summary geom_field: point - normalized_chemistry_results: + major_chemistry_results: type: collection title: Major Chemistry (Water Wells) description: Latest major chemistry analyte values for water wells, represented as static analyte columns. @@ -192,7 +192,7 @@ resources: password: {postgres_password_env} search_path: [public] id_field: id - table: ogc_normalized_chemistry_results + table: ogc_major_chemistry_results geom_field: point minor_chemistry_wells: diff --git a/docker-compose.yml b/docker-compose.yml index 61ddc3b75..9a557f827 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ # keep docker-compose.yml in root directory to configure with root .env services: - db_dev: + db: build: context: . dockerfile: ./docker/db/Dockerfile @@ -14,33 +14,13 @@ services: - 5432:5432 volumes: - postgres_data_dev:/var/lib/postgresql/data + - ./docker/db/init:/docker-entrypoint-initdb.d:ro healthcheck: test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ocotilloapi_dev"] interval: 2s timeout: 5s retries: 20 - db_test: - profiles: - - test - build: - context: . - dockerfile: ./docker/db/Dockerfile - platform: linux/amd64 - environment: - - POSTGRES_USER=${POSTGRES_USER} - - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - - POSTGRES_DB=ocotilloapi_test - ports: - - 5433:5432 - volumes: - - postgres_data_test:/var/lib/postgresql/data - healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ocotilloapi_test"] - interval: 2s - timeout: 5s - retries: 20 - app: build: context: . @@ -49,20 +29,19 @@ services: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - POSTGRES_DB=ocotilloapi_dev - - POSTGRES_HOST=db_dev + - POSTGRES_HOST=db - POSTGRES_PORT=5432 - MODE=${MODE} - AUTHENTIK_DISABLE_AUTHENTICATION=${AUTHENTIK_DISABLE_AUTHENTICATION} ports: - 8000:8000 depends_on: - db_dev: + db: condition: service_healthy # <-- wait for DB to be ready links: - - db_dev + - db volumes: - .:/app volumes: postgres_data_dev: - postgres_data_test: diff --git a/docker/db/init/01-create-test-db.sql b/docker/db/init/01-create-test-db.sql new file mode 100644 index 000000000..53ab9cb5d --- /dev/null +++ b/docker/db/init/01-create-test-db.sql @@ -0,0 +1,10 @@ +-- Initialize test database inside the same Postgres service used for dev. +-- This script runs only when the data directory is first initialized. + +CREATE DATABASE ocotilloapi_test; + +\connect ocotilloapi_dev +CREATE EXTENSION IF NOT EXISTS postgis; + +\connect ocotilloapi_test +CREATE EXTENSION IF NOT EXISTS postgis; diff --git a/tests/__init__.py b/tests/__init__.py index b5cee0114..57fa0c351 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -14,13 +14,29 @@ # limitations under the License. # =============================================================================== import os +import socket from functools import lru_cache from dotenv import load_dotenv # Load .env file BEFORE importing anything else -# Use override=True to override conflicting shell environment variables -load_dotenv(override=True) +# Use override=False so explicit shell environment variables can override .env +load_dotenv(override=False) + + +def _normalize_test_db_host() -> None: + """Fallback docker-compose hostnames to localhost for host-run tests.""" + for env_name in ("POSTGRES_HOST", "PYGEOAPI_POSTGRES_HOST"): + host = (os.environ.get(env_name) or "").strip() + if host != "db": + continue + try: + socket.gethostbyname(host) + except OSError: + os.environ[env_name] = "localhost" + + +_normalize_test_db_host() # for safety don't test on the production database port os.environ["POSTGRES_PORT"] = "5432" diff --git a/tests/conftest.py b/tests/conftest.py index 3847263b6..a5f037b61 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os +import socket import pytest from alembic import command @@ -16,7 +17,15 @@ def pytest_configure(): - load_dotenv(override=True) + load_dotenv(override=False) + for env_name in ("POSTGRES_HOST", "PYGEOAPI_POSTGRES_HOST"): + host = (os.environ.get(env_name) or "").strip() + if host != "db": + continue + try: + socket.gethostbyname(host) + except OSError: + os.environ[env_name] = "localhost" os.environ.setdefault("POSTGRES_PORT", "54321") # NOTE: This hardcoded secret key is for tests only and must NEVER be used in production. os.environ.setdefault("SESSION_SECRET_KEY", "test-session-secret-key") diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index ea68aeece..6d70f5874 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -58,7 +58,7 @@ def __exit__(self, exc_type, exc, tb): "REFRESH MATERIALIZED VIEW ogc_avg_tds_wells", "REFRESH MATERIALIZED VIEW ogc_depth_to_water_trend_wells", "REFRESH MATERIALIZED VIEW ogc_water_well_summary", - "REFRESH MATERIALIZED VIEW ogc_normalized_chemistry_results", + "REFRESH MATERIALIZED VIEW ogc_major_chemistry_results", "REFRESH MATERIALIZED VIEW ogc_minor_chemistry_wells", ] assert commit_called["value"] is True @@ -337,10 +337,12 @@ def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): """ def _write_csv(path: Path, *, well_name: str, notes: str): - csv_text = textwrap.dedent(f"""\ + csv_text = textwrap.dedent( + f"""\ field_staff,well_name_point_id,field_event_date_time,measurement_date_time,sampler,sample_method,mp_height,level_status,depth_to_water_ft,data_quality,water_level_notes CLI Tester,{well_name},2025-02-15T08:00:00-07:00,2025-02-15T10:30:00-07:00,Groundwater Team,electric tape,1.5,stable,42.5,approved,{notes} - """) + """ + ) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}" diff --git a/tests/test_ogc.py b/tests/test_ogc.py index 57ffe3ef6..7912cab91 100644 --- a/tests/test_ogc.py +++ b/tests/test_ogc.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from datetime import datetime +from datetime import date, datetime from importlib.util import find_spec import pytest @@ -27,7 +27,7 @@ viewer_function, amp_viewer_function, ) -from db import NMA_Chemistry_SampleInfo, NMA_MajorChemistry +from db import NMA_Chemistry_SampleInfo, NMA_MajorChemistry, NMA_MinorTraceChemistry from db.engine import session_ctx from main import app from tests import client, override_authentication @@ -190,11 +190,11 @@ def test_latest_tds_uses_latest_timestamp_within_same_day(water_well_thing): session.commit() -def test_ogc_normalized_major_chemistry_uses_latest_per_analyte(water_well_thing): +def test_ogc_major_chemistry_results_uses_latest_per_analyte(water_well_thing): with session_ctx() as session: csi = NMA_Chemistry_SampleInfo( thing_id=water_well_thing.id, - nma_sample_point_id="MAJOR-NORM-01", + nma_sample_point_id="MAJNORM01", collection_date=datetime(2024, 3, 1, 10, 0, 0), ) session.add(csi) @@ -231,15 +231,13 @@ def test_ogc_normalized_major_chemistry_uses_latest_per_analyte(water_well_thing session.add_all([calcium_old, calcium_new, chloride]) session.commit() - session.execute( - text("REFRESH MATERIALIZED VIEW ogc_normalized_chemistry_results") - ) + session.execute(text("REFRESH MATERIALIZED VIEW ogc_major_chemistry_results")) session.commit() row = session.execute( text( "SELECT calcium, calcium_units, chloride, chloride_units, latest_chemistry_date " - "FROM ogc_normalized_chemistry_results WHERE id = :thing_id" + "FROM ogc_major_chemistry_results WHERE id = :thing_id" ), {"thing_id": water_well_thing.id}, ).one() @@ -255,9 +253,77 @@ def test_ogc_normalized_major_chemistry_uses_latest_per_analyte(water_well_thing session.delete(calcium_old) session.delete(csi) session.commit() - session.execute( - text("REFRESH MATERIALIZED VIEW ogc_normalized_chemistry_results") + session.execute(text("REFRESH MATERIALIZED VIEW ogc_major_chemistry_results")) + session.commit() + + +def test_ogc_minor_chemistry_wells_uses_latest_per_analyte(water_well_thing): + with session_ctx() as session: + csi = NMA_Chemistry_SampleInfo( + thing_id=water_well_thing.id, + nma_sample_point_id="MINRNORM1", + collection_date=datetime(2024, 4, 1, 10, 0, 0), + ) + session.add(csi) + session.flush() + + # Older barium result + barium_old = NMA_MinorTraceChemistry( + chemistry_sample_info_id=csi.id, + nma_sample_point_id="MINRNORM1", + analyte="Ba", + symbol="", + sample_value=0.40, + units="mg/L", + analysis_date=date(2024, 4, 1), + ) + # Newer barium result that should win for barium + barium_units + barium_new = NMA_MinorTraceChemistry( + chemistry_sample_info_id=csi.id, + nma_sample_point_id="MINRNORM1", + analyte="Ba", + symbol="", + sample_value=0.55, + units="ug/L", + analysis_date=date(2024, 4, 2), + ) + # Separate analyte with even later date to drive latest_chemistry_date + fluoride = NMA_MinorTraceChemistry( + chemistry_sample_info_id=csi.id, + nma_sample_point_id="MINRNORM1", + analyte="F", + symbol="", + sample_value=1.2, + units="mg/L", + analysis_date=date(2024, 4, 3), ) + + session.add_all([barium_old, barium_new, fluoride]) + session.commit() + + session.execute(text("REFRESH MATERIALIZED VIEW ogc_minor_chemistry_wells")) + session.commit() + + row = session.execute( + text( + "SELECT barium, barium_units, fluoride, fluoride_units, latest_chemistry_date " + "FROM ogc_minor_chemistry_wells WHERE id = :thing_id" + ), + {"thing_id": water_well_thing.id}, + ).one() + + assert float(row.barium) == 0.55 + assert row.barium_units == "ug/L" + assert float(row.fluoride) == 1.2 + assert row.fluoride_units == "mg/L" + assert row.latest_chemistry_date.isoformat() == "2024-04-03" + + session.delete(fluoride) + session.delete(barium_new) + session.delete(barium_old) + session.delete(csi) + session.commit() + session.execute(text("REFRESH MATERIALIZED VIEW ogc_minor_chemistry_wells")) session.commit() @@ -273,7 +339,7 @@ def test_ogc_collections(): "latest_tds_wells", "depth_to_water_trend_wells", "water_well_summary", - "normalized_chemistry_results", + "major_chemistry_results", "minor_chemistry_wells", }.issubset(ids) @@ -283,7 +349,7 @@ def test_ogc_new_collection_items_endpoints(): "latest_tds_wells", "depth_to_water_trend_wells", "water_well_summary", - "normalized_chemistry_results", + "major_chemistry_results", "minor_chemistry_wells", ): response = client.get(f"/ogcapi/collections/{collection_id}/items?limit=10") From f2198a620445c47d2a82e6b636c7e889d2853eaa Mon Sep 17 00:00:00 2001 From: jirhiker <2035568+jirhiker@users.noreply.github.com> Date: Wed, 4 Mar 2026 21:39:56 +0000 Subject: [PATCH 9/9] Formatting changes --- tests/test_cli_commands.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 6d70f5874..6f17f4101 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -337,12 +337,10 @@ def test_water_levels_cli_persists_observations(tmp_path, water_well_thing): """ def _write_csv(path: Path, *, well_name: str, notes: str): - csv_text = textwrap.dedent( - f"""\ + csv_text = textwrap.dedent(f"""\ field_staff,well_name_point_id,field_event_date_time,measurement_date_time,sampler,sample_method,mp_height,level_status,depth_to_water_ft,data_quality,water_level_notes CLI Tester,{well_name},2025-02-15T08:00:00-07:00,2025-02-15T10:30:00-07:00,Groundwater Team,electric tape,1.5,stable,42.5,approved,{notes} - """ - ) + """) path.write_text(csv_text) unique_notes = f"pytest-{uuid.uuid4()}"