From 2991591d6539901946e18c9e4aad1f3dd84b1e07 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Thu, 20 Nov 2025 15:30:58 -0700 Subject: [PATCH 01/17] feat(transfers): add `LU_AquiferClass` and `LU_AquiferType` lookup tables to `util.py`. Maps the `LU_AquiferClass` and `LU_AquiferType` lookup tables to the lexicon. --- transfers/util.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transfers/util.py b/transfers/util.py index 8b9524ad5..a276708db 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -366,6 +366,8 @@ def _make_lu_to_lexicon_mapper(self): # Lookup tables where CODE maps to MEANING lu_tables = [ "LU_AltitudeMethod", + "LU_AquiferClass", + "LU_AquiferType", "LU_CollectionMethod", "LU_ConstructionMethod", "LU_CoordinateAccuracy", From 00e7225f0086d19f7e15c25df7292b9c97e4448c Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Thu, 20 Nov 2025 19:05:27 -0700 Subject: [PATCH 02/17] feat(schemas): implement Create schemas for `aquifer_system` and `geologic_formation`. --- schemas/aquifer_system.py | 14 ++++++++++++++ schemas/geologic_formation.py | 13 +++++++++++++ 2 files changed, 27 insertions(+) diff --git a/schemas/aquifer_system.py b/schemas/aquifer_system.py index 5f5b3ed4d..e7c8b9bd0 100644 --- a/schemas/aquifer_system.py +++ b/schemas/aquifer_system.py @@ -5,6 +5,20 @@ from schemas import BaseResponseModel +# ------ CREATE ---------- +class CreateAquiferSystem(BaseModel): + """ + Schema for creating an aquifer system. + Used during data transfer and API creation. + """ + + name: str + description: str | None = None + aquifer_type: str + geographic_scale: str + boundary: str | None = None + + # ------ RESPONSE ---------- class GeoJSONGeometry(BaseModel): """ diff --git a/schemas/geologic_formation.py b/schemas/geologic_formation.py index f6b3083d3..339188b5b 100644 --- a/schemas/geologic_formation.py +++ b/schemas/geologic_formation.py @@ -5,6 +5,19 @@ from schemas import BaseResponseModel +# ------ CREATE ---------- +class CreateGeologicFormation(BaseModel): + """ + Schema for creating a geologic formation. + Used during data transfer and API creation. + """ + + formation_code: str | None = None + description: str | None = None + lithology: str | None = None + boundary: str | None = None + + # ------ RESPONSE ---------- class GeoJSONGeometry(BaseModel): """ From af07c3e20ddaf2c9cafbc2f2c52cc4c4c646fda1 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Mon, 24 Nov 2025 09:36:39 -0700 Subject: [PATCH 03/17] feat(transfer): WIP aquifer and geology transfers --- transfers/aquifer_system_transfer.py | 139 +++++++++++++++++++++++ transfers/geologic_formation_transfer.py | 85 ++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 transfers/aquifer_system_transfer.py create mode 100644 transfers/geologic_formation_transfer.py diff --git a/transfers/aquifer_system_transfer.py b/transfers/aquifer_system_transfer.py new file mode 100644 index 000000000..0de7cc7ae --- /dev/null +++ b/transfers/aquifer_system_transfer.py @@ -0,0 +1,139 @@ +import time +from sqlalchemy.orm import Session +from pydantic import ValidationError + +from db import AquiferSystem +from schemas.aquifer_system import CreateAquiferSystem +from transfers.util import read_csv, replace_nans, logger + + +def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple: + """ + Transfer aquifer system data from LU_AquiferClass CSV to the database. + + This creates the master list of named aquifer systems (e.g., Ogallala Aquifer). the primary_type field is set + to "Unknown" as a placeholder and will be updated during well transfer when we know what type each well encounters. + + This should be run BEFORE well_transfer.py so that aquifer records exist for wells to reference. + + Args: + session (Session): SQLAlchemy database session + limit (int, optional): Limit the number of records to transfer (for testing). + + Returns: + tuple: (input_df, cleaned_df, errors) + """ + # 1. Read the CSV file + input_df = read_csv("LU_AquiferClass") + + # 2. Replace NaNs with NOne + cleaned_df = replace_nans(input_df) + + # 3. Initialize tracking variables for logging + n = len(input_df) + step = 25 + start_time = time.time() + errors = [] + created_count = 0 + skipped_count = 0 + + logger.info(f"Starting transfer of {n} aquifer systems from LU_AquiferClass.") + + # 4. Process each row + for i, row in enumerate(cleaned_df.itertuples()): + # check if limit is reached + if limit and i >= limit: + logger.info(f"Reached limit of {limit} rows. Stopping migration.") + break + + # Log progress every 'step' rows + if i and not i % 25: + logger.info( + f"Processing row {i} of {n}. Avg rows per second: {step / (time.time() - start_time):.2f}" + ) + start_time = time.time() + + # Commit progress periodically + try: + session.commit() + except Exception as e: + logger.critical(f"Error committing aquifer system {i}: {e}") + session.rollback() + continue + + # 5. Extract aquifer code and name + aquifer_code = row.CODE + aquifer_name = row.MEANING + + if not aquifer_name: + logger.warning( + f"Row {i} (code: {aquifer_code}) has no aquifer name (MEANING). Skipping." + ) + skipped_count += 1 + continue + + # 6. Check if aquifer system already exists + existing = ( + session.query(AquiferSystem) + .filter(AquiferSystem.name == aquifer_name) + .first() + ) + + if existing: + logger.info( + f"Aquifer '{aquifer_name}' (code: {aquifer_code}) already exists. Skipping." + ) + skipped_count += 1 + continue + + # 7. Prepare data dictionary + try: + data = CreateAquiferSystem( + name=aquifer_name, + description=None, # can be updated later + primary_aquifer_type="Unknown", # placeholder - will be updated during well transfer + ) + + # Validate data using Pydantic schema + CreateAquiferSystem.model_validate(data) + + except ValidationError as e: + errors.append({"code": aquifer_code, "name": aquifer_name, "error": str(e)}) + logger.critical( + f"Error creating aquifer system '{aquifer_name}' (code: {aquifer_code}) (row {i}): {e}" + ) + continue + + # 8. Create database record + aquifer_system = None + try: + aquifer_data = data.model_dump() + aquifer_system = AquiferSystem(**aquifer_data) + session.add(aquifer_system) + created_count += 1 + + logger.info( + f"Created aquifer system: {aquifer_system.name} (code: {aquifer_code})" + ) + + except Exception as e: + if aquifer_system is not None: + session.expunge(aquifer_system) + errors.append({"code": aquifer_code, "name": aquifer_name, "error": str(e)}) + logger.critical( + f"Error creating aquifer system record '{aquifer_name}': {e}" + ) + continue + + # 9. Final commit + try: + session.commit() + logger.info( + f"Successfully transferred {created_count} aquifer systems, skipped {skipped_count}. " + f"Note: primary_type set to 'Unknown' and will be updated during well transfer." + ) + except Exception as e: + logger.critical(f"Error in final commit: {e}") + session.rollback() + + return input_df, cleaned_df, errors diff --git a/transfers/geologic_formation_transfer.py b/transfers/geologic_formation_transfer.py new file mode 100644 index 000000000..202c9d431 --- /dev/null +++ b/transfers/geologic_formation_transfer.py @@ -0,0 +1,85 @@ +import time +from sqlalchemy.orm import Session +from pydantic import ValidationError + +from db import GeologicFormation +from schemas.geologic_formation import CreateGeologicFormation +from transfers.util import read_csv, replace_nans, lexicon_mapper, logger + + +def transfer_geologic_formations(session: Session, limit: int = None) -> tuple: + """ + Transfer geologic formation data from LU_GeologicFormation CSV to the database. + + This should be run BEFORE well_transfer.py so that geologic formation records exist for wells to reference. + + Args: + session (Session): SQLAlchemy database session + limit (int, optional): Optional limit on number of records to transfer (for testing). + + Returns: + tuple: (input_df, cleaned_df, errors) + """ + # 1. Read the CSV file + input_df = read_csv("LU_Formation") + + # 2. Replace NaNs with None + cleaned_df = replace_nans(input_df) + + # 3. Initialize tracking variables for logging + n = len(cleaned_df) + step = 25 + start_time = time.time() + errors = [] + created_count = 0 + skipped_count = 0 + + logger.info(f"Starting transfer of {n} geologic formations") + + # 4. Process each row + for i, row in enumerate(cleaned_df.itertuples()): + # check if limit is reached + if limit and i >= limit: + logger.info(f"Reached limit of {limit} rows. Stopping migration.") + break + + # Log progress every 'step' rows + if i and not i % step: + logger.info( + f"Processing row {i} of {n}. Avg rows per second: {step / (time.time() - start_time):.2f}" + ) + start_time = time.time() + + # Commit progress periodically + try: + session.commit() + except Exception as e: + logger.critical(f"Error committing geologic formation {i}: {e}") + session.rollback() + continue + + try: + payload = CreateGeologicFormation( + name=row.GeologicFormationName, + description=row.Description, + lithology=lexicon_mapper("Lithology", row.Lithology), + age=lexicon_mapper("GeologicAge", row.Age), + ) + formation = GeologicFormation(**payload.dict()) + session.add(formation) + created_count += 1 + except ValidationError as e: + error_msg = f"Validation error for row {i} with GeologicFormationName {row.GeologicFormationName}: {e.errors()}" + logger.critical(error_msg) + errors.append(error_msg) + except Exception as e: + error_msg = f"Error creating geologic formation for {row.GeologicFormationName}: {e}" + logger.critical(error_msg) + errors.append(error_msg) + continue + + # Final commit after all rows are processed + try: + session.commit() + except Exception as e: + logger.critical(f"Error during final commit of geologic formations: {e}") From 748a57b9baa542c84f6fce8b6160d520300d0558 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Mon, 24 Nov 2025 16:21:48 -0700 Subject: [PATCH 04/17] feat(transfer): add stratigraphy transfer script and update geologic formation transfers. Add stratigraphy_transfer.py to handle detailed lithology log import, create well-formation associations, and update formation lithology fields from stratigraphy data. This script is essential for linking wells to geologic formations with depth intervals. --- transfers/geologic_formation_transfer.py | 90 ++++++-- transfers/stratigraphy_transfer.py | 251 +++++++++++++++++++++++ transfers/util.py | 1 + 3 files changed, 325 insertions(+), 17 deletions(-) create mode 100644 transfers/stratigraphy_transfer.py diff --git a/transfers/geologic_formation_transfer.py b/transfers/geologic_formation_transfer.py index 202c9d431..724eb79fe 100644 --- a/transfers/geologic_formation_transfer.py +++ b/transfers/geologic_formation_transfer.py @@ -4,7 +4,7 @@ from db import GeologicFormation from schemas.geologic_formation import CreateGeologicFormation -from transfers.util import read_csv, replace_nans, lexicon_mapper, logger +from transfers.util import read_csv, replace_nans, logger def transfer_geologic_formations(session: Session, limit: int = None) -> tuple: @@ -54,32 +54,88 @@ def transfer_geologic_formations(session: Session, limit: int = None) -> tuple: try: session.commit() except Exception as e: - logger.critical(f"Error committing geologic formation {i}: {e}") + logger.critical(f"Error committing geologic formations: {e}") session.rollback() continue + # 5. Extract formation code and description + formation_code = row.Code + + if not formation_code: + logger.warning(f"Skipping row {i}: Missing formation code") + skipped_count += 1 + continue + + # Check if this formation already exists + existing = ( + session.query(GeologicFormation) + .filter(GeologicFormation.formation_code == formation_code) + .first() + ) + + if existing: + logger.info( + f"Skipping row {i}: Formation code {formation_code} already exists" + ) + skipped_count += 1 + continue + + # 6. Prepare data for creation + # Note: We only store the formation_code. Formation names will be mapped by the API using a + # formations.json file from authoritative sources (e.g., USGS). + # The description field is left as None and can be populated later if needed. + # Note: lithology is set to None here and will be updated during stratigraphy transfer try: - payload = CreateGeologicFormation( - name=row.GeologicFormationName, - description=row.Description, - lithology=lexicon_mapper("Lithology", row.Lithology), - age=lexicon_mapper("GeologicAge", row.Age), + data = CreateGeologicFormation( + formation_code=formation_code, + description=None, # Not storing from legacy data + lithology=None, # Will be populated from Stratigraphy.csv ) - formation = GeologicFormation(**payload.dict()) - session.add(formation) - created_count += 1 + + # Validate the data using Pydantic schema + CreateGeologicFormation.model_validate(data) + except ValidationError as e: - error_msg = f"Validation error for row {i} with GeologicFormationName {row.GeologicFormationName}: {e.errors()}" - logger.critical(error_msg) - errors.append(error_msg) + errors.append({"code": formation_code, "errors": e.errors()}) + logger.critical( + f"Validation error for row {i} with Code {formation_code}: {e.errors()}" + ) + continue + except Exception as e: + errors.append({"code": formation_code, "errors": str(e)}) + logger.critical(f"Error preparing data for {formation_code}: {e}") + continue + + # 7. Create database object + geologic_formation = None + try: + formation_data = data.model_dump() + geologic_formation = GeologicFormation(**formation_data) + session.add(geologic_formation) + created_count += 1 + + logger.info( + f"Created geologic formation: {geologic_formation.formation_code}" + ) + except Exception as e: - error_msg = f"Error creating geologic formation for {row.GeologicFormationName}: {e}" - logger.critical(error_msg) - errors.append(error_msg) + if geologic_formation is not None: + session.expunge(geologic_formation) + errors.append({"code": formation_code, "error": str(e)}) + logger.critical( + f"Error creating geologic formation for {formation_code}: {e}" + ) continue - # Final commit after all rows are processed + # 8. Final commit try: session.commit() + logger.info( + f"Successfully transferred {created_count} geologic formations, skipped {skipped_count}. " + f"Note: lithology is None and will be updated during stratigraphy transfer." + ) except Exception as e: logger.critical(f"Error during final commit of geologic formations: {e}") + session.rollback() + + return input_df, cleaned_df, errors diff --git a/transfers/stratigraphy_transfer.py b/transfers/stratigraphy_transfer.py new file mode 100644 index 000000000..ac74a2d9c --- /dev/null +++ b/transfers/stratigraphy_transfer.py @@ -0,0 +1,251 @@ +""" +Transfer script for stratigraphy (lithology log) data. + +This creates ThingGeologicFormationAssociation records from the Stratigraphy CSV, which contains depth-specific +formation information for wells. It also updates the GeologicFormation.lithology field based on the +Stratigraphy.Lithology data. +""" + +import time +from sqlalchemy.orm import Session + +from db import Thing, GeologicFormation, ThingGeologicFormationAssociation +from transfers.util import ( + read_csv, + replace_nans, + filter_to_valid_point_ids, + lexicon_mapper, + logger, +) + + +def transfer_stratigraphy(session: Session, limit: int = None) -> tuple: + """ + Transfer detailed stratigraphy (lithology log) data from Stratigraphy CSV. + + The Stratigraphy CSV contains multiple rows per well, each representing a + depth interval, the formation encountered, and its lithology. + + Fields used: + - PointID: Links to the well + - UnitIdentifier: Formation code (maps to LU_Formations) + - StratTop: Top depth of the layer (feet below ground surface) + - StratBottom: Bottom depth of the layer (feet below ground surface) + - Lithology: Lithology code (maps to LU_Lithology via ABBREVIATION field) + + This should be run AFTER: + 1. transfer_geologic_formations.py (so formations exist) + 2. transfer_wells.py (so wells exist) + + Args: + session: Database session + limit: Optional limit on number of WELLS to process (for testing) + + Returns: + tuple: (input_df, cleaned_df, errors) + """ + # 1. Read and clean data + input_df = read_csv("Stratigraphy") + cleaned_df = replace_nans(input_df) + + # Step 2: Filter to only wells that exist in database + cleaned_df = filter_to_valid_point_ids(session, cleaned_df) + + n_records = len(cleaned_df) + n_wells = len(cleaned_df["PointID"].unique()) + + logger.info( + f"Starting transfer of {n_records} stratigraphy records for {n_wells} wells" + ) + + # 3. Initialize tracking variables for logging + step = 25 + start_time = time.time() + errors = [] + created_count = 0 + skipped_count = 0 + lithology_updates = 0 + + # Step 4: Group by well for efficient processing + well_groups = cleaned_df.groupby("PointID") + + for well_index, (pointid, strat_group) in enumerate(well_groups): + # Check limit (on number of wells, not records) + if limit and well_index >= limit: + logger.info(f"Reached limit of {limit} wells. Stopping.") + break + + # Progress logging every 25 wells + if well_index and not well_index % step: + logger.info( + f"Processing well {well_index} of {n_wells}, " + f"avg wells per second: {step / (time.time() - start_time):.2f}" + ) + start_time = time.time() + + # Periodic commit + try: + session.commit() + except Exception as e: + logger.critical(f"Error committing stratigraphy records: {e}") + session.rollback() + continue + + # 5. Get the well from database + thing = session.query(Thing).filter(Thing.name == pointid).first() + if not thing: + logger.warning( + f"Well {pointid} not found in database, skipping stratigraphy" + ) + skipped_count += len(strat_group) + continue + + logger.info( + f"Processing {len(strat_group)} stratigraphy layers for well {pointid}" + ) + + # 6. Process each stratigraphy record for this well + for layer_index, row in enumerate(strat_group.itertuples()): + # Validate required fields + # UnitIdentifier + if not hasattr(row, "UnitIdentifier") or not row.UnitIdentifier: + logger.warning( + f"Stratigraphy record {layer_index} for {pointid} has no UnitIdentifier, skipping" + ) + skipped_count += 1 + continue + # StratTop + if not hasattr(row, "StratTop") or row.StratTop is None: + logger.warning( + f"Stratigraphy record {layer_index} for {pointid} has no StratTop, skipping" + ) + skipped_count += 1 + continue + # StratBottom + if not hasattr(row, "StratBottom") or row.StratBottom is None: + logger.warning( + f"Stratigraphy record {layer_index} for {pointid} has no StratBottom, skipping" + ) + skipped_count += 1 + continue + + # Extract formation code + formation_code = row.UnitIdentifier.strip() + + # Validate depth values + try: + top_depth = float(row.StratTop) + bottom_depth = float(row.StratBottom) + except (ValueError, TypeError) as e: + logger.warning( + f"Invalid depth values for {pointid}: StratTop={row.StratTop}, " + f"StratBottom={row.StratBottom}, error: {e}" + ) + skipped_count += 1 + continue + + # Validate depth logic + if top_depth >= bottom_depth: + logger.warning( + f"Invalid depths for {pointid} layer {layer_index}: " + f"top={top_depth} >= bottom={bottom_depth}, skipping" + ) + skipped_count += 1 + continue + + if top_depth < 0: + logger.warning( + f"Negative top depth for {pointid} layer {layer_index}: {top_depth}, skipping" + ) + skipped_count += 1 + continue + + # 7. Get or create the formation + formation = ( + session.query(GeologicFormation) + .filter(GeologicFormation.formation_code == formation_code) + .first() + ) + + if not formation: + # Create new formation if it doesn't exist + logger.info(f"Creating new geologic formation: {formation_code}") + formation = GeologicFormation( + formation_code=formation_code, + description=None, + lithology=None, # Will be set below + ) + session.add(formation) + session.flush() + + # 8. Update formation lithology if available and not already set + if hasattr(row, "Lithology") and row.Lithology: + try: + # Map lithology code to geologic_formation.lithology using ABBREVIATION field + lithology = lexicon_mapper.map_value( + f"LU_Lithology:{row.Lithology}" + ) + + # Update if formation does not have lithology yet + if not formation.lithology: + formation.lithology = lithology + lithology_updates += 1 + logger.info(f"Set lithology for {formation_code}: {lithology}") + elif formation.lithology != lithology: + # Log if there's a mismatch (different lithology for same formation) + logger.warning( + f"Formation {formation_code} has conflicting lithology: " + f"existing='{formation.lithology}', new='{lithology}'." + ) + except KeyError: + logger.warning( + f"Unknown lithology code '{row.Lithology}' for {pointid}, skipping lithology update" + ) + except Exception as e: + logger.warning(f"Error mapping lithology '{row.Lithology}': {e}") + + # 9. Create ThingGeologicFormationAssociation record + try: + formation_assoc = ThingGeologicFormationAssociation( + thing=thing, + geologic_formation=formation, + top_depth=top_depth, + bottom_depth=bottom_depth, + ) + session.add(formation_assoc) + created_count += 1 + + logger.info( + f" Layer {layer_index + 1}: {formation.formation_code} " + f"from {top_depth:.1f} to {bottom_depth:.1f} ft" + ) + + except Exception as e: + logger.critical( + f"Error creating stratigraphy association for {pointid}, " + f"formation {formation_code}: {e}" + ) + errors.append( + { + "pointid": pointid, + "formation": formation_code, + "layer": layer_index, + "error": str(e), + } + ) + skipped_count += 1 + continue + + # 10. Final commit + try: + session.commit() + logger.info( + f"Successfully transferred stratigraphy: " + f"{created_count} associations created, {skipped_count} skipped, " + f"{lithology_updates} lithology fields updated, {len(errors)} errors" + ) + except Exception as e: + logger.critical(f"Error in final commit: {e}") + session.rollback() + + return input_df, cleaned_df, errors diff --git a/transfers/util.py b/transfers/util.py index 8dde3cd4d..4a753e719 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -421,6 +421,7 @@ def _make_lu_to_lexicon_mapper(self): "LU_Depth_CompletionSource", "LU_Discharge_ChemistrySource", "LU_LevelStatus", + "LU_Lithology", "LU_MajorAnalyte", "LU_MeasurementMethod", "LU_MinorTraceAnalyte", From b14e6d05a7f166a6c57bd1745cb4dbe136c2ebda Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Tue, 25 Nov 2025 10:41:54 -0700 Subject: [PATCH 05/17] refactor(transfer): update log levels and error tracking in stratigraphy transfer - Changed log level from 'warning' to 'critical' in the depth validation section - Added error tracking and clearer error messages. --- transfers/aquifer_system_transfer.py | 2 +- transfers/stratigraphy_transfer.py | 29 ++++++++++++++++++++-------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/transfers/aquifer_system_transfer.py b/transfers/aquifer_system_transfer.py index 0de7cc7ae..7c1a42b79 100644 --- a/transfers/aquifer_system_transfer.py +++ b/transfers/aquifer_system_transfer.py @@ -47,7 +47,7 @@ def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple: break # Log progress every 'step' rows - if i and not i % 25: + if i and not i % step: logger.info( f"Processing row {i} of {n}. Avg rows per second: {step / (time.time() - start_time):.2f}" ) diff --git a/transfers/stratigraphy_transfer.py b/transfers/stratigraphy_transfer.py index ac74a2d9c..dbbe3d0bb 100644 --- a/transfers/stratigraphy_transfer.py +++ b/transfers/stratigraphy_transfer.py @@ -137,25 +137,38 @@ def transfer_stratigraphy(session: Session, limit: int = None) -> tuple: top_depth = float(row.StratTop) bottom_depth = float(row.StratBottom) except (ValueError, TypeError) as e: - logger.warning( - f"Invalid depth values for {pointid}: StratTop={row.StratTop}, " - f"StratBottom={row.StratBottom}, error: {e}" + error_msg = f"Invalid depth values: StratTop={row.StratTop}, StratBottom={row.StratBottom}" + logger.critical( + f"{pointid} layer {layer_index}: {error_msg}, error: {e}" + ) + errors.append( + { + "pointid": pointid, + "layer": layer_index, + "error": error_msg, + "details": str(e), # for conversion errors + } ) skipped_count += 1 continue # Validate depth logic if top_depth >= bottom_depth: - logger.warning( - f"Invalid depths for {pointid} layer {layer_index}: " - f"top={top_depth} >= bottom={bottom_depth}, skipping" + error_msg = ( + f"Invalid depth logic: top={top_depth} >= bottom={bottom_depth}" + ) + logger.critical(f"{pointid} layer {layer_index}: {error_msg}") + errors.append( + {"pointid": pointid, "layer": layer_index, "error": error_msg} ) skipped_count += 1 continue if top_depth < 0: - logger.warning( - f"Negative top depth for {pointid} layer {layer_index}: {top_depth}, skipping" + error_msg = f"Negative top depth: {top_depth}" + logger.critical(f"{pointid} layer {layer_index}: {error_msg}") + errors.append( + {"pointid": pointid, "layer": layer_index, "error": error_msg} ) skipped_count += 1 continue From 48830b92ecc743febb188cecc4fd8b4dd1f1c510 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Tue, 25 Nov 2025 11:08:02 -0700 Subject: [PATCH 06/17] refactor(schemas): use enums in `geologic_formation` create schema. --- core/enums.py | 1 + schemas/geologic_formation.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/enums.py b/core/enums.py index 11fc2708e..a5c8be1de 100644 --- a/core/enums.py +++ b/core/enums.py @@ -78,4 +78,5 @@ AquiferType: type[Enum] = build_enum_from_lexicon_category("aquifer_type") GeographicScale: type[Enum] = build_enum_from_lexicon_category("geographic_scale") Lithology: type[Enum] = build_enum_from_lexicon_category("lithology") +FormationCode: type[Enum] = build_enum_from_lexicon_category("formation_code") # ============= EOF ============================================= diff --git a/schemas/geologic_formation.py b/schemas/geologic_formation.py index 6cbc7357b..62e24ee8e 100644 --- a/schemas/geologic_formation.py +++ b/schemas/geologic_formation.py @@ -3,6 +3,7 @@ from pydantic import BaseModel from schemas import BaseResponseModel +from core.enums import FormationCode, Lithology # ------ CREATE ---------- @@ -12,9 +13,9 @@ class CreateGeologicFormation(BaseModel): Used during data transfer and API creation. """ - formation_code: str | None = None + formation_code: FormationCode | None = None description: str | None = None - lithology: str | None = None + lithology: Lithology | None = None boundary: str | None = None From dc9b1ef4d0acbf9750ab5aa0e4477a9463aee2ad Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Tue, 25 Nov 2025 12:16:43 -0700 Subject: [PATCH 07/17] refactor(transfer): update log levels and error tracking in aquifer_system transfer - Changed log level from 'warning' to 'critical' in the depth validation section - Added error tracking --- transfers/aquifer_system_transfer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/transfers/aquifer_system_transfer.py b/transfers/aquifer_system_transfer.py index 7c1a42b79..137b3b00f 100644 --- a/transfers/aquifer_system_transfer.py +++ b/transfers/aquifer_system_transfer.py @@ -66,9 +66,9 @@ def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple: aquifer_name = row.MEANING if not aquifer_name: - logger.warning( - f"Row {i} (code: {aquifer_code}) has no aquifer name (MEANING). Skipping." - ) + error_msg = f"Row {i} (code: {aquifer_code}) has no aquifer name (MEANING)." + logger.critical(error_msg) + errors.append({"row": i, "code": aquifer_code, "error": error_msg}) skipped_count += 1 continue From ee787d15288b60dff3d2a0cd2269032f15e20748 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Tue, 25 Nov 2025 14:39:50 -0700 Subject: [PATCH 08/17] feat(transfer): Add aquifer system and geologic formation associations to well transfer - Implement _extract_aquifer_type_codes() to parse compound codes (e.g., "FC" -> Fractured + Confined) - Add get_or_create_aquifer() helper to manage unique aquifer system records - Add get_or_create_formation() helper to manage geologic formation records - Integrate aquifer association logic in `transfer_wells()` to create ThingAquiferAssociation and AquiferType records - Integrate formation association logic to create ThingGeologicFormationAssociation records with depth data - Support lexicon mapping for both AqClass (aquifer name) and AquiferType (characteristics) fields - Add comprehensive error handling and logging for aquifer/formation associations This enables proper tracking of wells' aquifer systems with multiple type characteristics and their associated geologic formations, preserving all source data from NM_Aquifer. --- transfers/well_transfer.py | 231 +++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index ee54d0216..e713ef3e5 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -36,6 +36,11 @@ StatusHistory, MonitoringFrequencyHistory, MeasuringPointHistory, + AquiferSystem, + AquiferType, + GeologicFormation, + ThingAquiferAssociation, + ThingGeologicFormationAssociation, ) from schemas.thing import CreateWell, CreateWellScreen from services.gcs_helper import get_storage_bucket @@ -117,6 +122,107 @@ def _extract_casing_materials(row) -> list[str]: return materials +# Parse aquifer codes +def _extract_aquifer_type_codes(aquifer_code: str) -> list[str]: + """ + Parse aquifer type codes that may contain multiple values. + + Args: + aquifer_code: Raw code from AquiferType field + + Returns: + List of individual codes + """ + if not aquifer_code: + return [] + # clean the code + code = aquifer_code.strip().upper() + # split into individual characters. This handles cases like "FC" -> ["F", "C"] + individual_codes = list(code) + return individual_codes + + +# Get or create aquifer system +def get_or_create_aquifer_system( + session: Session, aquifer_name: str, primary_type: str +) -> AquiferSystem | None: + """ + Get existing aquifer or create new one if it doesn't exist. + + With the new AquiferType model, we create ONE aquifer record per named + aquifer (e.g., one "Santa Fe Group"), not multiple variants. + + Args: + session: Database session + aquifer_name: Name of the aquifer (from AqClass or type name) + primary_type: Primary aquifer type for the aquifer_type field + """ + # Try to find existing aquifer by name + aquifer = ( + session.query(AquiferSystem).filter(AquiferSystem.name == aquifer_name).first() + ) + + if aquifer: + return aquifer + + # Create new aquifer + try: + logger.info( + f"Creating new aquifer system: {aquifer_name} (primary type: {primary_type})" + ) + + aquifer = AquiferSystem( + name=aquifer_name, + aquifer_type=primary_type, # Primary type + geographic_scale=None, # Default + ) + session.add(aquifer) + session.flush() # Get the ID + return aquifer + except Exception as e: + logger.critical(f"Error creating aquifer {aquifer_name}: {e}") + return None + + +def get_or_create_geologic_formation( + session: Session, formation_code: str +) -> GeologicFormation | None: + """ + Get existing geologic formation or create new one if it doesn't exist. + + Args: + session: Database session + formation_code: The formation code from FormationZone field + + Returns: + GeologicFormation object or None if creation fails + """ + # Try to find existing formation + formation = ( + session.query(GeologicFormation) + .filter(GeologicFormation.formation_code == formation_code) + .first() + ) + + if formation: + return formation + + # If not found, create new formation + try: + logger.info(f"Creating new geologic formation: {formation_code}") + formation = GeologicFormation( + formation_code=formation_code, + description=None, + lithology=None, + ) + session.add(formation) + session.flush() + return formation + except Exception as e: + logger.critical(f"Error creating formation {formation_code}: {e}") + return None + + def get_wells_to_transfer( sess: Session, flags: dict = None ) -> tuple[pd.DataFrame, pd.DataFrame]: @@ -330,6 +436,131 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None assoc.thing = well session.add(assoc) + # --- Create Aquifer Association with AquiferType records --- + if hasattr(row, "AquiferType") and not isna(row.AquiferType): + try: + # Parse codes (handles multi-character codes like "FC") + aquifer_codes = _extract_aquifer_type_codes(row.AquiferType) + + if not aquifer_codes: + logger.warning( + f"Well {row.PointID}: Empty aquifer codes after parsing '{row.AquiferType}'" + ) + else: + # Map AqClass code to aquifer name using lexicon mapper + if hasattr(row, "AqClass") and not isna(row.AqClass): + try: + aquifer_name = lexicon_mapper.map_value( + f"LU_AquiferClass:{row.AqClass}" + ) + except KeyError: + logger.warning( + f"Unknown AqClass code '{row.AqClass}' for well {row.PointID}, using first type as name" + ) + aquifer_name = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_codes[0]}" + ) + else: + # No AqClass - use first code's mapped name as aquifer name + aquifer_name = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_codes[0]}" + ) + + # Determine primary type + try: + primary_type = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_codes[0]}" + ) + except KeyError: + logger.warning( + f"Unknown aquifer type code '{aquifer_codes[0]}' for well {row.PointID}" + ) + primary_type = None + + if primary_type: + # Get or create the aquifer + aquifer = get_or_create_aquifer_system( + session, aquifer_name, primary_type + ) + + if aquifer: + # Check if association already exists + existing_assoc = ( + session.query(ThingAquiferAssociation) + .filter( + ThingAquiferAssociation.thing_id == well.id, + ThingAquiferAssociation.aquifer_system_id + == aquifer.id, + ) + .first() + ) + + if not existing_assoc: + # Create the association + aquifer_assoc = ThingAquiferAssociation( + thing=well, aquifer_system=aquifer + ) + session.add(aquifer_assoc) + session.flush() + + # Create AquiferType records for EACH characteristic + aquifer_type_names = [] + for aquifer_code in aquifer_codes: + try: + type_name = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_code}" + ) + aquifer_type = AquiferType( + thing_aquifer_association=aquifer_assoc, + aquifer_type=type_name, + ) + session.add(aquifer_type) + aquifer_type_names.append(type_name) + except KeyError: + logger.warning( + f"Unknown aquifer code '{aquifer_code}' from AquiferType='{row.AquiferType}' " + f"for well {well.name}. Skipping this code." + ) + + logger.info( + f"Associated well {well.name} with aquifer {aquifer.name} " + f"(types: {', '.join(aquifer_type_names)})" + ) + + except Exception as e: + logger.critical( + f"Error creating aquifer associations for {well.name}: {e}" + ) + + # --- Create Formation Association (if FormationZone exists) --- + if hasattr(row, "FormationZone") and not isna(row.FormationZone): + try: + formation_code = row.FormationZone + formation = get_or_create_geologic_formation(session, formation_code) + + if formation: + top_depth = 0.0 + bottom_depth = ( + row.WellDepth + if row.WellDepth and not isna(row.WellDepth) + else 100.0 + ) + + formation_assoc = ThingGeologicFormationAssociation( + thing=well, + geologic_formation=formation, + top_depth=top_depth, + bottom_depth=bottom_depth, + ) + session.add(formation_assoc) + logger.info( + f"Associated well {well.name} with formation {formation.formation_code} (0-{bottom_depth} ft)" + ) + except Exception as e: + logger.critical( + f"Error creating formation association for {well.name}: {e}" + ) + session.commit() # add things thate need well id From 81fed8a0329bd29758046a506dcabd8786f46e28 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Tue, 25 Nov 2025 20:42:17 -0700 Subject: [PATCH 09/17] refactor(transfer): Update logic related to creating an aquifer_system - Set primary_type placeholder to "Unknown" instead of None when creating an aquifer_system in `well_transfer.py` --- transfers/well_transfer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index e713ef3e5..6c7d4f6e2 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -473,9 +473,10 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None ) except KeyError: logger.warning( - f"Unknown aquifer type code '{aquifer_codes[0]}' for well {row.PointID}" + f"Unknown aquifer type code '{aquifer_codes[0]}' for well {row.PointID}." + f"Setting primary_type to 'Unknown'" ) - primary_type = None + primary_type = "Unknown" # Creates aquifer with placeholder if primary_type: # Get or create the aquifer From cd1d170fcd4a0440f931b5f7b572f9c466f363ea Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Tue, 25 Nov 2025 20:56:27 -0700 Subject: [PATCH 10/17] refactor(transfer): Update logic related to creating formation associations - Updated the logic so that a formation association is created only if valid well depth data exist. --- transfers/well_transfer.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 6c7d4f6e2..4f54b64dc 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -534,18 +534,28 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None ) # --- Create Formation Association (if FormationZone exists) --- + # Note: This creates a single formation association from WellData. + # For detailed stratigraphy with depth intervals, see transfer_stratigraphy() if hasattr(row, "FormationZone") and not isna(row.FormationZone): try: formation_code = row.FormationZone formation = get_or_create_geologic_formation(session, formation_code) if formation: - top_depth = 0.0 - bottom_depth = ( - row.WellDepth - if row.WellDepth and not isna(row.WellDepth) - else 100.0 - ) + # Onlyl create association if valid well depth data exists + if ( + not hasattr(row, "WellDepth") + or isna(row.WellDepth) + or not row.WellDepth + ): + logger.warning( + f"Well {well.name} has FormationZone but no valid WellDepth. " + f"Skipping formation association. Use stratigraphy transfer for detailed depth data." + ) + else: + # Create association using actual well depth + top_depth = 0.0 + bottom_depth = float(row.WellDepth) formation_assoc = ThingGeologicFormationAssociation( thing=well, From e7f0aaa0e3411a64af74b2b2c741e68239385271 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Tue, 25 Nov 2025 21:06:17 -0700 Subject: [PATCH 11/17] refactor(transfer): add note to verify compound aquifer type codes with AMMP It is assumed that the first recorded type of a compound type is the primary type of the aquifer. --- transfers/well_transfer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 4f54b64dc..b4b206073 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -467,6 +467,8 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None ) # Determine primary type + # This assumes the first recorded type of a compound type is the primary type of the aquifer. + # TODO: verify with AMMP try: primary_type = lexicon_mapper.map_value( f"LU_AquiferType:{aquifer_codes[0]}" From 595d1d949a0e7a7db16eef5c79d7991043f2e13b Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Wed, 26 Nov 2025 11:54:53 -0700 Subject: [PATCH 12/17] refactor(transfer): removed unnecessary if statement re: creating aquifers --- transfers/well_transfer.py | 90 +++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index b4b206073..036f84b80 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -480,55 +480,53 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None ) primary_type = "Unknown" # Creates aquifer with placeholder - if primary_type: - # Get or create the aquifer - aquifer = get_or_create_aquifer_system( - session, aquifer_name, primary_type - ) + # Get or create the aquifer + aquifer = get_or_create_aquifer_system( + session, aquifer_name, primary_type + ) - if aquifer: - # Check if association already exists - existing_assoc = ( - session.query(ThingAquiferAssociation) - .filter( - ThingAquiferAssociation.thing_id == well.id, - ThingAquiferAssociation.aquifer_system_id - == aquifer.id, - ) - .first() + if aquifer: + # Check if association already exists + existing_assoc = ( + session.query(ThingAquiferAssociation) + .filter( + ThingAquiferAssociation.thing_id == well.id, + ThingAquiferAssociation.aquifer_system_id == aquifer.id, ) + .first() + ) - if not existing_assoc: - # Create the association - aquifer_assoc = ThingAquiferAssociation( - thing=well, aquifer_system=aquifer - ) - session.add(aquifer_assoc) - session.flush() - - # Create AquiferType records for EACH characteristic - aquifer_type_names = [] - for aquifer_code in aquifer_codes: - try: - type_name = lexicon_mapper.map_value( - f"LU_AquiferType:{aquifer_code}" - ) - aquifer_type = AquiferType( - thing_aquifer_association=aquifer_assoc, - aquifer_type=type_name, - ) - session.add(aquifer_type) - aquifer_type_names.append(type_name) - except KeyError: - logger.warning( - f"Unknown aquifer code '{aquifer_code}' from AquiferType='{row.AquiferType}' " - f"for well {well.name}. Skipping this code." - ) - - logger.info( - f"Associated well {well.name} with aquifer {aquifer.name} " - f"(types: {', '.join(aquifer_type_names)})" - ) + if not existing_assoc: + # Create the association + aquifer_assoc = ThingAquiferAssociation( + thing=well, aquifer_system=aquifer + ) + session.add(aquifer_assoc) + session.flush() + + # Create AquiferType records for EACH characteristic + aquifer_type_names = [] + for aquifer_code in aquifer_codes: + try: + type_name = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_code}" + ) + aquifer_type = AquiferType( + thing_aquifer_association=aquifer_assoc, + aquifer_type=type_name, + ) + session.add(aquifer_type) + aquifer_type_names.append(type_name) + except KeyError: + logger.warning( + f"Unknown aquifer code '{aquifer_code}' from AquiferType='{row.AquiferType}' " + f"for well {well.name}. Skipping this code." + ) + + logger.info( + f"Associated well {well.name} with aquifer {aquifer.name} " + f"(types: {', '.join(aquifer_type_names)})" + ) except Exception as e: logger.critical( From 9a2cdbc1d9c02f74495ffa43d50440d691248e95 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Mon, 1 Dec 2025 09:38:02 -0700 Subject: [PATCH 13/17] refactor(transfer): remove section that creates Formation Associations in `well_transfer.py` The current implementation for creating a Formation Association was problematic for the following reasons: 1. The FormationZone field from the WellData csv indicates completion zone, not the entire well stratigraphy 2. Setting top_depth = 0.0 incorrectly implies: * The formation starts at ground surface * The well only penetrates one formation * The entire well depth is within that single formation 3. ThingGeologicFormationAssociation = currently implies full stratigraphic column with depth intervals 4. Forcing FormationZone into a depth-based association creates misleading data Implementation - Added `formation_completion_code` Field to Thing Model - This provides a clear separation between `formation_completion_code` = "What formation is the well completed in" and `formation_associations` = "What formations does the borehole pass through?" - Updated well_transfer.py so that `ThingGeologicFormationAssociation` records are only being created from the Stratigraphy.csv (they were previously being created from WellData, too). --- db/thing.py | 6 ++++++ transfers/well_transfer.py | 44 +++++++++----------------------------- 2 files changed, 16 insertions(+), 34 deletions(-) diff --git a/db/thing.py b/db/thing.py index cae9363e0..87c706a35 100644 --- a/db/thing.py +++ b/db/thing.py @@ -133,6 +133,12 @@ class Thing( info={"unit": "feet below ground surface"}, comment="Depth of the well pump from ground surface to the pump intake (in feet).", ) + formation_completion_code: Mapped[str] = lexicon_term( + nullable=True, + comment="The geologic formation in which the well was completed (from WellData.FormationZone). " + "This indicates the target formation for the well, not the full stratigraphic column. " + "For detailed depth-interval stratigraphy, see formation_associations.", + ) # TODO: should this be required for every well in the database? AMMP review is_suitable_for_datalogger: Mapped[bool] = mapped_column( nullable=True, diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 80e6a3ca6..5ccc3dde6 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -41,7 +41,6 @@ AquiferType, GeologicFormation, ThingAquiferAssociation, - ThingGeologicFormationAssociation, ) from schemas.thing import CreateWell, CreateWellScreen from services.gcs_helper import get_storage_bucket @@ -558,43 +557,20 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None f"Error creating aquifer associations for {well.name}: {e}" ) - # --- Create Formation Association (if FormationZone exists) --- - # Note: This creates a single formation association from WellData. - # For detailed stratigraphy with depth intervals, see transfer_stratigraphy() + # --- Set Formation Completion (NOT depth-based stratigraphy) --- + # This simply records which formation the well was completed in. + # For detailed depth-interval stratigraphy, see stratigraphy_transfer.py if hasattr(row, "FormationZone") and not isna(row.FormationZone): try: - formation_code = row.FormationZone - formation = get_or_create_geologic_formation(session, formation_code) - - if formation: - # Onlyl create association if valid well depth data exists - if ( - not hasattr(row, "WellDepth") - or isna(row.WellDepth) - or not row.WellDepth - ): - logger.warning( - f"Well {well.name} has FormationZone but no valid WellDepth. " - f"Skipping formation association. Use stratigraphy transfer for detailed depth data." - ) - else: - # Create association using actual well depth - top_depth = 0.0 - bottom_depth = float(row.WellDepth) - - formation_assoc = ThingGeologicFormationAssociation( - thing=well, - geologic_formation=formation, - top_depth=top_depth, - bottom_depth=bottom_depth, - ) - session.add(formation_assoc) - logger.info( - f"Associated well {well.name} with formation {formation.formation_code} (0-{bottom_depth} ft)" - ) + formation_code = row.FormationZone.strip() + # Set the formation_completion_code field directly on the well + well.formation_completion_code = formation_code + logger.info( + f"Set formation_completion_code for {well.name}: {formation_code}" + ) except Exception as e: logger.critical( - f"Error creating formation association for {well.name}: {e}" + f"Error setting formation completion for {well.name}: {e}" ) session.commit() From f6dec153fb6ef88bcc90a98231c5b1c991690520 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Mon, 1 Dec 2025 10:38:51 -0700 Subject: [PATCH 14/17] refactor(transfer): add validation when assigning `formation_completion_code` to a well. --- transfers/well_transfer.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 5ccc3dde6..bb3cb7419 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -562,15 +562,36 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None # For detailed depth-interval stratigraphy, see stratigraphy_transfer.py if hasattr(row, "FormationZone") and not isna(row.FormationZone): try: - formation_code = row.FormationZone.strip() - # Set the formation_completion_code field directly on the well - well.formation_completion_code = formation_code - logger.info( - f"Set formation_completion_code for {well.name}: {formation_code}" + formation_code = row.FormationZone + + # Validate formation exists + formation = ( + session.query(GeologicFormation) + .filter(GeologicFormation.formation_code == formation_code) + .first() ) + + if formation: + # Formation exists: Set association + well.formation_completion_code = formation_code + logger.info( + f"Set completion formation for {well.name}: {formation_code}" + ) + else: + # Formation does NOT exist: Do not create new formation. Flag and log for review + logger.warning( + f"MISSING FORMATION: Formation '{formation_code}' not found for well {well.name}. Flagged for review." + ) + errors.append( + { + "well": well.name, + "error": f"Unknown formation: {formation_code}", + } + ) + except Exception as e: logger.critical( - f"Error setting formation completion for {well.name}: {e}" + f"Error setting completion formation for {well.name}: {e}" ) session.commit() From 64ecb1831feda41dea78865a0fa48366b3449a2a Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Mon, 1 Dec 2025 10:46:18 -0700 Subject: [PATCH 15/17] refactor(schema): add `formation_completion_code` field to CreateWell schema. --- schemas/thing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/schemas/thing.py b/schemas/thing.py index 28b056c82..e7af7995e 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -141,6 +141,7 @@ class CreateWell(CreateBaseThing, ValidateWell): well_construction_method_source: str | None = None well_pump_type: WellPumpType | None = None is_suitable_for_datalogger: bool | None + formation_completion_code: str | None = None class CreateSpring(CreateBaseThing): From 9a87a69d52a296018bb515cbbe3b99d305be69e2 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Mon, 1 Dec 2025 11:22:35 -0700 Subject: [PATCH 16/17] refactor(schema): use `FormationCode` enum for `formation_completion_code` --- schemas/thing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/schemas/thing.py b/schemas/thing.py index e7af7995e..2ee35ad6c 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -28,6 +28,7 @@ MonitoringFrequency, WellConstructionMethod, WellPumpType, + FormationCode, ) from schemas import BaseCreateModel, BaseUpdateModel, BaseResponseModel, PastOrTodayDate from schemas.group import GroupResponse @@ -141,7 +142,7 @@ class CreateWell(CreateBaseThing, ValidateWell): well_construction_method_source: str | None = None well_pump_type: WellPumpType | None = None is_suitable_for_datalogger: bool | None - formation_completion_code: str | None = None + formation_completion_code: FormationCode | None = None class CreateSpring(CreateBaseThing): From 9588627ed96f0132d51897d536777823043bef16 Mon Sep 17 00:00:00 2001 From: ksmuczynski Date: Mon, 1 Dec 2025 11:34:31 -0700 Subject: [PATCH 17/17] refactor(transfer): add missing transfers to `transfer.py` Added aquifer systems, geologic formations, and stratigraphy transfers. --- transfers/transfer.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/transfers/transfer.py b/transfers/transfer.py index 2b576a4b2..29d76cec7 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -31,11 +31,14 @@ from transfers.link_ids_transfer import transfer_link_ids, transfer_link_ids_welldata from transfers.contact_transfer import transfer_contacts from transfers.sensor_transfer import transfer_sensors +from transfers.aquifer_system_transfer import transfer_aquifer_systems +from transfers.geologic_formation_transfer import transfer_geologic_formations from transfers.waterlevels_transfer import transfer_water_levels from transfers.well_transfer import ( transfer_wells, transfer_wellscreens, ) +from transfers.stratigraphy_transfer import transfer_stratigraphy from transfers.permissions_transfer import transfer_permissions from transfers.asset_transfer import transfer_assets @@ -58,6 +61,14 @@ def transfer_all(sess, limit=100): erase_and_rebuild_db() metrics = Metrics() + + # transfer aquifer systems and geologic formations first as well_transfer depend on them + message("TRANSFERRING AQUIFER SYSTEMS") + timeit_direct(transfer_aquifer_systems, sess) + + message("TRANSFERRING GEOLOGIC FORMATIONS") + timeit_direct(transfer_geologic_formations, sess) + message("TRANSFERRING WELLS") flags = { @@ -72,6 +83,9 @@ def transfer_all(sess, limit=100): results = timeit_direct(transfer_wellscreens, sess) metrics.well_screen_metrics(sess, *results) + message("TRANSFERRING STRATIGRAPHY") + timeit_direct(transfer_stratigraphy, sess) + message("TRANSFERRING SENSORS") results = timeit_direct(transfer_sensors, sess) metrics.sensor_metrics(sess, *results)