diff --git a/.gitignore b/.gitignore index 1f1637d10..6be8b4d9e 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ migrate.sh launcher.sh gcs_credentials.json transfers/data/assets* +transfers/transfer*.log # deployment files app.yaml \ No newline at end of file diff --git a/transfers/asset_transfer.py b/transfers/asset_transfer.py index a9682797a..be0efa323 100644 --- a/transfers/asset_transfer.py +++ b/transfers/asset_transfer.py @@ -32,18 +32,18 @@ get_storage_bucket, get_storage_client, ) -from transfers.util import get_valid_things +from transfers.util import get_valid_things, logger def transfer_assets(session: Session) -> None: client = get_storage_client() bucket = get_storage_bucket(client) - print(f"Using bucket {bucket.name}") + logger.info(f"Using bucket {bucket.name}") for thing in get_valid_things(session): # find images in temp bucket - print(f"Processing PointID: {thing.name}") + logger.info(f"Processing PointID: {thing.name}") blobs = bucket.list_blobs(prefix=f"nma-photos/{thing.name}") # move blobs from temp to assets bucket for srcblob in blobs: @@ -61,7 +61,7 @@ def transfer_assets_testing(session: Session) -> None: thing_id = 151 if check_asset_exists(session, blob_name, thing_id): - print(f"Asset {blob_name} already exists. Skipping.") + logger.warning(f"Asset {blob_name} already exists. Skipping.") continue add_asset(session, uf, p, thing_id, uri, blob_name) diff --git a/transfers/contact_transfer.py b/transfers/contact_transfer.py index 28c025b97..690fa0f28 100644 --- a/transfers/contact_transfer.py +++ b/transfers/contact_transfer.py @@ -15,7 +15,7 @@ # =============================================================================== import numpy as np import pandas as pd -from transfers.util import read_csv, filter_to_valid_point_ids +from transfers.util import read_csv, filter_to_valid_point_ids, logger from db import Thing, Contact, ThingContactAssociation, Email, Phone, Address from schemas.contact import CreateContact @@ -51,7 +51,9 @@ def transfer_contacts(session): for i, row in odf.iterrows(): thing = session.query(Thing).where(Thing.name == row.PointID).first() if thing is None: - print(f"Thing with PointID {row.PointID} not found. Skipping owner.") + logger.warning( + f"Thing with PointID {row.PointID} not found. Skipping owner." + ) continue # TODO: extract role from OwnerComment @@ -143,7 +145,7 @@ def transfer_contacts(session): session.commit() except Exception as e: - print( + logger.warning( f"Skipping first contact for PointID {row.PointID} due to validation error: {e}" ) from pprint import pprint @@ -202,7 +204,7 @@ def transfer_contacts(session): session.add(second_contact) except Exception as e: - print( + logger.warning( f"Skipping second contact for PointID {row.PointID} due to validation error: {e}" ) session.rollback() diff --git a/transfers/group_transfer.py b/transfers/group_transfer.py index d49527e1e..7aa82714b 100644 --- a/transfers/group_transfer.py +++ b/transfers/group_transfer.py @@ -18,7 +18,7 @@ from db import Thing, Group from db.engine import session_ctx -from transfers.util import read_csv +from transfers.util import read_csv, logger def transfer_groups( @@ -40,7 +40,7 @@ def transfer_groups( sql = select(Thing).where(Thing.name.like(f"{prefix}%")) records = session.scalars(sql).all() if records: - print( + logger.info( f"Adding {len(records)} things to group {group.name}, prefix {prefix}" ) group.things = records diff --git a/transfers/link_ids_transfer.py b/transfers/link_ids_transfer.py index 3e8810a86..07ca9d74a 100644 --- a/transfers/link_ids_transfer.py +++ b/transfers/link_ids_transfer.py @@ -19,7 +19,7 @@ from db import Thing, ThingIdLink from transfers.util import ( filter_to_valid_point_ids, - log, + logger, extract_organization, read_csv, ) @@ -34,12 +34,14 @@ def transfer_link_ids_welldata(session): # RULE: exclude rows where both ids are null if pd.isna(row.OSEWellID) and pd.isna(row.OSEWelltagID): - log(row, "Both OSEWellID and OSEWelltagID are null") + logger.warning(f"Both OSEWellID and OSEWelltagID are null for row {i}") continue thing = session.query(Thing).where(Thing.name == row.PointID).first() if thing is None: - log(row, "Thing not found") + logger.warning( + f"Thing not found for row {i} PointID {row.PointID}. Skipping link ids." + ) continue for aid, klass, regex in ( @@ -51,16 +53,20 @@ def transfer_link_ids_welldata(session): ), # TODO: need to figure out regex for this field ): if pd.isna(aid): - log(row, f"{klass} is null") + logger.warning(f"{klass} is null for row {i}") continue # RULE: exclude any id that == 'X', '?' if aid.strip().lower() in ("x", "?", "exempt"): - log(row, f'{klass} is "X", "?", or "exempt", id={aid}') + logger.warning( + f'{klass} is "X", "?", or "exempt", id={aid} for row {i}' + ) continue if regex and not re.match(regex, aid): - log(row, f"{klass} id does not match regex {regex}, id={aid}") + logger.warning( + f"{klass} id does not match regex {regex}, id={aid} for row {i}" + ) continue link_id = ThingIdLink() @@ -87,7 +93,7 @@ def add_link_alternate_site_id(session, row, thing): link_id.alternate_organization = extract_organization(str(row.AlternateSiteID)) - print("adding link id: ", link_id) + logger.info(f"adding link id: {link_id}") session.add(link_id) @@ -103,7 +109,7 @@ def add_link_site_id(session, row, thing): if not re.match(r"^\d{15}$", site_id): # TODO: lets make a sweet function for flagging issues # flag for interrogation - log(row, f"alternate id {site_id} is not a valid USGS site id") + logger.warning(f"alternate id {site_id} is not a valid USGS site id") return link_id.alternate_id = row.SiteID @@ -131,7 +137,7 @@ def add_link_plss(session, row, thing): alternate_id = f"T{township}{township_direction}.R{_range}{range_direction}.S{section}{section_direction}" if not re.match(r"T\d{1,3}.R\d{1,3}.S\d{1,3}", alternate_id): # flag for interrogation - log(row, f"alternate id {alternate_id} is not a valid PLSS id") + logger.warning(f"alternate id {alternate_id} is not a valid PLSS") return link_id.alternate_id = alternate_id link_id.alternate_organization = "PLSS" @@ -149,10 +155,11 @@ def transfer_link_ids(session, site_type="GW"): for i, row in enumerate(ldf.itertuples()): thing = session.query(Thing).where(Thing.name == row.PointID).first() if thing is None: - # TODO: lets make a sweet function for flagging issues - # print(f"Thing with PointID {row.PointID} not foaund. Skipping link id.") + logger.warning( + f"Thing with PointID {row.PointID} not found. Skipping link id." + ) continue - print( + logger.info( f"Processing PointID: {row.PointID}, Thing ID: {thing.id}, a={row.AlternateSiteID}, " f"b={row.AlternateSiteID2}" ) diff --git a/transfers/thing_transfer.py b/transfers/thing_transfer.py index 6c85312be..844ca1eec 100644 --- a/transfers/thing_transfer.py +++ b/transfers/thing_transfer.py @@ -14,13 +14,11 @@ # limitations under the License. # =============================================================================== import time -from pathlib import Path from sqlalchemy.orm import Session -import pandas as pd from db import LocationThingAssociation from services.thing_helper import add_thing -from transfers.util import make_location, read_csv +from transfers.util import make_location, read_csv, logger def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -> None: @@ -32,11 +30,11 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - start_time = time.time() for i, row in enumerate(ldf.itertuples()): if limit and i >= limit: - print(f"Reached limit of {limit} rows. Stopping migration.") + logger.warning(f"Reached limit of {limit} rows. Stopping migration.") break if i and not i % 100: - print( + logger.info( f"Processing row {i} of {n}. {row.PointID}, avg rows per second: {i / (time.time() - start_time):.2f}" ) session.commit() diff --git a/transfers/transfer.py b/transfers/transfer.py index 1eb55b841..267aafd3f 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -32,6 +32,7 @@ transfer_ephemeral_stream, transfer_met, ) +from transfers.util import logger def erase_and_initalize(session: Session) -> None: @@ -43,6 +44,9 @@ def erase_and_initalize(session: Session) -> None: def main_transfer(): + logger.info("Starting transfer") + logger.info("") + init = True transfer_well_flag = False @@ -60,51 +64,70 @@ def main_transfer(): limit = 100 with session_ctx() as sess: - if init: erase_and_initalize(sess) if init or transfer_well_flag: - print("\n", "*" * 10, "TRANSFERRING WELLS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING WELLS" + "*" * 10 + logger.info(msg) transfer_wells(sess, limit) transfer_wellscreens(sess) + logger.info("") if init or transfer_spring_flag: - print("\n", "*" * 10, "TRANSFERRING SPRINGS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING SPRINGS" + "*" * 10 + logger.info(msg) transfer_springs(sess, limit) + logger.info("") if init or transfer_perennial_stream_flag: - print("\n", "*" * 10, "TRANSFERRING PERENNIAL STREAMS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING PERENNIAL STREAMS" + "*" * 10 + logger.info(msg) transfer_perennial_stream(sess, limit) + logger.info("") if init or transfer_ephemeral_stream_flag: - print("\n", "*" * 10, "TRANSFERRING EPHEMERAL STREAMS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING EPHEMERAL STREAMS" + "*" * 10 + logger.info(msg) transfer_ephemeral_stream(sess, limit) + logger.info("") if init or transfer_met_flag: - print("\n", "*" * 10, "TRANSFERRING METEOROLOGICAL", "*" * 10) + msg = "*" * 10 + "TRANSFERRING METEOROLOGICAL" + "*" * 10 + logger.info(msg) transfer_met(sess, limit) + logger.info("") if init or transfer_contacts_flag: - print("\n", "*" * 10, "TRANSFERRING CONTACTS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING CONTACTS" + "*" * 10 + logger.info(msg) transfer_contacts(sess) + logger.info("") if init or transfer_waterlevels_flag: - print("\n", "*" * 10, "TRANSFERRING WATER LEVELS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING WATER LEVELS" + "*" * 10 + logger.info(msg) transfer_water_levels(sess) + logger.info("") if init or transfer_link_ids_flag: - print("\n", "*" * 10, "TRANSFERRING LINK IDS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING LINK IDS" + "*" * 10 + logger.info(msg) transfer_link_ids(sess) transfer_link_ids_welldata(sess) + logger.info("") if init or transfer_assets_flag: - print("\n", "*" * 10, "TRANSFERRING ASSETS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING ASSETS" + "*" * 10 + logger.info(msg) transfer_assets_testing(sess) + logger.info("") if init or transfer_groups_flag: - print("\n", "*" * 10, "TRANSFERRING GROUPS", "*" * 10) + msg = "*" * 10 + "TRANSFERRING GROUPS" + "*" * 10 + logger.info(msg) transfer_groups(sess) + logger.info("") # if init or cleanup_wells_flag: # cleanup_wells(sess) diff --git a/transfers/util.py b/transfers/util.py index 4b4cec3bb..0c4cc155c 100644 --- a/transfers/util.py +++ b/transfers/util.py @@ -13,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +from datetime import datetime import re from pathlib import Path - +import logging import httpx import pyproj from shapely import Point @@ -26,6 +27,19 @@ from db import Thing, Location +log_filename = f"transfers/transfer_{datetime.now():%Y-%m-%dT%Hh%Mm%Ss}.log" + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)-8s] %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler(log_filename, mode="w", encoding="utf-8"), + ], +) +logger = logging.getLogger(__name__) + TRANSFORMERS = {} @@ -83,10 +97,6 @@ def filter_to_valid_point_ids(session: Session, df: pd.DataFrame) -> pd.DataFram return df[df["PointID"].isin(valid_point_ids)] -def log(row, msg): - print(f"{row.PointID} {msg}") - - def convert_to_wgs84_vertical_datum(row, z): if row.VerticalDatum == "NAVD88": z = z + 2.0 # TODO: check this transformation @@ -145,14 +155,14 @@ def get_quad_name_from_point(lon: float, lat: float) -> str: } resp = httpx.get(url, params=params, timeout=15) - print(resp) + logger.info(resp) data = resp.json() if data["features"]: attrs = data["features"][0]["attributes"] return attrs["CELL_NAME"] else: - print("No quad found") + logger.warning(f"No quad name found for POINT ({lon} {lat})") def get_epqs_elevation(lon: float, lat: float) -> float: diff --git a/transfers/waterlevels_transfer.py b/transfers/waterlevels_transfer.py index 6ecefc621..4ee336fc9 100644 --- a/transfers/waterlevels_transfer.py +++ b/transfers/waterlevels_transfer.py @@ -19,7 +19,7 @@ import pandas as pd from db import Thing, Sample, Observation -from transfers.util import filter_to_valid_point_ids, log, read_csv +from transfers.util import filter_to_valid_point_ids, logger, read_csv def transfer_water_levels(session): @@ -31,15 +31,14 @@ def transfer_water_levels(session): for index, group in gwd: for row in group.itertuples(): if pd.isna(row.DepthToWater) or pd.isna(row.DateMeasured): - log(row, f"Skipping row {row.Index} due to missing data.") + logger.warning(f"Skipping row {row.Index} due to missing data.") continue dt = datetime.fromisoformat(row.DateMeasured) thing = session.query(Thing).where(Thing.name == row.PointID).first() if thing is None: - log( - row, - f"Thing with PointID {row.PointID} not found. Skipping water level.", + logger.warning( + f"Thing with PointID {row.PointID} not found. Skipping water level." ) continue diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 13515ea42..bcdb0ce5e 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -31,6 +31,7 @@ get_state_from_point, get_county_from_point, get_quad_name_from_point, + logger, ) ADDED = [] @@ -49,11 +50,11 @@ def transfer_wells(session, limit=None): for i, row in enumerate(wdf.itertuples()): if limit and i >= limit: - print("Reached limit of", limit, "rows. Stopping migration.") + logger.warning("Reached limit of %d rows. Stopping migration.", limit) break if i and not i % 25: - print( + logger.info( f"Processing row {i} of {n}. {row.PointID}, avg rows per second: {i / (time.time() - start_time):.2f}" ) session.commit() @@ -61,7 +62,7 @@ def transfer_wells(session, limit=None): try: location = make_location(row) except Exception as e: - print(f"Error making location for row {i}: {e}") + logger.warning(f"Error making location for row {i}: {e}") break # print(location_row) @@ -116,13 +117,13 @@ def transfer_wellscreens(session, limit=None): for i, row in enumerate(wdf.itertuples()): if limit and i >= limit: - print("Reached limit of", limit, "rows. Stopping migration.") + logger.warning("Reached limit of", limit, "rows. Stopping migration.") break # this is for testing only. not sure in practice we have to commit every 100 rows # should we commit every row? or every 1000? or every 10? if i and not i % 100: - print( + logger.info( f"Processing row {i} of {n}. {row.PointID}, avg rows per second: {i / (time.time() - start_time):.2f}" ) session.commit() @@ -130,7 +131,9 @@ def transfer_wellscreens(session, limit=None): sql = select(Thing).where(Thing.name == row.PointID) thing = session.execute(sql).scalar_one_or_none() if not thing: - print(f"Thing with PointID {row.PointID} not found. Skipping well screen.") + logger.warning( + f"Thing with PointID {row.PointID} not found. Skipping well screen." + ) continue well_screen_data = { @@ -149,7 +152,9 @@ def transfer_wellscreens(session, limit=None): session.add(well_screen) session.commit() except ValidationError as e: - print(f"Validation error for row {i} with PointID {row.PointID}: {e}") + logger.warning( + f"Validation error for row {i} with PointID {row.PointID}: {e}" + ) continue