diff --git a/.env.example b/.env.example index 27f624d4c..1645fa31c 100644 --- a/.env.example +++ b/.env.example @@ -48,6 +48,13 @@ GOOGLE_APPLICATION_CREDENTIALS=/path/to/gcs_credentials.json # set to development for lexicon and parameter to be populated and enable the enums to work MODE=development +# pg_cron nightly materialized-view refresh (PRODUCTION ONLY). +# Leave unset/0 in development, test, and CI: the dev Postgres image does not +# load pg_cron, and alembic migration x2y3z4a5b6c7 is a no-op when this is off. +# Set to 1 in production (DB server has shared_preload_libraries=pg_cron) to +# register the nightly refresh job. See docs/pg_cron-nightly-refresh.md. +# ENABLE_PG_CRON=0 + # disable authentication (for development only) AUTHENTIK_DISABLE_AUTHENTICATION=1 diff --git a/.github/workflows/CD_production.yml b/.github/workflows/CD_production.yml index 155bd1db1..1ade7f251 100644 --- a/.github/workflows/CD_production.yml +++ b/.github/workflows/CD_production.yml @@ -90,6 +90,11 @@ jobs: CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" CLOUD_SQL_IAM_AUTH: true + # Register the nightly pg_cron materialized-view refresh job. + # Requires the Cloud SQL instance flag cloudsql.enable_pg_cron=on and + # cron.database_name set to CLOUD_SQL_DATABASE. See + # docs/pg_cron-nightly-refresh.md. + ENABLE_PG_CRON: "1" run: | uv run --no-dev alembic upgrade head @@ -101,7 +106,7 @@ jobs: CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" CLOUD_SQL_IAM_AUTH: true run: | - uv run --no-dev python -m cli.cli refresh-pygeoapi-materialized-views + uv run --no-dev python -m cli.cli refresh-materialized-views - name: Ensure envsubst is available run: | diff --git a/.github/workflows/CD_staging.yml b/.github/workflows/CD_staging.yml index 047237d9d..e55c6f2a4 100644 --- a/.github/workflows/CD_staging.yml +++ b/.github/workflows/CD_staging.yml @@ -66,7 +66,7 @@ jobs: CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" CLOUD_SQL_IAM_AUTH: true run: | - uv run --no-dev python -m cli.cli refresh-pygeoapi-materialized-views + uv run --no-dev python -m cli.cli refresh-materialized-views - name: Ensure envsubst is available run: | diff --git a/.github/workflows/CD_testing.yml b/.github/workflows/CD_testing.yml index 66c96a2ce..64e15443e 100644 --- a/.github/workflows/CD_testing.yml +++ b/.github/workflows/CD_testing.yml @@ -66,7 +66,7 @@ jobs: CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" CLOUD_SQL_IAM_AUTH: true run: | - uv run --no-dev python -m cli.cli refresh-pygeoapi-materialized-views + uv run --no-dev python -m cli.cli refresh-materialized-views - name: Ensure envsubst is available run: | diff --git a/alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py b/alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py index 60d03fc04..d8e12b2bc 100644 --- a/alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py +++ b/alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py @@ -16,7 +16,7 @@ down_revision: Union[str, Sequence[str], None] = "c4d5e6f7a8b9" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None -REFRESH_FUNCTION_NAME = "refresh_pygeoapi_materialized_views" +REFRESH_FUNCTION_NAME = "refresh_materialized_views" THING_COLLECTIONS = [ ("water_wells", "water well"), diff --git a/alembic/versions/x2y3z4a5b6c7_schedule_nightly_matview_refresh_pg_cron.py b/alembic/versions/x2y3z4a5b6c7_schedule_nightly_matview_refresh_pg_cron.py new file mode 100644 index 000000000..0e50fdef9 --- /dev/null +++ b/alembic/versions/x2y3z4a5b6c7_schedule_nightly_matview_refresh_pg_cron.py @@ -0,0 +1,132 @@ +"""schedule nightly materialized-view refresh via pg_cron + +Registers a pg_cron job that refreshes the materialized views once a +night. The job calls a SQL helper function, +``public.refresh_materialized_views()``, which discovers every +materialized view in the public schema from the catalog at run time -- so +this migration stays immutable and self-contained, and views added by +later migrations are refreshed without any rescheduling. + +pg_cron is a *production-only* dependency. It requires the extension to be +loaded via ``shared_preload_libraries`` on the database server, which the +development docker-compose Postgres image does not do. To avoid breaking +``alembic upgrade head`` in development (and in test/CI), this migration is a +no-op unless ``ENABLE_PG_CRON`` is truthy in the environment. Production sets +``ENABLE_PG_CRON=1``; everywhere else the migration records itself as applied +without touching pg_cron. See ``docs/pg_cron-nightly-refresh.md``. + +Revision ID: x2y3z4a5b6c7 +Revises: w1x2y3z4a5b6 +Create Date: 2026-06-17 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import text + +from services.env import get_bool_env + +# revision identifiers, used by Alembic. +revision: str = "x2y3z4a5b6c7" +down_revision: Union[str, Sequence[str], None] = "w1x2y3z4a5b6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +# Name of the pg_cron job. Used to (re)register and to unschedule. +CRON_JOB_NAME = "refresh-materialized-views" + +# Nightly schedule in standard cron syntax. pg_cron interprets this in the +# database server's timezone (UTC on Cloud SQL), so 09:00 UTC is roughly +# 02:00-03:00 in US Mountain time -- comfortably off-peak. +CRON_SCHEDULE = "0 9 * * *" + + +# Helper function the cron job calls. It discovers every materialized view in +# the public schema from the catalog at run time rather than from a baked-in +# list. This keeps the migration immutable and self-contained -- it does not +# depend on mutable application code, and views added by later migrations are +# picked up automatically without rescheduling. +# +# Plain (non-concurrent) REFRESH is used deliberately: REFRESH ... CONCURRENTLY +# cannot run inside the implicit transaction of a PL/pgSQL function, and the +# nightly window tolerates the brief exclusive lock. +_REFRESH_FUNCTION_SQL = r""" +CREATE OR REPLACE FUNCTION public.refresh_materialized_views() +RETURNS void +LANGUAGE plpgsql +AS $func$ +DECLARE + r record; +BEGIN + FOR r IN + SELECT matviewname + FROM pg_matviews + WHERE schemaname = 'public' + ORDER BY matviewname + LOOP + EXECUTE format('REFRESH MATERIALIZED VIEW %I', r.matviewname); + END LOOP; +END; +$func$; +""" + + +def _pg_cron_enabled() -> bool: + """pg_cron is only wired up where the server explicitly enables it.""" + return get_bool_env("ENABLE_PG_CRON", False) is True + + +def upgrade() -> None: + if not _pg_cron_enabled(): + print( + "ENABLE_PG_CRON is not set; skipping pg_cron job registration " + "(expected in development, test, and CI)." + ) + return + + bind = op.get_bind() + + # Requires shared_preload_libraries to include 'pg_cron' and the extension + # to be creatable in this database (cron.database_name = this DB). See docs. + op.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) + + # (Re)create the refresh helper. + op.execute(text(_REFRESH_FUNCTION_SQL)) + + # Drop any previously registered job with the same name so re-running this + # migration (or a re-deploy) does not accumulate duplicate schedules. + op.execute( + text( + "SELECT cron.unschedule(jobid) FROM cron.job " "WHERE jobname = :name" + ).bindparams(name=CRON_JOB_NAME) + ) + + bind.execute( + text("SELECT cron.schedule(:name, :sched, :cmd)").bindparams( + name=CRON_JOB_NAME, + sched=CRON_SCHEDULE, + cmd="SELECT public.refresh_materialized_views();", + ) + ) + + print( + f"Registered pg_cron job '{CRON_JOB_NAME}' " + f"(schedule '{CRON_SCHEDULE}', server timezone)." + ) + + +def downgrade() -> None: + if not _pg_cron_enabled(): + print("ENABLE_PG_CRON is not set; nothing to unschedule.") + return + + op.execute( + text( + "SELECT cron.unschedule(jobid) FROM cron.job " "WHERE jobname = :name" + ).bindparams(name=CRON_JOB_NAME) + ) + op.execute(text("DROP FUNCTION IF EXISTS public.refresh_materialized_views()")) + # The pg_cron extension itself is left installed: it is a server-level + # capability that other jobs may depend on, and dropping it is not the + # inverse of "schedule a job". diff --git a/cli/cli.py b/cli/cli.py index 30c9742f6..14c8f9470 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -24,6 +24,8 @@ import typer from dotenv import load_dotenv +from services.materialized_views import MATERIALIZED_VIEWS + # CLI should load `.env` defaults without clobbering an explicitly prepared environment. load_dotenv(override=False) os.environ.setdefault("OCO_LOG_CONTEXT", "cli") @@ -50,17 +52,6 @@ class SmokePopulation(str, Enum): agreed = "agreed" -PYGEOAPI_MATERIALIZED_VIEWS = ( - "ogc_latest_depth_to_water_wells", - "ogc_water_elevation_wells", - "ogc_avg_tds_wells", - "ogc_depth_to_water_trend_wells", - "ogc_water_well_summary", - "ogc_major_chemistry_results", - "ogc_minor_chemistry_wells", -) - - def _resolve_theme(theme: ThemeMode) -> ThemeMode: if theme != ThemeMode.auto: return theme @@ -1095,14 +1086,14 @@ def alembic_upgrade_and_data( typer.echo(f"applied {len(ran)} migration(s)") -@cli.command("refresh-pygeoapi-materialized-views") -def refresh_pygeoapi_materialized_views( +@cli.command("refresh-materialized-views") +def refresh_materialized_views( view: list[str] = typer.Option( None, "--view", help=( "Materialized view name(s) to refresh. Repeat --view for multiple. " - "Defaults to all pygeoapi materialized views." + "Defaults to all materialized views." ), ), concurrently: bool = typer.Option( @@ -1115,7 +1106,7 @@ def refresh_pygeoapi_materialized_views( from db.engine import engine, session_ctx - target_views = tuple(view) if view else PYGEOAPI_MATERIALIZED_VIEWS + target_views = tuple(view) if view else MATERIALIZED_VIEWS # Validate all view names before opening any DB connections or sessions. safe_views = tuple(_validate_sql_identifier(v) for v in target_views) diff --git a/docs/pg_cron-nightly-refresh.md b/docs/pg_cron-nightly-refresh.md new file mode 100644 index 000000000..8c82a9edd --- /dev/null +++ b/docs/pg_cron-nightly-refresh.md @@ -0,0 +1,92 @@ +# Nightly materialized-view refresh with pg_cron + +Every materialized view in the database (the `ogc_*` views and +`transducer_daily_data`) is refreshed once a night in production by a +[pg_cron](https://github.com/citusdata/pg_cron) job. + +## What is registered, and where + +Alembic migration +[`x2y3z4a5b6c7_schedule_nightly_matview_refresh_pg_cron.py`](../alembic/versions/x2y3z4a5b6c7_schedule_nightly_matview_refresh_pg_cron.py) +registers everything, so the schedule is traceable in version control: + +- A SQL helper, `public.refresh_materialized_views()`, that discovers + every materialized view in the public schema from the catalog at run time and + runs `REFRESH MATERIALIZED VIEW` for each (plain, non-concurrent — see note). +- A pg_cron job named `refresh-materialized-views` that runs + `SELECT public.refresh_materialized_views();` on the schedule + `0 9 * * *` (09:00 in the **server timezone**, UTC on Cloud SQL — + roughly 02:00–03:00 US Mountain). + +The helper refreshes whatever materialized views exist, so a view added by a +later migration is picked up automatically — there is nothing to keep in sync +and no need to reschedule. (The `oco refresh-materialized-views` CLI +command, used for manual/on-deploy refreshes, keeps an explicit list in +[`services/materialized_views.py`](../services/materialized_views.py).) +To change the schedule, edit the migration (or add a new one). Do not edit the +job in the database by hand, or it will drift from the repo. + +## Why it is gated by `ENABLE_PG_CRON` + +pg_cron is a **production-only** dependency. It must be loaded through the +server's `shared_preload_libraries`, which the development docker-compose +Postgres image (`postgis/postgis:17-3.5`) does not do. Running +`CREATE EXTENSION pg_cron` without that preload fails. + +So the migration is a **no-op unless `ENABLE_PG_CRON` is truthy**: + +- Development, test, CI, **and staging**: `ENABLE_PG_CRON` unset → migration + prints a skip message and records itself as applied. `alembic upgrade head` + works on the stock dev image with nothing extra installed. Staging refreshes + the views on each deploy instead (the "Refresh materialized views" CD step), + so it does not need the nightly job. +- Production: `ENABLE_PG_CRON=1` → migration creates the extension, the helper + function, and the cron job. Only `CD_production.yml` sets this. + +## Production setup (Google Cloud SQL) + +Production runs on Cloud SQL, where pg_cron is enabled with an instance flag +(the `docker/db/Dockerfile` image is development-only and does not load pg_cron): + +1. Set the flag `cloudsql.enable_pg_cron=on` and + `cron.database_name=`, then restart the instance. +2. Deploy with `ENABLE_PG_CRON=1` (already set on the migration step in + `CD_production.yml`) so the migration registers the job. + +`cron.database_name` must match the application database so the alembic +migration (which connects to that database) can `CREATE EXTENSION pg_cron` and +`cron.schedule(...)` locally. + +## Verifying + +```sql +-- the registered job +SELECT jobid, jobname, schedule, command, active FROM cron.job + WHERE jobname = 'refresh-materialized-views'; + +-- recent run history +SELECT status, start_time, end_time, return_message + FROM cron.job_run_details + WHERE jobid = (SELECT jobid FROM cron.job + WHERE jobname = 'refresh-materialized-views') + ORDER BY start_time DESC LIMIT 5; +``` + +## Manual / ad-hoc refresh + +Independent of the cron job, the views can be refreshed on demand with the CLI +(also useful in development, where the cron job does not exist): + +```bash +oco refresh-materialized-views # all views, plain +oco refresh-materialized-views --concurrently # no read lock +``` + +### Note on non-concurrent REFRESH + +The cron helper uses plain `REFRESH MATERIALIZED VIEW`, not `CONCURRENTLY`, +because `REFRESH ... CONCURRENTLY` cannot run inside the implicit transaction of +a PL/pgSQL function. Plain refresh takes a brief exclusive lock on each view, +which is acceptable in the off-peak nightly window. The CLI still offers +`--concurrently` for daytime manual refreshes (every view has the required +unique index). diff --git a/services/materialized_views.py b/services/materialized_views.py new file mode 100644 index 000000000..ec1ae7103 --- /dev/null +++ b/services/materialized_views.py @@ -0,0 +1,19 @@ +"""Curated materialized-view list for the CLI refresh command. + +``oco refresh-materialized-views`` refreshes these views (in order) +by default. The nightly pg_cron job does NOT use this list -- its SQL helper +discovers every materialized view from the catalog at run time (see alembic +migration ``x2y3z4a5b6c7``) to stay immutable and self-contained. +""" + +# Order is the order views are refreshed in. +MATERIALIZED_VIEWS: tuple[str, ...] = ( + "ogc_latest_depth_to_water_wells", + "ogc_water_elevation_wells", + "ogc_avg_tds_wells", + "ogc_depth_to_water_trend_wells", + "ogc_water_well_summary", + "ogc_major_chemistry_results", + "ogc_minor_chemistry_wells", + "transducer_daily_data", +) diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 5953c0f2e..f64a81306 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -38,7 +38,7 @@ from db.engine import session_ctx -def test_refresh_pygeoapi_materialized_views_defaults(monkeypatch): +def test_refresh_materialized_views_defaults(monkeypatch): executed_sql: list[str] = [] commit_called = {"value": False} @@ -59,7 +59,7 @@ def __exit__(self, exc_type, exc, tb): monkeypatch.setattr("db.engine.session_ctx", lambda: _FakeCtx()) runner = CliRunner() - result = runner.invoke(cli, ["refresh-pygeoapi-materialized-views"]) + result = runner.invoke(cli, ["refresh-materialized-views"]) assert result.exit_code == 0, result.output assert executed_sql == [ @@ -70,12 +70,13 @@ def __exit__(self, exc_type, exc, tb): "REFRESH MATERIALIZED VIEW ogc_water_well_summary", "REFRESH MATERIALIZED VIEW ogc_major_chemistry_results", "REFRESH MATERIALIZED VIEW ogc_minor_chemistry_wells", + "REFRESH MATERIALIZED VIEW transducer_daily_data", ] assert commit_called["value"] is True - assert "Refreshed 7 materialized view(s)." in result.output + assert "Refreshed 8 materialized view(s)." in result.output -def test_refresh_pygeoapi_materialized_views_custom_and_concurrently( +def test_refresh_materialized_views_custom_and_concurrently( monkeypatch, ): executed_sql: list[str] = [] @@ -105,7 +106,7 @@ def connect(self): result = runner.invoke( cli, [ - "refresh-pygeoapi-materialized-views", + "refresh-materialized-views", "--view", "ogc_avg_tds_wells", "--concurrently", @@ -119,12 +120,12 @@ def connect(self): ] -def test_refresh_pygeoapi_materialized_views_rejects_invalid_identifier(): +def test_refresh_materialized_views_rejects_invalid_identifier(): runner = CliRunner() result = runner.invoke( cli, [ - "refresh-pygeoapi-materialized-views", + "refresh-materialized-views", "--view", "ogc_avg_tds_wells;drop table thing", ],