diff --git a/.github/workflows/dev_deploy.yml b/.github/workflows/dev_deploy.yml index fb71d3464..da47376f1 100644 --- a/.github/workflows/dev_deploy.yml +++ b/.github/workflows/dev_deploy.yml @@ -40,6 +40,11 @@ jobs: echo "runtime: python313" >> app.yaml echo "entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" >> app.yaml echo "instance_class: F4" >> app.yaml + echo "inbound_services:" >> app.yaml + echo " - warmup" >> app.yaml + echo "automatic_scaling:" >> app.yaml + echo " min_instances: 0" >> app.yaml + echo " max_instances: 10" >> app.yaml echo "" >> app.yaml echo "env_variables:" >> app.yaml echo " MODE: \"production\"" >> app.yaml diff --git a/.github/workflows/staging_deploy.yml b/.github/workflows/staging_deploy.yml index 67853446c..f58a0160c 100644 --- a/.github/workflows/staging_deploy.yml +++ b/.github/workflows/staging_deploy.yml @@ -40,6 +40,11 @@ jobs: echo "runtime: python313" >> app.yaml echo "entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" >> app.yaml echo "instance_class: F4" >> app.yaml + echo "inbound_services:" >> app.yaml + echo " - warmup" >> app.yaml + echo "automatic_scaling:" >> app.yaml + echo " min_instances: 0" >> app.yaml + echo " max_instances: 10" >> app.yaml echo "" >> app.yaml echo "env_variables:" >> app.yaml echo " MODE: \"production\"" >> app.yaml diff --git a/schedule b/schedule new file mode 100644 index 000000000..cadb867ef --- /dev/null +++ b/schedule @@ -0,0 +1,9 @@ + + +this is used to add a schedule. +This schedule is used to keeping the api runingl + +gcloud scheduler jobs create http keep-alive-job \ + --schedule="*/10 8-18 * * 1-5" \ + --uri="https://ocotillo-api-dot-waterdatainitiative-271000.appspot.com/_ah/warmup" \ + --http-method=GET \ No newline at end of file diff --git a/transfers/owner_transfer.py b/transfers/owner_transfer.py index 75e2d32ea..e3f00964b 100644 --- a/transfers/owner_transfer.py +++ b/transfers/owner_transfer.py @@ -19,12 +19,24 @@ from db import Thing, Contact, ThingContactAssociation, Email, Phone, Address +def extract_owner_role(comment): + # if comment is None: + # return "Owner" + # if "Owner" in comment: + # return "Owner" + # if "Manager" in comment: + # return "Manager" + # if "Director" in comment: + # return "Director" + + return "Primary" + + def transfer_owners(session): odf = read_csv("ownersdata.csv") odf = odf.replace(pd.NA, None) odf = odf.replace({np.nan: None}) - odf = filter_to_valid_point_ids(session, odf) for i, row in odf.iterrows(): thing = session.query(Thing).where(Thing.name == row.PointID).first() @@ -32,8 +44,12 @@ def transfer_owners(session): print(f"Thing with PointID {row.PointID} not foaund. Skipping owner.") continue + # TODO: extract role from OwnerComment + # role = extract_owner_role(row.OwnerComment) + role = "Primary" + # TODO: put in guards for null values - contact1 = Contact(name=f"{row.FirstName} {row.LastName}", role="Primary") + contact1 = Contact(name=f"{row.FirstName} {row.LastName}", role=role) assoc = ThingContactAssociation() assoc.thing = thing assoc.contact = contact1 diff --git a/transfers/thing_transfer.py b/transfers/thing_transfer.py index 914299e74..6c85312be 100644 --- a/transfers/thing_transfer.py +++ b/transfers/thing_transfer.py @@ -43,11 +43,9 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) - location = make_location(row) session.add(location) - - spring = add_thing( - session, - make_payload(row), - ) + payload = make_payload(row) + thing_type = payload.pop("thing_type") + spring = add_thing(session, payload, thing_type=thing_type) assoc = LocationThingAssociation() assoc.location = location diff --git a/transfers/transfer.py b/transfers/transfer.py index 9c0741f41..2087d62d7 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -13,211 +13,94 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import time +from sqlalchemy.orm import Session -import numpy as np -import pandas as pd -import pyproj -from shapely import Point -from shapely.ops import transform - -from db import * -from db.location import Location +from core.initializers import init_lexicon +from db import Base from db.engine import session_ctx -from db.thing.well import WellThing -from services.lexicon import add_lexicon_term +from transfers.asset_transfer import transfer_assets_testing +from transfers.group_transfer import transfer_groups +from transfers.link_ids_transfer import transfer_link_ids, transfer_link_ids_welldata +from transfers.owner_transfer import transfer_owners +from transfers.sensor_transfer import init_sensor +from transfers.waterlevels_transfer import transfer_water_levels + +from transfers.well_transfer import transfer_wells, transfer_wellscreens, cleanup_wells +from transfers.thing_transfer import ( + transfer_springs, + transfer_perennial_stream, + transfer_ephemeral_stream, + transfer_met, +) + + +def erase_and_initalize(session: Session) -> None: + Base.metadata.drop_all(session.bind) + Base.metadata.create_all(session.bind) + + init_lexicon() + init_sensor(session) + + +def main_transfer(): + init = True + + transfer_well_flag = False + transfer_spring_flag = False + transfer_perennial_stream_flag = False + transfer_ephemeral_stream_flag = False + transfer_met_flag = False + transfer_owners_flag = False + transfer_waterlevels_flag = False + transfer_link_ids_flag = False + transfer_assets_flag = False + transfer_groups_flag = False + + cleanup_wells_flag = False + + limit = 1000 + with session_ctx() as sess: -TRANSFORMERS = {} + if init: + erase_and_initalize(sess) + if init or transfer_well_flag: + transfer_wells(sess, limit) + transfer_wellscreens(sess) -def transform_srid(geometry, source_srid, target_srid): - """ - geometry must be a shapely geometry object, like Point, Polygon, or MultiPolygon - """ - transformer_key = (source_srid, target_srid) - if transformer_key not in TRANSFORMERS: - source_crs = pyproj.CRS(f"EPSG:{source_srid}") - target_crs = pyproj.CRS(f"EPSG:{target_srid}") - transformer = pyproj.Transformer.from_crs( - source_crs, target_crs, always_xy=True - ) - TRANSFORMERS[transformer_key] = transformer - else: - transformer = TRANSFORMERS[transformer_key] - return transform(transformer.transform, geometry) - - -def extract_locations(): - """ - Extracts location data from the database. - This function should connect to the database and retrieve location data. - """ - df = pd.read_csv("data/location.csv") - df = df[df["SiteType"] == "GW"] - df = df[df["Easting"].notna() & df["Northing"].notna()] - return df - - -def extract_wells(): - """ - Extracts well data from the database. - This function should connect to the database and retrieve well data. - """ - df = pd.read_csv("data/welldata.csv") - return df - - -def transform_locations(df): - return df - - -def transform_wells(df): - # cover nans to nulls - df = df.replace(pd.NA, None) - df = df.replace({np.nan: None}) - - return df - - -def load_locations(sess, df): - def f(row): - # Convert the row to a dictionary - row_dict = row._asdict() - - e, n = row_dict["Easting"], row_dict["Northing"] - - point = Point(e, n) - transformed_point = transform_srid( - point, source_srid=26913, target_srid=4326 # WGS84 SRID - ) - - sl = Location( - # name=row_dict["PointID"], - point=transformed_point.wkt, - # visible=row_dict["PublicRelease"], - ) - - sess.add(sl) - # try: - # sess.commit() # Commit the changes to the database - # except ProgrammingError: - # print(f"skipping row due to ProgrammingError. {row_dict['PointID']}") - # sess.rollback() - # Remove the index from the dictionary - - loader(df, sess, f) - - -def loader(df, sess, function): - n = len(df) - st = time.time() - prev = st - g = 175 - for i, row in enumerate(df.itertuples()): - if not i % g: - print( - f"Processing row {i} of {n}, {g/(time.time()-prev)} rate: {i / (time.time() - st):.2f} rows/sec" - ) - prev = time.time() - function(row) - - if not i % g: - sess.commit() - - -ADDED = [] - - -def load_wells(sess, df): - def f(row): - row_dict = row._asdict() - - # location = ( - # sess.query(Location).filter_by(name=row_dict["PointID"]).one_or_none() - # ) - - # location = sess.query(Location).filter_by(point=row_dict["PointID"]).one_or_none() - - if location: - well = WellThing() - # well.location = location - well.well_depth = row_dict["WellDepth"] - well.hole_depth = row_dict["HoleDepth"] - well.ose_pod_id = row_dict["OSEWellID"] - well.casing_depth = row_dict["CasingDepth"] - well.casing_diameter = row_dict["CasingDiameter"] - well.casing_description = row_dict["CasingDescription"] - - wt = row_dict["Meaning"] - if wt not in ADDED: - add_lexicon_term( - sess, wt, "Current use of the well, aka well type", "current_use" - ) - ADDED.append(wt) - - well.well_type = wt - - sess.add(well) - - loader(df, sess, f) - # print(df.head()) - # n = len(df) - # - # for i, row in enumerate(df.itertuples()): - # if not i % 100: - # print(f"Processing row {i} of {n}") - # - # row_dict = row._asdict() - # - # location = ( - # sess.query(Location).filter_by(name=row_dict["PointID"]).one_or_none() - # ) - # - # if location: - # well = WellThing() - # well.location = location - # well.well_depth = row_dict["WellDepth"] - # well.hole_depth = row_dict["HoleDepth"] - # well.ose_pod_id = row_dict["OSEWellID"] - # well.casing_depth = row_dict["CasingDepth"] - # well.casing_diameter = row_dict["CasingDiameter"] - # well.casing_description = row_dict["CasingDescription"] - # - # wt = row_dict["Meaning"] - # - # add_lexicon_term( - # sess, wt, "Current use of the well, aka well type", "current_use" - # ) - # - # well.well_type = wt - # - # # print(row_dict) - # sess.add(well) - # sess.commit() - # # break - - -def location_etl(sess): - """ - Extract, Transform, Load (ETL) process for location data. - """ - df = extract_locations() - df = transform_locations(df) - load_locations(sess, df) - - -def well_etl(sess): - """ - Extract, Transform, Load (ETL) process for well data. - """ - df = extract_wells() - df = transform_wells(df) - load_wells(sess, df) + if init or transfer_spring_flag: + transfer_springs(sess, limit) + + if init or transfer_perennial_stream_flag: + transfer_perennial_stream(sess, limit) + + if init or transfer_ephemeral_stream_flag: + transfer_ephemeral_stream(sess, limit) + + if init or transfer_met_flag: + transfer_met(sess, limit) + + if init or transfer_owners_flag: + transfer_owners(sess) + + if init or transfer_waterlevels_flag: + transfer_water_levels(sess) + + if init or transfer_link_ids_flag: + transfer_link_ids(sess) + transfer_link_ids_welldata(sess) + + if init or transfer_assets_flag: + transfer_assets_testing(sess) + + if init or transfer_groups_flag: + transfer_groups(sess) + + # if init or cleanup_wells_flag: + # cleanup_wells(sess) if __name__ == "__main__": - with session_ctx() as session: - location_etl(session) - # well_etl(session) - session.close() + main_transfer() + # ============= EOF ============================================= diff --git a/transfers/transfer2.py b/transfers/transfer2.py deleted file mode 100644 index 002bc65be..000000000 --- a/transfers/transfer2.py +++ /dev/null @@ -1,106 +0,0 @@ -# =============================================================================== -# Copyright 2025 ross -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# =============================================================================== -from sqlalchemy.orm import Session - -from core.initializers import init_lexicon -from db import Base -from db.engine import session_ctx -from transfers.asset_transfer import transfer_assets_testing -from transfers.group_transfer import transfer_groups -from transfers.link_ids_transfer import transfer_link_ids, transfer_link_ids_welldata -from transfers.owner_transfer import transfer_owners -from transfers.sensor_transfer import init_sensor -from transfers.waterlevels_transfer import transfer_water_levels - -from transfers.well_transfer import transfer_wells, transfer_wellscreens, cleanup_wells -from transfers.thing_transfer import ( - transfer_springs, - transfer_perennial_stream, - transfer_ephemeral_stream, - transfer_met, -) - - -def erase_and_initalize(session: Session) -> None: - Base.metadata.drop_all(session.bind) - Base.metadata.create_all(session.bind) - - init_lexicon() - init_sensor(session) - - -def main_transfer(): - init = True - - transfer_well_flag = False - transfer_spring_flag = False - transfer_perennial_stream_flag = False - transfer_ephemeral_stream_flag = False - transfer_met_flag = False - transfer_owners_flag = False - transfer_waterlevels_flag = False - transfer_link_ids_flag = False - transfer_assets_flag = False - transfer_groups_flag = False - - cleanup_wells_flag = True - - limit = 100 - with session_ctx() as sess: - - if init: - erase_and_initalize(sess) - - if init or transfer_well_flag: - transfer_wells(sess, limit) - transfer_wellscreens(sess) - - if init or transfer_spring_flag: - transfer_springs(sess, limit) - - if init or transfer_perennial_stream_flag: - transfer_perennial_stream(sess, limit) - - if init or transfer_ephemeral_stream_flag: - transfer_ephemeral_stream(sess, limit) - - if init or transfer_met_flag: - transfer_met(sess, limit) - - if init or transfer_owners_flag: - transfer_owners(sess) - - if init or transfer_waterlevels_flag: - transfer_water_levels(sess) - - if init or transfer_link_ids_flag: - transfer_link_ids(sess) - transfer_link_ids_welldata(sess) - - if init or transfer_assets_flag: - transfer_assets_testing(sess) - - if init or transfer_groups_flag: - transfer_groups(sess) - - if init or cleanup_wells_flag: - cleanup_wells(sess) - - -if __name__ == "__main__": - main_transfer() - -# ============= EOF ============================================= diff --git a/transfers/waterlevels_transfer.py b/transfers/waterlevels_transfer.py index 005fa259b..6c3c54db6 100644 --- a/transfers/waterlevels_transfer.py +++ b/transfers/waterlevels_transfer.py @@ -59,7 +59,8 @@ def transfer_water_levels(session): obs.sample = sample obs.observation_datetime = dt - obs.depth_to_water = row.DepthToWater + obs.value = row.DepthToWater + obs.measuring_point_height = row.MPHeight obs.observed_property = "groundwater level:groundwater level" obs.unit = "ft" diff --git a/transfers/well_transfer.py b/transfers/well_transfer.py index 98aa5f8db..209b39732 100644 --- a/transfers/well_transfer.py +++ b/transfers/well_transfer.py @@ -80,10 +80,10 @@ def transfer_wells(session, limit=None): # "casing_diameter": row.CasingDiameter, # "casing_depth": row.CasingDepth, # "casing_description": row.CasingDescription, - "thing_type": "water well", "release_status": "public" if row.PublicRelease else "private", # "data_reliability": row.DataReliability, }, + thing_type="water well", ) wt = row.Meaning if wt not in ADDED: