Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
)
from .connectors.usgs.source import NWISSiteSource, NWISWaterLevelSource
from .connectors.wqp.source import WQPSiteSource, WQPAnalyteSource, WQPWaterLevelSource

from .logger import Loggable

SOURCE_DICT = {
"bernco": BernCoSiteSource,
Expand Down
31 changes: 30 additions & 1 deletion backend/persisters/geoserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from itertools import groupby

import psycopg2
from shapely.geometry.multipoint import MultiPoint
from shapely.geometry.point import Point
from sqlalchemy.dialects.postgresql import JSONB, insert
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

Expand Down Expand Up @@ -91,6 +93,7 @@ class Sources(Base):
__tablename__ = "tbl_sources"
id = Column(Integer)
name = Column(String, primary_key=True, index=True)
convex_hull = Column(Geometry(geometry_type="POLYGON", srid=4326))


class GeoServerPersister(BasePersister):
Expand Down Expand Up @@ -132,6 +135,32 @@ def _write_sources(self, records: list):
conn.execute(sql)
conn.commit()

def _write_sources_with_convex_hull(self, records: list):
# sources = {r.source for r in records}
with self._connection as conn:
def key(r):
return str(r.source)

records = sorted(records, key=key)
for source_name, group in groupby(records, key=key):
group = list(group)
# calculate convex hull for the source from the records

# Create a MultiPoint object
points = MultiPoint([Point(record.longitude, record.latitude) for record in group])

# Calculate the convex hull
sinsert = insert(Sources)
print("Writing source", source_name, points.convex_hull)
sql = sinsert.values([{"name": source_name,
"convex_hull": points.convex_hull.wkt}]).on_conflict_do_update(
index_elements=[Sources.name],
set_={"convex_hull": sinsert.excluded.convex_hull})
# sql = insert(Sources).values([{"name": source,} for source in sources]).on_conflict_do_nothing(
# index_elements=[Sources.name],)
conn.execute(sql)
conn.commit()

def _write_parameters(self):
with self._connection as conn:
sql = insert(Parameters).values([{"name": self.config.parameter,
Expand Down Expand Up @@ -196,7 +225,7 @@ def _write_to_sites(self, records: list):
Write records to a PostgreSQL database in optimized chunks.
"""

self._write_sources(records)
self._write_sources_with_convex_hull(records)

keys = ["usgs_site_id", "alternate_site_id", "formation", "aquifer", "well_depth"]
chunk_size = 1000 # Larger chunk size for fewer commits
Expand Down
11 changes: 7 additions & 4 deletions frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,15 @@ def weave(
end_date,
bbox,
county,
wkt,
no_bernco,
no_bor,
no_cabq,
no_ebid,
no_nmbgmr_amp,
no_nmed_dwb,
no_nmose_isc_seven_rivers,
no_nmose_pod,
no_nmose_roswell,
no_nwis,
no_pvacd,
Expand All @@ -261,7 +263,7 @@ def weave(
Get parameter timeseries or summary data
"""
# instantiate config and set up parameter
config = setup_config(parameter, config_path, bbox, county, site_limit, dry)
config = setup_config(parameter, config_path, bbox, county, wkt, site_limit, dry)

config.parameter = parameter

Expand Down Expand Up @@ -325,7 +327,7 @@ def weave(
@add_options(ALL_SOURCE_OPTIONS)
@add_options(DEBUG_OPTIONS)
def sites(config_path,
bbox, county,
bbox, county, wkt,
output_dir,
no_bernco,
no_bor,
Expand All @@ -334,6 +336,7 @@ def sites(config_path,
no_nmbgmr_amp,
no_nmed_dwb,
no_nmose_isc_seven_rivers,
no_nmose_pod,
no_nmose_roswell,
no_nwis,
no_pvacd,
Expand All @@ -345,7 +348,7 @@ def sites(config_path,
Get sites
"""

config = setup_config("sites", config_path, bbox, county, site_limit, dry)
config = setup_config("sites", config_path, bbox, county, wkt, site_limit, dry)
config_agencies = ["bernco", "bor", "cabq", "ebid", "nmbgmr_amp", "nmed_dwb",
"nmose_isc_seven_rivers", "nmose_roswell", "nwis", "pvacd",
"wqp", "nmose_pod"]
Expand Down Expand Up @@ -403,7 +406,7 @@ def sources(sources, bbox, wkt, county):
click.echo(s)


def setup_config(tag, config_path, bbox, county, site_limit, dry):
def setup_config(tag, config_path, bbox, county, wkt, site_limit, dry):
config = Config(path=config_path)

if county:
Expand Down