Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ GOOGLE_APPLICATION_CREDENTIALS=/path/to/gcs_credentials.json
# set to development for lexicon and parameter to be populated and enable the enums to work
MODE=development

# pg_cron nightly materialized-view refresh (PRODUCTION ONLY).
# Leave unset/0 in development, test, and CI: the dev Postgres image does not
# load pg_cron, and alembic migration x2y3z4a5b6c7 is a no-op when this is off.
# Set to 1 in production (DB server has shared_preload_libraries=pg_cron) to
# register the nightly refresh job. See docs/pg_cron-nightly-refresh.md.
# ENABLE_PG_CRON=0

# disable authentication (for development only)
AUTHENTIK_DISABLE_AUTHENTICATION=1

Expand Down
7 changes: 6 additions & 1 deletion .github/workflows/CD_production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ jobs:
CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}"
CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}"
CLOUD_SQL_IAM_AUTH: true
# Register the nightly pg_cron materialized-view refresh job.
# Requires the Cloud SQL instance flag cloudsql.enable_pg_cron=on and
# cron.database_name set to CLOUD_SQL_DATABASE. See
# docs/pg_cron-nightly-refresh.md.
ENABLE_PG_CRON: "1"
run: |
uv run --no-dev alembic upgrade head

Expand All @@ -101,7 +106,7 @@ jobs:
CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}"
CLOUD_SQL_IAM_AUTH: true
run: |
uv run --no-dev python -m cli.cli refresh-pygeoapi-materialized-views
uv run --no-dev python -m cli.cli refresh-materialized-views

- name: Ensure envsubst is available
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/CD_staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}"
CLOUD_SQL_IAM_AUTH: true
run: |
uv run --no-dev python -m cli.cli refresh-pygeoapi-materialized-views
uv run --no-dev python -m cli.cli refresh-materialized-views

- name: Ensure envsubst is available
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/CD_testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}"
CLOUD_SQL_IAM_AUTH: true
run: |
uv run --no-dev python -m cli.cli refresh-pygeoapi-materialized-views
uv run --no-dev python -m cli.cli refresh-materialized-views

- name: Ensure envsubst is available
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
down_revision: Union[str, Sequence[str], None] = "c4d5e6f7a8b9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
REFRESH_FUNCTION_NAME = "refresh_pygeoapi_materialized_views"
REFRESH_FUNCTION_NAME = "refresh_materialized_views"

THING_COLLECTIONS = [
("water_wells", "water well"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""schedule nightly materialized-view refresh via pg_cron

Registers a pg_cron job that refreshes the materialized views once a
night. The job calls a SQL helper function,
``public.refresh_materialized_views()``, which discovers every
materialized view in the public schema from the catalog at run time -- so
this migration stays immutable and self-contained, and views added by
later migrations are refreshed without any rescheduling.

pg_cron is a *production-only* dependency. It requires the extension to be
loaded via ``shared_preload_libraries`` on the database server, which the
development docker-compose Postgres image does not do. To avoid breaking
``alembic upgrade head`` in development (and in test/CI), this migration is a
no-op unless ``ENABLE_PG_CRON`` is truthy in the environment. Production sets
``ENABLE_PG_CRON=1``; everywhere else the migration records itself as applied
without touching pg_cron. See ``docs/pg_cron-nightly-refresh.md``.

Revision ID: x2y3z4a5b6c7
Revises: w1x2y3z4a5b6
Create Date: 2026-06-17 00:00:00.000000
"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text

from services.env import get_bool_env

# revision identifiers, used by Alembic.
revision: str = "x2y3z4a5b6c7"
down_revision: Union[str, Sequence[str], None] = "w1x2y3z4a5b6"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

# Name of the pg_cron job. Used to (re)register and to unschedule.
CRON_JOB_NAME = "refresh-materialized-views"

# Nightly schedule in standard cron syntax. pg_cron interprets this in the
# database server's timezone (UTC on Cloud SQL), so 09:00 UTC is roughly
# 02:00-03:00 in US Mountain time -- comfortably off-peak.
CRON_SCHEDULE = "0 9 * * *"


# Helper function the cron job calls. It discovers every materialized view in
# the public schema from the catalog at run time rather than from a baked-in
# list. This keeps the migration immutable and self-contained -- it does not
# depend on mutable application code, and views added by later migrations are
# picked up automatically without rescheduling.
#
# Plain (non-concurrent) REFRESH is used deliberately: REFRESH ... CONCURRENTLY
# cannot run inside the implicit transaction of a PL/pgSQL function, and the
# nightly window tolerates the brief exclusive lock.
_REFRESH_FUNCTION_SQL = r"""
CREATE OR REPLACE FUNCTION public.refresh_materialized_views()
RETURNS void
LANGUAGE plpgsql
AS $func$
DECLARE
r record;
BEGIN
FOR r IN
SELECT matviewname
FROM pg_matviews
WHERE schemaname = 'public'
ORDER BY matviewname
LOOP
EXECUTE format('REFRESH MATERIALIZED VIEW %I', r.matviewname);
END LOOP;
END;
$func$;
"""


def _pg_cron_enabled() -> bool:
"""pg_cron is only wired up where the server explicitly enables it."""
return get_bool_env("ENABLE_PG_CRON", False) is True


def upgrade() -> None:
if not _pg_cron_enabled():
print(
"ENABLE_PG_CRON is not set; skipping pg_cron job registration "
"(expected in development, test, and CI)."
)
return

bind = op.get_bind()

# Requires shared_preload_libraries to include 'pg_cron' and the extension
# to be creatable in this database (cron.database_name = this DB). See docs.
op.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))

