diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 7668de0..5ea32ac 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -5,7 +5,7 @@ name: CI/CD on: push: - branches: [ "main", "feature/jir"] + branches: [ "main", "feature/jir", "dev/jab"] pull_request: branches: [ "main"] diff --git a/.github/workflows/format_code.yml b/.github/workflows/format_code.yml index bae40ea..7b1c1b4 100644 --- a/.github/workflows/format_code.yml +++ b/.github/workflows/format_code.yml @@ -1,9 +1,9 @@ name: Format code on: pull_request: - branches: [feature/jir, ] + branches: [feature/jir] push: - branches: [feature/jir,] + branches: [feature/jir, dev/jab] jobs: format: runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index f6855af..5c03936 100644 --- a/.gitignore +++ b/.gitignore @@ -169,3 +169,12 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# outputs +output_timeseries +output.combined.csv +output.csv +output.sites.csv +output.timeseries.csv +output.logs.txt +output.warnings.txt \ No newline at end of file diff --git a/README.md b/README.md index 89f8d4c..34923d9 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,14 @@ pip install nmuwd ``` ## Usage + +### Timeseries & Summary +The flag `--separate_timeseries_files` exports timeseries for every location in their own file in the directory output_series (e.g. AB-0002.csv, AB-0003.csv). Locations with only one observation are gathered and exported to the file `output.combined.csv. + +The flag `--single_timeseries_file` exports all timeseries for all locations in one file titled output.timeseries.csv. It also exports a file titled output.sites.csv that contains site information, such as latitude, longitude, and elevation. + +If neither of the above flags are specified, a summary table called output.csv is exported. + ### Water Levels Get water levels for a county. Return a summary csv @@ -92,6 +100,7 @@ Available analytes: - Nitrate - pH - Potassium +- Silica - Sodium - Sulfate - TDS diff --git a/backend/config.py b/backend/config.py index 68515f7..559f6ed 100644 --- a/backend/config.py +++ b/backend/config.py @@ -135,6 +135,9 @@ class Config(object): use_csv: bool = True use_geojson: bool = False + logs: list = [] + warnings: list = [] + def __init__(self, model=None, payload=None): self.bbox = {} if model: @@ -313,21 +316,33 @@ def now_ms(self, days=0): # return current time in milliseconds return int((datetime.now() - td).timestamp() * 1000) - def report(self): + def report(self, log_report: bool = False): def _report_attributes(title, attrs): - click.secho( - f"---- {title} --------------------------------------------------", - fg="yellow", - ) + s = f"---- {title} --------------------------------------------------" + click.secho(s, fg="yellow") + if log_report: + self.logs.append(s) + for k in attrs: v = getattr(self, k) - click.secho(f"{k}: {v}", fg="yellow") - click.secho("", fg="yellow") + s = f"{k}: {v}" + click.secho(s, fg="yellow") + + if log_report: + self.logs.append(s) + + s = "" + click.secho(s, fg="yellow") + + if log_report: + self.logs.append(s) + + s = "---- Begin configuration -------------------------------------\n" + click.secho(s, fg="yellow") + + if log_report: + self.logs.append(s) - click.secho( - "---- Begin configuration -------------------------------------\n", - fg="yellow", - ) sources = [f"use_source_{s}" for s in SOURCE_KEYS] attrs = [ "start_date", @@ -356,9 +371,11 @@ def _report_attributes(title, attrs): ), ) - click.secho( - "---- End configuration -------------------------------------", fg="yellow" - ) + s = "---- End configuration -------------------------------------\n" + click.secho(s, fg="yellow") + + if log_report: + self.logs.append(s) def validate(self): if not self._validate_bbox(): diff --git a/backend/connectors/README.md b/backend/connectors/README.md index 01384e9..4281414 100644 --- a/backend/connectors/README.md +++ b/backend/connectors/README.md @@ -8,6 +8,8 @@ The following are necessary for adding a source: - the `use_source_` flag needs to be added to the `analyte_sources` method in the `Config` class in **/backend/config.py** if analytes are available for that source - the `use_source_` flag needs to be added to the `water_level_sources` method in the `Config` class in **/backend/config.py** if water levels are available for that source +**IMPORTANT: add tests for the source** + For the sake of discription, the example source is called Faux. # /backend/connectors/faux @@ -30,7 +32,7 @@ The following methods need to be defined for Faux. See `BaseSiteSource` for doc The following methods need to be defined for Faux. See `BaseAnalyteSource` for doc strings for each of the methods: - `get_records` -- `_extract_parent_records` +- `_extract_site_records` - `_extract_parameter_units` - `_extract_most_recent` - `_extract_parameter_result` @@ -45,7 +47,7 @@ The following method is optional: The following methods need to be defined for Faux. See `BaseWaterLevelSource` for doc strings for each of the methods: - `get_records` -- `_extract_parent_records` +- `_extract_site_records` - `_extract_parameter_units` - `_extract_most_recent` - `_extract_parameter_result` diff --git a/backend/connectors/bor/source.py b/backend/connectors/bor/source.py index 2bfb772..93f8f23 100644 --- a/backend/connectors/bor/source.py +++ b/backend/connectors/bor/source.py @@ -87,9 +87,9 @@ def _extract_most_recent(self, rs): "units": record["attributes"]["resultAttributes"]["units"], } - def _extract_parent_records(self, records, parent_record): + def _extract_site_records(self, records, site_record): return [ - ri for ri in records if ri["attributes"]["locationId"] == parent_record.id + ri for ri in records if ri["attributes"]["locationId"] == site_record.id ] def _reorder_catalog_items(self, items): @@ -98,12 +98,10 @@ def _reorder_catalog_items(self, items): items = items[self._catalog_item_idx :] + items[: self._catalog_item_idx] return items - def get_records(self, parent_record): + def get_records(self, site_record): code = get_analyte_search_param(self.config.analyte, BOR_ANALYTE_MAPPING) - for i, item in enumerate( - self._reorder_catalog_items(parent_record.catalogItems) - ): + for i, item in enumerate(self._reorder_catalog_items(site_record.catalogItems)): data = self._execute_json_request(f'https://data.usbr.gov{item["id"]}') if not data: diff --git a/backend/connectors/ckan/source.py b/backend/connectors/ckan/source.py index e70475a..0de25b3 100644 --- a/backend/connectors/ckan/source.py +++ b/backend/connectors/ckan/source.py @@ -112,12 +112,12 @@ def _parse_response(self, resp): class OSERoswellWaterLevelSource(OSERoswellSource, BaseWaterLevelSource): transformer_klass = OSERoswellWaterLevelTransformer - def get_records(self, parent_record): - return self._parse_response(parent_record, self.get_response()) + def get_records(self, site_record): + return self._parse_response(site_record, self.get_response()) - def _parse_response(self, parent_record, resp): + def _parse_response(self, site_record, resp): records = resp.json()["result"]["records"] - return [record for record in records if record["Site_ID"] == parent_record.id] + return [record for record in records if record["Site_ID"] == site_record.id] def _extract_parameter_results(self, records): return [float(r["DTWGS"]) for r in records] diff --git a/backend/connectors/ckan/transformer.py b/backend/connectors/ckan/transformer.py index 111171a..e56d609 100644 --- a/backend/connectors/ckan/transformer.py +++ b/backend/connectors/ckan/transformer.py @@ -48,17 +48,17 @@ def _transform(self, record): class OSERoswellWaterLevelTransformer(WaterLevelTransformer): source_tag = "CKAN/OSERoswell" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { - # "id": parent_record.id, + # "id": site_record.id, # "source": "CKAN/OSERoswell", - # "location": parent_record.name, - # "usgs_site_id": parent_record.id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, + # "location": site_record.name, + # "usgs_site_id": site_record.id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, # "elevation_units": "ft", - # "well_depth": parent_record.well_depth, + # "well_depth": site_record.well_depth, # "well_depth_units": "ft", # } # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/isc_seven_rivers/source.py b/backend/connectors/isc_seven_rivers/source.py index fe32656..c338877 100644 --- a/backend/connectors/isc_seven_rivers/source.py +++ b/backend/connectors/isc_seven_rivers/source.py @@ -109,12 +109,12 @@ def _extract_parameter_results(self, records): def _extract_parameter_units(self, records): return [r["units"] for r in records] - def get_records(self, parent_record): + def get_records(self, site_record): config = self.config analyte_id = self._get_analyte_id(config.analyte) if analyte_id: params = { - "monitoringPointId": parent_record.id, + "monitoringPointId": site_record.id, "analyteId": analyte_id, "start": 0, "end": config.now_ms(days=1), @@ -146,9 +146,9 @@ def get_datetime(record): class ISCSevenRiversWaterLevelSource(BaseWaterLevelSource): transformer_klass = ISCSevenRiversWaterLevelTransformer - def get_records(self, parent_record): + def get_records(self, site_record): params = { - "id": parent_record.id, + "id": site_record.id, "start": 0, "end": self.config.now_ms(days=1), } diff --git a/backend/connectors/isc_seven_rivers/transformer.py b/backend/connectors/isc_seven_rivers/transformer.py index e68aba3..dddbe9d 100644 --- a/backend/connectors/isc_seven_rivers/transformer.py +++ b/backend/connectors/isc_seven_rivers/transformer.py @@ -53,14 +53,14 @@ class ISCSevenRiversAnalyteTransformer(AnalyteTransformer): class ISCSevenRiversWaterLevelTransformer(WaterLevelTransformer): source_tag = "ISCSevenRivers" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "ISCSevenRivers", - # "id": parent_record.id, - # "location": parent_record.name, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, + # "id": site_record.id, + # "location": site_record.name, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, # "elevation_units": "ft", # } # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/mappings.py b/backend/connectors/mappings.py index 5231476..2fe4267 100644 --- a/backend/connectors/mappings.py +++ b/backend/connectors/mappings.py @@ -22,6 +22,7 @@ ARSENIC, NITRATE, CALCIUM, + SILICA, SODIUM, POTASSIUM, MAGNESIUM, @@ -31,22 +32,24 @@ ) # DWB =============================================================================== +# the mapping below is the corresponding "@iot.id" for the ObservedProperties DWB_ANALYTE_MAPPING: dict = { ARSENIC: 3, - BICARBONATE: None, - CALCIUM: None, + BICARBONATE: 22, # BICARBONATE AS HCO3 + CALCIUM: 11, CARBONATE: None, CHLORIDE: 15, FLUORIDE: 19, - MAGNESIUM: None, + MAGNESIUM: 23, NITRATE: 35, - POTASSIUM: None, - SODIUM: None, + POTASSIUM: 33, + SILICA: 37, + SODIUM: 38, SULFATE: 41, TDS: 90, # "Uranium-238": 386, URANIUM: 385, # "Combined Uranium" - PH: None, + PH: 81, } # ISC Seven Rivers =============================================================================== """ @@ -95,6 +98,7 @@ MAGNESIUM: "Magnesium", NITRATE: "Nitrate", POTASSIUM: "Potassium", + SILICA: "SiO2", SODIUM: "Sodium", SULFATE: "Sulfate", TDS: "TDS calc", @@ -137,6 +141,7 @@ MAGNESIUM: "Magnesium", NITRATE: "Nitrate (as N)", POTASSIUM: "Potassium", + SILICA: "Silica", SODIUM: "Sodium", SULFATE: "Sulfate", TDS: "Total Dissolved Solids", @@ -155,6 +160,7 @@ MAGNESIUM: ["Magnesium"], NITRATE: ["Nitrate", "Nitrate-N", "Nitrate as N"], POTASSIUM: ["Potassium"], + SILICA: ["Silica"], SODIUM: ["Sodium"], SULFATE: [ "Sulfate", @@ -171,8 +177,8 @@ """ Temp DO -ALK HCO3 -ALK CO3 +ALK HCO3 <-- this is a measure of alkalinity, and not necessarily the same as Bicarbonate +ALK CO3 <-- this is a measure of alkalinity, and not necessarily the same as Carbonate ALK OH ALK P ALK @@ -224,14 +230,15 @@ """ BOR_ANALYTE_MAPPING: dict = { ARSENIC: "As", - BICARBONATE: "ALK HCO3", + BICARBONATE: None, CALCIUM: "Ca", - CARBONATE: "ALK CO3", + CARBONATE: None, CHLORIDE: "Cl", FLUORIDE: "F", MAGNESIUM: "Mg", NITRATE: "NO3", POTASSIUM: "K", + SILICA: "SiO2", SODIUM: "Na", SULFATE: "SO4", TDS: "TDS", diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 08734f9..3f2463e 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -51,8 +51,8 @@ def _make_url(endpoint): if os.getenv("DEBUG") == "1": - return f"http://localhost:8000/{endpoint}" - return f"https://waterdata.nmt.edu/{endpoint}" + return f"http://localhost:8000/latest/{endpoint}" + return f"https://waterdata.nmt.edu/latest/{endpoint}" class NMBGMRSiteSource(BaseSiteSource): @@ -68,7 +68,7 @@ def health(self): def get_records(self): config = self.config - params = {} + params = {"site_type": "Groundwater other than spring (well)", "expand": False} if config.has_bounds(): params["wkt"] = config.bounding_wkt() @@ -83,20 +83,36 @@ def get_records(self): params["parameter"] = "Manual groundwater levels" # tags="features" because the response object is a GeoJSON - return self._execute_json_request( + sites = self._execute_json_request( _make_url("locations"), params, tag="features", timeout=30 ) + return sites + + # loop through the responses and add well information for each location + # this may be slow because of the number of sites that need to be queried + # but it is necessary to get the well information. With further + # development, this could be faster if one can batch the requests + # to /wells + # for site in sites: + # well_info = self._execute_json_request( + # _make_url("/wells"), + # params={"pointid": site["properties"]["point_id"]}, + # tag="", + # ) + # site["properties"]["formation"] = well_info["formation"] + # site["properties"]["well_depth"] = well_info["well_depth_ftbgs"] + # site["properties"]["well_depth_units"] = "ft" class NMBGMRAnalyteSource(BaseAnalyteSource): transformer_klass = NMBGMRAnalyteTransformer - def get_records(self, parent_record): + def get_records(self, site_record): analyte = get_analyte_search_param(self.config.analyte, NMBGMR_ANALYTE_MAPPING) records = self._execute_json_request( _make_url("waterchemistry"), params={ - "pointid": ",".join(make_site_list(parent_record)), + "pointid": ",".join(make_site_list(site_record)), "analyte": analyte, }, tag="", @@ -107,8 +123,8 @@ def get_records(self, parent_record): return records_sorted_by_pointid - def _extract_parent_records(self, records, parent_record): - return records.get(parent_record.id, []) + def _extract_site_records(self, records, site_record): + return records.get(site_record.id, []) def _extract_parameter_units(self, records): return [r["Units"] for r in records] @@ -156,15 +172,15 @@ def _extract_most_recent(self, records): def _extract_parameter_results(self, records): return [r["DepthToWaterBGS"] for r in records] - def _extract_parent_records(self, records, parent_record): - return [ri for ri in records if ri["Well"]["PointID"] == parent_record.id] + def _extract_site_records(self, records, site_record): + return [ri for ri in records if ri["Well"]["PointID"] == site_record.id] - def get_records(self, parent_record): + def get_records(self, site_record): # if self.config.latest_water_level_only: - # params = {"pointids": parent_record.id} + # params = {"pointids": site_record.id} # url = _make_url("waterlevels/latest") # else: - params = {"pointid": ",".join(make_site_list(parent_record))} + params = {"pointid": ",".join(make_site_list(site_record))} # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") diff --git a/backend/connectors/nmbgmr/transformer.py b/backend/connectors/nmbgmr/transformer.py index 693b728..b84f6b2 100644 --- a/backend/connectors/nmbgmr/transformer.py +++ b/backend/connectors/nmbgmr/transformer.py @@ -51,19 +51,19 @@ class NMBGMRAnalyteTransformer(AnalyteTransformer): class NMBGMRWaterLevelTransformer(WaterLevelTransformer): source_tag = "NMBGMR" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "NMBGMR", - # "id": parent_record.id, - # "location": parent_record.name, - # "usgs_site_id": parent_record.usgs_site_id, - # "alternate_site_id": parent_record.alternate_site_id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "well_depth": parent_record.well_depth, - # "well_depth_units": parent_record.well_depth_units, - # "elevation": parent_record.elevation, - # "elevation_units": parent_record.elevation_units, + # "id": site_record.id, + # "location": site_record.name, + # "usgs_site_id": site_record.usgs_site_id, + # "alternate_site_id": site_record.alternate_site_id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "well_depth": site_record.well_depth, + # "well_depth_units": site_record.well_depth_units, + # "elevation": site_record.elevation, + # "elevation_units": site_record.elevation_units, # } # # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/nmenv/source.py b/backend/connectors/nmenv/source.py index da6f684..37e4b7a 100644 --- a/backend/connectors/nmenv/source.py +++ b/backend/connectors/nmenv/source.py @@ -21,7 +21,7 @@ ) from backend.connectors.st_connector import STSiteSource, STAnalyteSource from backend.constants import PARAMETER, PARAMETER_UNITS, DT_MEASURED, PARAMETER_VALUE -from backend.source import get_analyte_search_param +from backend.source import get_analyte_search_param, get_most_recent URL = "https://nmenv.newmexicowaterdata.org/FROST-Server/v1.1/" @@ -64,8 +64,20 @@ class DWBAnalyteSource(STAnalyteSource): url = URL transformer_klass = DWBAnalyteTransformer - def _parse_result(self, result): - return float(result.split(" ")[0]) + def _parse_result( + self, result, result_dt=None, result_id=None, result_location=None + ): + if "< mrl" in result.lower(): + if self.config.output_summary: + self.warn( + f"Non-detect found: {result} for {result_location} on {result_dt} (observation {result_id}). Setting to 0 for summary." + ) + return 0.0 + else: + # return the results for timeseries, regardless of format (None/Null/non-detect) + return result + else: + return float(result.split(" ")[0]) def get_records(self, site, *args, **kw): service = self.get_service() @@ -92,16 +104,44 @@ def get_records(self, site, *args, **kw): return rs def _extract_parameter_record(self, record): + # this is only used for time series record[PARAMETER_VALUE] = self._parse_result(record["observation"].result) record[PARAMETER_UNITS] = record["datastream"].unit_of_measurement.symbol record[DT_MEASURED] = record["observation"].phenomenon_time return record def _extract_parameter_results(self, records): - return [self._parse_result(r["observation"].result) for r in records] + # this is only used in summary output + return [ + self._parse_result( + r["observation"].result, + r["observation"].phenomenon_time, + r["observation"].id, + r["location"].id, + ) + for r in records + ] def _extract_parameter_units(self, records): + # this is only used in summary output return [r["datastream"].unit_of_measurement.symbol for r in records] + def _extract_most_recent(self, records): + # this is only used in summary output + record = get_most_recent( + records, tag=lambda x: x["observation"].phenomenon_time + ) + + return { + "value": self._parse_result( + record["observation"].result, + record["observation"].phenomenon_time, + record["observation"].id, + record["location"].id, + ), + "datetime": record["observation"].phenomenon_time, + "units": record["datastream"].unit_of_measurement.symbol, + } + # ============= EOF ============================================= diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index f600b8f..df46261 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -88,12 +88,16 @@ def _extract_parameter_record(self, record): def _extract_parameter_results(self, records): return [r["observation"].result for r in records] - def get_records(self, parent_record, *args, **kw): + def _clean_records(self, records: list) -> list: + rs = [r for r in records if r["observation"].result is not None] + return rs + + def get_records(self, site_record, *args, **kw): service = self.get_service() config = self.config records = [] - for t in self._get_things(service, parent_record): + for t in self._get_things(service, site_record): if t.name == "Water Well": for di in t.datastreams: @@ -112,7 +116,7 @@ def get_records(self, parent_record, *args, **kw): records.append( { "thing": t, - "location": parent_record, + "location": site_record, "datastream": di, "observation": obs, } diff --git a/backend/connectors/st2/transformer.py b/backend/connectors/st2/transformer.py index 352e44b..e41a170 100644 --- a/backend/connectors/st2/transformer.py +++ b/backend/connectors/st2/transformer.py @@ -58,15 +58,15 @@ class EBIDSiteTransformer(STSiteTransformer): # class ST2WaterLevelTransformer(WaterLevelTransformer): # source_tag = "ST2" -# def _transform_hook(self, record, config, parent_record, *args, **kw): +# def _transform_hook(self, record, config, site_record, *args, **kw): # rec = { # "source": self.source_id, -# "id": parent_record.id, -# "location": parent_record.name, -# "latitude": parent_record.latitude, -# "longitude": parent_record.longitude, -# "surface_elevation_ft": parent_record.elevation, -# "well_depth_ft_below_ground_surface": parent_record.well_depth, +# "id": site_record.id, +# "location": site_record.name, +# "latitude": site_record.latitude, +# "longitude": site_record.longitude, +# "surface_elevation_ft": site_record.elevation, +# "well_depth_ft_below_ground_surface": site_record.well_depth, # } # # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/st_connector.py b/backend/connectors/st_connector.py index 3b8cded..92cac48 100644 --- a/backend/connectors/st_connector.py +++ b/backend/connectors/st_connector.py @@ -16,6 +16,7 @@ from datetime import datetime import frost_sta_client as fsc +from shapely import MultiPolygon, Polygon, unary_union from backend.bounding_polygons import get_state_polygon from backend.source import ( @@ -100,9 +101,9 @@ def get_records(self, *args, **kw): poly = config.bounding_wkt(as_wkt=False) # if poly is a MULTIPOLYGON convert to POLYGON - if poly.geom_type == "MultiPolygon": + if type(poly) == MultiPolygon: if len(poly.geoms) == 1: - poly = poly.geoms[0] + poly = unary_union(poly) else: # HUC4 1508 has 2 polygons, one of them is outside of NM state_boundary = get_state_polygon("NM") diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 0e1a14d..58abb7f 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -127,13 +127,13 @@ def get_records(self): class NWISWaterLevelSource(BaseWaterLevelSource): transformer_klass = NWISWaterLevelTransformer - def get_records(self, parent_record): + def get_records(self, site_record): params = { "format": "json", "siteType": "GW", "siteStatus": "all", "parameterCd": "72019", - "sites": ",".join(make_site_list(parent_record)), + "sites": ",".join(make_site_list(site_record)), } config = self.config @@ -155,8 +155,8 @@ def get_records(self, parent_record): self.log(f"Retrieved {len(records)} records") return records - def _extract_parent_records(self, records, parent_record): - return [ri for ri in records if ri["site_code"] == parent_record.id] + def _extract_site_records(self, records, site_record): + return [ri for ri in records if ri["site_code"] == site_record.id] def _clean_records(self, records): return [r for r in records if r["value"] is not None and r["value"].strip()] diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index 40d0acc..1f61cf5 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -52,18 +52,18 @@ def _transform(self, record): class NWISWaterLevelTransformer(WaterLevelTransformer): source_tag = "USGS-NWIS" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "USGS-NWIS", - # "id": parent_record.id, - # "location": parent_record.name, - # "usgs_site_id": parent_record.id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, - # "elevation_units": parent_record.elevation_units, - # "well_depth": parent_record.well_depth, - # "well_depth_units": parent_record.well_depth_units, + # "id": site_record.id, + # "location": site_record.name, + # "usgs_site_id": site_record.id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, + # "elevation_units": site_record.elevation_units, + # "well_depth": site_record.well_depth, + # "well_depth_units": site_record.well_depth_units, # # "date": record["datetime"], # # "value": record["lev_va"], # # "units": "ft", diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index 3213fc2..b734908 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -74,7 +74,12 @@ def health(self): def get_records(self): config = self.config - params = {"mimeType": "tsv", "siteType": "Well"} + params = { + "mimeType": "tsv", + "siteType": "Well", + "sampleMedia": "Water", + "statecode": "US:35", + } if config.has_bounds(): params["bBox"] = ",".join([str(b) for b in config.bbox_bounding_points()]) @@ -101,11 +106,9 @@ def _extract_parameter_record(self, record): record[DT_MEASURED] = record["ActivityStartDate"] return record - def _extract_parent_records(self, records, parent_record): + def _extract_site_records(self, records, site_record): return [ - ri - for ri in records - if ri["MonitoringLocationIdentifier"] == parent_record.id + ri for ri in records if ri["MonitoringLocationIdentifier"] == site_record.id ] def _extract_parameter_results(self, records): @@ -125,8 +128,8 @@ def _extract_most_recent(self, records): "units": ri["ResultMeasure/MeasureUnitCode"], } - def get_records(self, parent_record): - sites = make_site_list(parent_record) + def get_records(self, site_record): + sites = make_site_list(site_record) params = { "siteid": sites, diff --git a/backend/constants.py b/backend/constants.py index 0ccd1ec..2064a87 100644 --- a/backend/constants.py +++ b/backend/constants.py @@ -24,6 +24,7 @@ MAGNESIUM = "Magnesium" NITRATE = "Nitrate" POTASSIUM = "Potassium" +SILICA = "Silica" SODIUM = "Sodium" SULFATE = "Sulfate" URANIUM = "Uranium" @@ -60,6 +61,7 @@ MAGNESIUM, NITRATE, POTASSIUM, + SILICA, SODIUM, SULFATE, TDS, diff --git a/backend/geo_utils.py b/backend/geo_utils.py index cdd284f..930f76d 100644 --- a/backend/geo_utils.py +++ b/backend/geo_utils.py @@ -18,6 +18,8 @@ PROJECTIONS = {} TRANSFORMS = {} +ALLOWED_DATUMS = ["NAD27", "NAD83", "WGS84"] + def datum_transform(x, y, in_datum, out_datum): """ diff --git a/backend/persister.py b/backend/persister.py index b1a8a6b..bfc7e70 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -44,6 +44,8 @@ def __init__(self): self.combined = [] self.timeseries = [] self.sites = [] + self.logs = [] + self.warnings = [] # self.keys = record_klass.keys def load(self, records: list): @@ -98,6 +100,24 @@ def dump_sites(self, path: str): else: self.log("no sites to dump", fg="red") + def dump_logs(self, path: str): + if self.logs: + self.log(f"dumping logs to {os.path.abspath(path)}") + with open(path, "w") as f: + for l in self.logs: + f.write(f"{l}\n") + else: + self.log("no logs to dump") + + def dump_warnings(self, path: str): + if self.warnings: + self.log(f"dumping warnings to {os.path.abspath(path)}") + with open(path, "w") as f: + for w in self.warnings: + f.write(f"{w}\n") + else: + self.log("no warnings to dump") + def save(self, path: str): if self.records: path = self.add_extension(path) @@ -139,11 +159,12 @@ def write_memory(path, func, records): def dump_single_timeseries(writer, timeseries): + headers_have_not_been_written = True for i, (site, records) in enumerate(timeseries): - for j, record in enumerate(records): - if i == 0: + if i == 0 and headers_have_not_been_written: writer.writerow(record.keys) + headers_have_not_been_written = False writer.writerow(record.to_row()) diff --git a/backend/record.py b/backend/record.py index fc531b8..487d644 100644 --- a/backend/record.py +++ b/backend/record.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from backend.constants import DTW, PARAMETER, PARAMETER_VALUE, FEET +from backend.constants import DTW, PARAMETER, PARAMETER_VALUE, PARAMETER_UNITS, FEET class BaseRecord: @@ -82,8 +82,8 @@ class WaterLevelRecord(BaseRecord): class AnalyteRecord(BaseRecord): keys: tuple = ( - # "source", - # "id", + "source", + "id", # "location", # "latitude", # "longitude", @@ -91,6 +91,7 @@ class AnalyteRecord(BaseRecord): # "well_depth_ft_below_ground_surface", PARAMETER, PARAMETER_VALUE, + PARAMETER_UNITS, "date_measured", "time_measured", ) @@ -107,6 +108,7 @@ class SummaryRecord(BaseRecord): "alternate_site_id", "latitude", "longitude", + "horizontal_datum", "elevation", "elevation_units", "well_depth", diff --git a/backend/source.py b/backend/source.py index 9d120d2..0d56ef9 100644 --- a/backend/source.py +++ b/backend/source.py @@ -208,7 +208,8 @@ def warn(self, msg): ------- None """ - self.log(msg, fg="red") + s = self.log(msg, fg="red") + self.config.warnings.append(s) def log(self, msg, fg="yellow"): """ @@ -226,7 +227,10 @@ def log(self, msg, fg="yellow"): ------- None """ - click.secho(f"{self.__class__.__name__:25s} -- {msg}", fg=fg) + s = f"{self.__class__.__name__:25s} -- {msg}" + click.secho(s, fg=fg) + self.config.logs.append(s) + return s def _execute_text_request(self, url: str, params=None, **kw) -> str: """ @@ -326,7 +330,7 @@ def get_records(self, *args, **kw) -> dict: Parameters ---------- If parameter records: - parent_record : dict + site_record : dict the site record for the location whose parameter records are to be retrieved If site records: @@ -478,7 +482,7 @@ def read(self, *args, **kw) -> List[SiteRecord]: else: self.warn("No site records returned") - def _transform_sites(self, records: list) -> list: + def _transform_sites(self, records: list) -> List[SiteRecord]: """ Transforms site records into the standardized format. @@ -489,8 +493,8 @@ def _transform_sites(self, records: list) -> list: Returns ------- - list - a list of transformed site records + list[SiteRecord] + a list of transformed site records as SiteRecords """ transformed_records = [] for record in records: @@ -564,7 +568,7 @@ class BaseParameterSource(BaseSource): Returns a dictionary of parameter records where the keys are the site ids and the values are a list of the parameter records - _extract_parent_records + _extract_site_records Returns all records for a single site as a list of records _extract_most_recent @@ -603,7 +607,7 @@ class BaseParameterSource(BaseSource): # ========================================================================== def read( - self, parent_record: BaseSiteSource, use_summarize: bool + self, site_record: SiteRecord, use_summarize: bool, start_ind: int, end_ind: int ) -> List[ AnalyteRecord | AnalyteSummaryRecord @@ -621,7 +625,7 @@ def read( Parameters ---------- - parent_record : BaseSiteSource + site_record : SiteRecord the site record(s) for the location whose parameter records are to be retrieved use_summarize : bool @@ -632,57 +636,80 @@ def read( list[AnalyteRecord | AnalyteSummaryRecord | WaterLevelRecord | WaterLevelSummaryRecord] a list of transformed parameter records """ - if isinstance(parent_record, list): + if isinstance(site_record, list): self.log( - f"Gathering {self.name} summary for multiple records. {len(parent_record)}" + f"Gathering {self.name} summary for {len(site_record)} sites. {start_ind}-{end_ind}" ) else: - self.log( - f"{parent_record.id} ({parent_record.id}): Gathering {self.name} summary" - ) + self.log(f"{site_record.id}: Gathering {self.name} data") - all_analyte_records = self.get_records(parent_record) + all_analyte_records = self.get_records(site_record) if all_analyte_records: - if not isinstance(parent_record, list): - parent_record = [parent_record] + if not isinstance(site_record, list): + site_record = [site_record] # return values ret = [] # iterate over each site record and extract the parameter records for each site - for site in parent_record: - site_records = self._extract_parent_records(all_analyte_records, site) + for site in site_record: + site_records = self._extract_site_records(all_analyte_records, site) if not site_records: - self.warn(f"{site.name}: No parent records found") + self.warn(f"{site.id}: No records found") continue - # get cleaned records if _clean_records is defined by the source + # get cleaned records if _clean_records is defined by the source. This usually removes Nones/Null cleaned = self._clean_records(site_records) if not cleaned: - self.warn(f"{site.name} No clean records found") + self.warn(f"{site.id} No clean records found") continue - items = self._extract_parameter_results(cleaned) - units = self._extract_parameter_units(cleaned) - items = [ - convert_units(float(result), unit, self._get_output_units()) - for result, unit in zip(items, units) - ] + if use_summarize: - if items is not None: - n = len(items) - self.log(f"{site.name}: Retrieved {self.name}: {n}") + # doesn't need to be returned, but can be used to debug/for development + kept_items = [] + skipped_items = [] + + results = self._extract_parameter_results(cleaned) + units = self._extract_parameter_units(cleaned) + + for r, u in zip(results, units): + try: + converted_result, warning_msg = convert_units( + float(r), + u, + self._get_output_units(), + self.config.analyte, + ) + if warning_msg == "": + kept_items.append(converted_result) + else: + msg = f"{warning_msg} for {site.id}" + self.warn(msg) + skipped_items.append((site.id, r, u)) + except TypeError: + skipped_items.append((site.id, r, u)) + except ValueError: + skipped_items.append((site.id, r, u)) + + if len(skipped_items) > 0: + self.warn( + f"Skipped results because of formatting: {skipped_items}" + ) + + # if items is None or empty, no records were found or all results were None + if kept_items is not None and len(kept_items): + n = len(kept_items) + # self.log(f"{site.id}: Retrieved {self.name}: {n}") - # create the summaries if use_summarize is True, otherwise returned the cleaned and sorted records - if use_summarize: most_recent_result = self._extract_most_recent(cleaned) if not most_recent_result: continue rec = { "nrecords": n, - "min": min(items), - "max": max(items), - "mean": sum(items) / n, + "min": min(kept_items), + "max": max(kept_items), + "mean": sum(kept_items) / n, "most_recent_datetime": most_recent_result["datetime"], "most_recent_value": most_recent_result["value"], "most_recent_units": most_recent_result["units"], @@ -691,23 +718,32 @@ def read( rec, site, ) - ret.append(transformed_record) - else: - cleaned_sorted = [ - self.transformer.do_transform( - self._extract_parameter(record), site - ) - for record in cleaned - ] - cleaned_sorted = sorted(cleaned_sorted, key=self._sort_func) - ret.append((site, cleaned_sorted)) - + if transformed_record is None: + continue + else: + ret.append(transformed_record) + else: + cleaned_sorted = [ + self.transformer.do_transform( + self._extract_parameter(record), site + ) + for record in cleaned + if self.transformer.do_transform( + self._extract_parameter(record), site + ) + is not None + ] + if len(cleaned_sorted) == 0: + self.warn(f"{site.id}: No clean records found") + continue + cleaned_sorted = sorted(cleaned_sorted, key=self._sort_func) + ret.append((site, cleaned_sorted)) return ret else: - if isinstance(parent_record, list): - names = [str(r.id) for r in parent_record] + if isinstance(site_record, list): + names = [str(r.id) for r in site_record] else: - names = [str(parent_record.id)] + names = [str(site_record.id)] name = ",".join(names) self.warn(f"{name}: No records found") @@ -759,7 +795,7 @@ def _get_output_units(self) -> str: # Methods That Need to be Implemented For Each Source # ========================================================================== - def _extract_parent_records(self, records: dict, parent_record: dict) -> list: + def _extract_site_records(self, records: dict, site_record: dict) -> list: """ Returns all records for a single site as a list of records (which are dictionaries). @@ -768,7 +804,7 @@ def _extract_parent_records(self, records: dict, parent_record: dict) -> list: records : dict a dictionary of lists, where the keys are site ids and the values are parameter records - parent_record : dict + site_record : dict the site record for the location whose parameter records are to be retrieved Returns @@ -776,11 +812,11 @@ def _extract_parent_records(self, records: dict, parent_record: dict) -> list: list a list of records for the site """ - if parent_record.chunk_size == 1: + if site_record.chunk_size == 1: return records raise NotImplementedError( - f"{self.__class__.__name__} Must implement _extract_parent_records" + f"{self.__class__.__name__} Must implement _extract_site_records" ) def _clean_records(self, records: list) -> list: @@ -839,7 +875,7 @@ def _extract_parameter_units(self, records: list) -> list: def _extract_parameter_record(self, record: dict) -> dict: """ - Returns a parameter record with standardized fields added. + Returns a parameter record with standardized fields added. This is only used for time series, not summary outputs For an analyte, the fields are - backend.constants.PARAMETER @@ -867,7 +903,7 @@ def _extract_parameter_record(self, record: dict) -> dict: def _extract_parameter_results(self, records: list) -> list: """ - Returns the parameter results as a list from the records, in the same order as the records themselves + Returns the parameter results as a list from the records, in the same order as the records themselves. This is only used for summary outputs, not time serie Parameters ---------- @@ -885,7 +921,7 @@ def _extract_parameter_results(self, records: list) -> list: def _extract_parameter(self, record: dict) -> dict: """ - Extracts a parameter record from a list of records + Extracts a parameter record from a list of records. This is only used for time series, not summary outputs Parameters ---------- diff --git a/backend/transformer.py b/backend/transformer.py index 5fc01f4..06b60e6 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import click import pprint from datetime import datetime @@ -27,8 +28,9 @@ TONS_PER_ACRE_FOOT, MICROGRAMS_PER_LITER, DT_MEASURED, + PARAMETER_UNITS, ) -from backend.geo_utils import datum_transform +from backend.geo_utils import datum_transform, ALLOWED_DATUMS from backend.record import ( WaterLevelSummaryRecord, WaterLevelRecord, @@ -117,7 +119,7 @@ def transform_length_units( def convert_units( - input_value: int | float | str, input_units: str, output_units: str + input_value: int | float | str, input_units: str, output_units: str, analyte: str ) -> float: """ Converts the following units for any parameter value: @@ -127,6 +129,7 @@ def convert_units( - ppm to mg/L - ton/ac-ft to mg/L - ug/L to mg/L + - mg/L CaCO3 to mg/L length: - ft to m @@ -143,11 +146,17 @@ def convert_units( output_units: str The output unit of the value + analyte: str + The analyte to convert + Returns -------- float The converted value """ + warning = "" + conversion_factor = None + input_value = float(input_value) input_units = input_units.lower() output_units = output_units.lower() @@ -157,11 +166,30 @@ def convert_units( ppm = PARTS_PER_MILLION.lower() tpaf = TONS_PER_ACRE_FOOT.lower() + # edge cases for Bicarbonate + # BOR, WQP + if ( + input_units in ["mg/l caco3", "mg/l caco3**"] + and output_units == mgl + and analyte == "Bicarbonate" + ): + """ + https://aqua-chem.com/water-chemistry-caco3-equivalents/ + """ + conversion_factor = 1.22 + elif ( + input_units in ["mg/l caco3", "mg/l caco3**"] + and output_units == mgl + and analyte != "Bicarbonate" + ): + # this will catch if the input units are mg/l caco3 and the analyte is not bicarbonate so that the developer can perform the appropriate calculations for the conversion factor + conversion_factor = None + if input_units == output_units: - return input_value + conversion_factor = 1 if input_units == tpaf and output_units == mgl: - return input_value * 735.47 + conversion_factor = 735.47 if ( input_units == mgl @@ -169,10 +197,10 @@ def convert_units( or input_units == ppm and output_units == mgl ): - return input_value * 1.0 + conversion_factor = 1.0 if input_units == ugl and output_units == mgl: - return input_value * 0.001 + conversion_factor = 0.001 ft = FEET.lower() m = METERS.lower() @@ -183,12 +211,15 @@ def convert_units( input_units = m if input_units == ft and output_units == m: - return input_value * 0.3048 + conversion_factor = 0.3048 if input_units == m and output_units == ft: - return input_value * 3.28084 + conversion_factor = 3.28084 - print(f"Failed to convert {input_value} {input_units} to {output_units}") - return input_value + if conversion_factor: + return input_value * conversion_factor, warning + else: + warning = f"Failed to convert {input_value} {input_units} to {output_units} for {analyte}" + return input_value, warning def standardize_datetime(dt): @@ -333,8 +364,12 @@ def do_transform( if not record: return - if 'longitude' in record and 'latitude' in record: + # ensure that a site or summary record is contained within the boundaing polygon + if "longitude" in record and "latitude" in record: if not self.contained(record["longitude"], record["latitude"]): + self.warn( + f"Skipping site {record['id']}. It is not within the defined geographic bounds" + ) return self._post_transform(record, *args, **kw) @@ -366,6 +401,12 @@ def do_transform( x = float(record.longitude) input_horizontal_datum = record.horizontal_datum + if input_horizontal_datum not in ALLOWED_DATUMS: + self.warn( + f"Skipping site {record.id}. Datum {input_horizontal_datum} cannot be processed" + ) + return None + output_elevation_units = "" well_depth_units = "" output_horizontal_datum = "WGS84" @@ -400,6 +441,35 @@ def do_transform( record.update(well_depth=well_depth) record.update(well_depth_units=well_depth_unit) + # update the units to the output unit for analyte records + # this is done after converting the units to the output unit for the analyte records + # convert the parameter value to the output unit specified in the config + elif isinstance(record, (AnalyteRecord)): + r = record.parameter_value + u = record.parameter_units + warning_msg = "" + try: + converted_result, warning_msg = convert_units( + float(r), u, self.config.analyte_output_units, self.config.analyte + ) + if warning_msg != "": + msg = f"{warning_msg} for {record.id}" + self.warn(msg) + except TypeError: + msg = f"Keeping {r} for {record.id} on {record.date_measured} for time series data" + self.warn(msg) + converted_result = r + except ValueError: + msg = f"Keeping {r} for {record.id} on {record.date_measured} for time series data" + self.warn(msg) + converted_result = r + + if warning_msg == "": + record.update(parameter_value=converted_result) + record.update(parameter_units=self.config.analyte_output_units) + else: + record = None + return record def contained( @@ -436,6 +506,41 @@ def contained( return True + def warn(self, msg): + """ + Prints warning messages to the console in red + + Parameters + ---------- + msg : str + the message to print + + Returns + ------- + None + """ + self.log(msg, fg="red") + self.config.warnings.append(msg) + + def log(self, msg, fg="yellow"): + """ + Prints the message to the console in yellow + + Parameters + ---------- + msg : str + the message to print + + fg : str + the color of the message, defaults to yellow + + Returns + ------- + None + """ + click.secho(f"{self.__class__.__name__:25s} -- {msg}", fg=fg) + self.config.logs.append(f"{self.__class__.__name__:25s} -- {msg}") + # ========================================================================== # Methods That Need to be Implemented For Each SiteTransformer # ========================================================================== @@ -494,6 +599,11 @@ def _transform(self, *args, **kw) -> dict: site_record: dict The site record associated with the parameter record + + Returns + -------- + dict + The record with the standard fields added and populated """ raise NotImplementedError( f"{self.__class__.__name__} must implement _transform" @@ -537,10 +647,7 @@ def _transform(self, record, site_record): f"{self.__class__.__name__} source_tag is not set" ) - rec = { - "source": self.source_tag, - "id": site_record.id, - } + rec = {} if self.config.output_summary: self._transform_most_recents(record) @@ -553,6 +660,7 @@ def _transform(self, record, site_record): "alternate_site_id": site_record.alternate_site_id, "latitude": site_record.latitude, "longitude": site_record.longitude, + "horizontal_datum": site_record.horizontal_datum, "elevation": site_record.elevation, "elevation_units": site_record.elevation_units, "well_depth": site_record.well_depth, @@ -562,6 +670,16 @@ def _transform(self, record, site_record): } ) rec.update(record) + + """ + Some analyte records, like BOR, have a field called "id" that is the record's ID. + To allow for the record's "id" to be the site's "id", the record's "id" needs to be updated at the end. + """ + source_id = { + "source": self.source_tag, + "id": site_record.id, + } + rec.update(source_id) return rec def _transform_most_recents(self, record): @@ -571,7 +689,10 @@ def _transform_most_recents(self, record): record["most_recent_time"] = tt p, u = self._get_parameter() record["most_recent_value"] = convert_units( - record["most_recent_value"], record["most_recent_units"], u + record["most_recent_value"], + record["most_recent_units"], + u, + self.config.analyte, ) record["most_recent_units"] = u diff --git a/backend/unifier.py b/backend/unifier.py index c9a0531..94171e6 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -52,7 +52,7 @@ def unify_sites(config): def unify_analytes(config): print("Unifying analytes") - config.report() + config.report(log_report=True) config.validate() if not config.dry: @@ -64,7 +64,7 @@ def unify_analytes(config): def unify_waterlevels(config): print("Unifying waterlevels") - config.report() + config.report(log_report=True) config.validate() if not config.dry: @@ -112,36 +112,55 @@ def _perister_factory(config): def _site_wrapper(site_source, parameter_source, persister, config): try: + # TODO: fully develop checks/discoveries below + # if not site_source.check(): + # print(f"Skipping {site_source}. check failed") - if site_source.check(): - print(f"Skipping {site_source}. check failed") + # schemas = site_source.discover() + # if not schemas: + # print(f"No schemas found for {site_source}") - schemas = site_source.discover() - if not schemas: - print(f"No schemas found for {site_source}") - - # in the future make discover required - # return + # in the future make discover required + # return use_summarize = config.output_summary site_limit = config.site_limit sites = site_source.read() + if not sites: - print(f"No sites found for {site_source}") return - for i, sites in enumerate(site_source.chunks(sites)): - if site_limit and i > site_limit: + sites_with_records_count = 0 + start_ind = 1 + end_ind = 0 + first_flag = True + for sites in site_source.chunks(sites): + if site_limit and sites_with_records_count == site_limit: break + if type(sites) == list: + if first_flag: + end_ind += len(sites) + first_flag = False + else: + start_ind = end_ind + 1 + end_ind += len(sites) + if use_summarize: - summary_records = parameter_source.read(sites, use_summarize) + summary_records = parameter_source.read( + sites, use_summarize, start_ind, end_ind + ) if summary_records: persister.records.extend(summary_records) else: - results = parameter_source.read(sites, use_summarize) - if results is None: + results = parameter_source.read( + sites, use_summarize, start_ind, end_ind + ) + # no records are returned if there is no site record for parameter + # or if the record isn't clean (doesn't have the correct fields) + # don't count these sites to apply to site_limit + if results is None or len(results) == 0: continue if config.output_single_timeseries: @@ -156,6 +175,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): else: persister.timeseries.append((site, records)) persister.sites.append(site) + sites_with_records_count += 1 except BaseException: import traceback @@ -164,6 +184,12 @@ def _site_wrapper(site_source, parameter_source, persister, config): print(exc) print(f"Failed to unify {site_source}") + config.logs.append(exc) + config.logs.append(f"Failed to unify {site_source}") + + config.warnings.append(exc) + config.warnings.append(f"Failed to unify {site_source}") + def _unify_parameter( config, @@ -181,6 +207,11 @@ def _unify_parameter( else: persister.dump_combined(f"{config.output_path}.combined") persister.dump_timeseries(f"{config.output_path}_timeseries") + + persister.logs.extend(config.logs) + persister.warnings.extend(config.warnings) + persister.dump_logs(f"{config.output_path}.logs.txt") + persister.dump_warnings(f"{config.output_path}.warnings.txt") persister.finalize(config.output_name) diff --git a/frontend/cli.py b/frontend/cli.py index 1dab0f9..12a3cb2 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -133,6 +133,23 @@ def cli(): ), ] +TIMESERIES_OPTIONS = [ + click.option( + "--separate_timeseries_files", + is_flag=True, + default=False, + show_default=True, + help="Output separate timeseries files for every site", + ), + click.option( + "--single_timeseries_file", + is_flag=True, + default=False, + show_default=True, + help="Output single timeseries file, which includes all sites", + ), +] + def add_options(options): def _add_options(func): @@ -155,19 +172,14 @@ def wells(bbox, county): @cli.command() -@click.option( - "--timeseries", - is_flag=True, - default=False, - show_default=True, - help="Include timeseries data", -) +@add_options(TIMESERIES_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) def waterlevels( - timeseries, + separate_timeseries_files, + single_timeseries_file, start_date, end_date, bbox, @@ -184,8 +196,13 @@ def waterlevels( site_limit, dry, ): + if separate_timeseries_files or single_timeseries_file: + timeseries = True + else: + timeseries = False config = setup_config("waterlevels", timeseries, bbox, county, site_limit, dry) + config.output_single_timeseries = single_timeseries_file config.use_source_nmbgmr = no_amp config.use_source_nwis = no_nwis config.use_source_pvacd = no_pvacd @@ -210,20 +227,15 @@ def waterlevels( @cli.command() @click.argument("analyte", type=click.Choice(ANALYTE_CHOICES)) -@click.option( - "--timeseries", - is_flag=True, - default=False, - show_default=True, - help="Include timeseries data", -) +@add_options(TIMESERIES_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) def analytes( analyte, - timeseries, + separate_timeseries_files, + single_timeseries_file, start_date, end_date, bbox, @@ -240,11 +252,16 @@ def analytes( site_limit, dry, ): + if separate_timeseries_files or single_timeseries_file: + timeseries = True + else: + timeseries = False config = setup_config( f"analytes ({analyte})", timeseries, bbox, county, site_limit, dry ) config.analyte = analyte + config.output_single_timeseries = single_timeseries_file config.use_source_nmbgmr = no_amp config.use_source_nwis = no_nwis config.use_source_pvacd = no_pvacd diff --git a/tests/test_unifier.py b/tests/test_unifier.py index a2cd59a..3947ef6 100644 --- a/tests/test_unifier.py +++ b/tests/test_unifier.py @@ -120,6 +120,7 @@ def _test_waterlevels_timeseries( d = _setup_waterlevels(tmp_path, cfg, source) combined = d / "output.combined.csv" timeseries = d / "output_timeseries" + print(combined_flag) print("combined", combined.is_file(), combined_flag) assert combined.is_file() == combined_flag @@ -129,13 +130,15 @@ def _test_waterlevels_timeseries( return combined, timeseries -def _test_waterelevels_timeseries_date_range(tmp_path, cfg, source): +def _test_waterelevels_timeseries_date_range( + tmp_path, cfg, source, timeseries_flag=True, combined_flag=False +): combined, timeseries = _test_waterlevels_timeseries( tmp_path, cfg, source, - timeseries_flag=True, - combined_flag=False, + timeseries_flag=timeseries_flag, + combined_flag=combined_flag, ) for p in timeseries.iterdir(): @@ -351,11 +354,13 @@ def test_unify_waterlevels_ose_roswell_summary(tmp_path, waterlevel_summary_cfg) # Waterlevel timeseries tests ========================================================================================= def test_unify_waterlevels_nwis_timeseries(tmp_path, waterlevel_timeseries_cfg): + # there are one or more locations within the bounding box that have only + # one record, so there is a combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, "nwis", - combined_flag=False, + combined_flag=True, timeseries_flag=True, ) @@ -365,6 +370,8 @@ def test_unify_waterlevels_amp_timeseries(tmp_path, waterlevel_timeseries_cfg): def test_unify_waterlevels_pvacd_timeseries(tmp_path, waterlevel_timeseries_cfg): + # all locations within the bounding box have more than one record + # so there is no combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, @@ -377,6 +384,8 @@ def test_unify_waterlevels_pvacd_timeseries(tmp_path, waterlevel_timeseries_cfg) def test_unify_waterlevels_isc_seven_rivers_timeseries( tmp_path, waterlevel_timeseries_cfg ): + # all locations within the bounding box have more than one record + # so there is no combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, @@ -400,22 +409,40 @@ def test_waterlevels_nwis_summary_date_range(tmp_path, waterlevel_summary_cfg): # Waterlevel timeseries date range ==================================================================================== def test_waterlevels_nwis_timeseries_date_range(tmp_path, waterlevel_timeseries_cfg): + # there are one or more locations within the bounding box and date range + # that have only one record, so there is a combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "nwis" + tmp_path, + waterlevel_timeseries_cfg, + "nwis", + timeseries_flag=True, + combined_flag=True, ) def test_waterlevels_isc_seven_rivers_timeseries_date_range( tmp_path, waterlevel_timeseries_cfg ): + # all locations within the bounding box and date rangehave more than one + # record so there is no combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "iscsevenrivers" + tmp_path, + waterlevel_timeseries_cfg, + "iscsevenrivers", + timeseries_flag=True, + combined_flag=False, ) def test_waterlevels_pvacd_timeseries_date_range(tmp_path, waterlevel_timeseries_cfg): + # all locations within the bounding box and date rangehave more than one + # record so there is no combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "pvacd" + tmp_path, + waterlevel_timeseries_cfg, + "pvacd", + timeseries_flag=True, + combined_flag=False, ) @@ -429,6 +456,9 @@ def test_unify_analytes_amp_summary(tmp_path, analyte_summary_cfg): def test_unify_analytes_bor_summary(tmp_path, analyte_summary_cfg): + # BOR locations are found within Otero County + analyte_summary_cfg.county = "otero" + analyte_summary_cfg.bbox = None _test_analytes_summary(tmp_path, analyte_summary_cfg, "bor")