diff --git a/.env.example b/.env.example index d8a7547d..08dda83e 100644 --- a/.env.example +++ b/.env.example @@ -2,7 +2,7 @@ DB_DRIVER=postgres POSTGRES_USER=admin POSTGRES_PASSWORD=password -POSTGRES_DB=ocotillo +POSTGRES_DB=ocotilloapi_dev POSTGRES_HOST=localhost POSTGRES_PORT=5432 diff --git a/README.md b/README.md index 155dc2b9..7e35d3ec 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,36 @@ Notes: * Create file gcs_credentials.json in the root directory of the project, and obtain its contents from a teammate. * PostgreSQL uses the default port 5432. +Minimum vars to set in `.env` for local development: +* `POSTGRES_USER` +* `POSTGRES_PASSWORD` +* `POSTGRES_DB` (`ocotilloapi_dev` when using Docker Compose dev) +* `POSTGRES_HOST` (`localhost` for local psql/pytest against mapped Docker port) +* `POSTGRES_PORT` (`5432`) +* `MODE` (`development` recommended locally) +* `SESSION_SECRET_KEY` + +Auth-related vars (required when auth is enabled, optional when `AUTHENTIK_DISABLE_AUTHENTICATION=1`): +* `AUTHENTIK_DISABLE_AUTHENTICATION` +* `AUTHENTIK_URL` +* `AUTHENTIK_CLIENT_ID` +* `AUTHENTIK_AUTHORIZE_URL` +* `AUTHENTIK_TOKEN_URL` + +pygeoapi vars: +* `PYGEOAPI_MOUNT_PATH` (default `/ogcapi`) +* `PYGEOAPI_RUNTIME_DIR` (default `/tmp/pygeoapi`) +* `PYGEOAPI_POSTGRES_HOST` +* `PYGEOAPI_POSTGRES_PORT` +* `PYGEOAPI_POSTGRES_DB` +* `PYGEOAPI_POSTGRES_USER` +* `PYGEOAPI_POSTGRES_PASSWORD` + +Optional telemetry vars: +* `SENTRY_DSN` +* `APITALLY_CLIENT_ID` +* `ENVIRONMENT` + In development set `MODE=development` to allow lexicon enums to be populated. When `MODE=development`, the app attempts to seed the database with 10 example records via `transfers/seed.py`; if a `contact` record already exists, the seed step is skipped. #### 5. Database and server @@ -169,9 +199,15 @@ docker compose up --build Notes: * Requires Docker Desktop. -* Spins up two containers: `db` (PostGIS/PostgreSQL) and `app` (FastAPI API service). +* By default, spins up two containers: `db` (PostGIS/PostgreSQL) and `app` (FastAPI API service). +* `db` initializes both application databases in the same Postgres service: + * `ocotilloapi_dev` + * `ocotilloapi_test` * `alembic upgrade head` runs on app startup after `docker compose up`. -* The database listens on port `5432` both inside the container and on your host. Ensure `POSTGRES_PORT=5432` in your `.env` to run local commands against the Docker DB (e.g., `uv run pytest`, `uv run python -m transfers.transfer`). +* Compose uses hardcoded DB names: + * dev: `ocotilloapi_dev` + * test: `ocotilloapi_test` (created by init SQL in `docker/db/init/01-create-test-db.sql`) +* The database listens on port `5432` both inside the container and on your host. Ensure `POSTGRES_PORT=5432` and `POSTGRES_DB=ocotilloapi_dev` in your `.env` to run local commands against the Docker dev DB (e.g., `uv run pytest`, `uv run python -m transfers.transfer`). #### Staging Data diff --git a/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py b/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py new file mode 100644 index 00000000..a70edaf0 --- /dev/null +++ b/alembic/versions/b6f7a8b9c0d1_add_normalized_chemistry_results_materialized_view.py @@ -0,0 +1,298 @@ +"""add normalized chemistry results materialized view + +Revision ID: b6f7a8b9c0d1 +Revises: l5e6f7a8b9c0 +Create Date: 2026-03-04 14:10:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "b6f7a8b9c0d1" +down_revision: Union[str, Sequence[str], None] = "l5e6f7a8b9c0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +LATEST_LOCATION_CTE = """ +SELECT DISTINCT ON (lta.thing_id) + lta.thing_id, + lta.location_id, + lta.effective_start +FROM location_thing_association AS lta +WHERE lta.effective_end IS NULL +ORDER BY lta.thing_id, lta.effective_start DESC +""".strip() + +# Static analyte columns for major chemistry pivots. +# Includes aliases observed in current DB values (e.g., Ca(total), IONBAL, TAn, TCat, Na+K). +STATIC_ANALYTE_COLUMNS: list[tuple[str, str]] = [ + ("tds", "tds"), + ("calcium", "calcium"), + ("calcium_total", "calcium_total"), + ("magnesium", "magnesium"), + ("magnesium_total", "magnesium_total"), + ("sodium", "sodium"), + ("sodium_total", "sodium_total"), + ("potassium", "potassium"), + ("potassium_total", "potassium_total"), + ("sodium_plus_potassium", "sodium_plus_potassium"), + ("bicarbonate", "bicarbonate"), + ("carbonate", "carbonate"), + ("sulfate", "sulfate"), + ("chloride", "chloride"), + ("ion_balance", "ion_balance"), + ("total_anions", "total_anions"), + ("total_cations", "total_cations"), + ("alkalinity", "alkalinity"), + ("hardness", "hardness"), + ("specific_conductance", "specific_conductance"), + ("ph", "ph"), + ("nitrate", "nitrate"), + ("fluoride", "fluoride"), + ("silica", "silica"), +] + + +def _static_analyte_select_columns() -> str: + return ",\n".join( + [ + ( + " MAX(lr.sample_value) FILTER " + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}" + ) + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS + ] + ) + + +def _static_analyte_unit_columns() -> str: + return ",\n".join( + [ + ( + " MAX(lr.units) FILTER " + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}_units" + ) + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS + ] + ) + + +def _create_major_chemistry_results_view() -> str: + static_columns = _static_analyte_select_columns() + static_unit_columns = _static_analyte_unit_columns() + return f""" + CREATE MATERIALIZED VIEW ogc_major_chemistry_results AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + chemistry_rows AS ( + SELECT + csi.thing_id, + mc.id AS result_id, + COALESCE(mc."AnalysisDate", csi."CollectionDate") AS observation_datetime, + trim(mc."Analyte") AS analyte_name, + trim(mc."Symbol") AS symbol_name, + mc."SampleValue"::double precision AS sample_value, + mc."Units" AS units + FROM "NMA_MajorChemistry" AS mc + JOIN "NMA_Chemistry_SampleInfo" AS csi + ON csi.id = mc.chemistry_sample_info_id + JOIN thing AS t + ON t.id = csi.thing_id + WHERE mc."SampleValue" IS NOT NULL + AND t.thing_type = 'water well' + ), + normalized_rows AS ( + SELECT + cr.thing_id, + cr.result_id, + cr.observation_datetime, + NULLIF( + regexp_replace( + lower(trim(coalesce(cr.analyte_name, ''))), + '[^a-z0-9]+', + '', + 'g' + ), + '' + ) AS analyte_token, + NULLIF( + regexp_replace( + lower(trim(coalesce(cr.symbol_name, ''))), + '[^a-z0-9]+', + '', + 'g' + ), + '' + ) AS symbol_token, + cr.sample_value, + cr.units + FROM chemistry_rows AS cr + ), + mapped_rows AS ( + SELECT + nr.thing_id, + nr.result_id, + nr.observation_datetime, + CASE + WHEN coalesce(nr.symbol_token, '') = 'tds' + OR coalesce(nr.analyte_token, '') IN ('tds', 'totaldissolvedsolids') + THEN 'tds' + + WHEN coalesce(nr.symbol_token, '') = 'ca' + OR coalesce(nr.analyte_token, '') = 'ca' + THEN 'calcium' + WHEN coalesce(nr.analyte_token, '') = 'catotal' + THEN 'calcium_total' + + WHEN coalesce(nr.symbol_token, '') = 'mg' + OR coalesce(nr.analyte_token, '') = 'mg' + THEN 'magnesium' + WHEN coalesce(nr.analyte_token, '') = 'mgtotal' + THEN 'magnesium_total' + + WHEN coalesce(nr.symbol_token, '') = 'na' + OR coalesce(nr.analyte_token, '') = 'na' + THEN 'sodium' + WHEN coalesce(nr.analyte_token, '') = 'natotal' + THEN 'sodium_total' + + WHEN coalesce(nr.symbol_token, '') = 'k' + OR coalesce(nr.analyte_token, '') = 'k' + THEN 'potassium' + WHEN coalesce(nr.analyte_token, '') = 'ktotal' + THEN 'potassium_total' + + WHEN coalesce(nr.analyte_token, '') = 'nak' + THEN 'sodium_plus_potassium' + + WHEN coalesce(nr.symbol_token, '') = 'hco3' + OR coalesce(nr.analyte_token, '') = 'hco3' + THEN 'bicarbonate' + WHEN coalesce(nr.symbol_token, '') = 'co3' + OR coalesce(nr.analyte_token, '') = 'co3' + THEN 'carbonate' + WHEN coalesce(nr.symbol_token, '') = 'so4' + OR coalesce(nr.analyte_token, '') = 'so4' + THEN 'sulfate' + WHEN coalesce(nr.symbol_token, '') = 'cl' + OR coalesce(nr.analyte_token, '') = 'cl' + THEN 'chloride' + + WHEN coalesce(nr.analyte_token, '') = 'ionbal' + THEN 'ion_balance' + WHEN coalesce(nr.analyte_token, '') = 'tan' + THEN 'total_anions' + WHEN coalesce(nr.analyte_token, '') = 'tcat' + THEN 'total_cations' + + WHEN coalesce(nr.analyte_token, '') IN ('alk', 'alkalinity') + THEN 'alkalinity' + WHEN coalesce(nr.analyte_token, '') IN ('hrd', 'hardness') + THEN 'hardness' + WHEN coalesce(nr.analyte_token, '') IN ( + 'condlab', + 'specificconductance', + 'specificconductivity', + 'conductivity' + ) + THEN 'specific_conductance' + WHEN coalesce(nr.symbol_token, '') = 'ph' + OR coalesce(nr.analyte_token, '') IN ('ph', 'phl') + THEN 'ph' + + WHEN coalesce(nr.symbol_token, '') = 'no3' + OR coalesce(nr.analyte_token, '') IN ('no3', 'nitrate') + THEN 'nitrate' + WHEN coalesce(nr.symbol_token, '') = 'f' + OR coalesce(nr.analyte_token, '') IN ('f', 'fluoride') + THEN 'fluoride' + WHEN coalesce(nr.symbol_token, '') = 'sio2' + OR coalesce(nr.analyte_token, '') IN ('sio2', 'silica') + THEN 'silica' + + ELSE NULL + END AS analyte_key, + nr.sample_value, + nr.units + FROM normalized_rows AS nr + ), + latest_results AS ( + SELECT + mr.thing_id, + mr.analyte_key, + mr.sample_value, + mr.units, + mr.observation_datetime, + ROW_NUMBER() OVER ( + PARTITION BY mr.thing_id, mr.analyte_key + ORDER BY mr.observation_datetime DESC NULLS LAST, mr.result_id DESC + ) AS rn + FROM mapped_rows AS mr + WHERE mr.analyte_key IS NOT NULL + ) + SELECT + t.id AS id, + ll.location_id, + t.name, + t.thing_type, + COUNT(*)::integer AS analyte_count, + MAX(lr.observation_datetime::date) AS latest_chemistry_date, +{static_columns}, +{static_unit_columns}, + l.point + FROM latest_results AS lr + JOIN thing AS t ON t.id = lr.thing_id + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + WHERE lr.rn = 1 + GROUP BY t.id, ll.location_id, t.name, t.thing_type, l.point + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + required_tables = { + "thing", + "location", + "location_thing_association", + "NMA_Chemistry_SampleInfo", + "NMA_MajorChemistry", + } + + if not required_tables.issubset(existing_tables): + missing = sorted(t for t in required_tables if t not in existing_tables) + raise RuntimeError( + "Cannot create ogc_major_chemistry_results. Missing required tables: " + + ", ".join(missing) + ) + + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_major_chemistry_results")) + op.execute( + text("DROP MATERIALIZED VIEW IF EXISTS ogc_normalized_chemistry_results") + ) + op.execute(text(_create_major_chemistry_results_view())) + op.execute( + text( + "COMMENT ON MATERIALIZED VIEW ogc_major_chemistry_results IS " + "'Latest major-chemistry analyte values per location, pivoted into static analyte columns.'" + ) + ) + op.execute( + text( + "CREATE UNIQUE INDEX ux_ogc_major_chemistry_results_id " + "ON ogc_major_chemistry_results (id)" + ) + ) + + +def downgrade() -> None: + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_major_chemistry_results")) + op.execute( + text("DROP MATERIALIZED VIEW IF EXISTS ogc_normalized_chemistry_results") + ) diff --git a/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py b/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py new file mode 100644 index 00000000..e2e014ac --- /dev/null +++ b/alembic/versions/c7f8a9b0d1e2_add_minor_chemistry_wells_materialized_view.py @@ -0,0 +1,319 @@ +"""add minor chemistry wells materialized view + +Revision ID: c7f8a9b0d1e2 +Revises: b6f7a8b9c0d1 +Create Date: 2026-03-04 16:20:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "c7f8a9b0d1e2" +down_revision: Union[str, Sequence[str], None] = "b6f7a8b9c0d1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +LATEST_LOCATION_CTE = """ +SELECT DISTINCT ON (lta.thing_id) + lta.thing_id, + lta.location_id, + lta.effective_start +FROM location_thing_association AS lta +WHERE lta.effective_end IS NULL +ORDER BY lta.thing_id, lta.effective_start DESC +""".strip() + +STATIC_ANALYTE_COLUMNS: list[tuple[str, str]] = [ + ("h2r", "h2r"), + ("o18r", "o18r"), + ("c13r", "c13r"), + ("c14", "c14"), + ("c14_years", "c14_years"), + ("fluoride", "fluoride"), + ("barium", "barium"), + ("barium_total", "barium_total"), + ("copper", "copper"), + ("copper_total", "copper_total"), + ("zinc", "zinc"), + ("zinc_total", "zinc_total"), + ("molybdenum", "molybdenum"), + ("molybdenum_total", "molybdenum_total"), + ("silica", "silica"), + ("silicon", "silicon"), + ("silicon_total", "silicon_total"), + ("manganese", "manganese"), + ("manganese_total", "manganese_total"), + ("iron", "iron"), + ("iron_total", "iron_total"), + ("strontium", "strontium"), + ("strontium_total", "strontium_total"), + ("chromium", "chromium"), + ("chromium_total", "chromium_total"), + ("boron", "boron"), + ("boron_total", "boron_total"), + ("uranium", "uranium"), + ("uranium_total", "uranium_total"), + ("lithium", "lithium"), + ("lithium_total", "lithium_total"), + ("silver", "silver"), + ("silver_total", "silver_total"), + ("antimony", "antimony"), + ("antimony_total", "antimony_total"), + ("beryllium", "beryllium"), + ("beryllium_total", "beryllium_total"), + ("lead", "lead"), + ("lead_total", "lead_total"), + ("thallium", "thallium"), + ("thallium_total", "thallium_total"), + ("bromide", "bromide"), + ("selenium", "selenium"), + ("selenium_total", "selenium_total"), + ("vanadium", "vanadium"), + ("vanadium_total", "vanadium_total"), + ("aluminum", "aluminum"), + ("aluminum_total", "aluminum_total"), + ("arsenic", "arsenic"), + ("arsenic_total", "arsenic_total"), + ("nickel", "nickel"), + ("nickel_total", "nickel_total"), + ("cadmium", "cadmium"), + ("cadmium_total", "cadmium_total"), + ("cobalt", "cobalt"), + ("cobalt_total", "cobalt_total"), + ("phosphate", "phosphate"), + ("nitrite", "nitrite"), + ("nitrate", "nitrate"), + ("nitrate_as_n", "nitrate_as_n"), + ("thorium", "thorium"), + ("thorium_total", "thorium_total"), + ("tin", "tin"), + ("tin_total", "tin_total"), + ("mercury", "mercury"), + ("mercury_total", "mercury_total"), + ("titanium", "titanium"), + ("titanium_total", "titanium_total"), +] + + +def _static_analyte_value_columns() -> str: + return ",\n".join( + [ + ( + " MAX(lr.sample_value) FILTER " + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}" + ) + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS + ] + ) + + +def _static_analyte_unit_columns() -> str: + return ",\n".join( + [ + ( + " MAX(lr.units) FILTER " + f"(WHERE lr.analyte_key = '{analyte_key}') AS {column_name}_units" + ) + for analyte_key, column_name in STATIC_ANALYTE_COLUMNS + ] + ) + + +def _create_minor_chemistry_wells_view() -> str: + value_columns = _static_analyte_value_columns() + unit_columns = _static_analyte_unit_columns() + + return f""" + CREATE MATERIALIZED VIEW ogc_minor_chemistry_wells AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + chemistry_rows AS ( + SELECT + csi.thing_id, + mtc.id AS result_id, + COALESCE(mtc.analysis_date::timestamp, csi."CollectionDate") AS observation_datetime, + trim(mtc.analyte) AS analyte_name, + mtc.sample_value::double precision AS sample_value, + mtc.units AS units + FROM "NMA_MinorTraceChemistry" AS mtc + JOIN "NMA_Chemistry_SampleInfo" AS csi + ON csi.id = mtc.chemistry_sample_info_id + JOIN thing AS t ON t.id = csi.thing_id + WHERE + mtc.sample_value IS NOT NULL + AND t.thing_type = 'water well' + ), + normalized_rows AS ( + SELECT + cr.thing_id, + cr.result_id, + cr.observation_datetime, + NULLIF( + regexp_replace( + lower(trim(coalesce(cr.analyte_name, ''))), + '[^a-z0-9]+', + '', + 'g' + ), + '' + ) AS analyte_token, + cr.sample_value, + cr.units + FROM chemistry_rows AS cr + ), + mapped_rows AS ( + SELECT + nr.thing_id, + nr.result_id, + nr.observation_datetime, + CASE + WHEN coalesce(nr.analyte_token, '') = 'h2r' THEN 'h2r' + WHEN coalesce(nr.analyte_token, '') = 'o18r' THEN 'o18r' + WHEN coalesce(nr.analyte_token, '') = 'c13r' THEN 'c13r' + WHEN coalesce(nr.analyte_token, '') = 'c14' THEN 'c14' + WHEN coalesce(nr.analyte_token, '') = 'c14years' THEN 'c14_years' + + WHEN coalesce(nr.analyte_token, '') = 'f' THEN 'fluoride' + WHEN coalesce(nr.analyte_token, '') = 'ba' THEN 'barium' + WHEN coalesce(nr.analyte_token, '') = 'batotal' THEN 'barium_total' + WHEN coalesce(nr.analyte_token, '') = 'cu' THEN 'copper' + WHEN coalesce(nr.analyte_token, '') = 'cutotal' THEN 'copper_total' + WHEN coalesce(nr.analyte_token, '') = 'zn' THEN 'zinc' + WHEN coalesce(nr.analyte_token, '') = 'zntotal' THEN 'zinc_total' + WHEN coalesce(nr.analyte_token, '') = 'mo' THEN 'molybdenum' + WHEN coalesce(nr.analyte_token, '') = 'mototal' THEN 'molybdenum_total' + WHEN coalesce(nr.analyte_token, '') = 'sio2' THEN 'silica' + WHEN coalesce(nr.analyte_token, '') = 'si' THEN 'silicon' + WHEN coalesce(nr.analyte_token, '') = 'sitotal' THEN 'silicon_total' + WHEN coalesce(nr.analyte_token, '') = 'mn' THEN 'manganese' + WHEN coalesce(nr.analyte_token, '') = 'mntotal' THEN 'manganese_total' + WHEN coalesce(nr.analyte_token, '') = 'fe' THEN 'iron' + WHEN coalesce(nr.analyte_token, '') = 'fetotal' THEN 'iron_total' + WHEN coalesce(nr.analyte_token, '') = 'sr' THEN 'strontium' + WHEN coalesce(nr.analyte_token, '') = 'srtotal' THEN 'strontium_total' + WHEN coalesce(nr.analyte_token, '') = 'cr' THEN 'chromium' + WHEN coalesce(nr.analyte_token, '') = 'crtotal' THEN 'chromium_total' + WHEN coalesce(nr.analyte_token, '') = 'b' THEN 'boron' + WHEN coalesce(nr.analyte_token, '') = 'btotal' THEN 'boron_total' + WHEN coalesce(nr.analyte_token, '') = 'u' THEN 'uranium' + WHEN coalesce(nr.analyte_token, '') = 'utotal' THEN 'uranium_total' + WHEN coalesce(nr.analyte_token, '') = 'li' THEN 'lithium' + WHEN coalesce(nr.analyte_token, '') = 'litotal' THEN 'lithium_total' + WHEN coalesce(nr.analyte_token, '') = 'ag' THEN 'silver' + WHEN coalesce(nr.analyte_token, '') = 'agtotal' THEN 'silver_total' + WHEN coalesce(nr.analyte_token, '') = 'sb' THEN 'antimony' + WHEN coalesce(nr.analyte_token, '') = 'sbtotal' THEN 'antimony_total' + WHEN coalesce(nr.analyte_token, '') = 'be' THEN 'beryllium' + WHEN coalesce(nr.analyte_token, '') = 'betotal' THEN 'beryllium_total' + WHEN coalesce(nr.analyte_token, '') = 'pb' THEN 'lead' + WHEN coalesce(nr.analyte_token, '') = 'pbtotal' THEN 'lead_total' + WHEN coalesce(nr.analyte_token, '') = 'tl' THEN 'thallium' + WHEN coalesce(nr.analyte_token, '') = 'tltotal' THEN 'thallium_total' + WHEN coalesce(nr.analyte_token, '') = 'br' THEN 'bromide' + WHEN coalesce(nr.analyte_token, '') = 'se' THEN 'selenium' + WHEN coalesce(nr.analyte_token, '') = 'setotal' THEN 'selenium_total' + WHEN coalesce(nr.analyte_token, '') = 'v' THEN 'vanadium' + WHEN coalesce(nr.analyte_token, '') = 'vtotal' THEN 'vanadium_total' + WHEN coalesce(nr.analyte_token, '') = 'al' THEN 'aluminum' + WHEN coalesce(nr.analyte_token, '') = 'altotal' THEN 'aluminum_total' + WHEN coalesce(nr.analyte_token, '') = 'as' THEN 'arsenic' + WHEN coalesce(nr.analyte_token, '') = 'astotal' THEN 'arsenic_total' + WHEN coalesce(nr.analyte_token, '') = 'ni' THEN 'nickel' + WHEN coalesce(nr.analyte_token, '') = 'nitotal' THEN 'nickel_total' + WHEN coalesce(nr.analyte_token, '') = 'cd' THEN 'cadmium' + WHEN coalesce(nr.analyte_token, '') = 'cdtotal' THEN 'cadmium_total' + WHEN coalesce(nr.analyte_token, '') = 'co' THEN 'cobalt' + WHEN coalesce(nr.analyte_token, '') = 'cototal' THEN 'cobalt_total' + WHEN coalesce(nr.analyte_token, '') = 'po4' THEN 'phosphate' + WHEN coalesce(nr.analyte_token, '') = 'no2' THEN 'nitrite' + WHEN coalesce(nr.analyte_token, '') = 'no3' THEN 'nitrate' + WHEN coalesce(nr.analyte_token, '') = 'no3n' THEN 'nitrate_as_n' + WHEN coalesce(nr.analyte_token, '') = 'th' THEN 'thorium' + WHEN coalesce(nr.analyte_token, '') = 'thtotal' THEN 'thorium_total' + WHEN coalesce(nr.analyte_token, '') = 'sn' THEN 'tin' + WHEN coalesce(nr.analyte_token, '') = 'sntotal' THEN 'tin_total' + WHEN coalesce(nr.analyte_token, '') = 'hg' THEN 'mercury' + WHEN coalesce(nr.analyte_token, '') = 'hgtotal' THEN 'mercury_total' + WHEN coalesce(nr.analyte_token, '') = 'ti' THEN 'titanium' + WHEN coalesce(nr.analyte_token, '') = 'titotal' THEN 'titanium_total' + ELSE NULL + END AS analyte_key, + nr.sample_value, + nr.units + FROM normalized_rows AS nr + ), + latest_results AS ( + SELECT + mr.thing_id, + mr.analyte_key, + mr.sample_value, + mr.units, + mr.observation_datetime, + ROW_NUMBER() OVER ( + PARTITION BY mr.thing_id, mr.analyte_key + ORDER BY mr.observation_datetime DESC NULLS LAST, mr.result_id DESC + ) AS rn + FROM mapped_rows AS mr + WHERE mr.analyte_key IS NOT NULL + ) + SELECT + t.id AS id, + ll.location_id, + t.name, + t.thing_type, + COUNT(*)::integer AS analyte_count, + MAX(lr.observation_datetime::date) AS latest_chemistry_date, +{value_columns}, +{unit_columns}, + l.point + FROM latest_results AS lr + JOIN thing AS t ON t.id = lr.thing_id + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + WHERE lr.rn = 1 + AND t.thing_type = 'water well' + GROUP BY t.id, ll.location_id, t.name, t.thing_type, l.point + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + required_tables = { + "thing", + "location", + "location_thing_association", + "NMA_Chemistry_SampleInfo", + "NMA_MinorTraceChemistry", + } + + if not required_tables.issubset(existing_tables): + missing = sorted(t for t in required_tables if t not in existing_tables) + raise RuntimeError( + "Cannot create ogc_minor_chemistry_wells. Missing required tables: " + + ", ".join(missing) + ) + + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_minor_chemistry_wells")) + op.execute(text(_create_minor_chemistry_wells_view())) + op.execute( + text( + "COMMENT ON MATERIALIZED VIEW ogc_minor_chemistry_wells IS " + "'Latest minor/trace chemistry analyte values for water wells, pivoted into static analyte columns.'" + ) + ) + op.execute( + text( + "CREATE UNIQUE INDEX ux_ogc_minor_chemistry_wells_id " + "ON ogc_minor_chemistry_wells (id)" + ) + ) + + +def downgrade() -> None: + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_minor_chemistry_wells")) diff --git a/cli/cli.py b/cli/cli.py index 09c20185..e9f8dc36 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -55,6 +55,8 @@ class SmokePopulation(str, Enum): "ogc_avg_tds_wells", "ogc_depth_to_water_trend_wells", "ogc_water_well_summary", + "ogc_major_chemistry_results", + "ogc_minor_chemistry_wells", ) diff --git a/core/pygeoapi-config.yml b/core/pygeoapi-config.yml index 1a468b13..0a205d29 100644 --- a/core/pygeoapi-config.yml +++ b/core/pygeoapi-config.yml @@ -172,4 +172,50 @@ resources: table: ogc_water_well_summary geom_field: point + major_chemistry_results: + type: collection + title: Major Chemistry (Water Wells) + description: Latest major chemistry analyte values for water wells, represented as static analyte columns. + keywords: [water-wells, chemistry, analytes, major-chemistry] + extents: + spatial: + bbox: [-109.05, 31.33, -103.00, 37.00] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: PostgreSQL + data: + host: {postgres_host} + port: {postgres_port} + dbname: {postgres_db} + user: {postgres_user} + password: {postgres_password_env} + search_path: [public] + id_field: id + table: ogc_major_chemistry_results + geom_field: point + + minor_chemistry_wells: + type: collection + title: Minor Chemistry (Water Wells) + description: Latest minor/trace chemistry analyte values for water wells, represented as static analyte columns. + keywords: [water-wells, chemistry, analytes, minor-chemistry, trace-chemistry] + extents: + spatial: + bbox: [-109.05, 31.33, -103.00, 37.00] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: PostgreSQL + data: + host: {postgres_host} + port: {postgres_port} + dbname: {postgres_db} + user: {postgres_user} + password: {postgres_password_env} + search_path: [public] + id_field: id + table: ogc_minor_chemistry_wells + geom_field: point + {thing_collections_block} diff --git a/docker-compose.yml b/docker-compose.yml index 9eb88baf..9a557f82 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,13 +9,14 @@ services: environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - - POSTGRES_DB=${POSTGRES_DB} + - POSTGRES_DB=ocotilloapi_dev ports: - 5432:5432 volumes: - - postgres_data:/var/lib/postgresql/data + - postgres_data_dev:/var/lib/postgresql/data + - ./docker/db/init:/docker-entrypoint-initdb.d:ro healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"] + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ocotilloapi_dev"] interval: 2s timeout: 5s retries: 20 @@ -27,7 +28,7 @@ services: environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - - POSTGRES_DB=${POSTGRES_DB} + - POSTGRES_DB=ocotilloapi_dev - POSTGRES_HOST=db - POSTGRES_PORT=5432 - MODE=${MODE} @@ -43,4 +44,4 @@ services: - .:/app volumes: - postgres_data: + postgres_data_dev: diff --git a/docker/db/init/01-create-test-db.sql b/docker/db/init/01-create-test-db.sql new file mode 100644 index 00000000..53ab9cb5 --- /dev/null +++ b/docker/db/init/01-create-test-db.sql @@ -0,0 +1,10 @@ +-- Initialize test database inside the same Postgres service used for dev. +-- This script runs only when the data directory is first initialized. + +CREATE DATABASE ocotilloapi_test; + +\connect ocotilloapi_dev +CREATE EXTENSION IF NOT EXISTS postgis; + +\connect ocotilloapi_test +CREATE EXTENSION IF NOT EXISTS postgis; diff --git a/entrypoint.sh b/entrypoint.sh index 66248761..3fd13d48 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,7 +1,13 @@ #!/bin/sh +set -eu + +DB_HOST="${POSTGRES_HOST:-db}" +DB_PORT="${POSTGRES_PORT:-5432}" +DB_NAME="${POSTGRES_DB:-postgres}" + # Wait for PostgreSQL to be ready -until PGPASSWORD="$POSTGRES_PASSWORD" pg_isready -h db -p 5432 -U "$POSTGRES_USER"; do - echo "Waiting for postgres..." +until PGPASSWORD="$POSTGRES_PASSWORD" pg_isready -h "$DB_HOST" -p "$DB_PORT" -U "$POSTGRES_USER" -d "$DB_NAME"; do + echo "Waiting for postgres at ${DB_HOST}:${DB_PORT}/${DB_NAME}..." sleep 2 done echo "PostgreSQL is ready!" @@ -9,4 +15,4 @@ echo "PostgreSQL is ready!" echo "Applying migrations..." alembic upgrade head echo "Starting the application..." -uvicorn main:app --host 0.0.0.0 --port 8000 --reload \ No newline at end of file +uvicorn main:app --host 0.0.0.0 --port 8000 --reload diff --git a/tests/__init__.py b/tests/__init__.py index b5cee011..57fa0c35 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -14,13 +14,29 @@ # limitations under the License. # =============================================================================== import os +import socket from functools import lru_cache from dotenv import load_dotenv # Load .env file BEFORE importing anything else -# Use override=True to override conflicting shell environment variables -load_dotenv(override=True) +# Use override=False so explicit shell environment variables can override .env +load_dotenv(override=False) + + +def _normalize_test_db_host() -> None: + """Fallback docker-compose hostnames to localhost for host-run tests.""" + for env_name in ("POSTGRES_HOST", "PYGEOAPI_POSTGRES_HOST"): + host = (os.environ.get(env_name) or "").strip() + if host != "db": + continue + try: + socket.gethostbyname(host) + except OSError: + os.environ[env_name] = "localhost" + + +_normalize_test_db_host() # for safety don't test on the production database port os.environ["POSTGRES_PORT"] = "5432" diff --git a/tests/conftest.py b/tests/conftest.py index 3847263b..a5f037b6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os +import socket import pytest from alembic import command @@ -16,7 +17,15 @@ def pytest_configure(): - load_dotenv(override=True) + load_dotenv(override=False) + for env_name in ("POSTGRES_HOST", "PYGEOAPI_POSTGRES_HOST"): + host = (os.environ.get(env_name) or "").strip() + if host != "db": + continue + try: + socket.gethostbyname(host) + except OSError: + os.environ[env_name] = "localhost" os.environ.setdefault("POSTGRES_PORT", "54321") # NOTE: This hardcoded secret key is for tests only and must NEVER be used in production. os.environ.setdefault("SESSION_SECRET_KEY", "test-session-secret-key") diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 0673d8ba..6f17f410 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -58,9 +58,11 @@ def __exit__(self, exc_type, exc, tb): "REFRESH MATERIALIZED VIEW ogc_avg_tds_wells", "REFRESH MATERIALIZED VIEW ogc_depth_to_water_trend_wells", "REFRESH MATERIALIZED VIEW ogc_water_well_summary", + "REFRESH MATERIALIZED VIEW ogc_major_chemistry_results", + "REFRESH MATERIALIZED VIEW ogc_minor_chemistry_wells", ] assert commit_called["value"] is True - assert "Refreshed 4 materialized view(s)." in result.output + assert "Refreshed 6 materialized view(s)." in result.output def test_refresh_pygeoapi_materialized_views_custom_and_concurrently(monkeypatch): diff --git a/tests/test_ogc.py b/tests/test_ogc.py index e243c90b..7912cab9 100644 --- a/tests/test_ogc.py +++ b/tests/test_ogc.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from datetime import datetime +from datetime import date, datetime from importlib.util import find_spec import pytest @@ -27,7 +27,7 @@ viewer_function, amp_viewer_function, ) -from db import NMA_Chemistry_SampleInfo, NMA_MajorChemistry +from db import NMA_Chemistry_SampleInfo, NMA_MajorChemistry, NMA_MinorTraceChemistry from db.engine import session_ctx from main import app from tests import client, override_authentication @@ -190,6 +190,143 @@ def test_latest_tds_uses_latest_timestamp_within_same_day(water_well_thing): session.commit() +def test_ogc_major_chemistry_results_uses_latest_per_analyte(water_well_thing): + with session_ctx() as session: + csi = NMA_Chemistry_SampleInfo( + thing_id=water_well_thing.id, + nma_sample_point_id="MAJNORM01", + collection_date=datetime(2024, 3, 1, 10, 0, 0), + ) + session.add(csi) + session.flush() + + # Older calcium result + calcium_old = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Ca", + symbol="", + sample_value=80.0, + units="mg/L", + analysis_date=datetime(2024, 3, 1, 9, 0, 0), + ) + # Newer calcium result that should win for calcium + calcium_units + calcium_new = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Ca", + symbol="", + sample_value=95.0, + units="mg/L as CaCO3", + analysis_date=datetime(2024, 3, 2, 9, 0, 0), + ) + # Separate analyte with even later date to drive latest_chemistry_date + chloride = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Cl", + symbol="", + sample_value=40.0, + units="mg/L", + analysis_date=datetime(2024, 3, 3, 8, 0, 0), + ) + + session.add_all([calcium_old, calcium_new, chloride]) + session.commit() + + session.execute(text("REFRESH MATERIALIZED VIEW ogc_major_chemistry_results")) + session.commit() + + row = session.execute( + text( + "SELECT calcium, calcium_units, chloride, chloride_units, latest_chemistry_date " + "FROM ogc_major_chemistry_results WHERE id = :thing_id" + ), + {"thing_id": water_well_thing.id}, + ).one() + + assert float(row.calcium) == 95.0 + assert row.calcium_units == "mg/L as CaCO3" + assert float(row.chloride) == 40.0 + assert row.chloride_units == "mg/L" + assert row.latest_chemistry_date.isoformat() == "2024-03-03" + + session.delete(chloride) + session.delete(calcium_new) + session.delete(calcium_old) + session.delete(csi) + session.commit() + session.execute(text("REFRESH MATERIALIZED VIEW ogc_major_chemistry_results")) + session.commit() + + +def test_ogc_minor_chemistry_wells_uses_latest_per_analyte(water_well_thing): + with session_ctx() as session: + csi = NMA_Chemistry_SampleInfo( + thing_id=water_well_thing.id, + nma_sample_point_id="MINRNORM1", + collection_date=datetime(2024, 4, 1, 10, 0, 0), + ) + session.add(csi) + session.flush() + + # Older barium result + barium_old = NMA_MinorTraceChemistry( + chemistry_sample_info_id=csi.id, + nma_sample_point_id="MINRNORM1", + analyte="Ba", + symbol="", + sample_value=0.40, + units="mg/L", + analysis_date=date(2024, 4, 1), + ) + # Newer barium result that should win for barium + barium_units + barium_new = NMA_MinorTraceChemistry( + chemistry_sample_info_id=csi.id, + nma_sample_point_id="MINRNORM1", + analyte="Ba", + symbol="", + sample_value=0.55, + units="ug/L", + analysis_date=date(2024, 4, 2), + ) + # Separate analyte with even later date to drive latest_chemistry_date + fluoride = NMA_MinorTraceChemistry( + chemistry_sample_info_id=csi.id, + nma_sample_point_id="MINRNORM1", + analyte="F", + symbol="", + sample_value=1.2, + units="mg/L", + analysis_date=date(2024, 4, 3), + ) + + session.add_all([barium_old, barium_new, fluoride]) + session.commit() + + session.execute(text("REFRESH MATERIALIZED VIEW ogc_minor_chemistry_wells")) + session.commit() + + row = session.execute( + text( + "SELECT barium, barium_units, fluoride, fluoride_units, latest_chemistry_date " + "FROM ogc_minor_chemistry_wells WHERE id = :thing_id" + ), + {"thing_id": water_well_thing.id}, + ).one() + + assert float(row.barium) == 0.55 + assert row.barium_units == "ug/L" + assert float(row.fluoride) == 1.2 + assert row.fluoride_units == "mg/L" + assert row.latest_chemistry_date.isoformat() == "2024-04-03" + + session.delete(fluoride) + session.delete(barium_new) + session.delete(barium_old) + session.delete(csi) + session.commit() + session.execute(text("REFRESH MATERIALIZED VIEW ogc_minor_chemistry_wells")) + session.commit() + + def test_ogc_collections(): response = client.get("/ogcapi/collections") assert response.status_code == 200 @@ -202,6 +339,8 @@ def test_ogc_collections(): "latest_tds_wells", "depth_to_water_trend_wells", "water_well_summary", + "major_chemistry_results", + "minor_chemistry_wells", }.issubset(ids) @@ -210,6 +349,8 @@ def test_ogc_new_collection_items_endpoints(): "latest_tds_wells", "depth_to_water_trend_wells", "water_well_summary", + "major_chemistry_results", + "minor_chemistry_wells", ): response = client.get(f"/ogcapi/collections/{collection_id}/items?limit=10") assert response.status_code == 200