# (Re)create the refresh helper.
op.execute(text(_REFRESH_FUNCTION_SQL))

# Drop any previously registered job with the same name so re-running this
# migration (or a re-deploy) does not accumulate duplicate schedules.
op.execute(
text(
"SELECT cron.unschedule(jobid) FROM cron.job " "WHERE jobname = :name"
).bindparams(name=CRON_JOB_NAME)
)

bind.execute(
text("SELECT cron.schedule(:name, :sched, :cmd)").bindparams(
name=CRON_JOB_NAME,
sched=CRON_SCHEDULE,
cmd="SELECT public.refresh_materialized_views();",
)
)

print(
f"Registered pg_cron job '{CRON_JOB_NAME}' "
f"(schedule '{CRON_SCHEDULE}', server timezone)."
)


def downgrade() -> None:
if not _pg_cron_enabled():
print("ENABLE_PG_CRON is not set; nothing to unschedule.")
return

op.execute(
text(
"SELECT cron.unschedule(jobid) FROM cron.job " "WHERE jobname = :name"
).bindparams(name=CRON_JOB_NAME)
)
op.execute(text("DROP FUNCTION IF EXISTS public.refresh_materialized_views()"))
# The pg_cron extension itself is left installed: it is a server-level
# capability that other jobs may depend on, and dropping it is not the
# inverse of "schedule a job".
21 changes: 6 additions & 15 deletions cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import typer
from dotenv import load_dotenv

from services.materialized_views import MATERIALIZED_VIEWS

# CLI should load `.env` defaults without clobbering an explicitly prepared environment.
load_dotenv(override=False)
os.environ.setdefault("OCO_LOG_CONTEXT", "cli")
Expand All @@ -50,17 +52,6 @@ class SmokePopulation(str, Enum):
agreed = "agreed"


PYGEOAPI_MATERIALIZED_VIEWS = (
"ogc_latest_depth_to_water_wells",
"ogc_water_elevation_wells",
"ogc_avg_tds_wells",
"ogc_depth_to_water_trend_wells",
"ogc_water_well_summary",
"ogc_major_chemistry_results",
"ogc_minor_chemistry_wells",
)


