Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2991591
feat(transfers): add `LU_AquiferClass` and `LU_AquiferType` lookup ta…
ksmuczynski Nov 20, 2025
00e7225
feat(schemas): implement Create schemas for `aquifer_system` and `geo…
ksmuczynski Nov 21, 2025
9c924ae
Merge branch 'kas-227-231-additional-well-info-models' into kas-227-2…
ksmuczynski Nov 23, 2025
af07c3e
feat(transfer): WIP aquifer and geology transfers
ksmuczynski Nov 24, 2025
b1933e1
Merge branch 'bdms-227' into kas-227-231-additional-well-info-transfe…
ksmuczynski Nov 24, 2025
748a57b
feat(transfer): add stratigraphy transfer script and update geologic …
ksmuczynski Nov 24, 2025
b14e6d0
refactor(transfer): update log levels and error tracking in stratigra…
ksmuczynski Nov 25, 2025
48830b9
refactor(schemas): use enums in `geologic_formation` create schema.
ksmuczynski Nov 25, 2025
dc9b1ef
refactor(transfer): update log levels and error tracking in aquifer_s…
ksmuczynski Nov 25, 2025
ee787d1
feat(transfer): Add aquifer system and geologic formation association…
ksmuczynski Nov 25, 2025
81fed8a
refactor(transfer): Update logic related to creating an aquifer_system
ksmuczynski Nov 26, 2025
cd1d170
refactor(transfer): Update logic related to creating formation associ…
ksmuczynski Nov 26, 2025
e7f0aaa
refactor(transfer): add note to verify compound aquifer type codes wi…
ksmuczynski Nov 26, 2025
595d1d9
refactor(transfer): removed unnecessary if statement re: creating aqu…
ksmuczynski Nov 26, 2025
091a77e
Merge branch 'bdms-227' into kas-227-231-additional-well-info-transfe…
ksmuczynski Nov 26, 2025
9a2cdbc
refactor(transfer): remove section that creates Formation Association…
ksmuczynski Dec 1, 2025
f6dec15
refactor(transfer): add validation when assigning `formation_completi…
ksmuczynski Dec 1, 2025
64ecb18
refactor(schema): add `formation_completion_code` field to CreateWell…
ksmuczynski Dec 1, 2025
9a87a69
refactor(schema): use `FormationCode` enum for `formation_completion_…
ksmuczynski Dec 1, 2025
9588627
refactor(transfer): add missing transfers to `transfer.py`
ksmuczynski Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@
AquiferType: type[Enum] = build_enum_from_lexicon_category("aquifer_type")
GeographicScale: type[Enum] = build_enum_from_lexicon_category("geographic_scale")
Lithology: type[Enum] = build_enum_from_lexicon_category("lithology")
FormationCode: type[Enum] = build_enum_from_lexicon_category("formation_code")
# ============= EOF =============================================
6 changes: 6 additions & 0 deletions db/thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class Thing(
info={"unit": "feet below ground surface"},
comment="Depth of the well pump from ground surface to the pump intake (in feet).",
)
formation_completion_code: Mapped[str] = lexicon_term(
nullable=True,
comment="The geologic formation in which the well was completed (from WellData.FormationZone). "
"This indicates the target formation for the well, not the full stratigraphic column. "
"For detailed depth-interval stratigraphy, see formation_associations.",
)
# TODO: should this be required for every well in the database? AMMP review
is_suitable_for_datalogger: Mapped[bool] = mapped_column(
nullable=True,
Expand Down
14 changes: 14 additions & 0 deletions schemas/geologic_formation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@
from pydantic import BaseModel

from schemas import BaseResponseModel
from core.enums import FormationCode, Lithology


# ------ CREATE ----------
class CreateGeologicFormation(BaseModel):
"""
Schema for creating a geologic formation.
Used during data transfer and API creation.
"""

formation_code: FormationCode | None = None
description: str | None = None
lithology: Lithology | None = None
boundary: str | None = None


# ------ RESPONSE ----------
Expand Down
2 changes: 2 additions & 0 deletions schemas/thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
MonitoringFrequency,
WellConstructionMethod,
WellPumpType,
FormationCode,
)
from schemas import BaseCreateModel, BaseUpdateModel, BaseResponseModel, PastOrTodayDate
from schemas.group import GroupResponse
Expand Down Expand Up @@ -141,6 +142,7 @@ class CreateWell(CreateBaseThing, ValidateWell):
well_construction_method_source: str | None = None
well_pump_type: WellPumpType | None = None
is_suitable_for_datalogger: bool | None
formation_completion_code: FormationCode | None = None


class CreateSpring(CreateBaseThing):
Expand Down
139 changes: 139 additions & 0 deletions transfers/aquifer_system_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import time
from sqlalchemy.orm import Session
from pydantic import ValidationError

from db import AquiferSystem
from schemas.aquifer_system import CreateAquiferSystem
from transfers.util import read_csv, replace_nans, logger


def transfer_aquifer_systems(session: Session, limit: int = None) -> tuple:
"""
Transfer aquifer system data from LU_AquiferClass CSV to the database.

This creates the master list of named aquifer systems (e.g., Ogallala Aquifer). the primary_type field is set
to "Unknown" as a placeholder and will be updated during well transfer when we know what type each well encounters.

This should be run BEFORE well_transfer.py so that aquifer records exist for wells to reference.

Args:
session (Session): SQLAlchemy database session
limit (int, optional): Limit the number of records to transfer (for testing).

Returns:
tuple: (input_df, cleaned_df, errors)
"""
# 1. Read the CSV file
input_df = read_csv("LU_AquiferClass")

# 2. Replace NaNs with NOne
cleaned_df = replace_nans(input_df)

# 3. Initialize tracking variables for logging
n = len(input_df)
step = 25
start_time = time.time()
errors = []
created_count = 0
skipped_count = 0

logger.info(f"Starting transfer of {n} aquifer systems from LU_AquiferClass.")

# 4. Process each row
for i, row in enumerate(cleaned_df.itertuples()):
# check if limit is reached
if limit and i >= limit:
logger.info(f"Reached limit of {limit} rows. Stopping migration.")
break

# Log progress every 'step' rows
if i and not i % step:
logger.info(
f"Processing row {i} of {n}. Avg rows per second: {step / (time.time() - start_time):.2f}"
)
start_time = time.time()

# Commit progress periodically
try:
session.commit()
except Exception as e:
logger.critical(f"Error committing aquifer system {i}: {e}")
session.rollback()
continue

# 5. Extract aquifer code and name
aquifer_code = row.CODE
aquifer_name = row.MEANING

if not aquifer_name:
error_msg = f"Row {i} (code: {aquifer_code}) has no aquifer name (MEANING)."
logger.critical(error_msg)
errors.append({"row": i, "code": aquifer_code, "error": error_msg})
skipped_count += 1
continue

# 6. Check if aquifer system already exists
existing = (
session.query(AquiferSystem)
.filter(AquiferSystem.name == aquifer_name)
.first()
)

if existing:
logger.info(
f"Aquifer '{aquifer_name}' (code: {aquifer_code}) already exists. Skipping."
)
skipped_count += 1
continue

# 7. Prepare data dictionary
try:
data = CreateAquiferSystem(
name=aquifer_name,
description=None, # can be updated later
primary_aquifer_type="Unknown", # placeholder - will be updated during well transfer
)

# Validate data using Pydantic schema
CreateAquiferSystem.model_validate(data)

except ValidationError as e:
errors.append({"code": aquifer_code, "name": aquifer_name, "error": str(e)})
logger.critical(
f"Error creating aquifer system '{aquifer_name}' (code: {aquifer_code}) (row {i}): {e}"
)
continue

# 8. Create database record
aquifer_system = None
try:
aquifer_data = data.model_dump()
aquifer_system = AquiferSystem(**aquifer_data)
session.add(aquifer_system)
created_count += 1

logger.info(
f"Created aquifer system: {aquifer_system.name} (code: {aquifer_code})"
)

except Exception as e:
if aquifer_system is not None:
session.expunge(aquifer_system)
errors.append({"code": aquifer_code, "name": aquifer_name, "error": str(e)})
logger.critical(
f"Error creating aquifer system record '{aquifer_name}': {e}"
)
continue

# 9. Final commit
try:
session.commit()
logger.info(
f"Successfully transferred {created_count} aquifer systems, skipped {skipped_count}. "
f"Note: primary_type set to 'Unknown' and will be updated during well transfer."
)
except Exception as e:
logger.critical(f"Error in final commit: {e}")
session.rollback()

return input_df, cleaned_df, errors
141 changes: 141 additions & 0 deletions transfers/geologic_formation_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import time
from sqlalchemy.orm import Session
from pydantic import ValidationError

from db import GeologicFormation
from schemas.geologic_formation import CreateGeologicFormation
from transfers.util import read_csv, replace_nans, logger


def transfer_geologic_formations(session: Session, limit: int = None) -> tuple:
"""
Transfer geologic formation data from LU_GeologicFormation CSV to the database.

This should be run BEFORE well_transfer.py so that geologic formation records exist for wells to reference.

Args:
session (Session): SQLAlchemy database session
limit (int, optional): Optional limit on number of records to transfer (for testing).

Returns:
tuple: (input_df, cleaned_df, errors)
"""
# 1. Read the CSV file
input_df = read_csv("LU_Formation")

# 2. Replace NaNs with None
cleaned_df = replace_nans(input_df)

# 3. Initialize tracking variables for logging
n = len(cleaned_df)
step = 25
start_time = time.time()
errors = []
created_count = 0
skipped_count = 0

logger.info(f"Starting transfer of {n} geologic formations")

# 4. Process each row
for i, row in enumerate(cleaned_df.itertuples()):
# check if limit is reached
if limit and i >= limit:
logger.info(f"Reached limit of {limit} rows. Stopping migration.")
break

# Log progress every 'step' rows
if i and not i % step:
logger.info(
f"Processing row {i} of {n}. Avg rows per second: {step / (time.time() - start_time):.2f}"
)
start_time = time.time()

# Commit progress periodically
try:
session.commit()
except Exception as e:
logger.critical(f"Error committing geologic formations: {e}")
session.rollback()
continue

# 5. Extract formation code and description
formation_code = row.Code

if not formation_code:
logger.warning(f"Skipping row {i}: Missing formation code")
skipped_count += 1
continue

# Check if this formation already exists
existing = (
session.query(GeologicFormation)
.filter(GeologicFormation.formation_code == formation_code)
.first()
)

if existing:
logger.info(
f"Skipping row {i}: Formation code {formation_code} already exists"
)
skipped_count += 1
continue

# 6. Prepare data for creation
# Note: We only store the formation_code. Formation names will be mapped by the API using a
# formations.json file from authoritative sources (e.g., USGS).
# The description field is left as None and can be populated later if needed.
# Note: lithology is set to None here and will be updated during stratigraphy transfer
try:
data = CreateGeologicFormation(
formation_code=formation_code,
description=None, # Not storing from legacy data
lithology=None, # Will be populated from Stratigraphy.csv
)

# Validate the data using Pydantic schema
CreateGeologicFormation.model_validate(data)

except ValidationError as e:
errors.append({"code": formation_code, "errors": e.errors()})
logger.critical(
f"Validation error for row {i} with Code {formation_code}: {e.errors()}"
)
continue
except Exception as e:
errors.append({"code": formation_code, "errors": str(e)})
logger.critical(f"Error preparing data for {formation_code}: {e}")
continue

# 7. Create database object
geologic_formation = None
try:
formation_data = data.model_dump()
geologic_formation = GeologicFormation(**formation_data)
session.add(geologic_formation)
created_count += 1

logger.info(
f"Created geologic formation: {geologic_formation.formation_code}"
)

except Exception as e:
if geologic_formation is not None:
session.expunge(geologic_formation)
errors.append({"code": formation_code, "error": str(e)})
logger.critical(
f"Error creating geologic formation for {formation_code}: {e}"
)
continue

# 8. Final commit
try:
session.commit()
logger.info(
f"Successfully transferred {created_count} geologic formations, skipped {skipped_count}. "
f"Note: lithology is None and will be updated during stratigraphy transfer."
)
except Exception as e:
logger.critical(f"Error during final commit of geologic formations: {e}")
session.rollback()

return input_df, cleaned_df, errors
Loading
Loading