Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/dev_deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ jobs:
echo "runtime: python313" >> app.yaml
echo "entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" >> app.yaml
echo "instance_class: F4" >> app.yaml
echo "inbound_services:" >> app.yaml
echo " - warmup" >> app.yaml
echo "automatic_scaling:" >> app.yaml
echo " min_instances: 0" >> app.yaml
echo " max_instances: 10" >> app.yaml
echo "" >> app.yaml
echo "env_variables:" >> app.yaml
echo " MODE: \"production\"" >> app.yaml
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/staging_deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ jobs:
echo "runtime: python313" >> app.yaml
echo "entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app" >> app.yaml
echo "instance_class: F4" >> app.yaml
echo "inbound_services:" >> app.yaml
echo " - warmup" >> app.yaml
echo "automatic_scaling:" >> app.yaml
echo " min_instances: 0" >> app.yaml
echo " max_instances: 10" >> app.yaml
echo "" >> app.yaml
echo "env_variables:" >> app.yaml
echo " MODE: \"production\"" >> app.yaml
Expand Down
9 changes: 9 additions & 0 deletions schedule
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


this is used to add a schedule.
This schedule is used to keeping the api runingl

gcloud scheduler jobs create http keep-alive-job \
--schedule="*/10 8-18 * * 1-5" \
--uri="https://ocotillo-api-dot-waterdatainitiative-271000.appspot.com/_ah/warmup" \
--http-method=GET
20 changes: 18 additions & 2 deletions transfers/owner_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,37 @@
from db import Thing, Contact, ThingContactAssociation, Email, Phone, Address


def extract_owner_role(comment):
# if comment is None:
# return "Owner"
# if "Owner" in comment:
# return "Owner"
# if "Manager" in comment:
# return "Manager"
# if "Director" in comment:
# return "Director"

return "Primary"


def transfer_owners(session):

odf = read_csv("ownersdata.csv")
odf = odf.replace(pd.NA, None)
odf = odf.replace({np.nan: None})

odf = filter_to_valid_point_ids(session, odf)
for i, row in odf.iterrows():
thing = session.query(Thing).where(Thing.name == row.PointID).first()
if thing is None:
print(f"Thing with PointID {row.PointID} not foaund. Skipping owner.")
continue

# TODO: extract role from OwnerComment
# role = extract_owner_role(row.OwnerComment)
role = "Primary"

# TODO: put in guards for null values
contact1 = Contact(name=f"{row.FirstName} {row.LastName}", role="Primary")
contact1 = Contact(name=f"{row.FirstName} {row.LastName}", role=role)
assoc = ThingContactAssociation()
assoc.thing = thing
assoc.contact = contact1
Expand Down
8 changes: 3 additions & 5 deletions transfers/thing_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@ def transfer_thing(session: Session, site_type: str, make_payload, limit=None) -

location = make_location(row)
session.add(location)

spring = add_thing(
session,
make_payload(row),
)
payload = make_payload(row)
thing_type = payload.pop("thing_type")
spring = add_thing(session, payload, thing_type=thing_type)
assoc = LocationThingAssociation()

assoc.location = location
Expand Down
281 changes: 82 additions & 199 deletions transfers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,211 +13,94 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================
import time
from sqlalchemy.orm import Session

import numpy as np
import pandas as pd
import pyproj
from shapely import Point
from shapely.ops import transform

from db import *
from db.location import Location
from core.initializers import init_lexicon
from db import Base
from db.engine import session_ctx
from db.thing.well import WellThing
from services.lexicon import add_lexicon_term
from transfers.asset_transfer import transfer_assets_testing
from transfers.group_transfer import transfer_groups
from transfers.link_ids_transfer import transfer_link_ids, transfer_link_ids_welldata
from transfers.owner_transfer import transfer_owners
from transfers.sensor_transfer import init_sensor
from transfers.waterlevels_transfer import transfer_water_levels

from transfers.well_transfer import transfer_wells, transfer_wellscreens, cleanup_wells
from transfers.thing_transfer import (
transfer_springs,
transfer_perennial_stream,
transfer_ephemeral_stream,
transfer_met,
)


def erase_and_initalize(session: Session) -> None:
Base.metadata.drop_all(session.bind)
Base.metadata.create_all(session.bind)

init_lexicon()
init_sensor(session)


def main_transfer():
init = True

transfer_well_flag = False
transfer_spring_flag = False
transfer_perennial_stream_flag = False
transfer_ephemeral_stream_flag = False
transfer_met_flag = False
transfer_owners_flag = False
transfer_waterlevels_flag = False
transfer_link_ids_flag = False
transfer_assets_flag = False
transfer_groups_flag = False

cleanup_wells_flag = False

limit = 1000
with session_ctx() as sess:

TRANSFORMERS = {}
if init:
erase_and_initalize(sess)

if init or transfer_well_flag:
transfer_wells(sess, limit)
transfer_wellscreens(sess)

