Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ migrate.sh
launcher.sh
gcs_credentials.json
transfers/data/assets*
transfers/transfer*.log

# deployment files
app.yaml
8 changes: 4 additions & 4 deletions transfers/asset_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@
get_storage_bucket,
get_storage_client,
)
from transfers.util import get_valid_things
from transfers.util import get_valid_things, logger


def transfer_assets(session: Session) -> None:
client = get_storage_client()

bucket = get_storage_bucket(client)
print(f"Using bucket {bucket.name}")
logger.info(f"Using bucket {bucket.name}")

for thing in get_valid_things(session):
# find images in temp bucket
print(f"Processing PointID: {thing.name}")
logger.info(f"Processing PointID: {thing.name}")
blobs = bucket.list_blobs(prefix=f"nma-photos/{thing.name}")
# move blobs from temp to assets bucket
for srcblob in blobs:
Expand All @@ -61,7 +61,7 @@ def transfer_assets_testing(session: Session) -> None:
thing_id = 151

if check_asset_exists(session, blob_name, thing_id):
print(f"Asset {blob_name} already exists. Skipping.")
logger.warning(f"Asset {blob_name} already exists. Skipping.")
continue
add_asset(session, uf, p, thing_id, uri, blob_name)

Expand Down
10 changes: 6 additions & 4 deletions transfers/contact_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# ===============================================================================
import numpy as np
import pandas as pd
from transfers.util import read_csv, filter_to_valid_point_ids
from transfers.util import read_csv, filter_to_valid_point_ids, logger
from db import Thing, Contact, ThingContactAssociation, Email, Phone, Address

from schemas.contact import CreateContact
Expand Down Expand Up @@ -51,7 +51,9 @@ def transfer_contacts(session):
for i, row in odf.iterrows():
thing = session.query(Thing).where(Thing.name == row.PointID).first()
if thing is None:
print(f"Thing with PointID {row.PointID} not found. Skipping owner.")
logger.warning(
f"Thing with PointID {row.PointID} not found. Skipping owner."
)
continue

# TODO: extract role from OwnerComment
Expand Down Expand Up @@ -143,7 +145,7 @@ def transfer_contacts(session):
session.commit()

