diff --git a/core/enums.py b/core/enums.py index eb5e399c7..91b206cab 100644 --- a/core/enums.py +++ b/core/enums.py @@ -79,4 +79,5 @@ AquiferType: type[Enum] = build_enum_from_lexicon_category("aquifer_type") GeographicScale: type[Enum] = build_enum_from_lexicon_category("geographic_scale") Lithology: type[Enum] = build_enum_from_lexicon_category("lithology") +FormationCode: type[Enum] = build_enum_from_lexicon_category("formation_code") # ============= EOF ============================================= diff --git a/db/thing.py b/db/thing.py index cae9363e0..87c706a35 100644 --- a/db/thing.py +++ b/db/thing.py @@ -133,6 +133,12 @@ class Thing( info={"unit": "feet below ground surface"}, comment="Depth of the well pump from ground surface to the pump intake (in feet).", ) + formation_completion_code: Mapped[str] = lexicon_term( + nullable=True, + comment="The geologic formation in which the well was completed (from WellData.FormationZone). " + "This indicates the target formation for the well, not the full stratigraphic column. " + "For detailed depth-interval stratigraphy, see formation_associations.", + ) # TODO: should this be required for every well in the database? AMMP review is_suitable_for_datalogger: Mapped[bool] = mapped_column( nullable=True, diff --git a/schemas/geologic_formation.py b/schemas/geologic_formation.py index d42e48389..62e24ee8e 100644 --- a/schemas/geologic_formation.py +++ b/schemas/geologic_formation.py @@ -3,6 +3,20 @@ from pydantic import BaseModel from schemas import BaseResponseModel +from core.enums import FormationCode, Lithology + + +# ------ CREATE ---------- +class CreateGeologicFormation(BaseModel): + """ + Schema for creating a geologic formation. + Used during data transfer and API creation. + """ + + formation_code: FormationCode | None = None + description: str | None = None + lithology: Lithology | None = None + boundary: str | None = None # ------ RESPONSE ---------- diff --git a/schemas/thing.py b/schemas/thing.py index 28b056c82..2ee35ad6c 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -28,6 +28,7 @@ MonitoringFrequency, WellConstructionMethod, WellPumpType, + FormationCode, ) from schemas import BaseCreateModel, BaseUpdateModel, BaseResponseModel, PastOrTodayDate from schemas.group import GroupResponse @@ -141,6 +142,7 @@ class CreateWell(CreateBaseThing, ValidateWell): well_construction_method_source: str | None = None well_pump_type: WellPumpType | None = None is_suitable_for_datalogger: bool | None + formation_completion_code: FormationCode | None = None class CreateSpring(CreateBaseThing): diff --git a/transfers/aquifer_system_transfer.py b/transfers/aquifer_system_transfer.py new file mode 100644 index 000000000..137b3b00f --- /dev/null +++ b/transfers/aquifer_system_transfer.py @@ -0,0 +1,139 @@ +import time +from sqlalchemy.orm import Session +from pydantic import ValidationError + +from db import AquiferSystem +from schemas.aquifer_system import CreateAquiferSystem +from transfers.util import read_csv, replace_nans, logger + + +def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple: + """ + Transfer aquifer system data from LU_AquiferClass CSV to the database. + + This creates the master list of named aquifer systems (e.g., Ogallala Aquifer). the primary_type field is set + to "Unknown" as a placeholder and will be updated during well transfer when we know what type each well encounters. + + This should be run BEFORE well_transfer.py so that aquifer records exist for wells to reference. + + Args: + session (Session): SQLAlchemy database session + limit (int, optional): Limit the number of records to transfer (for testing). + + Returns: + tuple: (input_df, cleaned_df, errors) + """ + # 1. Read the CSV file + input_df = read_csv("LU_AquiferClass") + + # 2. Replace NaNs with NOne + cleaned_df = replace_nans(input_df) + + # 3. Initialize tracking variables for logging + n = len(input_df) + step = 25 + start_time = time.time() + errors = [] + created_count = 0 + skipped_count = 0 + + logger.info(f"Starting transfer of {n} aquifer systems from LU_AquiferClass.") + + # 4. Process each row + for i, row in enumerate(cleaned_df.itertuples()): + # check if limit is reached + if limit and i >= limit: + logger.info(f"Reached limit of {limit} rows. Stopping migration.") + break + + # Log progress every 'step' rows + if i and not i % step: + logger.info( + f"Processing row {i} of {n}. Avg rows per second: {step / (time.time() - start_time):.2f}" + ) + start_time = time.time() + + # Commit progress periodically + try: + session.commit() + except Exception as e: + logger.critical(f"Error committing aquifer system {i}: {e}") + session.rollback() + continue + + # 5. Extract aquifer code and name + aquifer_code = row.CODE + aquifer_name = row.MEANING + + if not aquifer_name: + error_msg = f"Row {i} (code: {aquifer_code}) has no aquifer name (MEANING)." + logger.critical(error_msg) + errors.append({"row": i, "code": aquifer_code, "error": error_msg}) + skipped_count += 1 + continue + + # 6. Check if aquifer system already exists + existing = ( + session.query(AquiferSystem) + .filter(AquiferSystem.name == aquifer_name) + .first() + ) + + if existing: + logger.info( + f"Aquifer '{aquifer_name}' (code: {aquifer_code}) already exists. Skipping." + ) + skipped_count += 1 + continue + + # 7. Prepare data dictionary + try: + data = CreateAquiferSystem( + name=aquifer_name, + description=None, # can be updated later + primary_aquifer_type="Unknown", # placeholder - will be updated during well transfer + ) + + # Validate data using Pydantic schema + CreateAquiferSystem.model_validate(data) + + except ValidationError as e: + errors.append({"code": aquifer_code, "name": aquifer_name, "error": str(e)}) + logger.critical( + f"Error creating aquifer system '{aquifer_name}' (code: {aquifer_code}) (row {i}): {e}" + ) + continue + + # 8. Create database record + aquifer_system = None + try: + aquifer_data = data.model_dump() + aquifer_system = AquiferSystem(**aquifer_data) + session.add(aquifer_system) + created_count += 1 + + logger.info( + f"Created aquifer system: {aquifer_system.name} (code: {aquifer_code})" + ) + + except Exception as e: + if aquifer_system is not None: + session.expunge(aquifer_system) + errors.append({"code": aquifer_code, "name": aquifer_name, "error": str(e)}) + logger.critical( + f"Error creating aquifer system record '{aquifer_name}': {e}" + ) + continue + + # 9. Final commit + try: + session.commit() + logger.info( + f"Successfully transferred {created_count} aquifer systems, skipped {skipped_count}. " + f"Note: primary_type set to 'Unknown' and will be updated during well transfer." + ) + except Exception as e: + logger.critical(f"Error in final commit: {e}") + session.rollback() + + return input_df, cleaned_df, errors diff --git a/transfers/geologic_formation_transfer.py b/transfers/geologic_formation_transfer.py new file mode 100644 index 000000000..724eb79fe --- /dev/null +++ b/transfers/geologic_formation_transfer.py @@ -0,0 +1,141 @@ +import time +from sqlalchemy.orm import Session +from pydantic import ValidationError + +from db import GeologicFormation +from schemas.geologic_formation import CreateGeologicFormation +from transfers.util import read_csv, replace_nans, logger + + +def transfer_geologic_formations(session: Session, limit: int = None) -> tuple: + """ + Transfer geologic formation data from LU_GeologicFormation CSV to the database. + + This should be run BEFORE well_transfer.py so that geologic formation records exist for wells to reference. + + Args: + session (Session): SQLAlchemy database session + limit (int, optional): Optional limit on number of records to transfer (for testing). + + Returns: + tuple: (input_df, cleaned_df, errors) + """ + # 1. Read the CSV file + input_df = read_csv("LU_Formation") + + # 2. Replace NaNs with None + cleaned_df = replace_nans(input_df) + + # 3. Initialize tracking variables for logging + n = len(cleaned_df) + step = 25 + start_time = time.time() + errors = [] + created_count = 0 + skipped_count = 0 + + logger.info(f"Starting transfer of {n} geologic formations") + + # 4. Process each row + for i, row in enumerate(cleaned_df.itertuples()): + # check if limit is reached + if limit and i >= limit: + logger.info(f"Reached limit of {limit} rows. Stopping migration.") + break + + # Log progress every 'step' rows + if i and not i % step: + logger.info( + f"Processing row {i} of {n}. Avg rows per second: {step / (time.time() - start_time):.2f}" + ) + start_time = time.time() + + # Commit progress periodically + try: + session.commit() + except Exception as e: + logger.critical(f"Error committing geologic formations: {e}") + session.rollback() + continue + + # 5. Extract formation code and description + formation_code = row.Code + + if not formation_code: + logger.warning(f"Skipping row {i}: Missing formation code") + skipped_count += 1 + continue + + # Check if this formation already exists + existing = ( + session.query(GeologicFormation) + .filter(GeologicFormation.formation_code == formation_code) + .first() + ) + + if existing: + logger.info( + f"Skipping row {i}: Formation code {formation_code} already exists" + ) + skipped_count += 1 + continue + + # 6. Prepare data for creation + # Note: We only store the formation_code. Formation names will be mapped by the API using a + # formations.json file from authoritative sources (e.g., USGS). + # The description field is left as None and can be populated later if needed. + # Note: lithology is set to None here and will be updated during stratigraphy transfer + try: + data = CreateGeologicFormation( + formation_code=formation_code, + description=None, # Not storing from legacy data + lithology=None, # Will be populated from Stratigraphy.csv + ) + + # Validate the data using Pydantic schema + CreateGeologicFormation.model_validate(data) + + except ValidationError as e: + errors.append({"code": formation_code, "errors": e.errors()}) + logger.critical( + f"Validation error for row {i} with Code {formation_code}: {e.errors()}" + ) + continue + except Exception as e: + errors.append({"code": formation_code, "errors": str(e)}) + logger.critical(f"Error preparing data for {formation_code}: {e}") + continue + + # 7. Create database object + geologic_formation = None + try: + formation_data = data.model_dump() + geologic_formation = GeologicFormation(**formation_data) + session.add(geologic_formation) + created_count += 1 + + logger.info( + f"Created geologic formation: {geologic_formation.formation_code}" + ) + + except Exception as e: + if geologic_formation is not None: + session.expunge(geologic_formation) + errors.append({"code": formation_code, "error": str(e)}) + logger.critical( + f"Error creating geologic formation for {formation_code}: {e}" + ) + continue + + # 8. Final commit + try: + session.commit() + logger.info( + f"Successfully transferred {created_count} geologic formations, skipped {skipped_count}. " + f"Note: lithology is None and will be updated during stratigraphy transfer." + ) + except Exception as e: + logger.critical(f"Error during final commit of geologic formations: {e}") + session.rollback() + + return input_df, cleaned_df, errors diff --git a/transfers/stratigraphy_transfer.py b/transfers/stratigraphy_transfer.py new file mode 100644 index 000000000..dbbe3d0bb --- /dev/null +++ b/transfers/stratigraphy_transfer.py @@ -0,0 +1,264 @@ +""" +Transfer script for stratigraphy (lithology log) data. + +This creates ThingGeologicFormationAssociation records from the Stratigraphy CSV, which contains depth-specific +formation information for wells. It also updates the GeologicFormation.lithology field based on the +Stratigraphy.Lithology data. +""" + +import time +from sqlalchemy.orm import Session + +from db import Thing, GeologicFormation, ThingGeologicFormationAssociation +from transfers.util import ( + read_csv, + replace_nans, + filter_to_valid_point_ids, + lexicon_mapper, + logger, +) + + +def transfer_stratigraphy(session: Session, limit: int = None) -> tuple: + """ + Transfer detailed stratigraphy (lithology log) data from Stratigraphy CSV. + + The Stratigraphy CSV contains multiple rows per well, each representing a + depth interval, the formation encountered, and its lithology. + + Fields used: + - PointID: Links to the well + - UnitIdentifier: Formation code (maps to LU_Formations) + - StratTop: Top depth of the layer (feet below ground surface) + - StratBottom: Bottom depth of the layer (feet below ground surface) + - Lithology: Lithology code (maps to LU_Lithology via ABBREVIATION field) + + This should be run AFTER: + 1. transfer_geologic_formations.py (so formations exist) + 2. transfer_wells.py (so wells exist) + + Args: + session: Database session + limit: Optional limit on number of WELLS to process (for testing) + + Returns: + tuple: (input_df, cleaned_df, errors) + """ + # 1. Read and clean data + input_df = read_csv("Stratigraphy") + cleaned_df = replace_nans(input_df) + + # Step 2: Filter to only wells that exist in database + cleaned_df = filter_to_valid_point_ids(session, cleaned_df) + + n_records = len(cleaned_df) + n_wells = len(cleaned_df["PointID"].unique()) + + logger.info( + f"Starting transfer of {n_records} stratigraphy records for {n_wells} wells" + ) + + # 3. Initialize tracking variables for logging + step = 25 + start_time = time.time() + errors = [] + created_count = 0 + skipped_count = 0 + lithology_updates = 0 + + # Step 4: Group by well for efficient processing + well_groups = cleaned_df.groupby("PointID") + + for well_index, (pointid, strat_group) in enumerate(well_groups): + # Check limit (on number of wells, not records) + if limit and well_index >= limit: + logger.info(f"Reached limit of {limit} wells. Stopping.") + break + + # Progress logging every 25 wells + if well_index and not well_index % step: + logger.info( + f"Processing well {well_index} of {n_wells}, " + f"avg wells per second: {step / (time.time() - start_time):.2f}" + ) + start_time = time.time() + + # Periodic commit + try: + session.commit() + except Exception as e: + logger.critical(f"Error committing stratigraphy records: {e}") + session.rollback() + continue + + # 5. Get the well from database + thing = session.query(Thing).filter(Thing.name == pointid).first() + if not thing: + logger.warning( + f"Well {pointid} not found in database, skipping stratigraphy" + ) + skipped_count += len(strat_group) + continue + + logger.info( + f"Processing {len(strat_group)} stratigraphy layers for well {pointid}" + ) + + # 6. Process each stratigraphy record for this well + for layer_index, row in enumerate(strat_group.itertuples()): + # Validate required fields + # UnitIdentifier + if not hasattr(row, "UnitIdentifier") or not row.UnitIdentifier: + logger.warning( + f"Stratigraphy record {layer_index} for {pointid} has no UnitIdentifier, skipping" + ) + skipped_count += 1 + continue + # StratTop + if not hasattr(row, "StratTop") or row.StratTop is None: + logger.warning( + f"Stratigraphy record {layer_index} for {pointid} has no StratTop, skipping" + ) + skipped_count += 1 + continue + # StratBottom + if not hasattr(row, "StratBottom") or row.StratBottom is None: + logger.warning( + f"Stratigraphy record {layer_index} for {pointid} has no StratBottom, skipping" + ) + skipped_count += 1 + continue + + # Extract formation code + formation_code = row.UnitIdentifier.strip() + + # Validate depth values + try: + top_depth = float(row.StratTop) + bottom_depth = float(row.StratBottom) + except (ValueError, TypeError) as e: + error_msg = f"Invalid depth values: StratTop={row.StratTop}, StratBottom={row.StratBottom}" + logger.critical( + f"{pointid} layer {layer_index}: {error_msg}, error: {e}" + ) + errors.append( + { + "pointid": pointid, + "layer": layer_index, + "error": error_msg, + "details": str(e), # for conversion errors + } + ) + skipped_count += 1 + continue + + # Validate depth logic + if top_depth >= bottom_depth: + error_msg = ( + f"Invalid depth logic: top={top_depth} >= bottom={bottom_depth}" + ) + logger.critical(f"{pointid} layer {layer_index}: {error_msg}") + errors.append( + {"pointid": pointid, "layer": layer_index, "error": error_msg} + ) + skipped_count += 1 + continue + + if top_depth < 0: + error_msg = f"Negative top depth: {top_depth}" + logger.critical(f"{pointid} layer {layer_index}: {error_msg}") + errors.append( + {"pointid": pointid, "layer": layer_index, "error": error_msg} + ) + skipped_count += 1 + continue + + # 7. Get or create the formation + formation = ( + session.query(GeologicFormation) + .filter(GeologicFormation.formation_code == formation_code) + .first() + ) + + if not formation: + # Create new formation if it doesn't exist + logger.info(f"Creating new geologic formation: {formation_code}") + formation = GeologicFormation( + formation_code=formation_code, + description=None, + lithology=None, # Will be set below + ) + session.add(formation) + session.flush() + + # 8. Update formation lithology if available and not already set + if hasattr(row, "Lithology") and row.Lithology: + try: + # Map lithology code to geologic_formation.lithology using ABBREVIATION field + lithology = lexicon_mapper.map_value( + f"LU_Lithology:{row.Lithology}" + ) + + # Update if formation does not have lithology yet + if not formation.lithology: + formation.lithology = lithology + lithology_updates += 1 + logger.info(f"Set lithology for {formation_code}: {lithology}") + elif formation.lithology != lithology: + # Log if there's a mismatch (different lithology for same formation) + logger.warning( + f"Formation {formation_code} has conflicting lithology: " + f"existing='{formation.lithology}', new='{lithology}'." + ) + except KeyError: + logger.warning( + f"Unknown lithology code '{row.Lithology}' for {pointid}, skipping lithology update" + ) + except Exception as e: + logger.warning(f"Error mapping lithology '{row.Lithology}': {e}") + + # 9. Create ThingGeologicFormationAssociation record + try: + formation_assoc = ThingGeologicFormationAssociation( + thing=thing, + geologic_formation=formation, + top_depth=top_depth, + bottom_depth=bottom_depth, + ) + session.add(formation_assoc) + created_count += 1 + + logger.info( + f" Layer {layer_index + 1}: {formation.formation_code} " + f"from {top_depth:.1f} to {bottom_depth:.1f} ft" + ) + + except Exception as e: + logger.critical( + f"Error creating stratigraphy association for {pointid}, " + f"formation {formation_code}: {e}" + ) + errors.append( + { + "pointid": pointid, + "formation": formation_code, + "layer": layer_index, + "error": str(e), + } + ) + skipped_count += 1 + continue + + # 10. Final commit + try: + session.commit() + logger.info( + f"Successfully transferred stratigraphy: " + f"{created_count} associations created, {skipped_count} skipped, " + f"{lithology_updates} lithology fields updated, {len(errors)} errors" + ) + except Exception as e: + logger.critical(f"Error in final commit: {e}") + session.rollback() + + return input_df, cleaned_df, errors diff --git a/transfers/transfer.py b/transfers/transfer.py index 2b576a4b2..29d76cec7 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -31,11 +31,14 @@ from transfers.link_ids_transfer import transfer_link_ids, transfer_link_ids_welldata from transfers.contact_transfer import transfer_contacts from transfers.sensor_transfer import transfer_sensors +from transfers.aquifer_system_transfer import transfer_aquifer_systems +from transfers.geologic_formation_transfer import transfer_geologic_formations from transfers.waterlevels_transfer import transfer_water_levels from transfers.well_transfer import ( transfer_wells, transfer_wellscreens, ) +from transfers.stratigraphy_transfer import transfer_stratigraphy from transfers.permissions_transfer import transfer_permissions from transfers.asset_transfer import transfer_assets @@ -58,6 +61,14 @@ def transfer_all(sess, limit=100): erase_and_rebuild_db() metrics = Metrics() + + # transfer aquifer systems and geologic formations first as well_transfer depend on them + message("TRANSFERRING AQUIFER SYSTEMS") + timeit_direct(transfer_aquifer_systems, sess) + + message("TRANSFERRING GEOLOGIC FORMATIONS") + timeit_direct(transfer_geologic_formations, sess) + message("TRANSFERRING WELLS") flags = { @@ -72,6 +83,9 @@ def transfer_all(sess, limit=100): results = timeit_direct(transfer_wellscreens, sess) metrics.well_screen_metrics(sess, *results) + message("TRANSFERRING STRATIGRAPHY") + timeit_direct(transfer_stratigraphy, sess) + message("TRANSFERRING SENSORS") results = timeit_direct(transfer_sensors, sess) metrics.sensor_metrics(sess, *results) diff --git a/transfers/util.py b/transfers/util.py index 9b2afc84c..2217df5c1 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -408,6 +408,8 @@ def _make_lu_to_lexicon_mapper(self): # Lookup tables where CODE maps to MEANING lu_tables = [ "LU_AltitudeMethod", + "LU_AquiferClass", + "LU_AquiferType", "LU_CollectionMethod", "LU_ConstructionMethod", "LU_CoordinateAccuracy", @@ -418,6 +420,7 @@ def _make_lu_to_lexicon_mapper(self): "LU_Depth_CompletionSource", "LU_Discharge_ChemistrySource", "LU_LevelStatus", + "LU_Lithology", "LU_MajorAnalyte", "LU_MeasurementMethod", "LU_MinorTraceAnalyte", diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 9e684cc8d..bb3cb7419 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -37,6 +37,10 @@ MonitoringFrequencyHistory, MeasuringPointHistory, DataProvenance, + AquiferSystem, + AquiferType, + GeologicFormation, + ThingAquiferAssociation, ) from schemas.thing import CreateWell, CreateWellScreen from services.gcs_helper import get_storage_bucket @@ -133,6 +137,107 @@ def _extract_well_pump_type(row) -> str | None: return None +# Parse aquifer codes +def _extract_aquifer_type_codes(aquifer_code: str) -> list[str]: + """ + Parse aquifer type codes that may contain multiple values. + + Args: + aquifer_code: Raw code from AquiferType field + + Returns: + List of individual codes + """ + if not aquifer_code: + return [] + # clean the code + code = aquifer_code.strip().upper() + # split into individual characters. This handles cases like "FC" -> ["F", "C"] + individual_codes = list(code) + return individual_codes + + +# Get or create aquifer system +def get_or_create_aquifer_system( + session: Session, aquifer_name: str, primary_type: str +) -> AquiferSystem | None: + """ + Get existing aquifer or create new one if it doesn't exist. + + With the new AquiferType model, we create ONE aquifer record per named + aquifer (e.g., one "Santa Fe Group"), not multiple variants. + + Args: + session: Database session + aquifer_name: Name of the aquifer (from AqClass or type name) + primary_type: Primary aquifer type for the aquifer_type field + """ + # Try to find existing aquifer by name + aquifer = ( + session.query(AquiferSystem).filter(AquiferSystem.name == aquifer_name).first() + ) + + if aquifer: + return aquifer + + # Create new aquifer + try: + logger.info( + f"Creating new aquifer system: {aquifer_name} (primary type: {primary_type})" + ) + + aquifer = AquiferSystem( + name=aquifer_name, + aquifer_type=primary_type, # Primary type + geographic_scale=None, # Default + ) + session.add(aquifer) + session.flush() # Get the ID + return aquifer + except Exception as e: + logger.critical(f"Error creating aquifer {aquifer_name}: {e}") + return None + + +def get_or_create_geologic_formation( + session: Session, formation_code: str +) -> GeologicFormation | None: + """ + Get existing geologic formation or create new one if it doesn't exist. + + Args: + session: Database session + formation_code: The formation code from FormationZone field + + Returns: + GeologicFormation object or None if creation fails + """ + # Try to find existing formation + formation = ( + session.query(GeologicFormation) + .filter(GeologicFormation.formation_code == formation_code) + .first() + ) + + if formation: + return formation + + # If not found, create new formation + try: + logger.info(f"Creating new geologic formation: {formation_code}") + formation = GeologicFormation( + formation_code=formation_code, + description=None, + lithology=None, + ) + session.add(formation) + session.flush() + return formation + except Exception as e: + logger.critical(f"Error creating formation {formation_code}: {e}") + return None + + def get_wells_to_transfer( sess: Session, flags: dict = None ) -> tuple[pd.DataFrame, pd.DataFrame]: @@ -355,6 +460,140 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None assoc.thing = well session.add(assoc) + # --- Create Aquifer Association with AquiferType records --- + if hasattr(row, "AquiferType") and not isna(row.AquiferType): + try: + # Parse codes (handles multi-character codes like "FC") + aquifer_codes = _extract_aquifer_type_codes(row.AquiferType) + + if not aquifer_codes: + logger.warning( + f"Well {row.PointID}: Empty aquifer codes after parsing '{row.AquiferType}'" + ) + else: + # Map AqClass code to aquifer name using lexicon mapper + if hasattr(row, "AqClass") and not isna(row.AqClass): + try: + aquifer_name = lexicon_mapper.map_value( + f"LU_AquiferClass:{row.AqClass}" + ) + except KeyError: + logger.warning( + f"Unknown AqClass code '{row.AqClass}' for well {row.PointID}, using first type as name" + ) + aquifer_name = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_codes[0]}" + ) + else: + # No AqClass - use first code's mapped name as aquifer name + aquifer_name = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_codes[0]}" + ) + + # Determine primary type + # This assumes the first recorded type of a compound type is the primary type of the aquifer. + # TODO: verify with AMMP + try: + primary_type = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_codes[0]}" + ) + except KeyError: + logger.warning( + f"Unknown aquifer type code '{aquifer_codes[0]}' for well {row.PointID}." + f"Setting primary_type to 'Unknown'" + ) + primary_type = "Unknown" # Creates aquifer with placeholder + + # Get or create the aquifer + aquifer = get_or_create_aquifer_system( + session, aquifer_name, primary_type + ) + + if aquifer: + # Check if association already exists + existing_assoc = ( + session.query(ThingAquiferAssociation) + .filter( + ThingAquiferAssociation.thing_id == well.id, + ThingAquiferAssociation.aquifer_system_id == aquifer.id, + ) + .first() + ) + + if not existing_assoc: + # Create the association + aquifer_assoc = ThingAquiferAssociation( + thing=well, aquifer_system=aquifer + ) + session.add(aquifer_assoc) + session.flush() + + # Create AquiferType records for EACH characteristic + aquifer_type_names = [] + for aquifer_code in aquifer_codes: + try: + type_name = lexicon_mapper.map_value( + f"LU_AquiferType:{aquifer_code}" + ) + aquifer_type = AquiferType( + thing_aquifer_association=aquifer_assoc, + aquifer_type=type_name, + ) + session.add(aquifer_type) + aquifer_type_names.append(type_name) + except KeyError: + logger.warning( + f"Unknown aquifer code '{aquifer_code}' from AquiferType='{row.AquiferType}' " + f"for well {well.name}. Skipping this code." + ) + + logger.info( + f"Associated well {well.name} with aquifer {aquifer.name} " + f"(types: {', '.join(aquifer_type_names)})" + ) + + except Exception as e: + logger.critical( + f"Error creating aquifer associations for {well.name}: {e}" + ) + + # --- Set Formation Completion (NOT depth-based stratigraphy) --- + # This simply records which formation the well was completed in. + # For detailed depth-interval stratigraphy, see stratigraphy_transfer.py + if hasattr(row, "FormationZone") and not isna(row.FormationZone): + try: + formation_code = row.FormationZone + + # Validate formation exists + formation = ( + session.query(GeologicFormation) + .filter(GeologicFormation.formation_code == formation_code) + .first() + ) + + if formation: + # Formation exists: Set association + well.formation_completion_code = formation_code + logger.info( + f"Set completion formation for {well.name}: {formation_code}" + ) + else: + # Formation does NOT exist: Do not create new formation. Flag and log for review + logger.warning( + f"MISSING FORMATION: Formation '{formation_code}' not found for well {well.name}. Flagged for review." + ) + errors.append( + { + "well": well.name, + "error": f"Unknown formation: {formation_code}", + } + ) + + except Exception as e: + logger.critical( + f"Error setting completion formation for {well.name}: {e}" + ) + session.commit() # add things thate need well id