def transform_srid(geometry, source_srid, target_srid):
"""
geometry must be a shapely geometry object, like Point, Polygon, or MultiPolygon
"""
transformer_key = (source_srid, target_srid)
if transformer_key not in TRANSFORMERS:
source_crs = pyproj.CRS(f"EPSG:{source_srid}")
target_crs = pyproj.CRS(f"EPSG:{target_srid}")
transformer = pyproj.Transformer.from_crs(
source_crs, target_crs, always_xy=True
)
TRANSFORMERS[transformer_key] = transformer
else:
transformer = TRANSFORMERS[transformer_key]
return transform(transformer.transform, geometry)


def extract_locations():
"""
Extracts location data from the database.
This function should connect to the database and retrieve location data.
"""
df = pd.read_csv("data/location.csv")
df = df[df["SiteType"] == "GW"]
df = df[df["Easting"].notna() & df["Northing"].notna()]
return df


def extract_wells():
"""
Extracts well data from the database.
This function should connect to the database and retrieve well data.
"""
df = pd.read_csv("data/welldata.csv")
return df


def transform_locations(df):
return df


def transform_wells(df):
# cover nans to nulls
df = df.replace(pd.NA, None)
df = df.replace({np.nan: None})

return df


def load_locations(sess, df):
def f(row):
# Convert the row to a dictionary
row_dict = row._asdict()

e, n = row_dict["Easting"], row_dict["Northing"]

point = Point(e, n)
transformed_point = transform_srid(
point, source_srid=26913, target_srid=4326 # WGS84 SRID
)

sl = Location(
# name=row_dict["PointID"],
point=transformed_point.wkt,
# visible=row_dict["PublicRelease"],
)

sess.add(sl)
# try:
# sess.commit() # Commit the changes to the database
# except ProgrammingError:
# print(f"skipping row due to ProgrammingError. {row_dict['PointID']}")
# sess.rollback()
# Remove the index from the dictionary

loader(df, sess, f)


def loader(df, sess, function):
n = len(df)
st = time.time()
prev = st
g = 175
for i, row in enumerate(df.itertuples()):
if not i % g:
print(
f"Processing row {i} of {n}, {g/(time.time()-prev)} rate: {i / (time.time() - st):.2f} rows/sec"
)
prev = time.time()
function(row)

if not i % g:
sess.commit()


ADDED = []


def load_wells(sess, df):
def f(row):
row_dict = row._asdict()

# location = (
# sess.query(Location).filter_by(name=row_dict["PointID"]).one_or_none()
# )

# location = sess.query(Location).filter_by(point=row_dict["PointID"]).one_or_none()

if location:
well = WellThing()
# well.location = location
well.well_depth = row_dict["WellDepth"]
well.hole_depth = row_dict["HoleDepth"]
well.ose_pod_id = row_dict["OSEWellID"]
well.casing_depth = row_dict["CasingDepth"]
well.casing_diameter = row_dict["CasingDiameter"]
well.casing_description = row_dict["CasingDescription"]

wt = row_dict["Meaning"]
if wt not in ADDED:
add_lexicon_term(
sess, wt, "Current use of the well, aka well type", "current_use"
)
ADDED.append(wt)

well.well_type = wt

sess.add(well)

loader(df, sess, f)
# print(df.head())
# n = len(df)
#
# for i, row in enumerate(df.itertuples()):
# if not i % 100:
# print(f"Processing row {i} of {n}")
#
# row_dict = row._asdict()
#
# location = (
# sess.query(Location).filter_by(name=row_dict["PointID"]).one_or_none()
# )
#
# if location:
# well = WellThing()
# well.location = location
# well.well_depth = row_dict["WellDepth"]
# well.hole_depth = row_dict["HoleDepth"]
# well.ose_pod_id = row_dict["OSEWellID"]
# well.casing_depth = row_dict["CasingDepth"]
# well.casing_diameter = row_dict["CasingDiameter"]
# well.casing_description = row_dict["CasingDescription"]
#
# wt = row_dict["Meaning"]
#
# add_lexicon_term(
# sess, wt, "Current use of the well, aka well type", "current_use"
# )
#
# well.well_type = wt
#
# # print(row_dict)
# sess.add(well)
# sess.commit()
# # break


def location_etl(sess):
"""
Extract, Transform, Load (ETL) process for location data.
"""
df = extract_locations()
df = transform_locations(df)
load_locations(sess, df)


def well_etl(sess):
"""
Extract, Transform, Load (ETL) process for well data.
"""
df = extract_wells()
df = transform_wells(df)
load_wells(sess, df)
if init or transfer_spring_flag:
transfer_springs(sess, limit)

if init or transfer_perennial_stream_flag:
transfer_perennial_stream(sess, limit)

if init or transfer_ephemeral_stream_flag:
transfer_ephemeral_stream(sess, limit)

if init or transfer_met_flag:
transfer_met(sess, limit)

if init or transfer_owners_flag:
transfer_owners(sess)

if init or transfer_waterlevels_flag:
transfer_water_levels(sess)

if init or transfer_link_ids_flag:
transfer_link_ids(sess)
transfer_link_ids_welldata(sess)

if init or transfer_assets_flag:
transfer_assets_testing(sess)

if init or transfer_groups_flag:
transfer_groups(sess)

# if init or cleanup_wells_flag:
# cleanup_wells(sess)


if __name__ == "__main__":
with session_ctx() as session:
location_etl(session)
# well_etl(session)
session.close()
main_transfer()

# ============= EOF =============================================
Loading
Loading