except Exception as e:
print(
logger.warning(
f"Skipping first contact for PointID {row.PointID} due to validation error: {e}"
)
from pprint import pprint
Expand Down Expand Up @@ -202,7 +204,7 @@ def transfer_contacts(session):
session.add(second_contact)

except Exception as e:
print(
logger.warning(
f"Skipping second contact for PointID {row.PointID} due to validation error: {e}"
)
session.rollback()
Expand Down
4 changes: 2 additions & 2 deletions transfers/group_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from db import Thing, Group
from db.engine import session_ctx
from transfers.util import read_csv
from transfers.util import read_csv, logger


def transfer_groups(
Expand All @@ -40,7 +40,7 @@ def transfer_groups(
sql = select(Thing).where(Thing.name.like(f"{prefix}%"))
records = session.scalars(sql).all()
if records:
print(
logger.info(
f"Adding {len(records)} things to group {group.name}, prefix {prefix}"
)
group.things = records
Expand Down
31 changes: 19 additions & 12 deletions transfers/link_ids_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from db import Thing, ThingIdLink
from transfers.util import (
filter_to_valid_point_ids,
log,
logger,
extract_organization,
read_csv,
)
Expand All @@ -34,12 +34,14 @@ def transfer_link_ids_welldata(session):

# RULE: exclude rows where both ids are null
if pd.isna(row.OSEWellID) and pd.isna(row.OSEWelltagID):
log(row, "Both OSEWellID and OSEWelltagID are null")
logger.warning(f"Both OSEWellID and OSEWelltagID are null for row {i}")
continue

thing = session.query(Thing).where(Thing.name == row.PointID).first()
if thing is None:
log(row, "Thing not found")
logger.warning(
f"Thing not found for row {i} PointID {row.PointID}. Skipping link ids."
)
continue

for aid, klass, regex in (
Expand All @@ -51,16 +53,20 @@ def transfer_link_ids_welldata(session):
), # TODO: need to figure out regex for this field
):
if pd.isna(aid):
log(row, f"{klass} is null")
logger.warning(f"{klass} is null for row {i}")
continue

# RULE: exclude any id that == 'X', '?'
if aid.strip().lower() in ("x", "?", "exempt"):
log(row, f'{klass} is "X", "?", or "exempt", id={aid}')
logger.warning(
f'{klass} is "X", "?", or "exempt", id={aid} for row {i}'
)
continue

if regex and not re.match(regex, aid):
log(row, f"{klass} id does not match regex {regex}, id={aid}")
logger.warning(
f"{klass} id does not match regex {regex}, id={aid} for row {i}"
)
continue

link_id = ThingIdLink()
Expand All @@ -87,7 +93,7 @@ def add_link_alternate_site_id(session, row, thing):

link_id.alternate_organization = extract_organization(str(row.AlternateSiteID))

print("adding link id: ", link_id)
logger.info(f"adding link id: {link_id}")
session.add(link_id)


Expand All @@ -103,7 +109,7 @@ def add_link_site_id(session, row, thing):
if not re.match(r"^\d{15}$", site_id):
# TODO: lets make a sweet function for flagging issues
# flag for interrogation
log(row, f"alternate id {site_id} is not a valid USGS site id")
logger.warning(f"alternate id {site_id} is not a valid USGS site id")
return

link_id.alternate_id = row.SiteID
Expand Down Expand Up @@ -131,7 +137,7 @@ def add_link_plss(session, row, thing):
alternate_id = f"T{township}{township_direction}.R{_range}{range_direction}.S{section}{section_direction}"
if not re.match(r"T\d{1,3}.R\d{1,3}.S\d{1,3}", alternate_id):
# flag for interrogation
log(row, f"alternate id {alternate_id} is not a valid PLSS id")
logger.warning(f"alternate id {alternate_id} is not a valid PLSS")
return
link_id.alternate_id = alternate_id
link_id.alternate_organization = "PLSS"
Expand All @@ -149,10 +155,11 @@ def transfer_link_ids(session, site_type="GW"):
for i, row in enumerate(ldf.itertuples()):
thing = session.query(Thing).where(Thing.name == row.PointID).first()
if thing is None:
# TODO: lets make a sweet function for flagging issues
# print(f"Thing with PointID {row.PointID} not foaund. Skipping link id.")
logger.warning(
f"Thing with PointID {row.PointID} not found. Skipping link id."
)
continue
print(
logger.info(
f"Processing PointID: {row.PointID}, Thing ID: {thing.id}, a={row.AlternateSiteID}, "
f"b={row.AlternateSiteID2}"
)
Expand Down
8 changes: 3 additions & 5 deletions transfers/thing_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
# limitations under the License.
# ===============================================================================
import time
from pathlib import Path
from sqlalchemy.orm import Session
import pandas as pd

from db import LocationThingAssociation
from services.thing_helper import add_thing
from transfers.util import make_location, read_csv
from transfers.util import make_location, read_csv, logger


def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -> None:
Expand All @@ -32,11 +30,11 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -
start_time = time.time()
for i, row in enumerate(ldf.itertuples()):
if limit and i >= limit:
print(f"Reached limit of {limit} rows. Stopping migration.")
logger.warning(f"Reached limit of {limit} rows. Stopping migration.")
break

if i and not i % 100:
print(
logger.info(
f"Processing row {i} of {n}. {row.PointID}, avg rows per second: {i / (time.time() - start_time):.2f}"
)
session.commit()
Expand Down
45 changes: 34 additions & 11 deletions transfers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
transfer_ephemeral_stream,
transfer_met,
)
from transfers.util import logger


def erase_and_initalize(session: Session) -> None:
Expand All @@ -43,6 +44,9 @@ def erase_and_initalize(session: Session) -> None:


def main_transfer():
logger.info("Starting transfer")
logger.info("")

init = True

transfer_well_flag = False
Expand All @@ -60,51 +64,70 @@ def main_transfer():

limit = 100
with session_ctx() as sess:

if init:
erase_and_initalize(sess)

if init or transfer_well_flag:
print("\n", "*" * 10, "TRANSFERRING WELLS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING WELLS" + "*" * 10

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message("TRANSFERRING WELLS")

def message(msg, pad='*', pad_len=10):
     pad *= pad_len
     logger.info(f"{pad} {msg} {pad}")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know of the transfer branch... I've just been working off of pre-production assuming that was the base for all of the project. Is that message function the only update? If so, I can also merge transfer into my branch, make updates/resolve conflicts, and push back to this PR

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other updates are to allow transfers to run via Cloud Run

logger.info(msg)
transfer_wells(sess, limit)
transfer_wellscreens(sess)
logger.info("")

if init or transfer_spring_flag:
print("\n", "*" * 10, "TRANSFERRING SPRINGS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING SPRINGS" + "*" * 10
logger.info(msg)
transfer_springs(sess, limit)
logger.info("")

if init or transfer_perennial_stream_flag:
print("\n", "*" * 10, "TRANSFERRING PERENNIAL STREAMS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING PERENNIAL STREAMS" + "*" * 10
logger.info(msg)
transfer_perennial_stream(sess, limit)
logger.info("")

if init or transfer_ephemeral_stream_flag:
print("\n", "*" * 10, "TRANSFERRING EPHEMERAL STREAMS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING EPHEMERAL STREAMS" + "*" * 10
logger.info(msg)
transfer_ephemeral_stream(sess, limit)
logger.info("")

if init or transfer_met_flag:
print("\n", "*" * 10, "TRANSFERRING METEOROLOGICAL", "*" * 10)
msg = "*" * 10 + "TRANSFERRING METEOROLOGICAL" + "*" * 10
logger.info(msg)
transfer_met(sess, limit)
logger.info("")

if init or transfer_contacts_flag:
print("\n", "*" * 10, "TRANSFERRING CONTACTS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING CONTACTS" + "*" * 10
logger.info(msg)
transfer_contacts(sess)
logger.info("")

if init or transfer_waterlevels_flag:
print("\n", "*" * 10, "TRANSFERRING WATER LEVELS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING WATER LEVELS" + "*" * 10
logger.info(msg)
transfer_water_levels(sess)
logger.info("")

if init or transfer_link_ids_flag:
print("\n", "*" * 10, "TRANSFERRING LINK IDS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING LINK IDS" + "*" * 10
logger.info(msg)
transfer_link_ids(sess)
transfer_link_ids_welldata(sess)
logger.info("")

if init or transfer_assets_flag:
print("\n", "*" * 10, "TRANSFERRING ASSETS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING ASSETS" + "*" * 10
logger.info(msg)
transfer_assets_testing(sess)
logger.info("")

if init or transfer_groups_flag:
print("\n", "*" * 10, "TRANSFERRING GROUPS", "*" * 10)
msg = "*" * 10 + "TRANSFERRING GROUPS" + "*" * 10
logger.info(msg)
transfer_groups(sess)
logger.info("")

# if init or cleanup_wells_flag:
# cleanup_wells(sess)
Expand Down
24 changes: 17 additions & 7 deletions transfers/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
from datetime import datetime
import re
from pathlib import Path

import logging
import httpx
import pyproj
from shapely import Point
Expand All @@ -26,6 +27,19 @@

from db import Thing, Location

Comment thread
jacob-a-brown marked this conversation as resolved.
log_filename = f"transfers/transfer_{datetime.now():%Y-%m-%dT%Hh%Mm%Ss}.log"


logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)-8s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_filename, mode="w", encoding="utf-8"),
],
)
logger = logging.getLogger(__name__)

TRANSFORMERS = {}


Expand Down Expand Up @@ -83,10 +97,6 @@ def filter_to_valid_point_ids(session: Session, df: pd.DataFrame) -> pd.DataFram
return df[df["PointID"].isin(valid_point_ids)]


def log(row, msg):
print(f"{row.PointID} {msg}")


def convert_to_wgs84_vertical_datum(row, z):
if row.VerticalDatum == "NAVD88":
z = z + 2.0 # TODO: check this transformation
Expand Down Expand Up @@ -145,14 +155,14 @@ def get_quad_name_from_point(lon: float, lat: float) -> str:
}

resp = httpx.get(url, params=params, timeout=15)
print(resp)
logger.info(resp)
data = resp.json()

if data["features"]:
attrs = data["features"][0]["attributes"]
return attrs["CELL_NAME"]
else:
print("No quad found")
logger.warning(f"No quad name found for POINT ({lon} {lat})")


def get_epqs_elevation(lon: float, lat: float) -> float:
Expand Down
Loading
Loading