From aec217b642c1894e5c7eb68700231c6c6cc3f8dd Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 11 Nov 2025 14:42:13 -0700 Subject: [PATCH 01/13] feat: at mp height & description to well transfer --- schemas/thing.py | 7 ++++++- transfers/well_transfer.py | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/schemas/thing.py b/schemas/thing.py index b700ac9cd..78162aa20 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -30,9 +30,10 @@ class ValidateWell(BaseModel): well_depth: float | None = None # in feet hole_depth: float | None = None # in feet well_casing_depth: float | None = None # in feet + measuring_point_height: float | None = None # in feet @model_validator(mode="after") - def check_depths(self): + def validate_values(self): if ( self.hole_depth is not None and self.well_depth is not None @@ -99,6 +100,10 @@ class CreateWell(CreateBaseThing, ValidateWell): default=None, gt=0, description="Well casing depth in feet" ) well_casing_materials: list[CasingMaterial] | None = None + measuring_point_height: float = Field( + ge=0, description="Measuring point height in feet" + ) + measuring_point_description: str class CreateSpring(CreateBaseThing): diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 389439292..a1278c9b5 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -198,9 +198,13 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None hole_depth=row.HoleDepth, well_depth=row.WellDepth, well_construction_notes=row.ConstructionNotes, - well_casing_diameter=row.CasingDiameter, + well_casing_diameter=( + row.CasingDiameter * 12 if row.CasingDiameter else None + ), well_casing_depth=row.CasingDepth, release_status="public" if row.PublicRelease else "private", + measuring_point_height=row.MPHeight, + measuring_point_description=row.MeasuringPoint, ) CreateWell.model_validate(data) From 505ae6ea564a366be1e0c44142fa15b9fe17a48c Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 11 Nov 2025 16:21:39 -0700 Subject: [PATCH 02/13] feat: add well status and monitoring status to well transfer --- transfers/well_transfer.py | 57 +++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index a1278c9b5..a6376d607 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -15,7 +15,7 @@ # =============================================================================== import json import time -from datetime import datetime +from datetime import datetime, UTC import pandas as pd from pandas import isna @@ -33,6 +33,7 @@ Location, WellPurpose, WellCasingMaterial, + StatusHistory, ) from schemas.thing import CreateWell, CreateWellScreen from services.gcs_helper import get_storage_bucket @@ -229,6 +230,10 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None well_data["nma_pk_welldata"] = row.WellID well = Thing(**well_data) session.add(well) + logger.info(f"Created well for {row.PointID}") + + # flush well to access its ID for status_history + session.flush() if well_purposes: for wp in well_purposes: @@ -263,14 +268,52 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None assoc.thing = well session.add(assoc) + """ + Developer's notes + + For all status_history records the start_date will be now since that + isn't recorded in NM_Aquifer + """ + statusable_id = well.id + statusable_type = "Thing" + if row.MonitoringStatus: + if ( + "X" in row.MonitoringStatus + or "I" in row.MonitoringStatus + or "C" in row.MonitoringStatus + ): + status_value = "Not currently monitored" + else: + status_value = "Currently monitored" + + status_history = StatusHistory( + status_type="Monitoring Status", + status_value=status_value, + reason=row.MonitorStatusReason, + start_date=datetime.now(tz=UTC), + statusable_id=statusable_id, + statusable_type=statusable_type, + ) + session.add(status_history) + logger.info( + f" Added monitoring status for well {well.name}: {status_value}" + ) + + if row.Status: + status_value = lexicon_mapper.map_value(f"LU_Status:{row.Status}") + status_history = StatusHistory( + status_type="Well Status", + status_value=status_value, + reason=row.StatusUserNotes, + start_date=datetime.now(tz=UTC), + statusable_id=statusable_id, + statusable_type=statusable_type, + ) + session.add(status_history) + logger.info(f" Added well status for well {well.name}: {status_value}") + session.commit() return input_df, cleaned_df, errors - # try: - # session.commit() - # except Exception as e: - # logger.critical(f"Error committing well {row.PointID}: {e}") - # session.rollback() - # continue def transfer_wellscreens(session, limit=None): From 8107e7c6e4092946cafec2771d0a6d06a2c0f41c Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 11 Nov 2025 16:56:15 -0700 Subject: [PATCH 03/13] feat: validate measuring point height for a well --- schemas/thing.py | 20 ++++++++++++++++++++ tests/__init__.py | 5 +---- tests/test_thing.py | 24 ++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/schemas/thing.py b/schemas/thing.py index 78162aa20..2fba0c42f 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -48,6 +48,26 @@ def validate_values(self): raise ValueError( "well casing depth must be less than or equal to hole depth" ) + elif ( + self.measuring_point_height is not None + and self.hole_depth is not None + and self.measuring_point_height >= self.hole_depth + ): + raise ValueError("measuring point height must be less than hole depth") + elif ( + self.measuring_point_height is not None + and self.well_casing_depth is not None + and self.measuring_point_height >= self.well_casing_depth + ): + raise ValueError( + "measuring point height must be less than well casing depth" + ) + elif ( + self.measuring_point_height is not None + and self.well_depth is not None + and self.measuring_point_height >= self.well_depth + ): + raise ValueError("measuring point height must be less than well depth") return self diff --git a/tests/__init__.py b/tests/__init__.py index e5937e75d..a00c9b99e 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -34,7 +34,7 @@ from fastapi_pagination import add_pagination from starlette.middleware.cors import CORSMiddleware -from core.initializers import init_lexicon, init_parameter, register_routes +from core.initializers import register_routes from db import Base, Parameter from db.engine import session_ctx from core.app import app @@ -45,9 +45,6 @@ with session_ctx() as session: erase_and_initalize(session) -init_lexicon() -init_parameter() - register_routes(app) app.add_middleware( CORSMiddleware, diff --git a/tests/test_thing.py b/tests/test_thing.py index 03ab9ac09..84a6829c7 100644 --- a/tests/test_thing.py +++ b/tests/test_thing.py @@ -78,6 +78,30 @@ def test_validate_hole_depth_casing_depth(): ValidateWell(hole_depth=100.0, well_casing_depth=110.0) +def test_validate_mp_height_hole_depth(): + with pytest.raises( + ValueError, + match="measuring point height must be less than hole depth", + ): + ValidateWell(hole_depth=100.0, measuring_point_height=110.0) + + +def test_validate_mp_height_well_depth(): + with pytest.raises( + ValueError, + match="measuring point height must be less than well depth", + ): + ValidateWell(well_depth=100.0, measuring_point_height=105.0) + + +def test_validate_mp_height_well_casing_depth(): + with pytest.raises( + ValueError, + match="measuring point height must be less than well casing depth", + ): + ValidateWell(well_casing_depth=100.0, measuring_point_height=105.0) + + # POST tests =================================================================== From 647dc708ab7ec830786241cd6cfa9b5c71e3b32c Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 11 Nov 2025 17:06:10 -0700 Subject: [PATCH 04/13] refactor: fix erase/rebuild for tests --- tests/__init__.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index a00c9b99e..ed7fe4ea8 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -29,12 +29,16 @@ # time.tzset() -from transfers.transfer import erase_and_initalize from fastapi.testclient import TestClient from fastapi_pagination import add_pagination from starlette.middleware.cors import CORSMiddleware -from core.initializers import register_routes +from core.initializers import ( + init_lexicon, + init_parameter, + register_routes, + erase_and_rebuild_db, +) from db import Base, Parameter from db.engine import session_ctx from core.app import app @@ -43,7 +47,10 @@ # Base.metadata.drop_all(engine) # Base.metadata.create_all(engine) with session_ctx() as session: - erase_and_initalize(session) + erase_and_rebuild_db(session) + +init_lexicon() +init_parameter() register_routes(app) app.add_middleware( From 0bccd4f15788ec1524531a9256dd39a720a4ee5d Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 12 Nov 2025 11:31:28 -0700 Subject: [PATCH 05/13] note: add note for AMMP review --- transfers/well_transfer.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index a6376d607..eb3a2d8c6 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -274,6 +274,9 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None For all status_history records the start_date will be now since that isn't recorded in NM_Aquifer """ + # TODO: if row.MonitoringStatus == "Q" is it monitored or not? <-- AMMP review + # TODO: if row.MonitoringStatus == "X" can that change? <-- AMMP review + # TODO: have AMMP review and verify the various MonitoringStatus codes statusable_id = well.id statusable_type = "Thing" if row.MonitoringStatus: From f2f5e27f478d5c0a7330649dddcf48d667698189 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 12 Nov 2025 12:09:17 -0700 Subject: [PATCH 06/13] refactor: make well validations more readable --- schemas/thing.py | 65 +++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/schemas/thing.py b/schemas/thing.py index 2fba0c42f..fe8fe0f2d 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -34,40 +34,37 @@ class ValidateWell(BaseModel): @model_validator(mode="after") def validate_values(self): - if ( - self.hole_depth is not None - and self.well_depth is not None - and self.well_depth > self.hole_depth - ): - raise ValueError("well depth must be less than than or equal to hole depth") - elif ( - self.hole_depth is not None - and self.well_casing_depth is not None - and self.well_casing_depth > self.hole_depth - ): - raise ValueError( - "well casing depth must be less than or equal to hole depth" - ) - elif ( - self.measuring_point_height is not None - and self.hole_depth is not None - and self.measuring_point_height >= self.hole_depth - ): - raise ValueError("measuring point height must be less than hole depth") - elif ( - self.measuring_point_height is not None - and self.well_casing_depth is not None - and self.measuring_point_height >= self.well_casing_depth - ): - raise ValueError( - "measuring point height must be less than well casing depth" - ) - elif ( - self.measuring_point_height is not None - and self.well_depth is not None - and self.measuring_point_height >= self.well_depth - ): - raise ValueError("measuring point height must be less than well depth") + if self.hole_depth is not None: + if self.well_depth is not None and self.well_depth > self.hole_depth: + raise ValueError( + "well depth must be less than than or equal to hole depth" + ) + elif ( + self.well_casing_depth is not None + and self.well_casing_depth > self.hole_depth + ): + raise ValueError( + "well casing depth must be less than or equal to hole depth" + ) + + if self.measuring_point_height is not None: + if ( + self.hole_depth is not None + and self.measuring_point_height >= self.hole_depth + ): + raise ValueError("measuring point height must be less than hole depth") + elif ( + self.well_casing_depth is not None + and self.measuring_point_height >= self.well_casing_depth + ): + raise ValueError( + "measuring point height must be less than well casing depth" + ) + elif ( + self.well_depth is not None + and self.measuring_point_height >= self.well_depth + ): + raise ValueError("measuring point height must be less than well depth") return self From 771dff43e3f864c43ac7c311657c0b022d8dae2a Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 12 Nov 2025 14:08:08 -0700 Subject: [PATCH 07/13] refactor: update transfer script for monitoring frequency history table --- core/lexicon.json | 1 + transfers/well_transfer.py | 97 +++++++++++++++++++++++++++++++++++--- 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/core/lexicon.json b/core/lexicon.json index 2ba161456..9bbe89b89 100644 --- a/core/lexicon.json +++ b/core/lexicon.json @@ -671,6 +671,7 @@ {"categories": ["group_type"], "term": "Historical", "definition": "A group of `Things` that share a common historical attribute. E.g., 'Wells drilled before 1950', 'Legacy Wells (Pre-1990)'."}, {"categories": ["monitoring_frequency"], "term": "Monthly", "definition": "Location is monitored on a monthly basis."}, {"categories": ["monitoring_frequency"], "term": "Bimonthly", "definition": "Location is monitored every two months."}, + {"categories": ["monitoring_frequency"], "term": "Bimonthly reported", "definition": "Location is monitored every two months and reported to NMBGMR."}, {"categories": ["monitoring_frequency"], "term": "Quarterly", "definition": "Location is monitored on a quarterly basis."}, {"categories": ["monitoring_frequency"], "term": "Biannual", "definition": "Location is monitored twice a year."}, {"categories": ["monitoring_frequency"], "term": "Annual", "definition": "Location is monitored once a year."}, diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index eb3a2d8c6..935140bdb 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -34,6 +34,7 @@ WellPurpose, WellCasingMaterial, StatusHistory, + MonitoringFrequencyHistory, ) from schemas.thing import CreateWell, CreateWellScreen from services.gcs_helper import get_storage_bucket @@ -277,8 +278,8 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None # TODO: if row.MonitoringStatus == "Q" is it monitored or not? <-- AMMP review # TODO: if row.MonitoringStatus == "X" can that change? <-- AMMP review # TODO: have AMMP review and verify the various MonitoringStatus codes - statusable_id = well.id - statusable_type = "Thing" + target_id = well.id + target_table = "thing" if row.MonitoringStatus: if ( "X" in row.MonitoringStatus @@ -294,14 +295,98 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None status_value=status_value, reason=row.MonitorStatusReason, start_date=datetime.now(tz=UTC), - statusable_id=statusable_id, - statusable_type=statusable_type, + target_id=target_id, + target_table=target_table, ) session.add(status_history) logger.info( f" Added monitoring status for well {well.name}: {status_value}" ) + if "6" in row.MonitoringStatus: + monitoring_frequency_history = MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency="Biannual", + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(monitoring_frequency_history) + logger.info( + f" Adding biannual monitoring frequency for well {well.name}" + ) + + if "A" in row.MonitoringStatus: + monitoring_frequency_history = MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency="Annual", + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(monitoring_frequency_history) + logger.info( + f" Adding annual monitoring frequency for well {well.name}" + ) + + if "B" in row.MonitoringStatus: + monitoring_frequency_history = MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency="Bimonthly", + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(monitoring_frequency_history) + logger.info( + f" Adding annual monitoring frequency for well {well.name}" + ) + + if "L" in row.MonitoringStatus: + monitoring_frequency_history = MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency="Decadal", + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(monitoring_frequency_history) + logger.info( + f" Adding decadal monitoring frequency for well {well.name}" + ) + + if "M" in row.MonitoringStatus: + monitoring_frequency_history = MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency="Monthly", + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(monitoring_frequency_history) + logger.info( + f" Adding monthly monitoring frequency for well {well.name}" + ) + + if "R" in row.MonitoringStatus: + monitoring_frequency_history = MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency="Bimonthly reported", + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(monitoring_frequency_history) + logger.info( + f" Adding bimonthly reported monitoring frequency for well {well.name}" + ) + + if "N" in row.MonitoringStatus: + monitoring_frequency_history = MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency="Biannual", + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(monitoring_frequency_history) + logger.info( + f" Adding biannual monitoring frequency for well {well.name}" + ) + if row.Status: status_value = lexicon_mapper.map_value(f"LU_Status:{row.Status}") status_history = StatusHistory( @@ -309,8 +394,8 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None status_value=status_value, reason=row.StatusUserNotes, start_date=datetime.now(tz=UTC), - statusable_id=statusable_id, - statusable_type=statusable_type, + target_id=target_id, + target_table=target_table, ) session.add(status_history) logger.info(f" Added well status for well {well.name}: {status_value}") From ef1a4c8fd88c9a046f2b853511c6c6a45c09a406 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 12 Nov 2025 14:26:42 -0700 Subject: [PATCH 08/13] refactor: update for measuring point history table --- schemas/thing.py | 2 +- transfers/well_transfer.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/schemas/thing.py b/schemas/thing.py index fd4b59461..b933e842f 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -127,7 +127,7 @@ class CreateWell(CreateBaseThing, ValidateWell): measuring_point_height: float = Field( ge=0, description="Measuring point height in feet" ) - measuring_point_description: str + measuring_point_description: str | None class CreateSpring(CreateBaseThing): diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 935140bdb..11117fd80 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -35,6 +35,7 @@ WellCasingMaterial, StatusHistory, MonitoringFrequencyHistory, + MeasuringPointHistory, ) from schemas.thing import CreateWell, CreateWellScreen from services.gcs_helper import get_storage_bucket @@ -225,6 +226,8 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None "group_id", "well_purposes", "well_casing_materials", + "measuring_point_height", + "measuring_point_description", ] ) well_data["thing_type"] = "water well" @@ -236,6 +239,21 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None # flush well to access its ID for status_history session.flush() + """ + Developer's note + + It's not clear when the measuring point from NM_Aquifer was + determined, so I'm setting start_date to the day of the transfer + """ + measuring_point_history = MeasuringPointHistory( + thing_id=well.id, + measuring_point_height=row.MPHeight, + measuring_point_description=row.MeasuringPoint, + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(measuring_point_history) + if well_purposes: for wp in well_purposes: # TODO: add validation logic here From cabac98e753631538b324d7d5a7f2b290a253202 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 12 Nov 2025 14:57:40 -0700 Subject: [PATCH 09/13] feat: set group_type based off of wells' monitoring status --- transfers/group_transfer.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/transfers/group_transfer.py b/transfers/group_transfer.py index 8a414d680..296da1f2a 100644 --- a/transfers/group_transfer.py +++ b/transfers/group_transfer.py @@ -20,6 +20,7 @@ from db.engine import session_ctx from transfers.util import read_csv from transfers.logger import logger +from tests import retrieve_latest_polymorphic_table_record def transfer_groups( @@ -44,7 +45,34 @@ def transfer_groups( logger.info( f"Adding {len(records)} things to group {group.name}, prefix {prefix}" ) + group_is_monitoring_plan = False for record in records: + # set the group_type to Monitoring Plan if at least one well is currently monitored + if not group_is_monitoring_plan: + if record.status_history: + monitoring_status = [ + sh + for sh in record.status_history + if sh.status_type == "Monitoring Status" + ] + if monitoring_status: + monitoring_status = ( + retrieve_latest_polymorphic_table_record( + record, + "status_history", + "Monitoring Status", + ) + ) + if ( + monitoring_status.status_value + == "Currently monitored" + ): + group_is_monitoring_plan = True + group.group_type = "Monitoring Plan" + logger.info( + f" Setting group {group.name} type to Monitoring Plan based on thing {record.name}" + ) + gta = GroupThingAssociation(group=group, thing=record) session.add(gta) group.thing_associations.append(gta) From b22619c0fbc13bdc3eb5501202d1bb14e4d2cabb Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Thu, 13 Nov 2025 09:44:46 -0700 Subject: [PATCH 10/13] fix: import retrieve_latest_polymorphic_record from correct place --- transfers/group_transfer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transfers/group_transfer.py b/transfers/group_transfer.py index 296da1f2a..3ece1cc69 100644 --- a/transfers/group_transfer.py +++ b/transfers/group_transfer.py @@ -20,7 +20,7 @@ from db.engine import session_ctx from transfers.util import read_csv from transfers.logger import logger -from tests import retrieve_latest_polymorphic_table_record +from services.util import retrieve_latest_polymorphic_table_record def transfer_groups( From ef016daf8581dfc240a266e8fec3403a31579095 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 17 Nov 2025 18:17:28 -0700 Subject: [PATCH 11/13] WIP: use data provenance table in transfers --- transfers/group_transfer.py | 12 ++- transfers/thing_transfer.py | 17 +++- transfers/util.py | 169 +++++++++++++++++++++++------------- transfers/well_transfer.py | 9 +- 4 files changed, 138 insertions(+), 69 deletions(-) diff --git a/transfers/group_transfer.py b/transfers/group_transfer.py index 3ece1cc69..0bad85cb7 100644 --- a/transfers/group_transfer.py +++ b/transfers/group_transfer.py @@ -20,7 +20,7 @@ from db.engine import session_ctx from transfers.util import read_csv from transfers.logger import logger -from services.util import retrieve_latest_polymorphic_table_record +from services.util import retrieve_latest_polymorphic_history_table_record def transfer_groups( @@ -56,12 +56,10 @@ def transfer_groups( if sh.status_type == "Monitoring Status" ] if monitoring_status: - monitoring_status = ( - retrieve_latest_polymorphic_table_record( - record, - "status_history", - "Monitoring Status", - ) + monitoring_status = retrieve_latest_polymorphic_history_table_record( + record, + "status_history", + "Monitoring Status", ) if ( monitoring_status.status_value diff --git a/transfers/thing_transfer.py b/transfers/thing_transfer.py index 28fd394d4..38f9b4708 100644 --- a/transfers/thing_transfer.py +++ b/transfers/thing_transfer.py @@ -20,7 +20,12 @@ from db import LocationThingAssociation from services.thing_helper import add_thing -from transfers.util import make_location, read_csv, replace_nans +from transfers.util import ( + make_location, + make_location_data_provenance, + read_csv, + replace_nans, +) from transfers.logger import logger @@ -49,7 +54,15 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - session.commit() try: - location = make_location(row) + location, elevation_method = make_location(row) + session.add(location) + session.flush() + data_provenances = make_location_data_provenance( + row, location, elevation_method + ) + for dp in data_provenances: + session.add(dp) + payload = make_payload(row) thing_type = payload.pop("thing_type") thing = add_thing(session, payload, thing_type=thing_type) diff --git a/transfers/util.py b/transfers/util.py index 8b9524ad5..50f2ccf7b 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -28,17 +28,11 @@ from sqlalchemy.orm import Session from constants import SRID_WGS84, SRID_UTM_ZONE_13N -from db import Thing, Location +from db import Thing, Location, DataProvenance from services.gcs_helper import get_storage_bucket # from services.lexicon_mapper import lexicon_mapper -from services.util import ( - transform_srid, - get_epqs_elevation_from_point, - # get_state_from_point, - # get_county_from_point, - # get_quad_name_from_point, -) +from services.util import transform_srid, get_epqs_elevation_from_point, convert_ft_to_m from transfers.logger import logger @@ -186,7 +180,10 @@ def chunk_by_size(df, chunk_size): yield df.iloc[i : i + chunk_size] -def make_location(row: pd.Series) -> Location: +def make_location(row: pd.Series) -> tuple: + """ + Returns a tuple of location data and the elevation method + """ point = Point(row.Easting, row.Northing) # Convert the point to a WGS84 coordinate system @@ -194,40 +191,6 @@ def make_location(row: pd.Series) -> Location: point, source_srid=SRID_UTM_ZONE_13N, target_srid=SRID_WGS84 ) - # since this is such a time consuming operation, I do not want to run it during this step - # cleanup_wells was added for this reason - - # state = get_state_from_point(transformed_point.x, transformed_point.y) - # county = get_county_from_point(transformed_point.x, transformed_point.y) - # quad_name = get_quad_name_from_point(transformed_point.x, transformed_point.y) - - z = row.Altitude - if z: - elevation_from_epqs = False - z = z * 0.3048 - else: - elevation_from_epqs = True - logger.info( - f"Location {row.PointID} has no Altitude. Setting from National Map EPQS for " - ) - z = get_epqs_elevation_from_point(transformed_point.x, transformed_point.y) - - if elevation_from_epqs: - elevation_method = "USGS National Elevation Dataset (NED)" - elif pd.isna(row.AltitudeMethod): - elevation_method = None - else: - elevation_method = lexicon_mapper.map_value( - f"LU_AltitudeMethod:{row.AltitudeMethod.strip()}" - ) - - if pd.isna(row.CoordinateMethod): - coordinate_method = None - else: - coordinate_method = lexicon_mapper.map_value( - f"LU_CoordinateMethod:{row.CoordinateMethod}" - ) - """ Developer's notes @@ -255,6 +218,60 @@ def make_location(row: pd.Series) -> Location: if created_at is not None: created_at = convert_mt_to_utc(created_at) + z = row.Altitude + if z: + elevation_from_epqs = False + z = convert_ft_to_m(z) + else: + elevation_from_epqs = True + logger.info( + f"Location {row.PointID} has no Altitude. Setting from National Map EPQS for " + ) + z = get_epqs_elevation_from_point(transformed_point.x, transformed_point.y) + + if elevation_from_epqs: + elevation_method = "USGS National Elevation Dataset (NED)" + elif pd.isna(row.AltitudeMethod): + elevation_method = None + else: + elevation_method = lexicon_mapper.map_value( + f"LU_AltitudeMethod:{row.AltitudeMethod.strip()}" + ) + + location = Location( + nma_pk_location=row.LocationId, + point=transformed_point.wkt, + elevation=z, + release_status="public" if row.PublicRelease else "private", + created_at=created_at, + nma_coordinate_notes=row.CoordinateNotes, + nma_notes_location=row.LocationNotes, + ) + + return location, elevation_method + + +def make_location_data_provenance( + row: pd.Series, location: Location, elevation_method: str | None +) -> list[DataProvenance]: + provenance_records = [] + + if row.AltitudeAccuracy or row.CoordinateAccuracy: + provenance = DataProvenance( + target_id=location.id, + target_table="location", + field_name="elevation", + origin_source=None, + collection_method=elevation_method, + accuracy_value=( + None + if pd.isna(row.AltitudeAccuracy) + else convert_ft_to_m(row.AltitudeAccuracy) + ), + accuracy_unit="m", + ) + provenance_records.append(provenance) + # TODO: AMP feedback is required for transfering coordinate accuracy values # from NM_Aquifer to Ocotillo # if row.CoordinateAccuracy == "U" or pd.isna(row.CoordinateAccuracy): @@ -318,22 +335,56 @@ def make_location(row: pd.Series) -> Location: # minus_latitude = original_latitude - coordinate_accuracy_decimal_deg # minus_point_decimal_deg = Point(minus_longitude, minus_latitude) - location = Location( - nma_pk_location=row.LocationId, - # name=row.PointID, - point=transformed_point.wkt, - elevation=z, - release_status="public" if row.PublicRelease else "private", - elevation_accuracy=row.AltitudeAccuracy, - elevation_method=elevation_method, - created_at=created_at, - # TODO: get AMP feedback on transfering these values. See above note - # coordinate_accuracy=row.CoordinateAccuracy, - coordinate_method=coordinate_method, - nma_coordinate_notes=row.CoordinateNotes, - nma_notes_location=row.LocationNotes, - ) - return location + if row.CoordinateMethod or row.CoordinateAccuracy: + coordinate_method = ( + lexicon_mapper.map_value(f"LU_CoordinateMethod:{row.CoordinateMethod}") + if not pd.isna(row.CoordinateMethod) + else None + ) + + if row.CoordinateAccuracy == "5m": + accuracy_value = 5 + accuracy_unit = "minute" + elif row.CoordinateAccuracy == "1": + accuracy_value = 0.1 + accuracy_unit = "second" + elif row.CoordinateAccuracy == "5": + accuracy_value = 0.5 + accuracy_unit = "second" + elif row.CoordinateAccuracy == "F": + accuracy_value = 5 + accuracy_unit = "second" + elif row.CoordinateAccuracy == "H": + accuracy_value = 0.01 + accuracy_unit = "second" + elif row.CoordinateAccuracy == "M": + accuracy_value = 1 + accuracy_unit = "minute" + elif row.CoordinateAccuracy == "R": + accuracy_value = 3 + accuracy_unit = "second" + elif row.CoordinateAccuracy == "S": + accuracy_value = 1 + accuracy_unit = "second" + elif row.CoordinateAccuracy == "T": + accuracy_value = 10 + accuracy_unit = "second" + else: + accuracy_value = None + accuracy_unit = None + + provenance = DataProvenance( + target_id=location.id, + target_table="location", + field_name="point", + origin_source=None, + collection_method=coordinate_method, + accuracy_value=accuracy_value, + accuracy_unit=accuracy_unit, + ) + provenance_records.append(provenance) + + return provenance_records def timeit_direct(func, *args, **kwargs): diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 11117fd80..caf2b2125 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -46,6 +46,7 @@ ) from transfers.util import ( make_location, + make_location_data_provenance, filter_to_valid_point_ids, read_csv, logger, @@ -173,8 +174,14 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None location = None try: - location = make_location(row) + location, elevation_method = make_location(row) session.add(location) + session.flush() + data_provenances = make_location_data_provenance( + row, location, elevation_method + ) + for dp in data_provenances: + session.add(dp) except Exception as e: if location is not None: session.expunge(location) From cfc4e8fd6667a4b7a2d18565d3af7ac915c85a47 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 18 Nov 2025 10:04:43 -0700 Subject: [PATCH 12/13] fix: convert ngvd29 to navd88 for elevation where applicable --- services/util.py | 20 ++++++++++++++++++++ transfers/util.py | 18 +++++++++--------- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/services/util.py b/services/util.py index 06c29a6ad..77cd5d5cd 100644 --- a/services/util.py +++ b/services/util.py @@ -132,6 +132,26 @@ def get_epqs_elevation_from_point(lon: float, lat: float) -> float | None: return data["value"] +def convert_ngvd29_to_navd88( + elevation_ngvd29: float, longitude: float, latitude: float +) -> float: + url = "https://geodesy.noaa.gov/api/ncat/llh" + params = { + "lat": latitude, + "lon": longitude, + "inDatum": "nad83(2011)", + "outDatum": "nad83(2011)", + "inVertDatum": "ngvd29", + "outVertDatum": "navd88", + "orthoHt": elevation_ngvd29, + } + response = httpx.get(url, params=params) + data = response.json() + + elevation_navd88 = data.get("destOrthoht") + return elevation_navd88 + + def retrieve_latest_polymorphic_history_table_record( target_record: DeclarativeBase, polymorphic_relationship: str, diff --git a/transfers/util.py b/transfers/util.py index 50f2ccf7b..1f400cb8e 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -32,7 +32,12 @@ from services.gcs_helper import get_storage_bucket # from services.lexicon_mapper import lexicon_mapper -from services.util import transform_srid, get_epqs_elevation_from_point, convert_ft_to_m +from services.util import ( + transform_srid, + get_epqs_elevation_from_point, + convert_ft_to_m, + convert_ngvd29_to_navd88, +) from transfers.logger import logger @@ -147,14 +152,6 @@ def filter_to_valid_point_ids(session: Session, df: pd.DataFrame) -> pd.DataFram return df[df["PointID"].isin(valid_point_ids)] -def convert_to_wgs84_vertical_datum(row, z): - if row.VerticalDatum == "NAVD88": - z = z + 2.0 # TODO: check this transformation - elif row.VerticalDatum == "NGVD29": - z = z + 3.0 # TODO: check this transformation - return z - - def convert_mt_to_utc(dt_record: datetime): t = dt_record.time() if t.hour == 0 and t.minute == 0: @@ -222,6 +219,9 @@ def make_location(row: pd.Series) -> tuple: if z: elevation_from_epqs = False z = convert_ft_to_m(z) + + if row.AltDatum == "NGVD29": + z = convert_ngvd29_to_navd88(z, transformed_point.x, transformed_point.y) else: elevation_from_epqs = True logger.info( From df238fa95c094ba9648d9ef318ecfb141df5634b Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 18 Nov 2025 10:41:25 -0700 Subject: [PATCH 13/13] refactor: address PR comments --- transfers/util.py | 80 +++++++++++++++++----------- transfers/well_transfer.py | 106 ++++++++----------------------------- 2 files changed, 73 insertions(+), 113 deletions(-) diff --git a/transfers/util.py b/transfers/util.py index 1f400cb8e..6d3d6a1cf 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -41,6 +41,50 @@ from transfers.logger import logger +NMA_COORDINATE_ACCURACY = { + "5m": { + "accuracy_value": 5, + "accuracy_unit": "m", + }, + "1": { + "accuracy_value": 0.1, + "accuracy_unit": "second", + }, + "5": { + "accuracy_value": 0.5, + "accuracy_unit": "second", + }, + "F": { + "accuracy_value": 5, + "accuracy_unit": "second", + }, + "H": { + "accuracy_value": 0.01, + "accuracy_unit": "second", + }, + "M": { + "accuracy_value": 1, + "accuracy_unit": "minute", + }, + "R": { + "accuracy_value": 3, + "accuracy_unit": "second", + }, + "S": { + "accuracy_value": 1, + "accuracy_unit": "second", + }, + "T": { + "accuracy_value": 10, + "accuracy_unit": "second", + }, + None: { + "accuracy_value": None, + "accuracy_unit": None, + }, +} + + def replace_nans(df: pd.DataFrame, default=None) -> pd.DataFrame: df = df.replace(pd.NA, default) return df.replace({np.nan: default}) @@ -342,36 +386,12 @@ def make_location_data_provenance( else None ) - if row.CoordinateAccuracy == "5m": - accuracy_value = 5 - accuracy_unit = "minute" - elif row.CoordinateAccuracy == "1": - accuracy_value = 0.1 - accuracy_unit = "second" - elif row.CoordinateAccuracy == "5": - accuracy_value = 0.5 - accuracy_unit = "second" - elif row.CoordinateAccuracy == "F": - accuracy_value = 5 - accuracy_unit = "second" - elif row.CoordinateAccuracy == "H": - accuracy_value = 0.01 - accuracy_unit = "second" - elif row.CoordinateAccuracy == "M": - accuracy_value = 1 - accuracy_unit = "minute" - elif row.CoordinateAccuracy == "R": - accuracy_value = 3 - accuracy_unit = "second" - elif row.CoordinateAccuracy == "S": - accuracy_value = 1 - accuracy_unit = "second" - elif row.CoordinateAccuracy == "T": - accuracy_value = 10 - accuracy_unit = "second" - else: - accuracy_value = None - accuracy_unit = None + accuracy_value = NMA_COORDINATE_ACCURACY.get(row.CoordinateAccuracy, None).get( + "accuracy_value" + ) + accuracy_unit = NMA_COORDINATE_ACCURACY.get(row.CoordinateAccuracy, None).get( + "accuracy_unit" + ) provenance = DataProvenance( target_id=location.id, diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index caf2b2125..6fb4094fd 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -59,6 +59,16 @@ ADDED = [] +NMA_MONITORING_FREQUENCY = { + "6": "Biannual", + "A": "Annual", + "B": "Bimonthly", + "L": "Decadal", + "M": "Monthly", + "R": "Bimonthly reported", + "N": "Biannual", +} + def _get_first_visit_date(row) -> datetime | None: first_visit_date = None @@ -328,89 +338,19 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None f" Added monitoring status for well {well.name}: {status_value}" ) - if "6" in row.MonitoringStatus: - monitoring_frequency_history = MonitoringFrequencyHistory( - thing_id=well.id, - monitoring_frequency="Biannual", - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(monitoring_frequency_history) - logger.info( - f" Adding biannual monitoring frequency for well {well.name}" - ) - - if "A" in row.MonitoringStatus: - monitoring_frequency_history = MonitoringFrequencyHistory( - thing_id=well.id, - monitoring_frequency="Annual", - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(monitoring_frequency_history) - logger.info( - f" Adding annual monitoring frequency for well {well.name}" - ) - - if "B" in row.MonitoringStatus: - monitoring_frequency_history = MonitoringFrequencyHistory( - thing_id=well.id, - monitoring_frequency="Bimonthly", - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(monitoring_frequency_history) - logger.info( - f" Adding annual monitoring frequency for well {well.name}" - ) - - if "L" in row.MonitoringStatus: - monitoring_frequency_history = MonitoringFrequencyHistory( - thing_id=well.id, - monitoring_frequency="Decadal", - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(monitoring_frequency_history) - logger.info( - f" Adding decadal monitoring frequency for well {well.name}" - ) - - if "M" in row.MonitoringStatus: - monitoring_frequency_history = MonitoringFrequencyHistory( - thing_id=well.id, - monitoring_frequency="Monthly", - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(monitoring_frequency_history) - logger.info( - f" Adding monthly monitoring frequency for well {well.name}" - ) - - if "R" in row.MonitoringStatus: - monitoring_frequency_history = MonitoringFrequencyHistory( - thing_id=well.id, - monitoring_frequency="Bimonthly reported", - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(monitoring_frequency_history) - logger.info( - f" Adding bimonthly reported monitoring frequency for well {well.name}" - ) - - if "N" in row.MonitoringStatus: - monitoring_frequency_history = MonitoringFrequencyHistory( - thing_id=well.id, - monitoring_frequency="Biannual", - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(monitoring_frequency_history) - logger.info( - f" Adding biannual monitoring frequency for well {well.name}" - ) + for code in NMA_MONITORING_FREQUENCY.keys(): + if code in row.MonitoringStatus: + monitoring_frequency = NMA_MONITORING_FREQUENCY[code] + monitoring_frequency_history = MonitoringFrequencyHistory( + thing_id=well.id, + monitoring_frequency=monitoring_frequency, + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(monitoring_frequency_history) + logger.info( + f" Adding '{monitoring_frequency}' monitoring frequency for well {well.name}" + ) if row.Status: status_value = lexicon_mapper.map_value(f"LU_Status:{row.Status}")