From 9a7a77bd84e7a13209c67434f7bca6a069c92a00 Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 4 Nov 2025 18:22:02 -0700 Subject: [PATCH 01/11] fix: enhance error logging in sensor_transfer.py for better debugging --- transfers/sensor_transfer.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/transfers/sensor_transfer.py b/transfers/sensor_transfer.py index c46c121f6..bb9915439 100644 --- a/transfers/sensor_transfer.py +++ b/transfers/sensor_transfer.py @@ -53,7 +53,13 @@ def transfer_sensors(session): logger.critical( f"Skipping equipment with type {row.EquipmentType} for point {pointid}" ) - errors.append({"pointid": pointid, "error": e}) + errors.append( + { + "pointid": pointid, + "error": f"key error adding sensor_type:{row.EquipmentType} " + f"error: {e}", + } + ) continue sensor = ( @@ -167,7 +173,7 @@ def transfer_sensors(session): session.commit() except Exception as e: logger.critical(f"Could not add sensor and deployment: {e}") - errors.append({"pointid": pointid, "error": e}) + errors.append({"pointid": pointid, "error": f"row={row}. error={e}"}) return input_df, cleaned_df, errors From bff2be301dd47615bd294c041b717f0943c21c55 Mon Sep 17 00:00:00 2001 From: jakeross Date: Thu, 13 Nov 2025 09:35:07 -0700 Subject: [PATCH 02/11] feat: enhance contact and sensor transfer processes with improved error handling and logging --- transfers/contact_transfer.py | 63 +++++++++++++++++++++++------------ transfers/metrics.py | 36 ++++++++++++++++---- transfers/sensor_transfer.py | 21 ++++++++---- transfers/transfer.py | 12 +++---- transfers/well_transfer.py | 28 ++++++++++++---- 5 files changed, 114 insertions(+), 46 deletions(-) diff --git a/transfers/contact_transfer.py b/transfers/contact_transfer.py index 36c7107b7..680ec1ce6 100644 --- a/transfers/contact_transfer.py +++ b/transfers/contact_transfer.py @@ -61,7 +61,8 @@ def transfer_contacts(session): with open(co_to_org_mapper_path, "r") as f: co_to_org_mapper = json.load(f) - input_df = read_csv("OwnersData") + source_table = "OwnersData" + input_df = read_csv(source_table) odf = input_df.drop(["OBJECTID", "GlobalID"], axis=1) ldf = read_csv("OwnerLink") ldf = ldf.drop(["OBJECTID", "GlobalID"], axis=1) @@ -75,11 +76,13 @@ def transfer_contacts(session): odf = filter_to_valid_point_ids(session, odf) cleaned_df = odf errors = [] - # for i, row in odf.iterrows(): - for chunk in chunk_by_size(odf, 500): - things = ( - session.query(Thing).filter(Thing.name.in_(chunk.PointID.tolist())).all() - ) + added = [] + odf = odf.sort_values(by=["PointID"]) + + for chunk in chunk_by_size(odf, 100): + pointids = chunk.PointID.tolist() + logger.info(f"Processing chunk {pointids[0]} to {pointids[-1]}") + things = session.query(Thing).filter(Thing.name.in_(pointids)).all() for i, row in chunk.iterrows(): thing = next((thing for thing in things if thing.name == row.PointID), None) logger.info(f"Processing PointID: {i} {row.PointID}") @@ -91,22 +94,24 @@ def transfer_contacts(session): # TODO: use contact_helper.add_contact try: - _add_first_contact(session, row, thing, co_to_org_mapper) - session.commit() - # session.flush() - logger.info(f"added first contact for PointID {row.PointID}") + if _add_first_contact(session, row, thing, co_to_org_mapper, added): + session.commit() + # session.flush() + logger.info(f"added first contact for PointID {row.PointID}") except ValidationError as e: logger.critical( f"Skipping first contact for PointID {row.PointID} due to validation error: {e.errors()}" ) - session.rollback() - errors.append({"pointid": row.PointID, "error": e.errors()}) + # session.rollback() + errors.append({"pointid": row.PointID, "error": e}) except Exception as e: logger.critical( f"Skipping first contact for PointID {row.PointID} due to error: {e}" ) session.rollback() - errors.append({"pointid": row.PointID, "error": e}) + errors.append( + {"pointid": row.PointID, "error": e, "table": source_table} + ) try: if ( @@ -119,27 +124,32 @@ def transfer_contacts(session): f"No second contact info for PointID {row.PointID}, skipping." ) continue - _add_second_contact(session, row, thing, co_to_org_mapper) - session.commit() - # session.flush() - logger.info(f"added second contact for PointID {row.PointID}") + if _add_second_contact(session, row, thing, co_to_org_mapper, added): + session.commit() + # session.flush() + logger.info(f"added second contact for PointID {row.PointID}") + except ValidationError as e: logger.critical( f"Skipping second contact for PointID {row.PointID} due to validation error: {e.errors()}" ) - session.rollback() - errors.append({"pointid": row.PointID, "error": e.errors()}) + # session.rollback() + errors.append( + {"pointid": row.PointID, "error": e, "table": source_table} + ) except Exception as e: logger.critical( f"Skipping second contact for PointID {row.PointID} due to error: {e}" ) session.rollback() - errors.append({"pointid": row.PointID, "error": e}) + errors.append( + {"pointid": row.PointID, "error": e, "table": source_table} + ) return input_df, cleaned_df, errors -def _add_first_contact(session, row, thing, co_to_org_mapper): +def _add_first_contact(session, row, thing, co_to_org_mapper, added): # TODO: extract role from OwnerComment # role = extract_owner_role(row.OwnerComment) role = "Owner" @@ -149,6 +159,10 @@ def _add_first_contact(session, row, thing, co_to_org_mapper): organization = co_to_org_mapper.get(row.Company, row.Company) + if (name, organization) in added: + return + added.append((name, organization)) + contact_data = { "thing_id": thing.id, "release_status": release_status, @@ -232,14 +246,18 @@ def _add_first_contact(session, row, thing, co_to_org_mapper): ) if address: contact.addresses.append(address) + return True -def _add_second_contact(session, row, thing, co_to_org_mapper): +def _add_second_contact(session, row, thing, co_to_org_mapper, added): release_status = "private" name = _make_name(row.SecondFirstName, row.SecondLastName) organization = co_to_org_mapper.get(row.Company, row.Company) + if (name, organization) in added: + return + added.append((name, organization)) contact_data = { "thing_id": thing.id, @@ -280,6 +298,7 @@ def _add_second_contact(session, row, thing, co_to_org_mapper): contact.phones.append(phone) else: contact.incomplete_nma_phones.append(phone) + return True # helpers diff --git a/transfers/metrics.py b/transfers/metrics.py index ffbd3da31..93d93d000 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -1,4 +1,4 @@ -# =============================================================================== +1 # =============================================================================== # Copyright 2025 ross # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,7 +19,9 @@ from pathlib import Path from pandas import DataFrame +from pydantic import ValidationError from sqlalchemy import select, func +from sqlalchemy.exc import ProgrammingError from sqlalchemy.orm import Session from db import ( @@ -35,7 +37,7 @@ class Metrics: - include_errors = False + include_errors = True def __init__(self): # create a new path for the metrics @@ -132,13 +134,35 @@ def _handle_metrics( def _write_errors(self, errors: list) -> None: if self.include_errors: self._writer.writerow(["PointID", "Error"]) - for e in errors: - error = e["error"] - if not isinstance(error, (list, tuple)): + for record in errors: + error = record["error"] + # if not isinstance(error, (list, tuple)): + # error = [error] + if isinstance(error, str): error = [error] + elif isinstance(error, ValidationError): + nes = [] + for e in error.errors(): + try: + nes.append(f"{e['loc'][0]}: {e['msg']}") + except IndexError: + nes.append(e["msg"]) + error = nes + elif isinstance(error, ProgrammingError): + detail = error.orig.args[0].get("D") + # first = error.args[0] + # detail = first.get("D") if isinstance(first, dict) else first + # print('eee', error) + # print('vvve',type(error.args), error.args) + # error=[error] + # error = [error.args[0].get("D")] + error = [detail] + elif isinstance(error, Exception): + error = [str(error)] for ee in error: - self._writer.writerow([e["pointid"], ee]) + self._writer.writerow([record["pointid"], ee]) + self._writer.writerow([]) def _write_metrics( diff --git a/transfers/sensor_transfer.py b/transfers/sensor_transfer.py index bb9915439..f4974259b 100644 --- a/transfers/sensor_transfer.py +++ b/transfers/sensor_transfer.py @@ -28,7 +28,8 @@ def transfer_sensors(session): - input_df = read_csv("Equipment") + source_table = "Equipment" + input_df = read_csv(source_table) input_df.columns = input_df.columns.str.replace(" ", "_") input_df = input_df[input_df.SerialNo.notna()] cleaned_df = filter_to_valid_point_ids(session, input_df) @@ -53,11 +54,15 @@ def transfer_sensors(session): logger.critical( f"Skipping equipment with type {row.EquipmentType} for point {pointid}" ) + error = ( + f"key error adding sensor_type:{row.EquipmentType} error: {e}" + ) errors.append( { "pointid": pointid, - "error": f"key error adding sensor_type:{row.EquipmentType} " - f"error: {e}", + "error": error, + "table": source_table, + "field": "EquipmentType", } ) continue @@ -100,8 +105,10 @@ def transfer_sensors(session): errors.append( { "pointid": pointid, - "error": f"{row.ID}, {row.SerialNo}. Installation Date cannot " + "error": f"row.ID={row.ID}, row.SerialNo={row.SerialNo}. Installation Date cannot " f"be None", + "table": source_table, + "field": "DateInstalled", } ) continue @@ -123,8 +130,10 @@ def transfer_sensors(session): errors.append( { "pointid": pointid, - "error": f"{row.ID}, {row.SerialNo}. RecordingInterval is " + "error": f"row.ID={row.ID}, row.SerialNo={row.SerialNo}. RecordingInterval is " f"not an integer", + "table": source_table, + "field": "RecordingInterval", } ) sql = ( @@ -173,7 +182,7 @@ def transfer_sensors(session): session.commit() except Exception as e: logger.critical(f"Could not add sensor and deployment: {e}") - errors.append({"pointid": pointid, "error": f"row={row}. error={e}"}) + errors.append({"pointid": pointid, "error": e, "table": source_table}) return input_df, cleaned_df, errors diff --git a/transfers/transfer.py b/transfers/transfer.py index af7a20152..b456055e0 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -165,9 +165,9 @@ def transfer_debugging(sess, limit=100): results = timeit_direct(transfer_wells, sess, flags=flags, limit=limit) metrics.well_metrics(sess, *results) - # message("TRANSFERRING WELL SCREENS") - # results = timeit_direct(transfer_wellscreens, sess) - # metrics.well_screen_metrics(sess, *results) + message("TRANSFERRING WELL SCREENS") + results = timeit_direct(transfer_wellscreens, sess) + metrics.well_screen_metrics(sess, *results) message("TRANSFERRING SENSORS") results = timeit_direct(transfer_sensors, sess) @@ -186,9 +186,9 @@ def transfer_debugging(sess, limit=100): # message("TRANSFERRING METEOROLOGICAL") # timeit_direct(transfer_met, sess, limit) - # message("TRANSFERRING CONTACTS") - # results = timeit_direct(transfer_contacts, sess) - # metrics.contact_metrics(sess, *results) + message("TRANSFERRING CONTACTS") + results = timeit_direct(transfer_contacts, sess) + metrics.contact_metrics(sess, *results) # # message("TRANSFERRING WATER LEVELS") # results = timeit_direct(transfer_water_levels, sess) diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 389439292..c1f0731ea 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -136,7 +136,7 @@ def get_wells_to_transfer( def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None: input_df, cleaned_df = get_wells_to_transfer(session, flags) - + source_table = "WellData" wdf = cleaned_df n = len(wdf) @@ -149,7 +149,14 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None logger.critical( f"transfer_wells. PointID {pointid} has duplicate records. Skipping." ) - errors.append({"pointid": pointid, "error": "duplicate records"}) + errors.append( + { + "pointid": pointid, + "error": "duplicate records", + "table": source_table, + "field": "PointID", + } + ) continue if limit and i >= limit: @@ -177,7 +184,14 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None session.expunge(location) # these rollbacks are cause an issue because they are discarding good data # session.rollback() - errors.append({"pointid": row.PointID, "error": str(e)}) + errors.append( + { + "pointid": row.PointID, + "error": e, + "table": "Location", + "field": str(e), + } + ) logger.critical(f"Error making location for {row.PointID}: {e}") continue @@ -205,7 +219,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None CreateWell.model_validate(data) except ValidationError as e: - errors.append({"pointid": row.PointID, "error": e.errors()}) + errors.append({"pointid": row.PointID, "error": e, "table": "WellData"}) logger.critical( f"Validation error for row {i} with PointID {row.PointID}: {e.errors()}" ) @@ -249,7 +263,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None if well is not None: session.expunge(well) - errors.append({"pointid": row.PointID, "error": str(e)}) + errors.append({"pointid": row.PointID, "error": e, "table": "WellData"}) logger.critical(f"Error creating well for {row.PointID}: {e}") continue @@ -307,7 +321,9 @@ def transfer_wellscreens(session, limit=None): logger.critical( f"Validation error for row {i} with PointID {row.PointID}: {e.errors()}" ) - errors.append({"pointid": row.PointID, "error": e.errors()}) + errors.append( + {"pointid": row.PointID, "error": e, "table": "WellScreens"} + ) continue well_screen = WellScreen(**well_screen_data) From ebc5451f8771cd700e53f3f34766292898187257 Mon Sep 17 00:00:00 2001 From: jakeross Date: Thu, 13 Nov 2025 17:25:26 -0700 Subject: [PATCH 03/11] fix: enhance error logging in transfer scripts to include table and field information --- transfers/contact_transfer.py | 4 +++- transfers/metrics.py | 23 +++++++++++++--------- transfers/sensor_transfer.py | 22 ++++++++++++++------- transfers/transfer.py | 7 ++++--- transfers/waterlevels_transfer.py | 32 +++++++++++++++++++++++++++---- 5 files changed, 64 insertions(+), 24 deletions(-) diff --git a/transfers/contact_transfer.py b/transfers/contact_transfer.py index 680ec1ce6..c9b1c9fb0 100644 --- a/transfers/contact_transfer.py +++ b/transfers/contact_transfer.py @@ -103,7 +103,9 @@ def transfer_contacts(session): f"Skipping first contact for PointID {row.PointID} due to validation error: {e.errors()}" ) # session.rollback() - errors.append({"pointid": row.PointID, "error": e}) + errors.append( + {"pointid": row.PointID, "error": e, "table": source_table} + ) except Exception as e: logger.critical( f"Skipping first contact for PointID {row.PointID} due to error: {e}" diff --git a/transfers/metrics.py b/transfers/metrics.py index 93d93d000..c6c2c7586 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -50,11 +50,15 @@ def __init__(self): self.path = root / f"metrics_{datetime.now().strftime('%Y-%m-%dT%H_%M_%S')}.csv" delimiter = "|" if self.include_errors else "," - self._writer = csv.writer(self.path.open("a"), delimiter=delimiter) + self._fileobj = self.path.open("w") + self._writer = csv.writer(self._fileobj, delimiter=delimiter) self._writer.writerow( ["model", "input_count", "cleaned_count", "transferred", "issue_percentage"] ) + def close(self): + self._fileobj.close() + def well_metrics(self, *args, **kw) -> None: self._handle_metrics( Thing, where=Thing.thing_type == "water well", name="Well", *args, **kw @@ -133,7 +137,7 @@ def _handle_metrics( def _write_errors(self, errors: list) -> None: if self.include_errors: - self._writer.writerow(["PointID", "Error"]) + self._writer.writerow(["PointID", "Table", "Field", "Error"]) for record in errors: error = record["error"] # if not isinstance(error, (list, tuple)): @@ -150,18 +154,19 @@ def _write_errors(self, errors: list) -> None: error = nes elif isinstance(error, ProgrammingError): detail = error.orig.args[0].get("D") - # first = error.args[0] - # detail = first.get("D") if isinstance(first, dict) else first - # print('eee', error) - # print('vvve',type(error.args), error.args) - # error=[error] - # error = [error.args[0].get("D")] error = [detail] elif isinstance(error, Exception): error = [str(error)] for ee in error: - self._writer.writerow([record["pointid"], ee]) + self._writer.writerow( + [ + record["pointid"], + record.get("table"), + record.get("field"), + ee, + ] + ) self._writer.writerow([]) diff --git a/transfers/sensor_transfer.py b/transfers/sensor_transfer.py index f4974259b..f6ff49dcb 100644 --- a/transfers/sensor_transfer.py +++ b/transfers/sensor_transfer.py @@ -36,6 +36,7 @@ def transfer_sensors(session): cleaned_df = replace_nans(cleaned_df) errors = [] grouped_equipment = cleaned_df.groupby(["PointID"]) + added = {} for index, group in grouped_equipment: pointid = index[0] thing = session.query(Thing).filter(Thing.name == pointid).first() @@ -67,17 +68,23 @@ def transfer_sensors(session): ) continue - sensor = ( - session.query(Sensor) - .filter(Sensor.serial_no == row.SerialNo) - .one_or_none() - ) - if sensor: + if row.SerialNo in added: logger.info( - f"Sensor with serial number {row.SerialNo} already exists. Only creating deployment for that record" + f"Sensor with serial number {row.SerialNo} already added in this transfer session. Only creating deployment for that record" ) + sensor = added[row.SerialNo] else: + sensor = ( + session.query(Sensor) + .filter(Sensor.serial_no == row.SerialNo) + .one_or_none() + ) + if sensor: + logger.info( + f"Sensor with serial number {row.SerialNo} already exists. Only creating deployment for that record" + ) + if not sensor: # TODO: Add validation sensor = Sensor( nma_pk_equipment=row.GlobalID, @@ -88,6 +95,7 @@ def transfer_sensors(session): owner_agency="NMBGMR", notes=row.Equipment_Notes, ) + added[row.SerialNo] = sensor session.add(sensor) logger.info( f"Added sensor {sensor.name} with serial number {sensor.serial_no}" diff --git a/transfers/transfer.py b/transfers/transfer.py index b456055e0..117a334b3 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -190,9 +190,9 @@ def transfer_debugging(sess, limit=100): results = timeit_direct(transfer_contacts, sess) metrics.contact_metrics(sess, *results) # - # message("TRANSFERRING WATER LEVELS") - # results = timeit_direct(transfer_water_levels, sess) - # metrics.water_level_metrics(sess, *results) + message("TRANSFERRING WATER LEVELS") + results = timeit_direct(transfer_water_levels, sess) + metrics.water_level_metrics(sess, *results) # message("TRANSFERRING WATER LEVELS PRESSURE") # results = timeit_direct(transfer_water_levels_pressure, sess) @@ -223,6 +223,7 @@ def transfer_debugging(sess, limit=100): # timeit_direct(transfer_water_levels_acoustic, sess) # message("TRANSFERRING ASSETS") # timeit_direct(transfer_assets, sess) + metrics.close() def main(): diff --git a/transfers/waterlevels_transfer.py b/transfers/waterlevels_transfer.py index 14ced3cc0..a1bb32717 100644 --- a/transfers/waterlevels_transfer.py +++ b/transfers/waterlevels_transfer.py @@ -46,11 +46,19 @@ SPACE_6 = " " * 6 -def get_dt_utc(row): +def get_dt_utc(row, errors): if pd.isna(row.DateMeasured): logger.critical( f"transfer_water_levels. Skipping row PointID={row.PointID}, objectid={row.OBJECTID} because there is no DateMeasured" ) + errors.append( + { + "pointid": row.PointID, + "error": "no DateMeasured", + "table": "WaterLevels", + "field": "DateMeasured", + } + ) return if pd.isna(row.TimeMeasured): @@ -69,6 +77,14 @@ def get_dt_utc(row): dt = datetime.strptime(dt_measured, fmt) return convert_mt_to_utc(dt) except ValueError as e: + errors.append( + { + "pointid": row.PointID, + "error": str(e), + "table": "WaterLevels", + "field": "DateMeasured", + } + ) logger.critical( f"transfer_water_levels. Skipping row PointID={row.PointID}, objectid={row.OBJECTID} due to " f"invalid date/time: {e}" @@ -120,8 +136,8 @@ def transfer_water_levels(session): with open(path, "r") as f: measured_by_mapper = json.load(f) - - input_df = read_csv("WaterLevels") + source_table = "WaterLevels" + input_df = read_csv(source_table) cleaned_df = filter_to_valid_point_ids(session, input_df) cleaned_df = filter_by_valid_measuring_agency(cleaned_df) @@ -141,6 +157,14 @@ def transfer_water_levels(session): logger.critical( f"Thing with PointID={pointid} not found. Skipping water levels" ) + errors.append( + { + "pointid": pointid, + "error": "Thing with PointID not found", + "table": source_table, + "field": "PointID", + } + ) continue n = len(group) @@ -151,7 +175,7 @@ def transfer_water_levels(session): ) session.commit() - dt_utc = get_dt_utc(row) + dt_utc = get_dt_utc(row, errors) if dt_utc is None: continue From 23ee27ae727fb2b7048ea526de16de386d5a44b6 Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 18 Nov 2025 14:11:42 -0700 Subject: [PATCH 04/11] refactor: simplify database initialization by removing session context from erase_and_rebuild_db --- core/app.py | 9 +-------- core/initializers.py | 22 ++++++++++----------- tests/__init__.py | 8 ++------ tests/features/environment.py | 5 +++-- transfers/transfer.py | 37 ++++++----------------------------- transfers/well_transfer.py | 22 +++++++++++++++++++++ 6 files changed, 45 insertions(+), 58 deletions(-) diff --git a/core/app.py b/core/app.py index 78b33f887..b0e0184fe 100644 --- a/core/app.py +++ b/core/app.py @@ -17,8 +17,6 @@ from contextlib import asynccontextmanager from typing import AsyncGenerator -from db.engine import session_ctx - from fastapi import FastAPI from fastapi.openapi.docs import ( get_swagger_ui_html, @@ -27,8 +25,6 @@ from fastapi.openapi.utils import get_openapi from .initializers import ( - init_lexicon, - init_parameter, register_routes, erase_and_rebuild_db, ) @@ -41,10 +37,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: Application lifespan event handler to initialize the database and lexicon. """ if settings.get_enum("MODE") == "development": - with session_ctx() as session: - erase_and_rebuild_db(session) - init_lexicon() - init_parameter() + erase_and_rebuild_db() register_routes(app) yield diff --git a/core/initializers.py b/core/initializers.py index a38ee718c..fdfd4bfc6 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -18,7 +18,6 @@ from fastapi_pagination import add_pagination from sqlalchemy import text from sqlalchemy.exc import DatabaseError -from sqlalchemy.orm import Session from db import Base from db.engine import session_ctx @@ -56,16 +55,17 @@ def init_parameter(path: str = None) -> None: session.rollback() -def erase_and_rebuild_db(session: Session): - session.execute(text("DROP SCHEMA public CASCADE")) - session.execute(text("CREATE SCHEMA public")) - session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis")) - session.commit() - Base.metadata.drop_all(session.bind) - Base.metadata.create_all(session.bind) - - init_lexicon() - init_parameter() +def erase_and_rebuild_db(): + with session_ctx() as session: + session.execute(text("DROP SCHEMA public CASCADE")) + session.execute(text("CREATE SCHEMA public")) + session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis")) + session.commit() + Base.metadata.drop_all(session.bind) + Base.metadata.create_all(session.bind) + + init_lexicon() + init_parameter() def init_lexicon(path: str = None) -> None: diff --git a/tests/__init__.py b/tests/__init__.py index 678c60440..092707335 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -34,8 +34,6 @@ from starlette.middleware.cors import CORSMiddleware from core.initializers import ( - init_lexicon, - init_parameter, register_routes, erase_and_rebuild_db, ) @@ -43,11 +41,9 @@ from db.engine import session_ctx from core.app import app -with session_ctx() as session: - erase_and_rebuild_db(session) - - +erase_and_rebuild_db() register_routes(app) + app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins, adjust as needed for security diff --git a/tests/features/environment.py b/tests/features/environment.py index 04850e916..2640a25b4 100644 --- a/tests/features/environment.py +++ b/tests/features/environment.py @@ -221,9 +221,10 @@ def before_all(context): context.objects = {} rebuild = False # rebuild = True + if rebuild: + erase_and_rebuild_db() + with session_ctx() as session: - if rebuild: - erase_and_rebuild_db(session) loc_1 = add_location(context, session) loc_2 = add_location(context, session) diff --git a/transfers/transfer.py b/transfers/transfer.py index 117a334b3..7d84b347f 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -24,8 +24,7 @@ transfer_water_levels_pressure, transfer_water_levels_acoustic, ) -from sqlalchemy.orm import Session -from core.initializers import init_lexicon, init_parameter, erase_and_rebuild_db +from core.initializers import erase_and_rebuild_db from db.engine import session_ctx from transfers.group_transfer import transfer_groups @@ -43,33 +42,6 @@ from transfers.logger import logger, save_log_to_bucket -def erase_and_initalize(session: Session) -> None: - logger.info( - "Erasing existing data and initializing lexicon, parameter, and sensors" - ) - erase(session) - lexicon() - parameter() - - -@timeit -def lexicon(): - logger.info("Initializing lexicon") - init_lexicon() - - -@timeit -def parameter(): - logger.info("Initializing parameter") - init_parameter() - - -@timeit -def erase(session: Session): - logger.info("Erase and rebuilding database") - erase_and_rebuild_db(session) - - def message(msg, pad=10, new_line_at_top=True): pad = "*" * pad if new_line_at_top: @@ -80,7 +52,9 @@ def message(msg, pad=10, new_line_at_top=True): @timeit def transfer_all(sess, limit=100): message("STARTING TRANSFER", new_line_at_top=False) - erase_and_initalize(sess) + + logger.info("Erase and rebuilding database") + erase_and_rebuild_db() metrics = Metrics() message("TRANSFERRING WELLS") @@ -155,7 +129,8 @@ def transfer_debugging(sess, limit=100): message("STARTING TRANSFER DEBUG", new_line_at_top=False) if int(os.environ.get("ERASE_AND_REBUILD", 0)): - erase_and_initalize(sess) + logger.info("Erase and rebuilding database") + erase_and_rebuild_db() metrics = Metrics() message("TRANSFERRING WELLS") diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index c1f0731ea..4f7e2d5bd 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -215,6 +215,9 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None well_casing_diameter=row.CasingDiameter, well_casing_depth=row.CasingDepth, release_status="public" if row.PublicRelease else "private", + notes=( + [{"content": row.Notes, "note_type": "Other"}] if row.Notes else [] + ), ) CreateWell.model_validate(data) @@ -237,9 +240,18 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None ) well_data["thing_type"] = "water well" well_data["nma_pk_welldata"] = row.WellID + + notes = well_data.pop("notes") well = Thing(**well_data) session.add(well) + # session.commit() + # session.refresh(well) + # if notes: + # for ni in notes: + # nn = well.add_note(ni['content'], ni['note_type']) + # session.add(nn) + if well_purposes: for wp in well_purposes: # TODO: add validation logic here @@ -274,6 +286,16 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None session.add(assoc) session.commit() + + # add notes + for well in session.query(Thing).filter(Thing.thing_type == "water well").all(): + row = wdf[wdf["PointID"] == well.name].iloc[0] + if not isna(row.Notes): + note = well.add_note(row.Notes, "Other") + session.add(note) + + session.commit() + return input_df, cleaned_df, errors # try: # session.commit() From f5a71a5d467b8ed2829375341609de73588b8b87 Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 18 Nov 2025 14:47:44 -0700 Subject: [PATCH 05/11] refactor: move lexicon and parameter initialization outside of database setup --- core/initializers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/initializers.py b/core/initializers.py index fdfd4bfc6..74c811bff 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -64,8 +64,8 @@ def erase_and_rebuild_db(): Base.metadata.drop_all(session.bind) Base.metadata.create_all(session.bind) - init_lexicon() - init_parameter() + init_lexicon() + init_parameter() def init_lexicon(path: str = None) -> None: From 681c788658a0238b7fabdbc44facb3da03eaa004 Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 18 Nov 2025 15:30:34 -0700 Subject: [PATCH 06/11] refactor: streamline accuracy handling and improve logging in well transfer process --- .gitignore | 1 + transfers/util.py | 65 ++++++++------------------------- transfers/well_transfer.py | 75 ++++++++++++++++++++------------------ 3 files changed, 57 insertions(+), 84 deletions(-) diff --git a/.gitignore b/.gitignore index ec05f5cce..c1d8db1ee 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ transfers/data/nma_csv_cache/* tests/features/*.feature transfers/metrics/* transfers/logs/* +run_bdd-local.sh # deployment files diff --git a/transfers/util.py b/transfers/util.py index 6d3d6a1cf..4c45085e0 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -36,52 +36,19 @@ transform_srid, get_epqs_elevation_from_point, convert_ft_to_m, - convert_ngvd29_to_navd88, ) from transfers.logger import logger - NMA_COORDINATE_ACCURACY = { - "5m": { - "accuracy_value": 5, - "accuracy_unit": "m", - }, - "1": { - "accuracy_value": 0.1, - "accuracy_unit": "second", - }, - "5": { - "accuracy_value": 0.5, - "accuracy_unit": "second", - }, - "F": { - "accuracy_value": 5, - "accuracy_unit": "second", - }, - "H": { - "accuracy_value": 0.01, - "accuracy_unit": "second", - }, - "M": { - "accuracy_value": 1, - "accuracy_unit": "minute", - }, - "R": { - "accuracy_value": 3, - "accuracy_unit": "second", - }, - "S": { - "accuracy_value": 1, - "accuracy_unit": "second", - }, - "T": { - "accuracy_value": 10, - "accuracy_unit": "second", - }, - None: { - "accuracy_value": None, - "accuracy_unit": None, - }, + "5m": (5, "m"), + "1": (0.1, "second"), + "5": (0.5, "second"), + "F": (5, "second"), + "H": (0.01, "second"), + "M": (1, "minute"), + "R": (3, "second"), + "S": (1, "second"), + "T": (10, "second"), } @@ -264,8 +231,11 @@ def make_location(row: pd.Series) -> tuple: elevation_from_epqs = False z = convert_ft_to_m(z) - if row.AltDatum == "NGVD29": - z = convert_ngvd29_to_navd88(z, transformed_point.x, transformed_point.y) + # This is slowing things down significantly + # this information should be cached in a json file and stored in storage bucket + # I am disabling this for now, until this can be sped up + # if row.AltDatum == "NGVD29": + # z = convert_ngvd29_to_navd88(z, transformed_point.x, transformed_point.y) else: elevation_from_epqs = True logger.info( @@ -386,11 +356,8 @@ def make_location_data_provenance( else None ) - accuracy_value = NMA_COORDINATE_ACCURACY.get(row.CoordinateAccuracy, None).get( - "accuracy_value" - ) - accuracy_unit = NMA_COORDINATE_ACCURACY.get(row.CoordinateAccuracy, None).get( - "accuracy_unit" + accuracy_value, accuracy_unit = NMA_COORDINATE_ACCURACY.get( + row.CoordinateAccuracy, (None, None) ) provenance = DataProvenance( diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 6de212d2f..ff7956d0c 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -157,6 +157,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None step = 25 start_time = time.time() errors = [] + added_locations = {} for i, row in enumerate(wdf.itertuples()): pointid = row.PointID if wdf[wdf["PointID"] == pointid].shape[0] > 1: @@ -193,12 +194,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None try: location, elevation_method = make_location(row) session.add(location) - session.flush() - data_provenances = make_location_data_provenance( - row, location, elevation_method - ) - for dp in data_provenances: - session.add(dp) + added_locations[row.PointID] = elevation_method except Exception as e: if location is not None: session.expunge(location) @@ -267,28 +263,13 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None well_data["thing_type"] = "water well" well_data["nma_pk_welldata"] = row.WellID - notes = well_data.pop("notes") + well_data.pop("notes") well = Thing(**well_data) session.add(well) - logger.info(f"Created well for {row.PointID}") + # logger.info(f"Created well for {row.PointID}") # flush well to access its ID for status_history - session.flush() - - """ - Developer's note - - It's not clear when the measuring point from NM_Aquifer was - determined, so I'm setting start_date to the day of the transfer - """ - measuring_point_history = MeasuringPointHistory( - thing_id=well.id, - measuring_point_height=row.MPHeight, - measuring_point_description=row.MeasuringPoint, - start_date=datetime.now(tz=UTC), - end_date=None, - ) - session.add(measuring_point_history) + # session.flush() # session.commit() # session.refresh(well) @@ -330,6 +311,38 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None assoc.thing = well session.add(assoc) + session.commit() + + # add things thate need well id + for well in session.query(Thing).filter(Thing.thing_type == "water well").all(): + row = wdf[wdf["PointID"] == well.name].iloc[0] + if not isna(row.Notes): + note = well.add_note(row.Notes, "Other") + session.add(note) + + location = well.current_location + elevation_method = added_locations[row.PointID] + data_provenances = make_location_data_provenance( + row, location, elevation_method + ) + for dp in data_provenances: + session.add(dp) + + """ + Developer's note + + It's not clear when the measuring point from NM_Aquifer was + determined, so I'm setting start_date to the day of the transfer + """ + measuring_point_history = MeasuringPointHistory( + thing_id=well.id, + measuring_point_height=row.MPHeight, + measuring_point_description=row.MeasuringPoint, + start_date=datetime.now(tz=UTC), + end_date=None, + ) + session.add(measuring_point_history) + """ Developer's notes @@ -339,9 +352,10 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None # TODO: if row.MonitoringStatus == "Q" is it monitored or not? <-- AMMP review # TODO: if row.MonitoringStatus == "X" can that change? <-- AMMP review # TODO: have AMMP review and verify the various MonitoringStatus codes + target_id = well.id target_table = "thing" - if row.MonitoringStatus: + if not isna(row.MonitoringStatus): if ( "X" in row.MonitoringStatus or "I" in row.MonitoringStatus @@ -378,7 +392,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None f" Adding '{monitoring_frequency}' monitoring frequency for well {well.name}" ) - if row.Status: + if not isna(row.Status): status_value = lexicon_mapper.map_value(f"LU_Status:{row.Status}") status_history = StatusHistory( status_type="Well Status", @@ -393,15 +407,6 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None session.commit() - # add notes - for well in session.query(Thing).filter(Thing.thing_type == "water well").all(): - row = wdf[wdf["PointID"] == well.name].iloc[0] - if not isna(row.Notes): - note = well.add_note(row.Notes, "Other") - session.add(note) - - session.commit() - return input_df, cleaned_df, errors From 5748f14c471bb8152cb863597cb73091edd1da19 Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 18 Nov 2025 15:31:23 -0700 Subject: [PATCH 07/11] refactor: enable pytest hook in pre-commit configuration --- .pre-commit-config.yaml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8ea7e9413..5d74e6a6c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,15 +16,15 @@ repos: '--statistics' ] exclude: ^db/__init__.py$ # all models need to be imported for Alembic, but are not used directly -# - repo: local -# hooks: -# - id: pytest -# name: pytest -# entry: pytest # Or your specific test command, e.g., poetry run pytest -# language: system -# types: [python] # Specify relevant file types for your tests -# pass_filenames: false -# always_run: true + - repo: local + hooks: + - id: pytest + name: pytest + entry: pytest # Or your specific test command, e.g., poetry run pytest + language: system + types: [python] # Specify relevant file types for your tests + pass_filenames: false + always_run: true # - repo: https://github.com/pre-commit/mirrors-mypy # rev: v1.10.0 # Use the latest stable version or pin to your preference From 0eedbb12d24366be298642c08d19c146c81bbc6c Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 18 Nov 2025 16:29:52 -0700 Subject: [PATCH 08/11] refactor: add PastOrTodayDate type for date validation in schemas --- schemas/__init__.py | 13 ++++++++++++- schemas/thing.py | 16 ++++++++-------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/schemas/__init__.py b/schemas/__init__.py index 87f5688c3..cd8e62d62 100644 --- a/schemas/__init__.py +++ b/schemas/__init__.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from datetime import datetime, timezone +from datetime import datetime, timezone, date +from typing import Annotated from pydantic import ( BaseModel, @@ -21,6 +22,7 @@ AwareDatetime, field_validator, ) +from pydantic.functional_validators import AfterValidator from pydantic.json_schema import JsonSchemaValue from pydantic_core import core_schema @@ -51,6 +53,15 @@ class BaseUpdateModel(BaseCreateModel): release_status: ReleaseStatus | None = None +def past_or_today_validator(value: date) -> date: + if value > date.today(): + raise ValueError("Date must be today or in the past.") + return value + + +PastOrTodayDate = Annotated[date, AfterValidator(past_or_today_validator)] + + # Custom type for UTC datetime serialization class UTCAwareDatetime(AwareDatetime): """Custom datetime type that always serializes to UTC with 'Z' suffix.""" diff --git a/schemas/thing.py b/schemas/thing.py index 9a1096e36..cf8c3ef2b 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -15,7 +15,7 @@ # =============================================================================== from typing import List -from pydantic import BaseModel, model_validator, PastDate, Field, field_validator +from pydantic import BaseModel, model_validator, Field, field_validator from core.enums import ( WellPurpose, @@ -25,9 +25,9 @@ Organization, MonitoringFrequency, ) -from schemas import BaseCreateModel, BaseUpdateModel, BaseResponseModel -from schemas.location import LocationGeoJSONResponse +from schemas import BaseCreateModel, BaseUpdateModel, BaseResponseModel, PastOrTodayDate from schemas.group import GroupResponse +from schemas.location import LocationGeoJSONResponse from schemas.notes import NoteResponse, CreateNote @@ -102,7 +102,7 @@ class CreateBaseThing(BaseCreateModel): location_id: int | None group_id: int | None = None # Optional group ID for the thing name: str # Name of the thing - first_visit_date: PastDate | None = None # Date of NMBGMR's first visit + first_visit_date: PastOrTodayDate | None = None # Date of NMBGMR's first visit class CreateWell(CreateBaseThing, ValidateWell): @@ -171,15 +171,15 @@ class ThingIdLinkResponse(BaseResponseModel): class MonitoringFrequencyResponse(BaseModel): monitoring_frequency: MonitoringFrequency - start_date: PastDate - end_date: PastDate | None + start_date: PastOrTodayDate + end_date: PastOrTodayDate | None class BaseThingResponse(BaseResponseModel): name: str thing_type: str current_location: LocationGeoJSONResponse - first_visit_date: PastDate | None + first_visit_date: PastOrTodayDate | None # The new relationship to the polymorphic Notes table notes: List[NoteResponse] = [] @@ -317,7 +317,7 @@ class UpdateThing(BaseUpdateModel): """ name: str | None = None # Optional name for the thing - first_visit_date: PastDate | None = None # Date of NMBGMR's first visit + first_visit_date: PastOrTodayDate | None = None # Date of NMBGMR's first visit class UpdateWell(UpdateThing, ValidateWell): From 8195837062f2f82f07e832f6c6b71a38967d1103 Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 18 Nov 2025 16:48:58 -0700 Subject: [PATCH 09/11] refactor: implement elevation caching to optimize location processing --- transfers/thing_transfer.py | 7 +++++-- transfers/util.py | 17 +++++++++++------ transfers/well_transfer.py | 22 +++++++++++++++++++++- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/transfers/thing_transfer.py b/transfers/thing_transfer.py index 38f9b4708..3469fbc53 100644 --- a/transfers/thing_transfer.py +++ b/transfers/thing_transfer.py @@ -20,13 +20,13 @@ from db import LocationThingAssociation from services.thing_helper import add_thing +from transfers.logger import logger from transfers.util import ( make_location, make_location_data_provenance, read_csv, replace_nans, ) -from transfers.logger import logger def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -> None: @@ -37,6 +37,9 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - ldf = replace_nans(ldf) n = len(ldf) start_time = time.time() + + cached_elevations = {} + for i, row in enumerate(ldf.itertuples()): pointid = row.PointID if ldf[ldf["PointID"] == pointid].shape[0] > 1: @@ -54,7 +57,7 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - session.commit() try: - location, elevation_method = make_location(row) + location, elevation_method = make_location(row, cached_elevations) session.add(location) session.flush() data_provenances = make_location_data_provenance( diff --git a/transfers/util.py b/transfers/util.py index 4c45085e0..cbf0f2b17 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -36,6 +36,7 @@ transform_srid, get_epqs_elevation_from_point, convert_ft_to_m, + convert_ngvd29_to_navd88, ) from transfers.logger import logger @@ -188,7 +189,7 @@ def chunk_by_size(df, chunk_size): yield df.iloc[i : i + chunk_size] -def make_location(row: pd.Series) -> tuple: +def make_location(row: pd.Series, elevations: dict) -> tuple: """ Returns a tuple of location data and the elevation method """ @@ -231,11 +232,15 @@ def make_location(row: pd.Series) -> tuple: elevation_from_epqs = False z = convert_ft_to_m(z) - # This is slowing things down significantly - # this information should be cached in a json file and stored in storage bucket - # I am disabling this for now, until this can be sped up - # if row.AltDatum == "NGVD29": - # z = convert_ngvd29_to_navd88(z, transformed_point.x, transformed_point.y) + if row.AltDatum == "NGVD29": + key = f"{row.PointID}, {transformed_point.x, transformed_point.y}" + if key in elevations: + z = elevations[key] + else: + z = convert_ngvd29_to_navd88( + z, transformed_point.x, transformed_point.y + ) + elevations[key] = z else: elevation_from_epqs = True logger.info( diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index ff7956d0c..ee54d0216 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -148,6 +148,24 @@ def get_wells_to_transfer( return input_df, cleaned_df +def get_cached_elevations() -> dict: + bucket = get_storage_bucket() + log_filename = "transfer_data/cached_elevations.json" + blob = bucket.blob(log_filename) + if blob.exists(): + lut = json.loads(blob.download_as_string()) + return lut + else: + return {} + + +def dump_cached_elevations(lut: dict): + bucket = get_storage_bucket() + log_filename = "transfer_data/cached_elevations.json" + blob = bucket.blob(log_filename) + blob.upload_from_string(json.dumps(lut)) + + def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None: input_df, cleaned_df = get_wells_to_transfer(session, flags) source_table = "WellData" @@ -158,6 +176,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None start_time = time.time() errors = [] added_locations = {} + cached_elevations = get_cached_elevations() for i, row in enumerate(wdf.itertuples()): pointid = row.PointID if wdf[wdf["PointID"] == pointid].shape[0] > 1: @@ -192,7 +211,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None location = None try: - location, elevation_method = make_location(row) + location, elevation_method = make_location(row, cached_elevations) session.add(location) added_locations[row.PointID] = elevation_method except Exception as e: @@ -407,6 +426,7 @@ def transfer_wells(session: Session, flags: dict = None, limit: int = 0) -> None session.commit() + dump_cached_elevations(cached_elevations) return input_df, cleaned_df, errors From 09b831ee58b5a7a42567f6b5cbcb887cb73cd47e Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 18 Nov 2025 17:30:42 -0700 Subject: [PATCH 10/11] refactor: implement elevation caching to optimize location processing --- transfers/metrics.py | 8 ++++++++ transfers/transfer.py | 1 + 2 files changed, 9 insertions(+) diff --git a/transfers/metrics.py b/transfers/metrics.py index c6c2c7586..68d3d307e 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -1,3 +1,5 @@ +from services.gcs_helper import get_storage_bucket + 1 # =============================================================================== # Copyright 2025 ross # @@ -56,6 +58,12 @@ def __init__(self): ["model", "input_count", "cleaned_count", "transferred", "issue_percentage"] ) + def save_to_storage_bucket(self): + bucket = get_storage_bucket() + log_filename = self.path.name + blob = bucket.blob(f"transfer_metrics/{log_filename}") + blob.upload_from_string(self.path.read_text()) + def close(self): self._fileobj.close() diff --git a/transfers/transfer.py b/transfers/transfer.py index 7d84b347f..77275ed35 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -199,6 +199,7 @@ def transfer_debugging(sess, limit=100): # message("TRANSFERRING ASSETS") # timeit_direct(transfer_assets, sess) metrics.close() + metrics.save_to_storage_bucket() def main(): From b250e2f595012f50fe2ec104dc7e86afed86b9ba Mon Sep 17 00:00:00 2001 From: jakeross Date: Tue, 18 Nov 2025 17:32:40 -0700 Subject: [PATCH 11/11] refactor: import get_storage_bucket in metrics.py for improved functionality --- transfers/metrics.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/transfers/metrics.py b/transfers/metrics.py index 68d3d307e..25b6b626b 100644 --- a/transfers/metrics.py +++ b/transfers/metrics.py @@ -1,6 +1,4 @@ -from services.gcs_helper import get_storage_bucket - -1 # =============================================================================== +# =============================================================================== # Copyright 2025 ross # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -36,6 +34,7 @@ Deployment, TransducerObservation, ) +from services.gcs_helper import get_storage_bucket class Metrics: