diff --git a/.gitignore b/.gitignore index ec05f5cce..c1d8db1ee 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ transfers/data/nma_csv_cache/* tests/features/*.feature transfers/metrics/* transfers/logs/* +run_bdd-local.sh # deployment files diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8ea7e9413..5d74e6a6c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,15 +16,15 @@ repos: '--statistics' ] exclude: ^db/__init__.py$ # all models need to be imported for Alembic, but are not used directly -# - repo: local -# hooks: -# - id: pytest -# name: pytest -# entry: pytest # Or your specific test command, e.g., poetry run pytest -# language: system -# types: [python] # Specify relevant file types for your tests -# pass_filenames: false -# always_run: true + - repo: local + hooks: + - id: pytest + name: pytest + entry: pytest # Or your specific test command, e.g., poetry run pytest + language: system + types: [python] # Specify relevant file types for your tests + pass_filenames: false + always_run: true # - repo: https://github.com/pre-commit/mirrors-mypy # rev: v1.10.0 # Use the latest stable version or pin to your preference diff --git a/core/app.py b/core/app.py index 78b33f887..b0e0184fe 100644 --- a/core/app.py +++ b/core/app.py @@ -17,8 +17,6 @@ from contextlib import asynccontextmanager from typing import AsyncGenerator -from db.engine import session_ctx - from fastapi import FastAPI from fastapi.openapi.docs import ( get_swagger_ui_html, @@ -27,8 +25,6 @@ from fastapi.openapi.utils import get_openapi from .initializers import ( - init_lexicon, - init_parameter, register_routes, erase_and_rebuild_db, ) @@ -41,10 +37,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: Application lifespan event handler to initialize the database and lexicon. """ if settings.get_enum("MODE") == "development": - with session_ctx() as session: - erase_and_rebuild_db(session) - init_lexicon() - init_parameter() + erase_and_rebuild_db() register_routes(app) yield diff --git a/core/initializers.py b/core/initializers.py index a38ee718c..74c811bff 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -18,7 +18,6 @@ from fastapi_pagination import add_pagination from sqlalchemy import text from sqlalchemy.exc import DatabaseError -from sqlalchemy.orm import Session from db import Base from db.engine import session_ctx @@ -56,13 +55,14 @@ def init_parameter(path: str = None) -> None: session.rollback() -def erase_and_rebuild_db(session: Session): - session.execute(text("DROP SCHEMA public CASCADE")) - session.execute(text("CREATE SCHEMA public")) - session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis")) - session.commit() - Base.metadata.drop_all(session.bind) - Base.metadata.create_all(session.bind) +def erase_and_rebuild_db(): + with session_ctx() as session: + session.execute(text("DROP SCHEMA public CASCADE")) + session.execute(text("CREATE SCHEMA public")) + session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis")) + session.commit() + Base.metadata.drop_all(session.bind) + Base.metadata.create_all(session.bind) init_lexicon() init_parameter() diff --git a/schemas/__init__.py b/schemas/__init__.py index 87f5688c3..cd8e62d62 100644 --- a/schemas/__init__.py +++ b/schemas/__init__.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from datetime import datetime, timezone +from datetime import datetime, timezone, date +from typing import Annotated from pydantic import ( BaseModel, @@ -21,6 +22,7 @@ AwareDatetime, field_validator, ) +from pydantic.functional_validators import AfterValidator from pydantic.json_schema import JsonSchemaValue from pydantic_core import core_schema @@ -51,6 +53,15 @@ class BaseUpdateModel(BaseCreateModel): release_status: ReleaseStatus | None = None +def past_or_today_validator(value: date) -> date: + if value > date.today(): + raise ValueError("Date must be today or in the past.") + return value + + +PastOrTodayDate = Annotated[date, AfterValidator(past_or_today_validator)] + + # Custom type for UTC datetime serialization class UTCAwareDatetime(AwareDatetime): """Custom datetime type that always serializes to UTC with 'Z' suffix.""" diff --git a/schemas/thing.py b/schemas/thing.py index 9a1096e36..cf8c3ef2b 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -15,7 +15,7 @@ # =============================================================================== from typing import List -from pydantic import BaseModel, model_validator, PastDate, Field, field_validator +from pydantic import BaseModel, model_validator, Field, field_validator from core.enums import ( WellPurpose, @@ -25,9 +25,9 @@ Organization, MonitoringFrequency, ) -from schemas import BaseCreateModel, BaseUpdateModel, BaseResponseModel -from schemas.location import LocationGeoJSONResponse +from schemas import BaseCreateModel, BaseUpdateModel, BaseResponseModel, PastOrTodayDate from schemas.group import GroupResponse +from schemas.location import LocationGeoJSONResponse from schemas.notes import NoteResponse, CreateNote @@ -102,7 +102,7 @@ class CreateBaseThing(BaseCreateModel): location_id: int | None group_id: int | None = None # Optional group ID for the thing name: str # Name of the thing - first_visit_date: PastDate | None = None # Date of NMBGMR's first visit + first_visit_date: PastOrTodayDate | None = None # Date of NMBGMR's first visit class CreateWell(CreateBaseThing, ValidateWell): @@ -171,15 +171,15 @@ class ThingIdLinkResponse(BaseResponseModel): class MonitoringFrequencyResponse(BaseModel): monitoring_frequency: MonitoringFrequency - start_date: PastDate - end_date: PastDate | None + start_date: PastOrTodayDate + end_date: PastOrTodayDate | None class BaseThingResponse(BaseResponseModel): name: str thing_type: str current_location: LocationGeoJSONResponse - first_visit_date: PastDate | None + first_visit_date: PastOrTodayDate | None # The new relationship to the polymorphic Notes table notes: List[NoteResponse] = [] @@ -317,7 +317,7 @@ class UpdateThing(BaseUpdateModel): """ name: str | None = None # Optional name for the thing - first_visit_date: PastDate | None = None # Date of NMBGMR's first visit + first_visit_date: PastOrTodayDate | None = None # Date of NMBGMR's first visit class UpdateWell(UpdateThing, ValidateWell): diff --git a/tests/__init__.py b/tests/__init__.py index 02d6e27f1..91ff327db 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -41,11 +41,9 @@ from db.engine import session_ctx from core.app import app -with session_ctx() as session: - erase_and_rebuild_db(session) - - +erase_and_rebuild_db() register_routes(app) + app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins, adjust as needed for security diff --git a/tests/features/environment.py b/tests/features/environment.py index 217e769d4..9b801e9d7 100644 --- a/tests/features/environment.py +++ b/tests/features/environment.py @@ -357,9 +357,10 @@ def before_all(context): context.objects = {} rebuild = False # rebuild = True + if rebuild: + erase_and_rebuild_db() + with session_ctx() as session: - if rebuild: - erase_and_rebuild_db(session) loc_1 = add_location(context, session) loc_2 = add_location(context, session) diff --git a/transfers/contact_transfer.py b/transfers/contact_transfer.py index 36c7107b7..c9b1c9fb0 100644 --- a/transfers/contact_transfer.py +++ b/transfers/contact_transfer.py @@ -61,7 +61,8 @@ def transfer_contacts(session): with open(co_to_org_mapper_path, "r") as f: co_to_org_mapper = json.load(f) - input_df = read_csv("OwnersData") + source_table = "OwnersData" + input_df = read_csv(source_table) odf = input_df.drop(["OBJECTID", "GlobalID"], axis=1) ldf = read_csv("OwnerLink") ldf = ldf.drop(["OBJECTID", "GlobalID"], axis=1) @@ -75,11 +76,13 @@ def transfer_contacts(session): odf = filter_to_valid_point_ids(session, odf) cleaned_df = odf errors = [] - # for i, row in odf.iterrows(): - for chunk in chunk_by_size(odf, 500): - things = ( - session.query(Thing).filter(Thing.name.in_(chunk.PointID.tolist())).all() - ) + added = [] + odf = odf.sort_values(by=["PointID"]) + + for chunk in chunk_by_size(odf, 100): + pointids = chunk.PointID.tolist() + logger.info(f"Processing chunk {pointids[0]} to {pointids[-1]}") + things = session.query(Thing).filter(Thing.name.in_(pointids)).all() for i, row in chunk.iterrows(): thing = next((thing for thing in things if thing.name == row.PointID), None) logger.info(f"Processing PointID: {i} {row.PointID}") @@ -91,22 +94,26 @@ def transfer_contacts(session): # TODO: use contact_helper.add_contact try: - _add_first_contact(session, row, thing, co_to_org_mapper) - session.commit() - # session.flush() - logger.info(f"added first contact for PointID {row.PointID}") + if _add_first_contact(session, row, thing, co_to_org_mapper, added): + session.commit() + # session.flush() + logger.info(f"added first contact for PointID {row.PointID}") except ValidationError as e: logger.critical( f"Skipping first contact for PointID {row.PointID} due to validation error: {e.errors()}" ) - session.rollback() - errors.append({"pointid": row.PointID, "error": e.errors()}) + # session.rollback() + errors.append( + {"pointid": row.PointID, "error": e, "table": source_table} + ) except Exception as e: logger.critical( f"Skipping first contact for PointID {row.PointID} due to error: {e}" ) session.rollback() - errors.append({"pointid": row.PointID, "error": e}) + errors.append( + {"pointid": row.PointID, "error": e, "table": source_table} + ) try: if ( @@ -119,27 +126,32 @@ def transfer_contacts(session): f"No second contact info for PointID {row.PointID}, skipping." ) continue - _add_second_contact(session, row, thing, co_to_org_mapper) - session.commit() - # session.flush() - logger.info(f"added second contact for PointID {row.PointID}") + if _add_second_contact(session, row, thing, co_to_org_mapper, added): + session.commit() + # session.flush() + logger.info(f"added second contact for PointID {row.PointID}") + except ValidationError as e: logger.critical( f"Skipping second contact for PointID {row.PointID} due to validation error: {e.errors()}" ) - session.rollback() - errors.append({"pointid": row.PointID, "error": e.errors()}) + # session.rollback() + errors.append( + {"pointid": row.PointID, "error": e, "table": source_table} + ) except Exception as e: logger.critical( f"Skipping second contact for PointID {row.PointID} due to error: {e}" ) session.rollback() - errors.append({"pointid": row.PointID, "error": e}) + errors.append( + {"pointid": row.PointID, "error": e, "table": source_table} + ) return input_df, cleaned_df, errors -def _add_first_contact(session, row, thing, co_to_org_mapper): +def _add_first_contact(session, row, thing, co_to_org_mapper, added): # TODO: extract role from OwnerComment # role = extract_owner_role(row.OwnerComment) role = "Owner" @@ -149,6 +161,10 @@ def _add_first_contact(session, row, thing, co_to_org_mapper): organization = co_to_org_mapper.get(row.Company, row.Company) + if (name, organization) in added: + return + added.append((name, organization)) + contact_data = { "thing_id": thing.id, "release_status": release_status, @@ -232,14 +248,18 @@ def _add_first_contact(session, row, thing, co_to_org_mapper): ) if address: contact.addresses.append(address) + return True -def _add_second_contact(session, row, thing, co_to_org_mapper): +def _add_second_contact(session, row, thing, co_to_org_mapper, added): release_status = "private" name = _make_name(row.SecondFirstName, row.SecondLastName) organization = co_to_org_mapper.get(row.Company, row.Company) + if (name, organization) in added: + return + added.append((name, organization)) contact_data = { "thing_id": thing.id, @@ -280,6 +300,7 @@ def _add_second_contact(session, row, thing, co_to_org_mapper): contact.phones.append(phone) else: contact.incomplete_nma_phones.append(phone) + return True # helpers diff --git a/transfers/metrics.py b/transfers/metrics.py index ffbd3da31..25b6b626b 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -19,7 +19,9 @@ from pathlib import Path from pandas import DataFrame +from pydantic import ValidationError from sqlalchemy import select, func +from sqlalchemy.exc import ProgrammingError from sqlalchemy.orm import Session from db import ( @@ -32,10 +34,11 @@ Deployment, TransducerObservation, ) +from services.gcs_helper import get_storage_bucket class Metrics: - include_errors = False + include_errors = True def __init__(self): # create a new path for the metrics @@ -48,11 +51,21 @@ def __init__(self): self.path = root / f"metrics_{datetime.now().strftime('%Y-%m-%dT%H_%M_%S')}.csv" delimiter = "|" if self.include_errors else "," - self._writer = csv.writer(self.path.open("a"), delimiter=delimiter) + self._fileobj = self.path.open("w") + self._writer = csv.writer(self._fileobj, delimiter=delimiter) self._writer.writerow( ["model", "input_count", "cleaned_count", "transferred", "issue_percentage"] ) + def save_to_storage_bucket(self): + bucket = get_storage_bucket() + log_filename = self.path.name + blob = bucket.blob(f"transfer_metrics/{log_filename}") + blob.upload_from_string(self.path.read_text()) + + def close(self): + self._fileobj.close() + def well_metrics(self, *args, **kw) -> None: self._handle_metrics( Thing, where=Thing.thing_type == "water well", name="Well", *args, **kw @@ -131,14 +144,37 @@ def _handle_metrics( def _write_errors(self, errors: list) -> None: if self.include_errors: - self._writer.writerow(["PointID", "Error"]) - for e in errors: - error = e["error"] - if not isinstance(error, (list, tuple)): + self._writer.writerow(["PointID", "Table", "Field", "Error"]) + for record in errors: + error = record["error"] + # if not isinstance(error, (list, tuple)): + # error = [error] + if isinstance(error, str): error = [error] + elif isinstance(error, ValidationError): + nes = [] + for e in error.errors(): + try: + nes.append(f"{e['loc'][0]}: {e['msg']}") + except IndexError: + nes.append(e["msg"]) + error = nes + elif isinstance(error, ProgrammingError): + detail = error.orig.args[0].get("D") + error = [detail] + elif isinstance(error, Exception): + error = [str(error)] for ee in error: - self._writer.writerow([e["pointid"], ee]) + self._writer.writerow( + [ + record["pointid"], + record.get("table"), + record.get("field"), + ee, + ] + ) + self._writer.writerow([]) def _write_metrics( diff --git a/transfers/sensor_transfer.py b/transfers/sensor_transfer.py index c46c121f6..f6ff49dcb 100644 --- a/transfers/sensor_transfer.py +++ b/transfers/sensor_transfer.py @@ -28,13 +28,15 @@ def transfer_sensors(session): - input_df = read_csv("Equipment") + source_table = "Equipment" + input_df = read_csv(source_table) input_df.columns = input_df.columns.str.replace(" ", "_") input_df = input_df[input_df.SerialNo.notna()] cleaned_df = filter_to_valid_point_ids(session, input_df) cleaned_df = replace_nans(cleaned_df) errors = [] grouped_equipment = cleaned_df.groupby(["PointID"]) + added = {} for index, group in grouped_equipment: pointid = index[0] thing = session.query(Thing).filter(Thing.name == pointid).first() @@ -53,20 +55,36 @@ def transfer_sensors(session): logger.critical( f"Skipping equipment with type {row.EquipmentType} for point {pointid}" ) - errors.append({"pointid": pointid, "error": e}) + error = ( + f"key error adding sensor_type:{row.EquipmentType} error: {e}" + ) + errors.append( + { + "pointid": pointid, + "error": error, + "table": source_table, + "field": "EquipmentType", + } + ) continue - sensor = ( - session.query(Sensor) - .filter(Sensor.serial_no == row.SerialNo) - .one_or_none() - ) - if sensor: + if row.SerialNo in added: logger.info( - f"Sensor with serial number {row.SerialNo} already exists. Only creating deployment for that record" + f"Sensor with serial number {row.SerialNo} already added in this transfer session. Only creating deployment for that record" ) + sensor = added[row.SerialNo] else: + sensor = ( + session.query(Sensor) + .filter(Sensor.serial_no == row.SerialNo) + .one_or_none() + ) + if sensor: + logger.info( + f"Sensor with serial number {row.SerialNo} already exists. Only creating deployment for that record" + ) + if not sensor: # TODO: Add validation sensor = Sensor( nma_pk_equipment=row.GlobalID, @@ -77,6 +95,7 @@ def transfer_sensors(session): owner_agency="NMBGMR", notes=row.Equipment_Notes, ) + added[row.SerialNo] = sensor session.add(sensor) logger.info( f"Added sensor {sensor.name} with serial number {sensor.serial_no}" @@ -94,8 +113,10 @@ def transfer_sensors(session): errors.append( { "pointid": pointid, - "error": f"{row.ID}, {row.SerialNo}. Installation Date cannot " + "error": f"row.ID={row.ID}, row.SerialNo={row.SerialNo}. Installation Date cannot " f"be None", + "table": source_table, + "field": "DateInstalled", } ) continue @@ -117,8 +138,10 @@ def transfer_sensors(session): errors.append( { "pointid": pointid, - "error": f"{row.ID}, {row.SerialNo}. RecordingInterval is " + "error": f"row.ID={row.ID}, row.SerialNo={row.SerialNo}. RecordingInterval is " f"not an integer", + "table": source_table, + "field": "RecordingInterval", } ) sql = ( @@ -167,7 +190,7 @@ def transfer_sensors(session): session.commit() except Exception as e: logger.critical(f"Could not add sensor and deployment: {e}") - errors.append({"pointid": pointid, "error": e}) + errors.append({"pointid": pointid, "error": e, "table": source_table}) return input_df, cleaned_df, errors diff --git a/transfers/thing_transfer.py b/transfers/thing_transfer.py index 38f9b4708..3469fbc53 100644 --- a/transfers/thing_transfer.py +++ b/transfers/thing_transfer.py @@ -20,13 +20,13 @@ from db import LocationThingAssociation from services.thing_helper import add_thing +from transfers.logger import logger from transfers.util import ( make_location, make_location_data_provenance, read_csv, replace_nans, ) -from transfers.logger import logger def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -> None: @@ -37,6 +37,9 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - ldf = replace_nans(ldf) n = len(ldf) start_time = time.time() + + cached_elevations = {} + for i, row in enumerate(ldf.itertuples()): pointid = row.PointID if ldf[ldf["PointID"] == pointid].shape[0] > 1: @@ -54,7 +57,7 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - session.commit() try: - location, elevation_method = make_location(row) + location, elevation_method = make_location(row, cached_elevations) session.add(location) session.flush() data_provenances = make_location_data_provenance( diff --git a/transfers/transfer.py b/transfers/transfer.py index af7a20152..77275ed35 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -24,8 +24,7 @@ transfer_water_levels_pressure, transfer_water_levels_acoustic, ) -from sqlalchemy.orm import Session -from core.initializers import init_lexicon, init_parameter, erase_and_rebuild_db +from core.initializers import erase_and_rebuild_db from db.engine import session_ctx from transfers.group_transfer import transfer_groups @@ -43,33 +42,6 @@ from transfers.logger import logger, save_log_to_bucket -def erase_and_initalize(session: Session) -> None: - logger.info( - "Erasing existing data and initializing lexicon, parameter, and sensors" - ) - erase(session) - lexicon() - parameter() - - -@timeit -def lexicon(): - logger.info("Initializing lexicon") - init_lexicon() - - -@timeit -def parameter(): - logger.info("Initializing parameter") - init_parameter() - - -@timeit -def erase(session: Session): - logger.info("Erase and rebuilding database") - erase_and_rebuild_db(session) - - def message(msg, pad=10, new_line_at_top=True): pad = "*" * pad if new_line_at_top: @@ -80,7 +52,9 @@ def message(msg, pad=10, new_line_at_top=True): @timeit def transfer_all(sess, limit=100): message("STARTING TRANSFER", new_line_at_top=False) - erase_and_initalize(sess) + + logger.info("Erase and rebuilding database") + erase_and_rebuild_db() metrics = Metrics() message("TRANSFERRING WELLS") @@ -155,7 +129,8 @@ def transfer_debugging(sess, limit=100): message("STARTING TRANSFER DEBUG", new_line_at_top=False) if int(os.environ.get("ERASE_AND_REBUILD", 0)): - erase_and_initalize(sess) + logger.info("Erase and rebuilding database") + erase_and_rebuild_db() metrics = Metrics() message("TRANSFERRING WELLS") @@ -165,9 +140,9 @@ def transfer_debugging(sess, limit=100): results = timeit_direct(transfer_wells, sess, flags=flags, limit=limit) metrics.well_metrics(sess, *results) - # message("TRANSFERRING WELL SCREENS") - # results = timeit_direct(transfer_wellscreens, sess) - # metrics.well_screen_metrics(sess, *results) + message("TRANSFERRING WELL SCREENS") + results = timeit_direct(transfer_wellscreens, sess) + metrics.well_screen_metrics(sess, *results) message("TRANSFERRING SENSORS") results = timeit_direct(transfer_sensors, sess) @@ -186,13 +161,13 @@ def transfer_debugging(sess, limit=100): # message("TRANSFERRING METEOROLOGICAL") # timeit_direct(transfer_met, sess, limit) - # message("TRANSFERRING CONTACTS") - # results = timeit_direct(transfer_contacts, sess) - # metrics.contact_metrics(sess, *results) + message("TRANSFERRING CONTACTS") + results = timeit_direct(transfer_contacts, sess) + metrics.contact_metrics(sess, *results) # - # message("TRANSFERRING WATER LEVELS") - # results = timeit_direct(transfer_water_levels, sess) - # metrics.water_level_metrics(sess, *results) + message("TRANSFERRING WATER LEVELS") + results = timeit_direct(transfer_water_levels, sess) + metrics.water_level_metrics(sess, *results) # message("TRANSFERRING WATER LEVELS PRESSURE") # results = timeit_direct(transfer_water_levels_pressure, sess) @@ -223,6 +198,8 @@ def transfer_debugging(sess, limit=100): # timeit_direct(transfer_water_levels_acoustic, sess) # message("TRANSFERRING ASSETS") # timeit_direct(transfer_assets, sess) + metrics.close() + metrics.save_to_storage_bucket() def main(): diff --git a/transfers/util.py b/transfers/util.py index 6d3d6a1cf..cbf0f2b17 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -40,48 +40,16 @@ ) from transfers.logger import logger - NMA_COORDINATE_ACCURACY = { - "5m": { - "accuracy_value": 5, - "accuracy_unit": "m", - }, - "1": { - "accuracy_value": 0.1, - "accuracy_unit": "second", - }, - "5": { - "accuracy_value": 0.5, - "accuracy_unit": "second", - }, - "F": { - "accuracy_value": 5, - "accuracy_unit": "second", - }, - "H": { - "accuracy_value": 0.01, - "accuracy_unit": "second", - }, - "M": { - "accuracy_value": 1, - "accuracy_unit": "minute", - }, - "R": { - "accuracy_value": 3, - "accuracy_unit": "second", - }, - "S": { - "accuracy_value": 1, - "accuracy_unit": "second", - }, - "T": { - "accuracy_value": 10, - "accuracy_unit": "second", - }, - None: { - "accuracy_value": None, - "accuracy_unit": None, - }, + "5m": (5, "m"), + "1": (0.1, "second"), + "5": (0.5, "second"), + "F": (5, "second"), + "H": (0.01, "second"), + "M": (1, "minute"), + "R": (3, "second"), + "S": (1, "second"), + "T": (10, "second"), } @@ -221,7 +189,7 @@ def chunk_by_size(df, chunk_size): yield df.iloc[i : i + chunk_size] -def make_location(row: pd.Series) -> tuple: +def make_location(row: pd.Series, elevations: dict) -> tuple: """ Returns a tuple of location data and the elevation method """ @@ -265,7 +233,14 @@ def make_location(row: pd.Series) -> tuple: z = convert_ft_to_m(z) if row.AltDatum == "NGVD29": - z = convert_ngvd29_to_navd88(z, transformed_point.x, transformed_point.y) + key = f"{row.PointID}, {transformed_point.x, transformed_point.y}" + if key in elevations: + z = elevations[key] + else: + z = convert_ngvd29_to_navd88( + z, transformed_point.x, transformed_point.y + ) + elevations[key] = z else: elevation_from_epqs = True logger.info( @@ -386,11 +361,8 @@ def make_location_data_provenance( else None ) - accuracy_value = NMA_COORDINATE_ACCURACY.get(row.CoordinateAccuracy, None).get( - "accuracy_value" - ) - accuracy_unit = NMA_COORDINATE_ACCURACY.get(row.CoordinateAccuracy, None).get( - "accuracy_unit" + accuracy_value, accuracy_unit = NMA_COORDINATE_ACCURACY.get( + row.CoordinateAccuracy, (None, None) ) provenance = DataProvenance( diff --git a/transfers/waterlevels_transfer.py b/transfers/waterlevels_transfer.py index 14ced3cc0..a1bb32717 100644 --- a/transfers/waterlevels_transfer.py +++ b/transfers/waterlevels_transfer.py @@ -46,11 +46,19 @@ SPACE_6 = " " * 6 -def get_dt_utc(row): +def get_dt_utc(row, errors): if pd.isna(row.DateMeasured): logger.critical( f"transfer_water_levels. Skipping row PointID={row.PointID}, objectid={row.OBJECTID} because there is no DateMeasured" ) + errors.append( + { + "pointid": row.PointID, + "error": "no DateMeasured", + "table": "WaterLevels", + "field": "DateMeasured", + } + ) return if pd.isna(row.TimeMeasured): @@ -69,6 +77,14 @@ def get_dt_utc(row): dt = datetime.strptime(dt_measured, fmt) return convert_mt_to_utc(dt) except ValueError as e: + errors.append( + { + "pointid": row.PointID, + "error": str(e), + "table": "WaterLevels", + "field": "DateMeasured", + } + ) logger.critical( f"transfer_water_levels. Skipping row PointID={row.PointID}, objectid={row.OBJECTID} due to " f"invalid date/time: {e}" @@ -120,8 +136,8 @@ def transfer_water_levels(session): with open(path, "r") as f: measured_by_mapper = json.load(f) - - input_df = read_csv("WaterLevels") + source_table = "WaterLevels" + input_df = read_csv(source_table) cleaned_df = filter_to_valid_point_ids(session, input_df) cleaned_df = filter_by_valid_measuring_agency(cleaned_df) @@ -141,6 +157,14 @@ def transfer_water_levels(session): logger.critical( f"Thing with PointID={pointid} not found. Skipping water levels" ) + errors.append( + { + "pointid": pointid, + "error": "Thing with PointID not found", + "table": source_table, + "field": "PointID", + } + ) continue n = len(group) @@ -151,7 +175,7 @@ def transfer_water_levels(session): ) session.commit() - dt_utc = get_dt_utc(row) + dt_utc = get_dt_utc(row, errors) if dt_utc is None: continue diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 6fb4094fd..ee54d0216 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -148,22 +148,49 @@ def get_wells_to_transfer( return input_df, cleaned_df +def get_cached_elevations() -> dict: + bucket = get_storage_bucket() + log_filename = "transfer_data/cached_elevations.json" + blob = bucket.blob(log_filename) + if blob.exists(): + lut = json.loads(blob.download_as_string()) + return lut + else: + return {} + + +def dump_cached_elevations(lut: dict): + bucket = get_storage_bucket() + log_filename = "transfer_data/cached_elevations.json" + blob = bucket.blob(log_filename) + blob.upload_from_string(json.dumps(lut)) + + def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None: input_df, cleaned_df = get_wells_to_transfer(session, flags) - + source_table = "WellData" wdf = cleaned_df n = len(wdf) step = 25 start_time = time.time() errors = [] + added_locations = {} + cached_elevations = get_cached_elevations() for i, row in enumerate(wdf.itertuples()): pointid = row.PointID if wdf[wdf["PointID"] == pointid].shape[0] > 1: logger.critical( f"transfer_wells. PointID {pointid} has duplicate records. Skipping." ) - errors.append({"pointid": pointid, "error": "duplicate records"}) + errors.append( + { + "pointid": pointid, + "error": "duplicate records", + "table": source_table, + "field": "PointID", + } + ) continue if limit and i >= limit: @@ -184,20 +211,22 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None location = None try: - location, elevation_method = make_location(row) + location, elevation_method = make_location(row, cached_elevations) session.add(location) - session.flush() - data_provenances = make_location_data_provenance( - row, location, elevation_method - ) - for dp in data_provenances: - session.add(dp) + added_locations[row.PointID] = elevation_method except Exception as e: if location is not None: session.expunge(location) # these rollbacks are cause an issue because they are discarding good data # session.rollback() - errors.append({"pointid": row.PointID, "error": str(e)}) + errors.append( + { + "pointid": row.PointID, + "error": e, + "table": "Location", + "field": str(e), + } + ) logger.critical(f"Error making location for {row.PointID}: {e}") continue @@ -225,11 +254,14 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None release_status="public" if row.PublicRelease else "private", measuring_point_height=row.MPHeight, measuring_point_description=row.MeasuringPoint, + notes=( + [{"content": row.Notes, "note_type": "Other"}] if row.Notes else [] + ), ) CreateWell.model_validate(data) except ValidationError as e: - errors.append({"pointid": row.PointID, "error": e.errors()}) + errors.append({"pointid": row.PointID, "error": e, "table": "WellData"}) logger.critical( f"Validation error for row {i} with PointID {row.PointID}: {e.errors()}" ) @@ -249,27 +281,21 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None ) well_data["thing_type"] = "water well" well_data["nma_pk_welldata"] = row.WellID + + well_data.pop("notes") well = Thing(**well_data) session.add(well) - logger.info(f"Created well for {row.PointID}") + # logger.info(f"Created well for {row.PointID}") # flush well to access its ID for status_history - session.flush() + # session.flush() - """ - Developer's note - - It's not clear when the measuring point from NM_Aquifer was - determined, so I'm setting start_date to the day of the transfer - """ - measuring_point_history = MeasuringPointHistory( - thing_id=well.id, - measuring_point_height=row.MPHeight, - measuring_point_description=row.MeasuringPoint, - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(measuring_point_history) + # session.commit() + # session.refresh(well) + # if notes: + # for ni in notes: + # nn = well.add_note(ni['content'], ni['note_type']) + # session.add(nn) if well_purposes: for wp in well_purposes: @@ -294,7 +320,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None if well is not None: session.expunge(well) - errors.append({"pointid": row.PointID, "error": str(e)}) + errors.append({"pointid": row.PointID, "error": e, "table": "WellData"}) logger.critical(f"Error creating well for {row.PointID}: {e}") continue @@ -304,6 +330,38 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None assoc.thing = well session.add(assoc) + session.commit() + + # add things thate need well id + for well in session.query(Thing).filter(Thing.thing_type == "water well").all(): + row = wdf[wdf["PointID"] == well.name].iloc[0] + if not isna(row.Notes): + note = well.add_note(row.Notes, "Other") + session.add(note) + + location = well.current_location + elevation_method = added_locations[row.PointID] + data_provenances = make_location_data_provenance( + row, location, elevation_method + ) + for dp in data_provenances: + session.add(dp) + + """ + Developer's note + + It's not clear when the measuring point from NM_Aquifer was + determined, so I'm setting start_date to the day of the transfer + """ + measuring_point_history = MeasuringPointHistory( + thing_id=well.id, + measuring_point_height=row.MPHeight, + measuring_point_description=row.MeasuringPoint, + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(measuring_point_history) + """ Developer's notes @@ -313,9 +371,10 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None # TODO: if row.MonitoringStatus == "Q" is it monitored or not? <-- AMMP review # TODO: if row.MonitoringStatus == "X" can that change? <-- AMMP review # TODO: have AMMP review and verify the various MonitoringStatus codes + target_id = well.id target_table = "thing" - if row.MonitoringStatus: + if not isna(row.MonitoringStatus): if ( "X" in row.MonitoringStatus or "I" in row.MonitoringStatus @@ -352,7 +411,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None f" Adding '{monitoring_frequency}' monitoring frequency for well {well.name}" ) - if row.Status: + if not isna(row.Status): status_value = lexicon_mapper.map_value(f"LU_Status:{row.Status}") status_history = StatusHistory( status_type="Well Status", @@ -366,6 +425,8 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None logger.info(f" Added well status for well {well.name}: {status_value}") session.commit() + + dump_cached_elevations(cached_elevations) return input_df, cleaned_df, errors @@ -407,7 +468,9 @@ def transfer_wellscreens(session, limit=None): logger.critical( f"Validation error for row {i} with PointID {row.PointID}: {e.errors()}" ) - errors.append({"pointid": row.PointID, "error": e.errors()}) + errors.append( + {"pointid": row.PointID, "error": e, "table": "WellScreens"} + ) continue well_screen = WellScreen(**well_screen_data)