Skip to content

Commit 83fe746

Browse files
jirhikerclaude
andcommitted
T12: add NM_Wells mirror/loader/OGC tests; fix CAST-unwrap bug (B1)
tests/test_nmw_mirror.py (19 tests) covers SPEC invariants V1 (18 mirror tables + PK), V2 (FK parent loads before child in NMW_MIRROR_SPECS), V3 (8 OGC views built), V5 (temperature-profile view materialized), V6 (geothermal pygeoapi collections back existing relations), V10 (DB-level FK constraints), and the SQL-dump value parser. Fixes B1/V11: _CAST_RE in transfers/nmw_sql_dump.py only matched AS-types without parentheses, so CAST(x AS nvarchar(10)) / CAST(n AS Decimal(18,2)) left the value as a literal "CAST(...)" string. Widened to allow one paren level in the type name. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent ccf566d commit 83fe746

3 files changed

Lines changed: 316 additions & 1 deletion

File tree

SPEC.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# SPEC — BDMS-826 NM_Wells migrations core
2+
3+
Distilled from branch `feature/BDMS-826-NMW-migrations-core` (PR #738), POC of #686.
4+
Status: Phase-1 impl landed in branch, NOT reviewed/tested. Spec captures built state + remaining work.
5+
6+
## §G — goal
7+
8+
1:1 staging mirror of legacy NM_Wells (SQL Server) into Postgres `NMW_*` tables.
9+
Plus geothermal OGC API layers over mirror data. Phase-1 only: faithful copy, no transform to Ocotillo model.
10+
11+
## §C — constraints
12+
13+
- C1. Mirror = column-for-column copy. Original col names, types, PKs preserved. `SSMA_TimeStamp` dropped.
14+
- C2. Phase split: P1 = land `NMW_*` mirror + OGC views (this branch). P2 = transform mirror → Ocotillo model (Location→Thing→Event→Sample→Observation). P2 NOT built; per-col targets + lexicon maps flagged inline.
15+
- C3. Follow `db/nma_legacy.py` (NM_Aquifer) mirror convention.
16+
- C4. Standalone orchestrator. Do NOT extend deprecated `transfers/transfer.py` (NM_Aquifer driver).
17+
- C5. Tables load parent→child (FK order). Mirror load truncate+COPY (dump) or INSERT ON CONFLICT DO NOTHING (CSV). No upsert.
18+
- C6. Coords/geom stay WGS84 4326 per project std.
19+
- C7. Migrations idempotent, reversible (alembic up/down).
20+
21+
## §I — interfaces
22+
23+
- I.cli — `python -m transfers.transfer_geothermal` — runs ref→lexicon then NMW mirror load then refresh matviews.
24+
- I.env — `NMW_SQL_DUMP` (dump path; else CSV), `NMW_CSV_DIR`, `TRANSFER_LIMIT`, `TRANSFER_GEOTHERMAL_REFERENCE` (def 1), `TRANSFER_NMW_MIRROR` (def 1). Export: `NMW_HOST/USER/PASSWORD/PORT/DATABASE`.
25+
- I.export — `transfers/export_nmw_csvs.py` — pymssql dump SQL Server → `transfers/data/nma_csv_cache/<table>.csv`.
26+
- I.db — 18 `NMW_*` mirror tables (`db/nmw_legacy.py`), PK verified vs dump DDL.
27+
- I.migrations — `c0d1e2f3a4b5` (tables+FK), `d1e2f3a4b5c6` (per-well views), `e2f3a4b5c6d7` (measurement views). Chain down_rev: t6u7v8w9x0y1 → c0 → d1 → e2.
28+
- I.ogc — 6 new collections in `core/pygeoapi-config.yml`: geothermal_wells_bht, geothermal_wells_temperature_profile (MATVIEW), bht_measurements, temp_depth_measurements, heat_flow, dst.
29+
- I.views — DB: ogc_geothermal_wells_bht, ogc_geothermal_wells_temperature_profile (MAT), ogc_geothermal_wells_summary_heat_flow, ogc_geothermal_wells_interval_heat_flow, ogc_bht_measurements, ogc_temp_depth_measurements, ogc_heat_flow, ogc_dst.
30+
- I.lexicon — `reference_lexicon_transfer.py` maps 46 `ref_*``nmw_ref_*` LexiconCategory + terms.
31+
32+
## §V — invariants
33+
34+
- V1. `NMW_*` cols match legacy NM_Wells DDL exactly (name+type+PK). No renames except dropped `SSMA_TimeStamp`.
35+
- V2. Mirror load respects parent→child order in `NMW_MIRROR_SPECS`; child never loads before parent.
36+
- V3. `alembic upgrade head` then `downgrade` cleanly creates+drops all 18 tables + 8 views, no orphans.
37+
- V4. Re-running mirror load is non-destructive at row level (truncate+COPY full reload OR ON CONFLICT skip) — no dup rows, no partial-state corruption.
38+
- V5. After mirror load, `ogc_geothermal_wells_temperature_profile` matview refreshed; stale matview never served.
39+
- V6. Each of 6 OGC collections resolves to existing backing view; pygeoapi config view name == migration view name.
40+
- V7. `core/pygeoapi.py` unchanged from staging (reviewer note: reverted to original).
41+
- V8. `NMW_WellRecords.SourceID` joined as TEXT (free-text citation), not numeric FK.
42+
- V9. P2 lexicon mapping complete: every coded NMW col either in `LEXICON_REF_BY_COLUMN` (28, has ref_*) or `LEXICON_CANDIDATES_NO_REF` (11, needs new enum).
43+
- V10. ORM `NMW_*` models declare index only (`index=True`, no ORM `ForeignKey`); FK enforcement lives in migration `c0d1e2f3a4b5` (`op.create_foreign_key`). Keep both in sync.
44+
- V11. SQL-dump parser unwraps `CAST(expr AS <type>)` for parameterised types too (`Decimal(18,2)`, `nvarchar(10)`); never store the literal `CAST(...)` string in a mirror column.
45+
46+
## §T — tasks
47+
48+
id|status|task|cites
49+
T1|x|18 NMW_* mirror tables in db/nmw_legacy.py|V1,I.db
50+
T2|x|migration c0d1e2f3a4b5 tables+FK|V3,I.migrations
51+
T3|x|migration d1e2f3a4b5c6 per-well OGC views|I.views
52+
T4|x|migration e2f3a4b5c6d7 measurement OGC views|I.views
53+
T5|x|nmw_sql_dump.py SSMS dump parser|I.cli
54+
T6|x|nmw_mirror_transfer.py loader (dump+CSV)|V2,V4,I.cli
55+
T7|x|reference_lexicon_transfer.py ref_*→lexicon|I.lexicon
56+
T8|x|export_nmw_csvs.py pymssql export|I.export
57+
T9|x|transfer_geothermal.py orchestrator|I.cli
58+
T10|x|6 OGC collections in pygeoapi-config.yml|V6,I.ogc
59+
T11|x|FK enforced via migration op.create_foreign_key; model index-only (resolved)|V2,V10
60+
T12|x|add NMW_* mirror/loader/migration/OGC tests (tests/test_nmw_mirror.py, 19 tests); found+fixed CAST-unwrap bug B1|V1,V2,V3,V5,V6,V10,V11
61+
T13|.|verify alembic down path drops all views+tables (V3) on real db|V3
62+
T14|.|run end-to-end load vs real dump, capture row counts per table|V2,V4
63+
T15|.|finish PR #738 body (truncated at "- I ") + reviewer notes|-
64+
T16|.|P2 (later): transform NMW_* → Ocotillo model; build new enums for 11 LEXICON_CANDIDATES_NO_REF|C2,V9
65+
T17|x|landed docs/nm_wells-migration.md (commit ccf566d9; force-add, docs/ gitignored). Referenced 4x: nmw_legacy.py:75,81,759; nmw_mirror_transfer.py:19|-
66+
67+
## §B — bugs
68+
69+
id|date|cause|fix
70+
B1|2026-06-23|_CAST_RE in transfers/nmw_sql_dump.py matched AS-type without parens only; parenthesised types (nvarchar(10), Decimal(18,2)) left value as literal "CAST(...)" string|V11; widened regex to allow one paren level

tests/test_nmw_mirror.py

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
# ===============================================================================
2+
# Copyright 2026 ross
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ===============================================================================
16+
"""Structural + unit tests for the NM_Wells Phase-1 staging mirror.
17+
18+
Covers SPEC §V invariants for the mirror schema, migrations and OGC views:
19+
20+
V1 - all 18 NMW_* mirror tables exist with a primary key
21+
V2 - mirror load order respects parent->child (FK parent precedes child)
22+
V3 - migrations build all 8 OGC views
23+
V5 - the temperature-profile OGC view is MATERIALIZED
24+
V6 - each geothermal pygeoapi collection maps to an existing DB relation
25+
V10 - FK enforcement lives in the migration (DB-level FK constraints exist)
26+
27+
Full data round-trip against a real SQL dump is out of scope here (SPEC §T.T14);
28+
the dump parser is unit-tested directly instead.
29+
"""
30+
31+
import os
32+
33+
import pytest
34+
import yaml
35+
from sqlalchemy import inspect as sa_inspect, text
36+
37+
from db.engine import engine, session_ctx
38+
from transfers.nmw_mirror_transfer import NMW_MIRROR_SPECS
39+
from transfers.nmw_sql_dump import _parse_value, iter_table_rows
40+
41+
ROOT = os.path.dirname(os.path.dirname(__file__))
42+
43+
# DB relations created by the OGC-view migrations (d1e2f3a4b5c6, e2f3a4b5c6d7).
44+
OGC_VIEWS = [
45+
"ogc_geothermal_wells_bht",
46+
"ogc_geothermal_wells_temperature_profile", # MATERIALIZED
47+
"ogc_geothermal_wells_summary_heat_flow",
48+
"ogc_geothermal_wells_interval_heat_flow",
49+
"ogc_bht_measurements",
50+
"ogc_temp_depth_measurements",
51+
"ogc_heat_flow",
52+
"ogc_dst",
53+
]
54+
MATERIALIZED_VIEW = "ogc_geothermal_wells_temperature_profile"
55+
56+
# pygeoapi collections added by this PR and the DB relation each is backed by.
57+
GEOTHERMAL_COLLECTIONS = {
58+
"geothermal_wells_bht": "ogc_geothermal_wells_bht",
59+
"geothermal_wells_temperature_profile": "ogc_geothermal_wells_temperature_profile",
60+
"bht_measurements": "ogc_bht_measurements",
61+
"temp_depth_measurements": "ogc_temp_depth_measurements",
62+
"heat_flow": "ogc_heat_flow",
63+
"dst": "ogc_dst",
64+
}
65+
66+
67+
def _mirror_tablenames() -> list[str]:
68+
return [spec.model.__tablename__ for spec in NMW_MIRROR_SPECS]
69+
70+
71+
# --------------------------------------------------------------------------- V1
72+
def test_all_mirror_tables_present_with_pk():
73+
"""All 18 NMW_* mirror tables exist in the schema, each with a PK (V1)."""
74+
names = _mirror_tablenames()
75+
assert len(names) == 18, f"expected 18 mirror specs, got {len(names)}"
76+
77+
insp = sa_inspect(engine)
78+
existing = set(insp.get_table_names())
79+
for table in names:
80+
assert table in existing, f"mirror table {table} missing from schema"
81+
pk = insp.get_pk_constraint(table)["constrained_columns"]
82+
assert pk, f"mirror table {table} has no primary key"
83+
84+
85+
def test_well_headers_pk_is_well_data_id():
86+
"""Spot-check that original SQL Server column names are preserved (V1)."""
87+
insp = sa_inspect(engine)
88+
pk = insp.get_pk_constraint("NMW_WellHeaders")["constrained_columns"]
89+
assert pk == ["WellDataID"]
90+
91+
92+
# ----------------------------------------------------------------------- V2/V10
93+
def test_mirror_tables_have_fk_constraints():
94+
"""The migration creates DB-level FK constraints (V10) - at least one
95+
child table must carry a foreign key."""
96+
insp = sa_inspect(engine)
97+
total_fks = sum(len(insp.get_foreign_keys(t)) for t in _mirror_tablenames())
98+
assert total_fks > 0, "no FK constraints found on NMW_* mirror tables"
99+
100+
101+
def test_fk_parent_loads_before_child():
102+
"""Every FK parent table is loaded before its child in NMW_MIRROR_SPECS so
103+
the parent row exists when the child is inserted (V2)."""
104+
order = {name: i for i, name in enumerate(_mirror_tablenames())}
105+
insp = sa_inspect(engine)
106+
checked = 0
107+
for child in order:
108+
for fk in insp.get_foreign_keys(child):
109+
parent = fk["referred_table"]
110+
if parent not in order or parent == child:
111+
continue # self-ref or FK to a non-mirror table
112+
checked += 1
113+
assert order[parent] <= order[child], (
114+
f"{parent} (parent) must load before {child} (child) "
115+
f"in NMW_MIRROR_SPECS"
116+
)
117+
assert checked > 0, "expected at least one intra-mirror FK to validate"
118+
119+
120+
# --------------------------------------------------------------------------- V3
121+
def test_ogc_views_exist():
122+
"""All 8 OGC views built by the migrations exist as relations (V3)."""
123+
with session_ctx() as session:
124+
rows = session.execute(
125+
text(
126+
"SELECT table_name FROM information_schema.tables "
127+
"WHERE table_schema = 'public' "
128+
"UNION SELECT matviewname FROM pg_matviews WHERE schemaname = 'public'"
129+
)
130+
).all()
131+
relations = {r[0] for r in rows}
132+
for view in OGC_VIEWS:
133+
assert view in relations, f"OGC view {view} missing"
134+
135+
136+
# --------------------------------------------------------------------------- V5
137+
def test_temperature_profile_is_materialized():
138+
"""The temperature-profile view is MATERIALIZED so it can be refreshed
139+
after a mirror load (V5)."""
140+
with session_ctx() as session:
141+
names = {
142+
r[0]
143+
for r in session.execute(
144+
text("SELECT matviewname FROM pg_matviews WHERE schemaname = 'public'")
145+
).all()
146+
}
147+
assert MATERIALIZED_VIEW in names, f"{MATERIALIZED_VIEW} is not a materialized view"
148+
149+
150+
# --------------------------------------------------------------------------- V6
151+
class _Default(dict):
152+
"""format_map() helper: unknown placeholders render empty."""
153+
154+
def __missing__(self, key): # noqa: D401
155+
return ""
156+
157+
158+
def _load_pygeoapi_config() -> dict:
159+
"""pygeoapi-config.yml is a ``{placeholder}`` template (see core/pygeoapi.py
160+
_write_config); substitute dummy values before parsing as YAML."""
161+
raw = open(os.path.join(ROOT, "core", "pygeoapi-config.yml")).read()
162+
rendered = raw.format_map(
163+
_Default(
164+
server_url="http://test",
165+
postgres_host="h",
166+
postgres_port="5432",
167+
postgres_db="d",
168+
postgres_user="u",
169+
postgres_password_env="p",
170+
thing_collections_block="",
171+
)
172+
)
173+
return yaml.safe_load(rendered)
174+
175+
176+
def test_geothermal_collections_back_existing_relations():
177+
"""Each new geothermal pygeoapi collection points at a DB relation that
178+
actually exists (V6)."""
179+
cfg = _load_pygeoapi_config()
180+
resources = cfg["resources"]
181+
182+
with session_ctx() as session:
183+
rows = session.execute(
184+
text(
185+
"SELECT table_name FROM information_schema.tables "
186+
"WHERE table_schema = 'public' "
187+
"UNION SELECT matviewname FROM pg_matviews WHERE schemaname = 'public'"
188+
)
189+
).all()
190+
relations = {r[0] for r in rows}
191+
192+
for coll, expected_table in GEOTHERMAL_COLLECTIONS.items():
193+
assert coll in resources, f"collection {coll} missing from pygeoapi config"
194+
tables = {
195+
p.get("table")
196+
for p in resources[coll].get("providers", [])
197+
if p.get("table")
198+
}
199+
assert (
200+
expected_table in tables
201+
), f"collection {coll} should be backed by {expected_table}, got {tables}"
202+
assert (
203+
expected_table in relations
204+
), f"backing relation {expected_table} for {coll} does not exist in DB"
205+
206+
207+
# ----------------------------------------------------------------- dump parser
208+
@pytest.mark.parametrize(
209+
"raw,expected",
210+
[
211+
("NULL", None),
212+
("null", None),
213+
("123", 123),
214+
("-5", -5),
215+
("-1.5", -1.5),
216+
("'abc'", "abc"),
217+
("N'abc'", "abc"),
218+
("'O''Brien'", "O'Brien"), # doubled '' unescaped
219+
("CAST(42 AS int)", 42),
220+
("CAST(N'x' AS nvarchar(10))", "x"),
221+
("0xDEADBEEF", None), # binary / rowversion not mirrored
222+
],
223+
)
224+
def test_parse_value_coercion(raw, expected):
225+
assert _parse_value(raw) == expected
226+
227+
228+
def test_iter_table_rows_parses_inserts(tmp_path):
229+
"""iter_table_rows decodes column/value pairs from SSMS INSERT statements."""
230+
dump = tmp_path / "dump.sql"
231+
dump.write_text(
232+
"INSERT [dbo].[tbl_demo] ([OBJECTID], [Name], [Note]) "
233+
"VALUES (1, N'alpha', NULL), (2, 'beta', CAST(N'c' AS nvarchar(1)));\n",
234+
encoding="utf-8",
235+
)
236+
rows = list(iter_table_rows(str(dump), "tbl_demo"))
237+
assert rows == [
238+
{"OBJECTID": 1, "Name": "alpha", "Note": None},
239+
{"OBJECTID": 2, "Name": "beta", "Note": "c"},
240+
]
241+
242+
243+
# ============= EOF =============================================

transfers/nmw_sql_dump.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ def _iter_value_groups(s: str) -> Iterator[str]:
122122
i += 1
123123

124124

125-
_CAST_RE = re.compile(r"(?is)^CAST\s*\((.*)\s+AS\s+[^)]+\)$")
125+
# The AS-target type may itself be parameterised, e.g. CAST(1.50 AS Decimal(18, 2))
126+
# or CAST(N'x' AS nvarchar(10)); allow one level of parens in the type name.
127+
_CAST_RE = re.compile(r"(?is)^CAST\s*\((.*)\s+AS\s+[^()]+(?:\([^)]*\))?\s*\)$")
126128

127129

128130
def _parse_value(tok: str):

0 commit comments

Comments
 (0)