def _resolve_theme(theme: ThemeMode) -> ThemeMode:
if theme != ThemeMode.auto:
return theme
Expand Down Expand Up @@ -1095,14 +1086,14 @@ def alembic_upgrade_and_data(
typer.echo(f"applied {len(ran)} migration(s)")


@cli.command("refresh-pygeoapi-materialized-views")
def refresh_pygeoapi_materialized_views(
@cli.command("refresh-materialized-views")
def refresh_materialized_views(
view: list[str] = typer.Option(
None,
"--view",
help=(
"Materialized view name(s) to refresh. Repeat --view for multiple. "
"Defaults to all pygeoapi materialized views."
"Defaults to all materialized views."
),
),
concurrently: bool = typer.Option(
Expand All @@ -1115,7 +1106,7 @@ def refresh_pygeoapi_materialized_views(

from db.engine import engine, session_ctx

target_views = tuple(view) if view else PYGEOAPI_MATERIALIZED_VIEWS
target_views = tuple(view) if view else MATERIALIZED_VIEWS
# Validate all view names before opening any DB connections or sessions.
safe_views = tuple(_validate_sql_identifier(v) for v in target_views)

Expand Down
92 changes: 92 additions & 0 deletions docs/pg_cron-nightly-refresh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Nightly materialized-view refresh with pg_cron

Every materialized view in the database (the `ogc_*` views and
`transducer_daily_data`) is refreshed once a night in production by a
[pg_cron](https://github.com/citusdata/pg_cron) job.

## What is registered, and where

Alembic migration
[`x2y3z4a5b6c7_schedule_nightly_matview_refresh_pg_cron.py`](../alembic/versions/x2y3z4a5b6c7_schedule_nightly_matview_refresh_pg_cron.py)
registers everything, so the schedule is traceable in version control:

- A SQL helper, `public.refresh_materialized_views()`, that discovers
every materialized view in the public schema from the catalog at run time and
runs `REFRESH MATERIALIZED VIEW` for each (plain, non-concurrent — see note).
- A pg_cron job named `refresh-materialized-views` that runs
`SELECT public.refresh_materialized_views();` on the schedule
`0 9 * * *` (09:00 in the **server timezone**, UTC on Cloud SQL —
roughly 02:00–03:00 US Mountain).

The helper refreshes whatever materialized views exist, so a view added by a
later migration is picked up automatically — there is nothing to keep in sync
and no need to reschedule. (The `oco refresh-materialized-views` CLI
command, used for manual/on-deploy refreshes, keeps an explicit list in
[`services/materialized_views.py`](../services/materialized_views.py).)
To change the schedule, edit the migration (or add a new one). Do not edit the
job in the database by hand, or it will drift from the repo.

## Why it is gated by `ENABLE_PG_CRON`

pg_cron is a **production-only** dependency. It must be loaded through the
server's `shared_preload_libraries`, which the development docker-compose
Postgres image (`postgis/postgis:17-3.5`) does not do. Running
`CREATE EXTENSION pg_cron` without that preload fails.

So the migration is a **no-op unless `ENABLE_PG_CRON` is truthy**:

- Development, test, CI, **and staging**: `ENABLE_PG_CRON` unset → migration
prints a skip message and records itself as applied. `alembic upgrade head`
works on the stock dev image with nothing extra installed. Staging refreshes
the views on each deploy instead (the "Refresh materialized views" CD step),
so it does not need the nightly job.
- Production: `ENABLE_PG_CRON=1` → migration creates the extension, the helper
function, and the cron job. Only `CD_production.yml` sets this.

## Production setup (Google Cloud SQL)

Production runs on Cloud SQL, where pg_cron is enabled with an instance flag
(the `docker/db/Dockerfile` image is development-only and does not load pg_cron):

1. Set the flag `cloudsql.enable_pg_cron=on` and
`cron.database_name=<application database>`, then restart the instance.
2. Deploy with `ENABLE_PG_CRON=1` (already set on the migration step in
`CD_production.yml`) so the migration registers the job.

`cron.database_name` must match the application database so the alembic
migration (which connects to that database) can `CREATE EXTENSION pg_cron` and
`cron.schedule(...)` locally.

## Verifying

```sql
-- the registered job
SELECT jobid, jobname, schedule, command, active FROM cron.job
WHERE jobname = 'refresh-materialized-views';

-- recent run history
SELECT status, start_time, end_time, return_message
FROM cron.job_run_details
WHERE jobid = (SELECT jobid FROM cron.job
WHERE jobname = 'refresh-materialized-views')
ORDER BY start_time DESC LIMIT 5;
```

## Manual / ad-hoc refresh

Independent of the cron job, the views can be refreshed on demand with the CLI
(also useful in development, where the cron job does not exist):

```bash
oco refresh-materialized-views # all views, plain
oco refresh-materialized-views --concurrently # no read lock
```

### Note on non-concurrent REFRESH

The cron helper uses plain `REFRESH MATERIALIZED VIEW`, not `CONCURRENTLY`,
because `REFRESH ... CONCURRENTLY` cannot run inside the implicit transaction of
a PL/pgSQL function. Plain refresh takes a brief exclusive lock on each view,
which is acceptable in the off-peak nightly window. The CLI still offers
`--concurrently` for daytime manual refreshes (every view has the required
unique index).
19 changes: 19 additions & 0 deletions services/materialized_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Curated materialized-view list for the CLI refresh command.

``oco refresh-materialized-views`` refreshes these views (in order)
by default. The nightly pg_cron job does NOT use this list -- its SQL helper
discovers every materialized view from the catalog at run time (see alembic
migration ``x2y3z4a5b6c7``) to stay immutable and self-contained.
"""

# Order is the order views are refreshed in.
MATERIALIZED_VIEWS: tuple[str, ...] = (
"ogc_latest_depth_to_water_wells",
"ogc_water_elevation_wells",
"ogc_avg_tds_wells",
"ogc_depth_to_water_trend_wells",
"ogc_water_well_summary",
"ogc_major_chemistry_results",
"ogc_minor_chemistry_wells",
"transducer_daily_data",
)
Loading
Loading