From ed7a74ab48ab0d32f4fc8eb024282746d49f82aa Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 20 Sep 2024 13:18:31 -0600 Subject: [PATCH 01/55] Added note to README.md for new sources to make tests --- backend/connectors/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/connectors/README.md b/backend/connectors/README.md index 01384e9..15a6467 100644 --- a/backend/connectors/README.md +++ b/backend/connectors/README.md @@ -8,6 +8,8 @@ The following are necessary for adding a source: - the `use_source_` flag needs to be added to the `analyte_sources` method in the `Config` class in **/backend/config.py** if analytes are available for that source - the `use_source_` flag needs to be added to the `water_level_sources` method in the `Config` class in **/backend/config.py** if water levels are available for that source +**IMPORTANT: add tests for the source** + For the sake of discription, the example source is called Faux. # /backend/connectors/faux From 2d8530f79b4099e36510b4b7778fc4a40416674a Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 20 Sep 2024 16:28:54 -0600 Subject: [PATCH 02/55] Convert MultiPolygon to Polygon for ST connector This update was made because the FROST server no longer accepts MultiPolygons as valid WKT inputs. This change will allow the ST connector to convert MultiPolygons to Polygons before sending them to the FROST server. However, if the MultiPolygon is not continuous, then the MultiPolygon won't be converted to a Polygon --- backend/connectors/st_connector.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/connectors/st_connector.py b/backend/connectors/st_connector.py index 19d127c..de16f3d 100644 --- a/backend/connectors/st_connector.py +++ b/backend/connectors/st_connector.py @@ -16,6 +16,7 @@ from datetime import datetime import frost_sta_client as fsc +from shapely import MultiPolygon, Polygon, unary_union from backend.source import ( BaseSiteSource, @@ -99,6 +100,8 @@ def get_records(self, *args, **kw): poly = config.bounding_wkt(as_wkt=False) # if poly is a MULTIPOLYGON convert to POLYGON + if type(poly) == MultiPolygon: + poly = unary_union(poly) fs.append(f"st_within(location, geography'{poly}')") fi = make_dt_filter( From eb9f92311e439a2bddda177aa02a71424c82cdbd Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 23 Sep 2024 16:15:25 -0600 Subject: [PATCH 03/55] Convert MultiPolygon to Polygon for ST2 --- backend/connectors/st_connector.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/backend/connectors/st_connector.py b/backend/connectors/st_connector.py index de16f3d..df22504 100644 --- a/backend/connectors/st_connector.py +++ b/backend/connectors/st_connector.py @@ -18,6 +18,7 @@ import frost_sta_client as fsc from shapely import MultiPolygon, Polygon, unary_union +from backend.bounding_polygons import get_state_polygon from backend.source import ( BaseSiteSource, BaseWaterLevelSource, @@ -101,7 +102,15 @@ def get_records(self, *args, **kw): poly = config.bounding_wkt(as_wkt=False) # if poly is a MULTIPOLYGON convert to POLYGON if type(poly) == MultiPolygon: - poly = unary_union(poly) + if len(poly.geoms) == 1: + poly = unary_union(poly) + else: + # HUC4 1508 has 2 polygons, one of them is outside of NM + state_boundary = get_state_polygon("NM") + for geom in poly: + if state_boundary.contains(geom): + poly = geom + break fs.append(f"st_within(location, geography'{poly}')") fi = make_dt_filter( From ec7a288fb6aec30bedca552d0f3f08bd5f34b035 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 24 Sep 2024 11:27:01 -0600 Subject: [PATCH 04/55] Added comment about contains in the transformer --- backend/transformer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/transformer.py b/backend/transformer.py index 5fc01f4..c13f4ed 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -333,6 +333,7 @@ def do_transform( if not record: return + # ensure that a site or summary record is contained within the boundaing polygon if 'longitude' in record and 'latitude' in record: if not self.contained(record["longitude"], record["latitude"]): return From 5d7f451c8eea9059233984bafaa6a99739e232c9 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 24 Sep 2024 12:38:21 -0600 Subject: [PATCH 05/55] Only change poly from MultiPolygon to Polygon by checking type Sometimes a WKT string is passed, so called poly.geoms will cause an error. To fix this, a MultiPolygon is converted to a Polygon only if type(poly) == "MultiPolygon" --- backend/connectors/st_connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/connectors/st_connector.py b/backend/connectors/st_connector.py index 8aa9ab0..92cac48 100644 --- a/backend/connectors/st_connector.py +++ b/backend/connectors/st_connector.py @@ -101,9 +101,9 @@ def get_records(self, *args, **kw): poly = config.bounding_wkt(as_wkt=False) # if poly is a MULTIPOLYGON convert to POLYGON - if poly.geom_type == "MultiPolygon": + if type(poly) == MultiPolygon: if len(poly.geoms) == 1: - poly = poly.geoms[0] + poly = unary_union(poly) else: # HUC4 1508 has 2 polygons, one of them is outside of NM state_boundary = get_state_polygon("NM") From fb602c916367cf6bdca8667f7869a54cbfcf39ad Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 24 Sep 2024 14:34:34 -0600 Subject: [PATCH 06/55] User site.id instead of site.name in reporting to be consistent site.name was printing the site name, but the id is used everywhere else. Now, when a site has no clean records or parent records, the id instead of the name is printed --- backend/source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/source.py b/backend/source.py index 9d120d2..781da3d 100644 --- a/backend/source.py +++ b/backend/source.py @@ -653,13 +653,13 @@ def read( for site in parent_record: site_records = self._extract_parent_records(all_analyte_records, site) if not site_records: - self.warn(f"{site.name}: No parent records found") + self.warn(f"{site.id}: No parent records found") continue # get cleaned records if _clean_records is defined by the source cleaned = self._clean_records(site_records) if not cleaned: - self.warn(f"{site.name} No clean records found") + self.warn(f"{site.id} No clean records found") continue items = self._extract_parameter_results(cleaned) @@ -671,7 +671,7 @@ def read( if items is not None: n = len(items) - self.log(f"{site.name}: Retrieved {self.name}: {n}") + self.log(f"{site.id}: Retrieved {self.name}: {n}") # create the summaries if use_summarize is True, otherwise returned the cleaned and sorted records if use_summarize: From f9b771b1e8dd6d559a9df8d4375678608b70550c Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 24 Sep 2024 14:55:15 -0600 Subject: [PATCH 07/55] Start enumeration at 1 to get the correct number of sites for limits If the enumeration starts at 0 then limit + 1 sites will be collected --- backend/unifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/unifier.py b/backend/unifier.py index c9a0531..12fa328 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -131,7 +131,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): print(f"No sites found for {site_source}") return - for i, sites in enumerate(site_source.chunks(sites)): + for i, sites in enumerate(site_source.chunks(sites), 1): if site_limit and i > site_limit: break From 3383a2249a0e8aac836a0127878d16b3dcc35f9c Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 24 Sep 2024 15:59:03 -0600 Subject: [PATCH 08/55] Only count sites toward site_limit if they have associated records Before this update all sites were counted toward the site limit, even if they had no associated records. This change updates the site limit to only count sites that have associated records. --- backend/unifier.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/backend/unifier.py b/backend/unifier.py index 12fa328..2e85f96 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -131,8 +131,9 @@ def _site_wrapper(site_source, parameter_source, persister, config): print(f"No sites found for {site_source}") return - for i, sites in enumerate(site_source.chunks(sites), 1): - if site_limit and i > site_limit: + sites_with_records_count = 0 + for sites in site_source.chunks(sites): + if site_limit and sites_with_records_count == site_limit: break if use_summarize: @@ -141,7 +142,10 @@ def _site_wrapper(site_source, parameter_source, persister, config): persister.records.extend(summary_records) else: results = parameter_source.read(sites, use_summarize) - if results is None: + # no records are returned if there is no site record for parameter + # or if the record isn't clean (doesn't have the correct fields) + # don't count these sites to apply to site_limit + if results is None or len(results) == 0: continue if config.output_single_timeseries: @@ -156,6 +160,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): else: persister.timeseries.append((site, records)) persister.sites.append(site) + sites_with_records_count += 1 except BaseException: import traceback From 4ffa71e86106a82c43e1c85926fa40d75beb6782 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 24 Sep 2024 16:54:51 -0600 Subject: [PATCH 09/55] Pass waterlevels tests The tests were failing because the combined file would be created in some situations but not others. Because of this, the combined_flag needed to be set for each source appropriately --- tests/test_unifier.py | 50 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/tests/test_unifier.py b/tests/test_unifier.py index a2cd59a..89761e6 100644 --- a/tests/test_unifier.py +++ b/tests/test_unifier.py @@ -120,6 +120,7 @@ def _test_waterlevels_timeseries( d = _setup_waterlevels(tmp_path, cfg, source) combined = d / "output.combined.csv" timeseries = d / "output_timeseries" + print(combined_flag) print("combined", combined.is_file(), combined_flag) assert combined.is_file() == combined_flag @@ -129,13 +130,18 @@ def _test_waterlevels_timeseries( return combined, timeseries -def _test_waterelevels_timeseries_date_range(tmp_path, cfg, source): - combined, timeseries = _test_waterlevels_timeseries( +def _test_waterelevels_timeseries_date_range( tmp_path, cfg, source, timeseries_flag=True, - combined_flag=False, + combined_flag=False): + combined, timeseries = _test_waterlevels_timeseries( + tmp_path, + cfg, + source, + timeseries_flag=timeseries_flag, + combined_flag=combined_flag, ) for p in timeseries.iterdir(): @@ -351,11 +357,13 @@ def test_unify_waterlevels_ose_roswell_summary(tmp_path, waterlevel_summary_cfg) # Waterlevel timeseries tests ========================================================================================= def test_unify_waterlevels_nwis_timeseries(tmp_path, waterlevel_timeseries_cfg): + # there are one or more locations within the bounding box that have only + # one record, so there is a combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, "nwis", - combined_flag=False, + combined_flag=True, timeseries_flag=True, ) @@ -365,11 +373,13 @@ def test_unify_waterlevels_amp_timeseries(tmp_path, waterlevel_timeseries_cfg): def test_unify_waterlevels_pvacd_timeseries(tmp_path, waterlevel_timeseries_cfg): + # all locations within the bounding box have more than one record + # so there is no combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, "pvacd", - combined_flag=False, + combined_flag=False, timeseries_flag=True, ) @@ -377,6 +387,8 @@ def test_unify_waterlevels_pvacd_timeseries(tmp_path, waterlevel_timeseries_cfg) def test_unify_waterlevels_isc_seven_rivers_timeseries( tmp_path, waterlevel_timeseries_cfg ): + # all locations within the bounding box have more than one record + # so there is no combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, @@ -399,23 +411,43 @@ def test_waterlevels_nwis_summary_date_range(tmp_path, waterlevel_summary_cfg): # Waterlevel timeseries date range ==================================================================================== -def test_waterlevels_nwis_timeseries_date_range(tmp_path, waterlevel_timeseries_cfg): +def test_waterlevels_nwis_timeseries_date_range( + tmp_path, + waterlevel_timeseries_cfg): + # there are one or more locations within the bounding box and date range + # that have only one record, so there is a combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "nwis" + tmp_path, + waterlevel_timeseries_cfg, + "nwis", + timeseries_flag=True, + combined_flag=True ) def test_waterlevels_isc_seven_rivers_timeseries_date_range( tmp_path, waterlevel_timeseries_cfg ): + # all locations within the bounding box and date rangehave more than one + # record so there is no combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "iscsevenrivers" + tmp_path, + waterlevel_timeseries_cfg, + "iscsevenrivers", + timeseries_flag=True, + combined_flag=False ) def test_waterlevels_pvacd_timeseries_date_range(tmp_path, waterlevel_timeseries_cfg): + # all locations within the bounding box and date rangehave more than one + # record so there is no combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "pvacd" + tmp_path, + waterlevel_timeseries_cfg, + "pvacd", + timeseries_flag=True, + combined_flag=False ) From 202b99a3918f5bfb73a5597efd9c1b9de84a2ae5 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 25 Sep 2024 10:24:45 -0600 Subject: [PATCH 10/55] Docstring for BaseTransformer._transform --- backend/transformer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/backend/transformer.py b/backend/transformer.py index c13f4ed..5c86acc 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -495,6 +495,11 @@ def _transform(self, *args, **kw) -> dict: site_record: dict The site record associated with the parameter record + + Returns + -------- + dict + The record with the standard fields added and populated """ raise NotImplementedError( f"{self.__class__.__name__} must implement _transform" From 7efceae1c97f44317aa2855d9293e7ccc79e3620 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 25 Sep 2024 11:05:46 -0600 Subject: [PATCH 11/55] Changed config for BoR tests as it is in Otero County --- tests/test_unifier.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_unifier.py b/tests/test_unifier.py index 89761e6..38e9100 100644 --- a/tests/test_unifier.py +++ b/tests/test_unifier.py @@ -461,6 +461,9 @@ def test_unify_analytes_amp_summary(tmp_path, analyte_summary_cfg): def test_unify_analytes_bor_summary(tmp_path, analyte_summary_cfg): + # BOR locations are found within Otero County + analyte_summary_cfg.county = "otero" + analyte_summary_cfg.bbox = None _test_analytes_summary(tmp_path, analyte_summary_cfg, "bor") From 20473bdc800c5c38c2603486459943c35fc4e560 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 25 Sep 2024 11:15:10 -0600 Subject: [PATCH 12/55] Added log and warn to transformer to communicate information to user This allows any transformer class to communicate to users in the same way that source classes do. This is useful for debugging and for communicating crucial information to the user. --- backend/transformer.py | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/backend/transformer.py b/backend/transformer.py index 5c86acc..e6f4d67 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import click import pprint from datetime import datetime @@ -336,8 +337,9 @@ def do_transform( # ensure that a site or summary record is contained within the boundaing polygon if 'longitude' in record and 'latitude' in record: if not self.contained(record["longitude"], record["latitude"]): + self.warn(f"Skipping site {record['id']}. It is not within the defined geographic bounds") return - + self._post_transform(record, *args, **kw) # standardize datetime @@ -436,6 +438,39 @@ def contained( return poly.contains(pt) return True + + def warn(self, msg): + """ + Prints warning messages to the console in red + + Parameters + ---------- + msg : str + the message to print + + Returns + ------- + None + """ + self.log(msg, fg="red") + + def log(self, msg, fg="yellow"): + """ + Prints the message to the console in yellow + + Parameters + ---------- + msg : str + the message to print + + fg : str + the color of the message, defaults to yellow + + Returns + ------- + None + """ + click.secho(f"{self.__class__.__name__:25s} -- {msg}", fg=fg) # ========================================================================== # Methods That Need to be Implemented For Each SiteTransformer From afda0b464c3300583ac05280befccd979072b2f7 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 25 Sep 2024 11:16:12 -0600 Subject: [PATCH 13/55] Updated docstring and type hints for BaseSiteSource._transform_sites This update indicates that a list of SiteRecords is returned, rather than just a list (what was previously documented) --- backend/source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/source.py b/backend/source.py index 781da3d..e84d513 100644 --- a/backend/source.py +++ b/backend/source.py @@ -478,7 +478,7 @@ def read(self, *args, **kw) -> List[SiteRecord]: else: self.warn("No site records returned") - def _transform_sites(self, records: list) -> list: + def _transform_sites(self, records: list) -> List[SiteRecord]: """ Transforms site records into the standardized format. @@ -489,8 +489,8 @@ def _transform_sites(self, records: list) -> list: Returns ------- - list - a list of transformed site records + list[SiteRecord] + a list of transformed site records as SiteRecords """ transformed_records = [] for record in records: From 2d02e4a10dbd11c5b3efad6166a11713ed336ada Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Wed, 25 Sep 2024 14:59:21 -0600 Subject: [PATCH 14/55] Renamed parent_record site_record for clarity All parent_records are site records. To clarify this, parent_record has been renamed to site_record. --- backend/connectors/README.md | 4 +- backend/connectors/bor/source.py | 8 ++-- backend/connectors/ckan/source.py | 8 ++-- backend/connectors/ckan/transformer.py | 16 ++++---- backend/connectors/isc_seven_rivers/source.py | 8 ++-- .../isc_seven_rivers/transformer.py | 12 +++--- backend/connectors/nmbgmr/source.py | 18 ++++----- backend/connectors/nmbgmr/transformer.py | 22 +++++----- backend/connectors/st2/source.py | 6 +-- backend/connectors/st2/transformer.py | 14 +++---- backend/connectors/usgs/source.py | 8 ++-- backend/connectors/usgs/transformer.py | 20 +++++----- backend/connectors/wqp/source.py | 8 ++-- backend/source.py | 40 +++++++++---------- 14 files changed, 96 insertions(+), 96 deletions(-) diff --git a/backend/connectors/README.md b/backend/connectors/README.md index 15a6467..4281414 100644 --- a/backend/connectors/README.md +++ b/backend/connectors/README.md @@ -32,7 +32,7 @@ The following methods need to be defined for Faux. See `BaseSiteSource` for doc The following methods need to be defined for Faux. See `BaseAnalyteSource` for doc strings for each of the methods: - `get_records` -- `_extract_parent_records` +- `_extract_site_records` - `_extract_parameter_units` - `_extract_most_recent` - `_extract_parameter_result` @@ -47,7 +47,7 @@ The following method is optional: The following methods need to be defined for Faux. See `BaseWaterLevelSource` for doc strings for each of the methods: - `get_records` -- `_extract_parent_records` +- `_extract_site_records` - `_extract_parameter_units` - `_extract_most_recent` - `_extract_parameter_result` diff --git a/backend/connectors/bor/source.py b/backend/connectors/bor/source.py index 2bfb772..bc5b108 100644 --- a/backend/connectors/bor/source.py +++ b/backend/connectors/bor/source.py @@ -87,9 +87,9 @@ def _extract_most_recent(self, rs): "units": record["attributes"]["resultAttributes"]["units"], } - def _extract_parent_records(self, records, parent_record): + def _extract_site_records(self, records, site_record): return [ - ri for ri in records if ri["attributes"]["locationId"] == parent_record.id + ri for ri in records if ri["attributes"]["locationId"] == site_record.id ] def _reorder_catalog_items(self, items): @@ -98,11 +98,11 @@ def _reorder_catalog_items(self, items): items = items[self._catalog_item_idx :] + items[: self._catalog_item_idx] return items - def get_records(self, parent_record): + def get_records(self, site_record): code = get_analyte_search_param(self.config.analyte, BOR_ANALYTE_MAPPING) for i, item in enumerate( - self._reorder_catalog_items(parent_record.catalogItems) + self._reorder_catalog_items(site_record.catalogItems) ): data = self._execute_json_request(f'https://data.usbr.gov{item["id"]}') diff --git a/backend/connectors/ckan/source.py b/backend/connectors/ckan/source.py index e70475a..0de25b3 100644 --- a/backend/connectors/ckan/source.py +++ b/backend/connectors/ckan/source.py @@ -112,12 +112,12 @@ def _parse_response(self, resp): class OSERoswellWaterLevelSource(OSERoswellSource, BaseWaterLevelSource): transformer_klass = OSERoswellWaterLevelTransformer - def get_records(self, parent_record): - return self._parse_response(parent_record, self.get_response()) + def get_records(self, site_record): + return self._parse_response(site_record, self.get_response()) - def _parse_response(self, parent_record, resp): + def _parse_response(self, site_record, resp): records = resp.json()["result"]["records"] - return [record for record in records if record["Site_ID"] == parent_record.id] + return [record for record in records if record["Site_ID"] == site_record.id] def _extract_parameter_results(self, records): return [float(r["DTWGS"]) for r in records] diff --git a/backend/connectors/ckan/transformer.py b/backend/connectors/ckan/transformer.py index 111171a..e56d609 100644 --- a/backend/connectors/ckan/transformer.py +++ b/backend/connectors/ckan/transformer.py @@ -48,17 +48,17 @@ def _transform(self, record): class OSERoswellWaterLevelTransformer(WaterLevelTransformer): source_tag = "CKAN/OSERoswell" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { - # "id": parent_record.id, + # "id": site_record.id, # "source": "CKAN/OSERoswell", - # "location": parent_record.name, - # "usgs_site_id": parent_record.id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, + # "location": site_record.name, + # "usgs_site_id": site_record.id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, # "elevation_units": "ft", - # "well_depth": parent_record.well_depth, + # "well_depth": site_record.well_depth, # "well_depth_units": "ft", # } # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/isc_seven_rivers/source.py b/backend/connectors/isc_seven_rivers/source.py index fe32656..c338877 100644 --- a/backend/connectors/isc_seven_rivers/source.py +++ b/backend/connectors/isc_seven_rivers/source.py @@ -109,12 +109,12 @@ def _extract_parameter_results(self, records): def _extract_parameter_units(self, records): return [r["units"] for r in records] - def get_records(self, parent_record): + def get_records(self, site_record): config = self.config analyte_id = self._get_analyte_id(config.analyte) if analyte_id: params = { - "monitoringPointId": parent_record.id, + "monitoringPointId": site_record.id, "analyteId": analyte_id, "start": 0, "end": config.now_ms(days=1), @@ -146,9 +146,9 @@ def get_datetime(record): class ISCSevenRiversWaterLevelSource(BaseWaterLevelSource): transformer_klass = ISCSevenRiversWaterLevelTransformer - def get_records(self, parent_record): + def get_records(self, site_record): params = { - "id": parent_record.id, + "id": site_record.id, "start": 0, "end": self.config.now_ms(days=1), } diff --git a/backend/connectors/isc_seven_rivers/transformer.py b/backend/connectors/isc_seven_rivers/transformer.py index e68aba3..dddbe9d 100644 --- a/backend/connectors/isc_seven_rivers/transformer.py +++ b/backend/connectors/isc_seven_rivers/transformer.py @@ -53,14 +53,14 @@ class ISCSevenRiversAnalyteTransformer(AnalyteTransformer): class ISCSevenRiversWaterLevelTransformer(WaterLevelTransformer): source_tag = "ISCSevenRivers" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "ISCSevenRivers", - # "id": parent_record.id, - # "location": parent_record.name, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, + # "id": site_record.id, + # "location": site_record.name, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, # "elevation_units": "ft", # } # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 08734f9..caa8e32 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -91,12 +91,12 @@ def get_records(self): class NMBGMRAnalyteSource(BaseAnalyteSource): transformer_klass = NMBGMRAnalyteTransformer - def get_records(self, parent_record): + def get_records(self, site_record): analyte = get_analyte_search_param(self.config.analyte, NMBGMR_ANALYTE_MAPPING) records = self._execute_json_request( _make_url("waterchemistry"), params={ - "pointid": ",".join(make_site_list(parent_record)), + "pointid": ",".join(make_site_list(site_record)), "analyte": analyte, }, tag="", @@ -107,8 +107,8 @@ def get_records(self, parent_record): return records_sorted_by_pointid - def _extract_parent_records(self, records, parent_record): - return records.get(parent_record.id, []) + def _extract_site_records(self, records, site_record): + return records.get(site_record.id, []) def _extract_parameter_units(self, records): return [r["Units"] for r in records] @@ -156,15 +156,15 @@ def _extract_most_recent(self, records): def _extract_parameter_results(self, records): return [r["DepthToWaterBGS"] for r in records] - def _extract_parent_records(self, records, parent_record): - return [ri for ri in records if ri["Well"]["PointID"] == parent_record.id] + def _extract_site_records(self, records, site_record): + return [ri for ri in records if ri["Well"]["PointID"] == site_record.id] - def get_records(self, parent_record): + def get_records(self, site_record): # if self.config.latest_water_level_only: - # params = {"pointids": parent_record.id} + # params = {"pointids": site_record.id} # url = _make_url("waterlevels/latest") # else: - params = {"pointid": ",".join(make_site_list(parent_record))} + params = {"pointid": ",".join(make_site_list(site_record))} # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") diff --git a/backend/connectors/nmbgmr/transformer.py b/backend/connectors/nmbgmr/transformer.py index 693b728..b84f6b2 100644 --- a/backend/connectors/nmbgmr/transformer.py +++ b/backend/connectors/nmbgmr/transformer.py @@ -51,19 +51,19 @@ class NMBGMRAnalyteTransformer(AnalyteTransformer): class NMBGMRWaterLevelTransformer(WaterLevelTransformer): source_tag = "NMBGMR" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "NMBGMR", - # "id": parent_record.id, - # "location": parent_record.name, - # "usgs_site_id": parent_record.usgs_site_id, - # "alternate_site_id": parent_record.alternate_site_id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "well_depth": parent_record.well_depth, - # "well_depth_units": parent_record.well_depth_units, - # "elevation": parent_record.elevation, - # "elevation_units": parent_record.elevation_units, + # "id": site_record.id, + # "location": site_record.name, + # "usgs_site_id": site_record.usgs_site_id, + # "alternate_site_id": site_record.alternate_site_id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "well_depth": site_record.well_depth, + # "well_depth_units": site_record.well_depth_units, + # "elevation": site_record.elevation, + # "elevation_units": site_record.elevation_units, # } # # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index f600b8f..511977d 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -88,12 +88,12 @@ def _extract_parameter_record(self, record): def _extract_parameter_results(self, records): return [r["observation"].result for r in records] - def get_records(self, parent_record, *args, **kw): + def get_records(self, site_record, *args, **kw): service = self.get_service() config = self.config records = [] - for t in self._get_things(service, parent_record): + for t in self._get_things(service, site_record): if t.name == "Water Well": for di in t.datastreams: @@ -112,7 +112,7 @@ def get_records(self, parent_record, *args, **kw): records.append( { "thing": t, - "location": parent_record, + "location": site_record, "datastream": di, "observation": obs, } diff --git a/backend/connectors/st2/transformer.py b/backend/connectors/st2/transformer.py index 352e44b..e41a170 100644 --- a/backend/connectors/st2/transformer.py +++ b/backend/connectors/st2/transformer.py @@ -58,15 +58,15 @@ class EBIDSiteTransformer(STSiteTransformer): # class ST2WaterLevelTransformer(WaterLevelTransformer): # source_tag = "ST2" -# def _transform_hook(self, record, config, parent_record, *args, **kw): +# def _transform_hook(self, record, config, site_record, *args, **kw): # rec = { # "source": self.source_id, -# "id": parent_record.id, -# "location": parent_record.name, -# "latitude": parent_record.latitude, -# "longitude": parent_record.longitude, -# "surface_elevation_ft": parent_record.elevation, -# "well_depth_ft_below_ground_surface": parent_record.well_depth, +# "id": site_record.id, +# "location": site_record.name, +# "latitude": site_record.latitude, +# "longitude": site_record.longitude, +# "surface_elevation_ft": site_record.elevation, +# "well_depth_ft_below_ground_surface": site_record.well_depth, # } # # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 0e1a14d..58abb7f 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -127,13 +127,13 @@ def get_records(self): class NWISWaterLevelSource(BaseWaterLevelSource): transformer_klass = NWISWaterLevelTransformer - def get_records(self, parent_record): + def get_records(self, site_record): params = { "format": "json", "siteType": "GW", "siteStatus": "all", "parameterCd": "72019", - "sites": ",".join(make_site_list(parent_record)), + "sites": ",".join(make_site_list(site_record)), } config = self.config @@ -155,8 +155,8 @@ def get_records(self, parent_record): self.log(f"Retrieved {len(records)} records") return records - def _extract_parent_records(self, records, parent_record): - return [ri for ri in records if ri["site_code"] == parent_record.id] + def _extract_site_records(self, records, site_record): + return [ri for ri in records if ri["site_code"] == site_record.id] def _clean_records(self, records): return [r for r in records if r["value"] is not None and r["value"].strip()] diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index 40d0acc..1f61cf5 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -52,18 +52,18 @@ def _transform(self, record): class NWISWaterLevelTransformer(WaterLevelTransformer): source_tag = "USGS-NWIS" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "USGS-NWIS", - # "id": parent_record.id, - # "location": parent_record.name, - # "usgs_site_id": parent_record.id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, - # "elevation_units": parent_record.elevation_units, - # "well_depth": parent_record.well_depth, - # "well_depth_units": parent_record.well_depth_units, + # "id": site_record.id, + # "location": site_record.name, + # "usgs_site_id": site_record.id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, + # "elevation_units": site_record.elevation_units, + # "well_depth": site_record.well_depth, + # "well_depth_units": site_record.well_depth_units, # # "date": record["datetime"], # # "value": record["lev_va"], # # "units": "ft", diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index 3213fc2..402f4c6 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -101,11 +101,11 @@ def _extract_parameter_record(self, record): record[DT_MEASURED] = record["ActivityStartDate"] return record - def _extract_parent_records(self, records, parent_record): + def _extract_site_records(self, records, site_record): return [ ri for ri in records - if ri["MonitoringLocationIdentifier"] == parent_record.id + if ri["MonitoringLocationIdentifier"] == site_record.id ] def _extract_parameter_results(self, records): @@ -125,8 +125,8 @@ def _extract_most_recent(self, records): "units": ri["ResultMeasure/MeasureUnitCode"], } - def get_records(self, parent_record): - sites = make_site_list(parent_record) + def get_records(self, site_record): + sites = make_site_list(site_record) params = { "siteid": sites, diff --git a/backend/source.py b/backend/source.py index e84d513..aed2108 100644 --- a/backend/source.py +++ b/backend/source.py @@ -326,7 +326,7 @@ def get_records(self, *args, **kw) -> dict: Parameters ---------- If parameter records: - parent_record : dict + site_record : dict the site record for the location whose parameter records are to be retrieved If site records: @@ -564,7 +564,7 @@ class BaseParameterSource(BaseSource): Returns a dictionary of parameter records where the keys are the site ids and the values are a list of the parameter records - _extract_parent_records + _extract_site_records Returns all records for a single site as a list of records _extract_most_recent @@ -603,7 +603,7 @@ class BaseParameterSource(BaseSource): # ========================================================================== def read( - self, parent_record: BaseSiteSource, use_summarize: bool + self, site_record: SiteRecord, use_summarize: bool ) -> List[ AnalyteRecord | AnalyteSummaryRecord @@ -621,7 +621,7 @@ def read( Parameters ---------- - parent_record : BaseSiteSource + site_record : SiteRecord the site record(s) for the location whose parameter records are to be retrieved use_summarize : bool @@ -632,28 +632,28 @@ def read( list[AnalyteRecord | AnalyteSummaryRecord | WaterLevelRecord | WaterLevelSummaryRecord] a list of transformed parameter records """ - if isinstance(parent_record, list): + if isinstance(site_record, list): self.log( - f"Gathering {self.name} summary for multiple records. {len(parent_record)}" + f"Gathering {self.name} summary for multiple records. {len(site_record)}" ) else: self.log( - f"{parent_record.id} ({parent_record.id}): Gathering {self.name} summary" + f"{site_record.id} ({site_record.id}): Gathering {self.name} summary" ) - all_analyte_records = self.get_records(parent_record) + all_analyte_records = self.get_records(site_record) if all_analyte_records: - if not isinstance(parent_record, list): - parent_record = [parent_record] + if not isinstance(site_record, list): + site_record = [site_record] # return values ret = [] # iterate over each site record and extract the parameter records for each site - for site in parent_record: - site_records = self._extract_parent_records(all_analyte_records, site) + for site in site_record: + site_records = self._extract_site_records(all_analyte_records, site) if not site_records: - self.warn(f"{site.id}: No parent records found") + self.warn(f"{site.id}: No site records found") continue # get cleaned records if _clean_records is defined by the source @@ -704,10 +704,10 @@ def read( return ret else: - if isinstance(parent_record, list): - names = [str(r.id) for r in parent_record] + if isinstance(site_record, list): + names = [str(r.id) for r in site_record] else: - names = [str(parent_record.id)] + names = [str(site_record.id)] name = ",".join(names) self.warn(f"{name}: No records found") @@ -759,7 +759,7 @@ def _get_output_units(self) -> str: # Methods That Need to be Implemented For Each Source # ========================================================================== - def _extract_parent_records(self, records: dict, parent_record: dict) -> list: + def _extract_site_records(self, records: dict, site_record: dict) -> list: """ Returns all records for a single site as a list of records (which are dictionaries). @@ -768,7 +768,7 @@ def _extract_parent_records(self, records: dict, parent_record: dict) -> list: records : dict a dictionary of lists, where the keys are site ids and the values are parameter records - parent_record : dict + site_record : dict the site record for the location whose parameter records are to be retrieved Returns @@ -776,11 +776,11 @@ def _extract_parent_records(self, records: dict, parent_record: dict) -> list: list a list of records for the site """ - if parent_record.chunk_size == 1: + if site_record.chunk_size == 1: return records raise NotImplementedError( - f"{self.__class__.__name__} Must implement _extract_parent_records" + f"{self.__class__.__name__} Must implement _extract_site_records" ) def _clean_records(self, records: list) -> list: From b05098e5a2fd02b4d7486e2e6e432534168f6927 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 1 Nov 2024 09:46:39 -0600 Subject: [PATCH 15/55] Disallow None values - including them breaks statistics/summaries --- backend/source.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/source.py b/backend/source.py index aed2108..42f6699 100644 --- a/backend/source.py +++ b/backend/source.py @@ -665,11 +665,13 @@ def read( items = self._extract_parameter_results(cleaned) units = self._extract_parameter_units(cleaned) items = [ - convert_units(float(result), unit, self._get_output_units()) + convert_units(float(result), unit, self._get_output_units()) for result, unit in zip(items, units) + if result is not None ] - if items is not None: + # if items is None or empty, no records were found or all results were None + if items is not None and len(items) > 0: n = len(items) self.log(f"{site.id}: Retrieved {self.name}: {n}") From d9c1a13597c08e9c9ac5b9eb117d5df9ce4790ed Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 1 Nov 2024 09:48:53 -0600 Subject: [PATCH 16/55] Revert to old NMBGMR API until /v1 is live --- backend/connectors/nmbgmr/source.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index caa8e32..bd5c620 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -50,11 +50,13 @@ def _make_url(endpoint): + # if os.getenv("DEBUG") == "1": + # return f"http://localhost:8000/latest/{endpoint}" + # return f"https://waterdata.nmt.edu/latest/{endpoint}" if os.getenv("DEBUG") == "1": return f"http://localhost:8000/{endpoint}" return f"https://waterdata.nmt.edu/{endpoint}" - class NMBGMRSiteSource(BaseSiteSource): transformer_klass = NMBGMRSiteTransformer chunk_size = 100 @@ -68,6 +70,7 @@ def health(self): def get_records(self): config = self.config + #params = {"site_type": "Groundwater other than spring (well)"} params = {} if config.has_bounds(): params["wkt"] = config.bounding_wkt() @@ -83,9 +86,27 @@ def get_records(self): params["parameter"] = "Manual groundwater levels" # tags="features" because the response object is a GeoJSON - return self._execute_json_request( + sites = self._execute_json_request( _make_url("locations"), params, tag="features", timeout=30 ) + return sites + + # loop through the responses and add well information for each location + # this may be slow because of the number of sites that need to be queried + # but it is necessary to get the well information. With further + # development, this could be faster if one can batch the requests + # to /wells + # for site in sites: + # well_info = self._execute_json_request( + # _make_url("/wells"), + # params={"pointid": site["properties"]["point_id"]}, + # tag="", + # ) + # site["properties"]["formation"] = well_info["formation"] + # site["properties"]["well_depth"] = well_info["well_depth_ftbgs"] + # site["properties"]["well_depth_units"] = "ft" + + class NMBGMRAnalyteSource(BaseAnalyteSource): From 7aa2d80099fe770072c79c8f83da0ef97be16bcc Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 1 Nov 2024 10:58:58 -0600 Subject: [PATCH 17/55] Ignore output files --- .gitignore | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.gitignore b/.gitignore index f6855af..4cbf08b 100644 --- a/.gitignore +++ b/.gitignore @@ -169,3 +169,10 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# outputs +output_timeseries +output.combined.csv +output.csv +output.sites.csv +output.timeseries.csv \ No newline at end of file From b6d01b469a48572a6ccf1fe2355724a692ea4667 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 1 Nov 2024 11:04:03 -0600 Subject: [PATCH 18/55] Enable user to specify single timeseries option for a single file --- frontend/cli.py | 48 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/frontend/cli.py b/frontend/cli.py index 1dab0f9..7bd09f1 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -133,6 +133,22 @@ def cli(): ), ] +TIMESERIES_OPTIONS = [ + click.option( + "--separate_timeseries", + is_flag=True, + default=False, + show_default=True, + help="Output separate timeseries files for every site", + ), + click.option( + "--single_timeseries", + is_flag=True, + default=False, + show_default=True, + help="Output single timeseries file, which includes all sites", + ), +] def add_options(options): def _add_options(func): @@ -155,19 +171,14 @@ def wells(bbox, county): @cli.command() -@click.option( - "--timeseries", - is_flag=True, - default=False, - show_default=True, - help="Include timeseries data", -) +@add_options(TIMESERIES_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) def waterlevels( - timeseries, + separate_timeseries, + single_timeseries, start_date, end_date, bbox, @@ -184,8 +195,13 @@ def waterlevels( site_limit, dry, ): + if separate_timeseries or single_timeseries: + timeseries = True + else: + timeseries = False config = setup_config("waterlevels", timeseries, bbox, county, site_limit, dry) + config.output_single_timeseries = single_timeseries config.use_source_nmbgmr = no_amp config.use_source_nwis = no_nwis config.use_source_pvacd = no_pvacd @@ -210,20 +226,15 @@ def waterlevels( @cli.command() @click.argument("analyte", type=click.Choice(ANALYTE_CHOICES)) -@click.option( - "--timeseries", - is_flag=True, - default=False, - show_default=True, - help="Include timeseries data", -) +@add_options(TIMESERIES_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) def analytes( analyte, - timeseries, + separate_timeseries, + single_timeseries, start_date, end_date, bbox, @@ -240,11 +251,16 @@ def analytes( site_limit, dry, ): + if separate_timeseries or single_timeseries: + timeseries = True + else: + timeseries = False config = setup_config( f"analytes ({analyte})", timeseries, bbox, county, site_limit, dry ) config.analyte = analyte + config.output_single_timeseries = single_timeseries config.use_source_nmbgmr = no_amp config.use_source_nwis = no_nwis config.use_source_pvacd = no_pvacd From 1807cd62cf5cb7d169d61e9bb08c6c4024b901be Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 1 Nov 2024 15:17:34 -0600 Subject: [PATCH 19/55] Add source, id, and parameter unit to timeseries files This information is evident when the files are named for the location, but needs to be included in the single timeseries file. For ease of development, and for being explicit, this information is now contained in all timeseries files. --- backend/record.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/backend/record.py b/backend/record.py index fc531b8..6bb69e9 100644 --- a/backend/record.py +++ b/backend/record.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from backend.constants import DTW, PARAMETER, PARAMETER_VALUE, FEET +from backend.constants import DTW, PARAMETER, PARAMETER_VALUE, PARAMETER_UNITS, FEET class BaseRecord: @@ -82,8 +82,8 @@ class WaterLevelRecord(BaseRecord): class AnalyteRecord(BaseRecord): keys: tuple = ( - # "source", - # "id", + "source", + "id", # "location", # "latitude", # "longitude", @@ -91,6 +91,7 @@ class AnalyteRecord(BaseRecord): # "well_depth_ft_below_ground_surface", PARAMETER, PARAMETER_VALUE, + PARAMETER_UNITS, "date_measured", "time_measured", ) From 084462b7d2cb4c366bde97037f87b82eecab7281 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 1 Nov 2024 15:19:20 -0600 Subject: [PATCH 20/55] Only add headers for first line of timeseries file Some locations may have more than one record, making dump_single_timeseries write the headers to multiple rows. This change ensures that headers are only written to the first row. --- backend/persister.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/persister.py b/backend/persister.py index b1a8a6b..9f53a95 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -139,11 +139,12 @@ def write_memory(path, func, records): def dump_single_timeseries(writer, timeseries): + headers_have_not_been_written = True for i, (site, records) in enumerate(timeseries): - for j, record in enumerate(records): - if i == 0: + if i == 0 and headers_have_not_been_written: writer.writerow(record.keys) + headers_have_not_been_written = False writer.writerow(record.to_row()) From 4d101dfdfd18ebcdb47d26df1021da1a8218f109 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 1 Nov 2024 15:20:18 -0600 Subject: [PATCH 21/55] Skip check() and discover() until implemented These methods are not yet implemented, so warnings/messages are printed every time. These have been commented out for now, but should be implemented in the future. --- backend/unifier.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/backend/unifier.py b/backend/unifier.py index 2e85f96..811175a 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -112,13 +112,13 @@ def _perister_factory(config): def _site_wrapper(site_source, parameter_source, persister, config): try: + # TODO: fully develop checks/discoveries below + # if not site_source.check(): + # print(f"Skipping {site_source}. check failed") - if site_source.check(): - print(f"Skipping {site_source}. check failed") - - schemas = site_source.discover() - if not schemas: - print(f"No schemas found for {site_source}") + # schemas = site_source.discover() + # if not schemas: + # print(f"No schemas found for {site_source}") # in the future make discover required # return From 8a8aa10de6d5ed6eb44afbd7212b487c8dce933b Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Fri, 1 Nov 2024 15:21:01 -0600 Subject: [PATCH 22/55] Enable user to choose separate timeseries files, or single timeseries file This update allows the user to specify if they want separate timeseries files for each timeseries, or a single timeseries file for all timeseries. This update also includes documentation in the README --- README.md | 8 ++++++++ frontend/cli.py | 20 ++++++++++---------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 89f8d4c..e63c62d 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,14 @@ pip install nmuwd ``` ## Usage + +### Timeseries & Summary +The flag `--separate_timeseries_files` exports timeseries for every location in their own file in the directory output_series (e.g. AB-0002.csv, AB-0003.csv). Locations with only one observation are gathered and exported to the file `output.combined.csv. + +The flag `--single_timeseries_file` exports all timeseries for all locations in one file titled output.timeseries.csv. It also exports a file titled output.sites.csv that contains site information, such as latitude, longitude, and elevation. + +If neither of the above flags are specified, a summary table called output.csv is exported. + ### Water Levels Get water levels for a county. Return a summary csv diff --git a/frontend/cli.py b/frontend/cli.py index 7bd09f1..7e7efff 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -135,14 +135,14 @@ def cli(): TIMESERIES_OPTIONS = [ click.option( - "--separate_timeseries", + "--separate_timeseries_files", is_flag=True, default=False, show_default=True, help="Output separate timeseries files for every site", ), click.option( - "--single_timeseries", + "--single_timeseries_file", is_flag=True, default=False, show_default=True, @@ -177,8 +177,8 @@ def wells(bbox, county): @add_options(SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) def waterlevels( - separate_timeseries, - single_timeseries, + separate_timeseries_files, + single_timeseries_file, start_date, end_date, bbox, @@ -195,13 +195,13 @@ def waterlevels( site_limit, dry, ): - if separate_timeseries or single_timeseries: + if separate_timeseries_files or single_timeseries_file: timeseries = True else: timeseries = False config = setup_config("waterlevels", timeseries, bbox, county, site_limit, dry) - config.output_single_timeseries = single_timeseries + config.output_single_timeseries = single_timeseries_file config.use_source_nmbgmr = no_amp config.use_source_nwis = no_nwis config.use_source_pvacd = no_pvacd @@ -233,8 +233,8 @@ def waterlevels( @add_options(DEBUG_OPTIONS) def analytes( analyte, - separate_timeseries, - single_timeseries, + separate_timeseries_files, + single_timeseries_file, start_date, end_date, bbox, @@ -251,7 +251,7 @@ def analytes( site_limit, dry, ): - if separate_timeseries or single_timeseries: + if separate_timeseries_files or single_timeseries_file: timeseries = True else: timeseries = False @@ -260,7 +260,7 @@ def analytes( ) config.analyte = analyte - config.output_single_timeseries = single_timeseries + config.output_single_timeseries = single_timeseries_file config.use_source_nmbgmr = no_amp config.use_source_nwis = no_nwis config.use_source_pvacd = no_pvacd From 9f4dbb5498a95cca65a824ff58134db8458c9edf Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 4 Nov 2024 10:16:29 -0700 Subject: [PATCH 23/55] Enable parameter record id to be same as site id | mg/L CaCO3 unit conversion --- backend/transformer.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/backend/transformer.py b/backend/transformer.py index e6f4d67..3d1fa35 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -28,6 +28,7 @@ TONS_PER_ACRE_FOOT, MICROGRAMS_PER_LITER, DT_MEASURED, + PARAMETER_UNITS ) from backend.geo_utils import datum_transform from backend.record import ( @@ -128,6 +129,7 @@ def convert_units( - ppm to mg/L - ton/ac-ft to mg/L - ug/L to mg/L + - mg/L CaCO3 to mg/L length: - ft to m @@ -158,6 +160,10 @@ def convert_units( ppm = PARTS_PER_MILLION.lower() tpaf = TONS_PER_ACRE_FOOT.lower() + # edge case for BOR for Bicarbonate + if input_units == "mg/l caco3": + input_units = mgl + if input_units == output_units: return input_value @@ -403,6 +409,12 @@ def do_transform( record.update(well_depth=well_depth) record.update(well_depth_units=well_depth_unit) + # update the units to the output unit for analyte records + # this is done after converting the units to the output unit for the analyte records + elif isinstance(record, (AnalyteRecord)): + #print(self.config.analyte_output_units) + record.update(parameter_units=self.config.analyte_output_units) + return record def contained( @@ -578,10 +590,7 @@ def _transform(self, record, site_record): f"{self.__class__.__name__} source_tag is not set" ) - rec = { - "source": self.source_tag, - "id": site_record.id, - } + rec = {} if self.config.output_summary: self._transform_most_recents(record) @@ -603,6 +612,16 @@ def _transform(self, record, site_record): } ) rec.update(record) + + """ + Some analyte records, like BOR, have a field called "id" that is the record's ID. + To allow for the record's "id" to be the site's "id", the record's "id" needs to be updated at the end. + """ + source_id = { + "source": self.source_tag, + "id": site_record.id, + } + rec.update(source_id) return rec def _transform_most_recents(self, record): From cf09bee6f8a9c5684e706086cbcdeba3a99b1d7a Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 4 Nov 2024 16:08:36 -0700 Subject: [PATCH 24/55] Persist logs & warnings | Log site indices for user to track | Skip sites with uknown datums --- .gitignore | 4 +++- backend/config.py | 3 +++ backend/geo_utils.py | 1 + backend/persister.py | 20 ++++++++++++++++++++ backend/source.py | 30 +++++++++++++++++++++--------- backend/transformer.py | 8 +++++++- backend/unifier.py | 32 +++++++++++++++++++++++++++++--- 7 files changed, 84 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 4cbf08b..5c03936 100644 --- a/.gitignore +++ b/.gitignore @@ -175,4 +175,6 @@ output_timeseries output.combined.csv output.csv output.sites.csv -output.timeseries.csv \ No newline at end of file +output.timeseries.csv +output.logs.txt +output.warnings.txt \ No newline at end of file diff --git a/backend/config.py b/backend/config.py index 68515f7..b49b9f3 100644 --- a/backend/config.py +++ b/backend/config.py @@ -135,6 +135,9 @@ class Config(object): use_csv: bool = True use_geojson: bool = False + logs: list = [] + warnings: list = [] + def __init__(self, model=None, payload=None): self.bbox = {} if model: diff --git a/backend/geo_utils.py b/backend/geo_utils.py index cdd284f..16c5a94 100644 --- a/backend/geo_utils.py +++ b/backend/geo_utils.py @@ -18,6 +18,7 @@ PROJECTIONS = {} TRANSFORMS = {} +ALLOWED_DATUMS = ["NAD27", "NAD83", "WGS84"] def datum_transform(x, y, in_datum, out_datum): """ diff --git a/backend/persister.py b/backend/persister.py index 9f53a95..bfc7e70 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -44,6 +44,8 @@ def __init__(self): self.combined = [] self.timeseries = [] self.sites = [] + self.logs = [] + self.warnings = [] # self.keys = record_klass.keys def load(self, records: list): @@ -98,6 +100,24 @@ def dump_sites(self, path: str): else: self.log("no sites to dump", fg="red") + def dump_logs(self, path: str): + if self.logs: + self.log(f"dumping logs to {os.path.abspath(path)}") + with open(path, "w") as f: + for l in self.logs: + f.write(f"{l}\n") + else: + self.log("no logs to dump") + + def dump_warnings(self, path: str): + if self.warnings: + self.log(f"dumping warnings to {os.path.abspath(path)}") + with open(path, "w") as f: + for w in self.warnings: + f.write(f"{w}\n") + else: + self.log("no warnings to dump") + def save(self, path: str): if self.records: path = self.add_extension(path) diff --git a/backend/source.py b/backend/source.py index 42f6699..f1bade2 100644 --- a/backend/source.py +++ b/backend/source.py @@ -208,7 +208,8 @@ def warn(self, msg): ------- None """ - self.log(msg, fg="red") + s = self.log(msg, fg="red") + self.config.warnings.append(s) def log(self, msg, fg="yellow"): """ @@ -226,7 +227,10 @@ def log(self, msg, fg="yellow"): ------- None """ - click.secho(f"{self.__class__.__name__:25s} -- {msg}", fg=fg) + s = f"{self.__class__.__name__:25s} -- {msg}" + click.secho(s, fg=fg) + self.config.logs.append(s) + return s def _execute_text_request(self, url: str, params=None, **kw) -> str: """ @@ -603,7 +607,7 @@ class BaseParameterSource(BaseSource): # ========================================================================== def read( - self, site_record: SiteRecord, use_summarize: bool + self, site_record: SiteRecord, use_summarize: bool, start_ind: int, end_ind: int ) -> List[ AnalyteRecord | AnalyteSummaryRecord @@ -634,7 +638,7 @@ def read( """ if isinstance(site_record, list): self.log( - f"Gathering {self.name} summary for multiple records. {len(site_record)}" + f"Gathering {self.name} summary for {len(site_record)} sites. {start_ind}-{end_ind}" ) else: self.log( @@ -661,19 +665,27 @@ def read( if not cleaned: self.warn(f"{site.id} No clean records found") continue + + # doesn't need to be returned, but can be used to debug/for development + skipped_items = [] - items = self._extract_parameter_results(cleaned) + results = self._extract_parameter_results(cleaned) units = self._extract_parameter_units(cleaned) + items = [ - convert_units(float(result), unit, self._get_output_units()) - for result, unit in zip(items, units) - if result is not None + convert_units(float(result), unit, self._get_output_units()) if (type(result) in [int, float]) + else result if (not use_summarize) + else skipped_items.append((result)) + for result, unit in zip(results, units) ] + if len(skipped_items) > 0: + self.warn(f"Skipped results because of formatting: {skipped_items}") + # if items is None or empty, no records were found or all results were None if items is not None and len(items) > 0: n = len(items) - self.log(f"{site.id}: Retrieved {self.name}: {n}") + # self.log(f"{site.id}: Retrieved {self.name}: {n}") # create the summaries if use_summarize is True, otherwise returned the cleaned and sorted records if use_summarize: diff --git a/backend/transformer.py b/backend/transformer.py index 3d1fa35..cdb9782 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -30,7 +30,7 @@ DT_MEASURED, PARAMETER_UNITS ) -from backend.geo_utils import datum_transform +from backend.geo_utils import datum_transform, ALLOWED_DATUMS from backend.record import ( WaterLevelSummaryRecord, WaterLevelRecord, @@ -375,6 +375,10 @@ def do_transform( x = float(record.longitude) input_horizontal_datum = record.horizontal_datum + if input_horizontal_datum not in ALLOWED_DATUMS: + self.warn(f"Skipping site {record.id}. Datum {input_horizontal_datum} cannot be processed") + return None + output_elevation_units = "" well_depth_units = "" output_horizontal_datum = "WGS84" @@ -465,6 +469,7 @@ def warn(self, msg): None """ self.log(msg, fg="red") + self.config.warnings.append(msg) def log(self, msg, fg="yellow"): """ @@ -483,6 +488,7 @@ def log(self, msg, fg="yellow"): None """ click.secho(f"{self.__class__.__name__:25s} -- {msg}", fg=fg) + self.config.logs.append(f"{self.__class__.__name__:25s} -- {msg}") # ========================================================================== # Methods That Need to be Implemented For Each SiteTransformer diff --git a/backend/unifier.py b/backend/unifier.py index 811175a..e3764e9 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -128,20 +128,33 @@ def _site_wrapper(site_source, parameter_source, persister, config): sites = site_source.read() if not sites: - print(f"No sites found for {site_source}") + print(f"No sites found for {site_source.tag}") + config.logs.append(f"No sites found for {site_source.tag}") + config.warnings.append(f"No sites found for {site_source.tag}") return sites_with_records_count = 0 + start_ind = 1 + end_ind = 0 + first_flag = True for sites in site_source.chunks(sites): if site_limit and sites_with_records_count == site_limit: break + if type(sites) == list: + if first_flag: + end_ind += len(sites) + first_flag = False + else: + start_ind = end_ind + 1 + end_ind += len(sites) + if use_summarize: - summary_records = parameter_source.read(sites, use_summarize) + summary_records = parameter_source.read(sites, use_summarize, start_ind, end_ind) if summary_records: persister.records.extend(summary_records) else: - results = parameter_source.read(sites, use_summarize) + results = parameter_source.read(sites, use_summarize, start_ind, end_ind) # no records are returned if there is no site record for parameter # or if the record isn't clean (doesn't have the correct fields) # don't count these sites to apply to site_limit @@ -161,6 +174,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): persister.timeseries.append((site, records)) persister.sites.append(site) sites_with_records_count += 1 + except BaseException: import traceback @@ -168,6 +182,12 @@ def _site_wrapper(site_source, parameter_source, persister, config): exc = traceback.format_exc() print(exc) print(f"Failed to unify {site_source}") + + config.logs.append(exc) + config.logs.append(f"Failed to unify {site_source}") + + config.warnings.append(exc) + config.warnings.append(f"Failed to unify {site_source}") def _unify_parameter( @@ -186,6 +206,12 @@ def _unify_parameter( else: persister.dump_combined(f"{config.output_path}.combined") persister.dump_timeseries(f"{config.output_path}_timeseries") + + + persister.logs.extend(config.logs) + persister.warnings.extend(config.warnings) + persister.dump_logs(f"{config.output_path}.logs.txt") + persister.dump_warnings(f"{config.output_path}.warnings.txt") persister.finalize(config.output_name) From 5f6cc3bfdf011341731ded5fee6d91695489babf Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 4 Nov 2024 17:03:12 -0700 Subject: [PATCH 25/55] Clean ST2 waterlevel records - only return if not None --- backend/connectors/st2/source.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index 511977d..159c0eb 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -87,6 +87,10 @@ def _extract_parameter_record(self, record): def _extract_parameter_results(self, records): return [r["observation"].result for r in records] + + def _clean_records(self, records: list) -> list: + rs = [r for r in records if r["observation"].result is not None] + return rs def get_records(self, site_record, *args, **kw): service = self.get_service() From 02d045ff6d94b955370188ad44bd3410b9a433f8 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 4 Nov 2024 17:43:55 -0700 Subject: [PATCH 26/55] Keep non-numeric results if not summarize, keep for timeseries data Non-numeric results cannot be compared or summed for the purpose of summarization. They are kept for timeseries data --- backend/source.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/backend/source.py b/backend/source.py index f1bade2..f22fb60 100644 --- a/backend/source.py +++ b/backend/source.py @@ -672,12 +672,21 @@ def read( results = self._extract_parameter_results(cleaned) units = self._extract_parameter_units(cleaned) - items = [ - convert_units(float(result), unit, self._get_output_units()) if (type(result) in [int, float]) - else result if (not use_summarize) - else skipped_items.append((result)) - for result, unit in zip(results, units) - ] + items = [] + + # skip non-numeric results for summarization as they can't be compared or summed + # pass and try/except blocks cannot be used in list comprehension, so it needs to be done in a for loop even though it's slightly slower + for r, u in zip(results, units): + if use_summarize: + try: + items.append(convert_units(float(r), u, self._get_output_units())) + except TypeError: + skipped_items.append((r, u)) + else: + try: + items.append(convert_units(float(r), u, self._get_output_units())) + except TypeError: + items.append(r) if len(skipped_items) > 0: self.warn(f"Skipped results because of formatting: {skipped_items}") @@ -705,7 +714,10 @@ def read( rec, site, ) - ret.append(transformed_record) + if transformed_record is None: + continue + else: + ret.append(transformed_record) else: cleaned_sorted = [ self.transformer.do_transform( @@ -715,7 +727,6 @@ def read( ] cleaned_sorted = sorted(cleaned_sorted, key=self._sort_func) ret.append((site, cleaned_sorted)) - return ret else: if isinstance(site_record, list): From 8e7ab1c2d935757348f29e7c44572d23c8ea2fa1 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 4 Nov 2024 17:45:00 -0700 Subject: [PATCH 27/55] Add horizontal_datum to summary records --- backend/transformer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/transformer.py b/backend/transformer.py index cdb9782..bbc95de 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -609,6 +609,7 @@ def _transform(self, record, site_record): "alternate_site_id": site_record.alternate_site_id, "latitude": site_record.latitude, "longitude": site_record.longitude, + "horizontal_datum": site_record.horizontal_datum, "elevation": site_record.elevation, "elevation_units": site_record.elevation_units, "well_depth": site_record.well_depth, From f422d78bd9d81bc0e7fa915744ca4a9fcb7a5fb5 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 4 Nov 2024 17:45:27 -0700 Subject: [PATCH 28/55] Add horizontal_datum to summary records --- backend/record.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/record.py b/backend/record.py index 6bb69e9..487d644 100644 --- a/backend/record.py +++ b/backend/record.py @@ -108,6 +108,7 @@ class SummaryRecord(BaseRecord): "alternate_site_id", "latitude", "longitude", + "horizontal_datum", "elevation", "elevation_units", "well_depth", From bb5e4ee622bde62a34fa1a9089ffc4f6fa3a84bf Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 4 Nov 2024 17:46:57 -0700 Subject: [PATCH 29/55] Log if no sites are found --- backend/unifier.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/unifier.py b/backend/unifier.py index e3764e9..0e882af 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -127,10 +127,13 @@ def _site_wrapper(site_source, parameter_source, persister, config): site_limit = config.site_limit sites = site_source.read() + sites = [s for s in sites if s is not None] + if not sites: - print(f"No sites found for {site_source.tag}") - config.logs.append(f"No sites found for {site_source.tag}") - config.warnings.append(f"No sites found for {site_source.tag}") + not_sites_msg = f"No sites found for {site_source.tag}" + print(not_sites_msg) + config.logs.append(not_sites_msg) + config.warnings.append(not_sites_msg) return sites_with_records_count = 0 From 2d84c612b2bd58d20edd5bb85b81e7added9d2a9 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 08:17:57 -0700 Subject: [PATCH 30/55] Catch value errors when converting units --- backend/source.py | 4 ++++ backend/unifier.py | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/source.py b/backend/source.py index f22fb60..f8b009c 100644 --- a/backend/source.py +++ b/backend/source.py @@ -682,11 +682,15 @@ def read( items.append(convert_units(float(r), u, self._get_output_units())) except TypeError: skipped_items.append((r, u)) + except ValueError: + skipped_items.append((r, u)) else: try: items.append(convert_units(float(r), u, self._get_output_units())) except TypeError: items.append(r) + except ValueError: + items.append(r) if len(skipped_items) > 0: self.warn(f"Skipped results because of formatting: {skipped_items}") diff --git a/backend/unifier.py b/backend/unifier.py index 0e882af..d08db78 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -127,7 +127,6 @@ def _site_wrapper(site_source, parameter_source, persister, config): site_limit = config.site_limit sites = site_source.read() - sites = [s for s in sites if s is not None] if not sites: not_sites_msg = f"No sites found for {site_source.tag}" From 5c093ff19c89d50fb9c96be484e8620aa34889bf Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 08:58:09 -0700 Subject: [PATCH 31/55] Log/warn failed to convert units message --- backend/source.py | 12 ++++++++++-- backend/transformer.py | 22 ++++++++++++++-------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/backend/source.py b/backend/source.py index f8b009c..16549bb 100644 --- a/backend/source.py +++ b/backend/source.py @@ -679,14 +679,22 @@ def read( for r, u in zip(results, units): if use_summarize: try: - items.append(convert_units(float(r), u, self._get_output_units())) + converted_result, warning_msg = convert_units(float(r), u, self._get_output_units()) + items.append(converted_result) + if warning_msg != "": + msg = f"{warning_msg} for {site.id}" + self.warn(msg) except TypeError: skipped_items.append((r, u)) except ValueError: skipped_items.append((r, u)) else: try: - items.append(convert_units(float(r), u, self._get_output_units())) + converted_result, warning_msg = convert_units(float(r), u, self._get_output_units()) + items.append(converted_result) + if warning_msg != "": + msg = f"{warning_msg} for {site.id}" + self.warn(msg) except TypeError: items.append(r) except ValueError: diff --git a/backend/transformer.py b/backend/transformer.py index bbc95de..2bab59d 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -151,6 +151,9 @@ def convert_units( float The converted value """ + warning = "" + conversion_factor = None + input_value = float(input_value) input_units = input_units.lower() output_units = output_units.lower() @@ -165,10 +168,10 @@ def convert_units( input_units = mgl if input_units == output_units: - return input_value + conversion_factor = 1 if input_units == tpaf and output_units == mgl: - return input_value * 735.47 + conversion_factor = 735.47 if ( input_units == mgl @@ -176,10 +179,10 @@ def convert_units( or input_units == ppm and output_units == mgl ): - return input_value * 1.0 + conversion_factor = 1.0 if input_units == ugl and output_units == mgl: - return input_value * 0.001 + conversion_factor = 0.001 ft = FEET.lower() m = METERS.lower() @@ -190,12 +193,15 @@ def convert_units( input_units = m if input_units == ft and output_units == m: - return input_value * 0.3048 + conversion_factor = 0.3048 if input_units == m and output_units == ft: - return input_value * 3.28084 + conversion_factor = 3.28084 - print(f"Failed to convert {input_value} {input_units} to {output_units}") - return input_value + if conversion_factor: + return input_value * conversion_factor, warning + else: + warning = f"Failed to convert {input_value} {input_units} to {output_units}" + return input_value, warning def standardize_datetime(dt): From 25cf6faf940693ba21a12875e8aa5ce1de67f2dc Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 12:17:32 -0700 Subject: [PATCH 32/55] Restrict WQP data to Water samples only --- backend/connectors/wqp/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index 402f4c6..bab9910 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -74,7 +74,7 @@ def health(self): def get_records(self): config = self.config - params = {"mimeType": "tsv", "siteType": "Well"} + params = {"mimeType": "tsv", "siteType": "Well", "sampleMedia": "Water"} if config.has_bounds(): params["bBox"] = ",".join([str(b) for b in config.bbox_bounding_points()]) From d65765cc06e08db1c013700844920fcb3aecc3e6 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 12:19:01 -0700 Subject: [PATCH 33/55] Restrict WQP sites to New Mexico only --- backend/connectors/wqp/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index bab9910..ee0590f 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -74,7 +74,7 @@ def health(self): def get_records(self): config = self.config - params = {"mimeType": "tsv", "siteType": "Well", "sampleMedia": "Water"} + params = {"mimeType": "tsv", "siteType": "Well", "sampleMedia": "Water", "statecode": "US%3A35"} if config.has_bounds(): params["bBox"] = ",".join([str(b) for b in config.bbox_bounding_points()]) From c277aabe1c861380b3919ebb92f4bcc5951dc1f2 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 12:24:08 -0700 Subject: [PATCH 34/55] Fixed statecode typo --- backend/connectors/wqp/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index ee0590f..fc343dc 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -74,7 +74,7 @@ def health(self): def get_records(self): config = self.config - params = {"mimeType": "tsv", "siteType": "Well", "sampleMedia": "Water", "statecode": "US%3A35"} + params = {"mimeType": "tsv", "siteType": "Well", "sampleMedia": "Water", "statecode": "US:35"} if config.has_bounds(): params["bBox"] = ",".join([str(b) for b in config.bbox_bounding_points()]) From dedc22466056c68541e7dcdeef07062d46cde3c3 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 12:24:39 -0700 Subject: [PATCH 35/55] catch CaCO3 units edgecase for WQP data Some units are reported as mg/L CaCO3** --- backend/transformer.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/backend/transformer.py b/backend/transformer.py index 2bab59d..596ff08 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -163,9 +163,10 @@ def convert_units( ppm = PARTS_PER_MILLION.lower() tpaf = TONS_PER_ACRE_FOOT.lower() - # edge case for BOR for Bicarbonate - if input_units == "mg/l caco3": - input_units = mgl + # edge cases for Bicarbonate + # BOR, WQP + if input_units in ["mg/l caco3", "mg/l caco3**"] and output_units == mgl: + conversion_factor = 1 if input_units == output_units: conversion_factor = 1 From 16c64735d0f8a066a49712ae669f7b214313c1dc Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 12:25:24 -0700 Subject: [PATCH 36/55] Provide site id for skipped converted unit items --- backend/source.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/backend/source.py b/backend/source.py index 16549bb..cdfffca 100644 --- a/backend/source.py +++ b/backend/source.py @@ -680,21 +680,25 @@ def read( if use_summarize: try: converted_result, warning_msg = convert_units(float(r), u, self._get_output_units()) - items.append(converted_result) - if warning_msg != "": + if warning_msg == "": + items.append(converted_result) + else: msg = f"{warning_msg} for {site.id}" self.warn(msg) + skipped_items.append((site.id, r, u)) except TypeError: - skipped_items.append((r, u)) + skipped_items.append((site.id, r, u)) except ValueError: - skipped_items.append((r, u)) + skipped_items.append((site.id, r, u)) else: try: converted_result, warning_msg = convert_units(float(r), u, self._get_output_units()) - items.append(converted_result) - if warning_msg != "": + if warning_msg == "": + items.append(converted_result) + else: msg = f"{warning_msg} for {site.id}" self.warn(msg) + skipped_items.append((site.id, r, u)) except TypeError: items.append(r) except ValueError: From 05f16cb68ae6cf2c1caa78d5e24dd187bbaa84f8 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 13:51:44 -0700 Subject: [PATCH 37/55] Fixed error where records were added to time series that shouldve been omitted This is because the unit conversion was being done on items, which are used for summaries, whereas the conversions needed to be done on the records themselves --- backend/source.py | 77 ++++++++++++++++++------------------------ backend/transformer.py | 25 ++++++++++++-- 2 files changed, 56 insertions(+), 46 deletions(-) diff --git a/backend/source.py b/backend/source.py index cdfffca..afc3635 100644 --- a/backend/source.py +++ b/backend/source.py @@ -665,23 +665,21 @@ def read( if not cleaned: self.warn(f"{site.id} No clean records found") continue - - # doesn't need to be returned, but can be used to debug/for development - skipped_items = [] - results = self._extract_parameter_results(cleaned) - units = self._extract_parameter_units(cleaned) + if use_summarize: - items = [] + # doesn't need to be returned, but can be used to debug/for development + kept_items = [] + skipped_items = [] - # skip non-numeric results for summarization as they can't be compared or summed - # pass and try/except blocks cannot be used in list comprehension, so it needs to be done in a for loop even though it's slightly slower - for r, u in zip(results, units): - if use_summarize: + results = self._extract_parameter_results(cleaned) + units = self._extract_parameter_units(cleaned) + + for r, u in zip(results, units): try: converted_result, warning_msg = convert_units(float(r), u, self._get_output_units()) if warning_msg == "": - items.append(converted_result) + kept_items.append(converted_result) else: msg = f"{warning_msg} for {site.id}" self.warn(msg) @@ -690,38 +688,23 @@ def read( skipped_items.append((site.id, r, u)) except ValueError: skipped_items.append((site.id, r, u)) - else: - try: - converted_result, warning_msg = convert_units(float(r), u, self._get_output_units()) - if warning_msg == "": - items.append(converted_result) - else: - msg = f"{warning_msg} for {site.id}" - self.warn(msg) - skipped_items.append((site.id, r, u)) - except TypeError: - items.append(r) - except ValueError: - items.append(r) - if len(skipped_items) > 0: - self.warn(f"Skipped results because of formatting: {skipped_items}") + if len(skipped_items) > 0: + self.warn(f"Skipped results because of formatting: {skipped_items}") - # if items is None or empty, no records were found or all results were None - if items is not None and len(items) > 0: - n = len(items) - # self.log(f"{site.id}: Retrieved {self.name}: {n}") + # if items is None or empty, no records were found or all results were None + if kept_items is not None and len(kept_items): + n = len(kept_items) + # self.log(f"{site.id}: Retrieved {self.name}: {n}") - # create the summaries if use_summarize is True, otherwise returned the cleaned and sorted records - if use_summarize: most_recent_result = self._extract_most_recent(cleaned) if not most_recent_result: continue rec = { "nrecords": n, - "min": min(items), - "max": max(items), - "mean": sum(items) / n, + "min": min(kept_items), + "max": max(kept_items), + "mean": sum(kept_items) / n, "most_recent_datetime": most_recent_result["datetime"], "most_recent_value": most_recent_result["value"], "most_recent_units": most_recent_result["units"], @@ -734,15 +717,21 @@ def read( continue else: ret.append(transformed_record) - else: - cleaned_sorted = [ - self.transformer.do_transform( - self._extract_parameter(record), site - ) - for record in cleaned - ] - cleaned_sorted = sorted(cleaned_sorted, key=self._sort_func) - ret.append((site, cleaned_sorted)) + else: + cleaned_sorted = [ + self.transformer.do_transform( + self._extract_parameter(record), site + ) + for record in cleaned + if self.transformer.do_transform( + self._extract_parameter(record), site + ) is not None + ] + if len(cleaned_sorted) == 0: + self.warn(f"{site.id}: No clean records found") + continue + cleaned_sorted = sorted(cleaned_sorted, key=self._sort_func) + ret.append((site, cleaned_sorted)) return ret else: if isinstance(site_record, list): diff --git a/backend/transformer.py b/backend/transformer.py index 596ff08..423b1a3 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -422,9 +422,30 @@ def do_transform( # update the units to the output unit for analyte records # this is done after converting the units to the output unit for the analyte records + # convert the parameter value to the output unit specified in the config elif isinstance(record, (AnalyteRecord)): - #print(self.config.analyte_output_units) - record.update(parameter_units=self.config.analyte_output_units) + r = record.parameter_value + u = record.parameter_units + warning_msg = "" + try: + converted_result, warning_msg = convert_units(float(r), u, self.config.analyte_output_units) + if warning_msg != "": + msg = f"{warning_msg} for {record.id}" + self.warn(msg) + except TypeError: + msg = f"Keeping {r} for {record.id} on {record.date_measured} for time series data" + self.warn(msg) + converted_result = r + except ValueError: + msg = f"Keeping {r} for {record.id} on {record.date_measured} for time series data" + self.warn(msg) + converted_result = r + + if warning_msg == "": + record.update(parameter_value=converted_result) + record.update(parameter_units=self.config.analyte_output_units) + else: + record = None return record From 7c4ce17a3a2881f75e1215cca2bfea397778afe2 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 14:33:26 -0700 Subject: [PATCH 38/55] Write configuration to output.logs.txt --- backend/config.py | 40 +++++++++++++++++++++++++++------------- backend/unifier.py | 4 ++-- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/backend/config.py b/backend/config.py index b49b9f3..1a505b5 100644 --- a/backend/config.py +++ b/backend/config.py @@ -316,21 +316,33 @@ def now_ms(self, days=0): # return current time in milliseconds return int((datetime.now() - td).timestamp() * 1000) - def report(self): + def report(self, log_report: bool = False): def _report_attributes(title, attrs): - click.secho( - f"---- {title} --------------------------------------------------", - fg="yellow", - ) + s = f"---- {title} --------------------------------------------------" + click.secho(s, fg="yellow") + if log_report: + self.logs.append(s) + + for k in attrs: v = getattr(self, k) - click.secho(f"{k}: {v}", fg="yellow") + s = f"{k}: {v}" + click.secho(s, fg="yellow") + + if log_report: + self.logs.append(s) + click.secho("", fg="yellow") - click.secho( - "---- Begin configuration -------------------------------------\n", - fg="yellow", - ) + if log_report: + self.logs.append(s) + + s = "---- Begin configuration -------------------------------------\n" + click.secho(s, fg="yellow") + + if log_report: + self.logs.append(s) + sources = [f"use_source_{s}" for s in SOURCE_KEYS] attrs = [ "start_date", @@ -359,9 +371,11 @@ def _report_attributes(title, attrs): ), ) - click.secho( - "---- End configuration -------------------------------------", fg="yellow" - ) + s = "---- End configuration -------------------------------------\n" + click.secho(s, fg="yellow") + + if log_report: + self.logs.append(s) def validate(self): if not self._validate_bbox(): diff --git a/backend/unifier.py b/backend/unifier.py index d08db78..74f16fc 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -52,7 +52,7 @@ def unify_sites(config): def unify_analytes(config): print("Unifying analytes") - config.report() + config.report(log_report=True) config.validate() if not config.dry: @@ -64,7 +64,7 @@ def unify_analytes(config): def unify_waterlevels(config): print("Unifying waterlevels") - config.report() + config.report(log_report=True) config.validate() if not config.dry: From 8d9036c3620f3b6ecb2427dd01b48e382082c7ec Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 14:49:04 -0700 Subject: [PATCH 39/55] Added missing string from config logs for export --- backend/config.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/config.py b/backend/config.py index 1a505b5..92543f5 100644 --- a/backend/config.py +++ b/backend/config.py @@ -332,7 +332,8 @@ def _report_attributes(title, attrs): if log_report: self.logs.append(s) - click.secho("", fg="yellow") + s = "" + click.secho(s, fg="yellow") if log_report: self.logs.append(s) From 777cfd05f93b79e40b5e3d92626e477eb1ad6c39 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 14:57:43 -0700 Subject: [PATCH 40/55] Format/CICD for push to GitHub for branch dev/jab --- .github/workflows/cicd.yml | 2 +- .github/workflows/format_code.yml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 7668de0..5ea32ac 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -5,7 +5,7 @@ name: CI/CD on: push: - branches: [ "main", "feature/jir"] + branches: [ "main", "feature/jir", "dev/jab"] pull_request: branches: [ "main"] diff --git a/.github/workflows/format_code.yml b/.github/workflows/format_code.yml index bae40ea..7b1c1b4 100644 --- a/.github/workflows/format_code.yml +++ b/.github/workflows/format_code.yml @@ -1,9 +1,9 @@ name: Format code on: pull_request: - branches: [feature/jir, ] + branches: [feature/jir] push: - branches: [feature/jir,] + branches: [feature/jir, dev/jab] jobs: format: runs-on: ubuntu-latest From 4cd71540ebca519f109567661c98402033aed153 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Tue, 5 Nov 2024 21:59:11 +0000 Subject: [PATCH 41/55] Formatting changes --- backend/config.py | 5 ++--- backend/connectors/bor/source.py | 4 +--- backend/connectors/nmbgmr/source.py | 11 +++++------ backend/connectors/st2/source.py | 2 +- backend/connectors/wqp/source.py | 11 +++++++---- backend/geo_utils.py | 1 + backend/source.py | 11 ++++++++--- backend/transformer.py | 22 ++++++++++++++-------- backend/unifier.py | 16 +++++++++------- frontend/cli.py | 1 + tests/test_unifier.py | 25 ++++++++++--------------- 11 files changed, 59 insertions(+), 50 deletions(-) diff --git a/backend/config.py b/backend/config.py index 92543f5..559f6ed 100644 --- a/backend/config.py +++ b/backend/config.py @@ -323,14 +323,13 @@ def _report_attributes(title, attrs): if log_report: self.logs.append(s) - for k in attrs: v = getattr(self, k) s = f"{k}: {v}" click.secho(s, fg="yellow") if log_report: - self.logs.append(s) + self.logs.append(s) s = "" click.secho(s, fg="yellow") @@ -340,7 +339,7 @@ def _report_attributes(title, attrs): s = "---- Begin configuration -------------------------------------\n" click.secho(s, fg="yellow") - + if log_report: self.logs.append(s) diff --git a/backend/connectors/bor/source.py b/backend/connectors/bor/source.py index bc5b108..93f8f23 100644 --- a/backend/connectors/bor/source.py +++ b/backend/connectors/bor/source.py @@ -101,9 +101,7 @@ def _reorder_catalog_items(self, items): def get_records(self, site_record): code = get_analyte_search_param(self.config.analyte, BOR_ANALYTE_MAPPING) - for i, item in enumerate( - self._reorder_catalog_items(site_record.catalogItems) - ): + for i, item in enumerate(self._reorder_catalog_items(site_record.catalogItems)): data = self._execute_json_request(f'https://data.usbr.gov{item["id"]}') if not data: diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index bd5c620..63ceef1 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -57,6 +57,7 @@ def _make_url(endpoint): return f"http://localhost:8000/{endpoint}" return f"https://waterdata.nmt.edu/{endpoint}" + class NMBGMRSiteSource(BaseSiteSource): transformer_klass = NMBGMRSiteTransformer chunk_size = 100 @@ -70,7 +71,7 @@ def health(self): def get_records(self): config = self.config - #params = {"site_type": "Groundwater other than spring (well)"} + # params = {"site_type": "Groundwater other than spring (well)"} params = {} if config.has_bounds(): params["wkt"] = config.bounding_wkt() @@ -90,11 +91,11 @@ def get_records(self): _make_url("locations"), params, tag="features", timeout=30 ) return sites - + # loop through the responses and add well information for each location # this may be slow because of the number of sites that need to be queried - # but it is necessary to get the well information. With further - # development, this could be faster if one can batch the requests + # but it is necessary to get the well information. With further + # development, this could be faster if one can batch the requests # to /wells # for site in sites: # well_info = self._execute_json_request( @@ -105,8 +106,6 @@ def get_records(self): # site["properties"]["formation"] = well_info["formation"] # site["properties"]["well_depth"] = well_info["well_depth_ftbgs"] # site["properties"]["well_depth_units"] = "ft" - - class NMBGMRAnalyteSource(BaseAnalyteSource): diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index 159c0eb..df46261 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -87,7 +87,7 @@ def _extract_parameter_record(self, record): def _extract_parameter_results(self, records): return [r["observation"].result for r in records] - + def _clean_records(self, records: list) -> list: rs = [r for r in records if r["observation"].result is not None] return rs diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index fc343dc..b734908 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -74,7 +74,12 @@ def health(self): def get_records(self): config = self.config - params = {"mimeType": "tsv", "siteType": "Well", "sampleMedia": "Water", "statecode": "US:35"} + params = { + "mimeType": "tsv", + "siteType": "Well", + "sampleMedia": "Water", + "statecode": "US:35", + } if config.has_bounds(): params["bBox"] = ",".join([str(b) for b in config.bbox_bounding_points()]) @@ -103,9 +108,7 @@ def _extract_parameter_record(self, record): def _extract_site_records(self, records, site_record): return [ - ri - for ri in records - if ri["MonitoringLocationIdentifier"] == site_record.id + ri for ri in records if ri["MonitoringLocationIdentifier"] == site_record.id ] def _extract_parameter_results(self, records): diff --git a/backend/geo_utils.py b/backend/geo_utils.py index 16c5a94..930f76d 100644 --- a/backend/geo_utils.py +++ b/backend/geo_utils.py @@ -20,6 +20,7 @@ ALLOWED_DATUMS = ["NAD27", "NAD83", "WGS84"] + def datum_transform(x, y, in_datum, out_datum): """ Transform x, y to a different datum diff --git a/backend/source.py b/backend/source.py index afc3635..e66f476 100644 --- a/backend/source.py +++ b/backend/source.py @@ -677,7 +677,9 @@ def read( for r, u in zip(results, units): try: - converted_result, warning_msg = convert_units(float(r), u, self._get_output_units()) + converted_result, warning_msg = convert_units( + float(r), u, self._get_output_units() + ) if warning_msg == "": kept_items.append(converted_result) else: @@ -690,7 +692,9 @@ def read( skipped_items.append((site.id, r, u)) if len(skipped_items) > 0: - self.warn(f"Skipped results because of formatting: {skipped_items}") + self.warn( + f"Skipped results because of formatting: {skipped_items}" + ) # if items is None or empty, no records were found or all results were None if kept_items is not None and len(kept_items): @@ -725,7 +729,8 @@ def read( for record in cleaned if self.transformer.do_transform( self._extract_parameter(record), site - ) is not None + ) + is not None ] if len(cleaned_sorted) == 0: self.warn(f"{site.id}: No clean records found") diff --git a/backend/transformer.py b/backend/transformer.py index 423b1a3..c7d2444 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -28,7 +28,7 @@ TONS_PER_ACRE_FOOT, MICROGRAMS_PER_LITER, DT_MEASURED, - PARAMETER_UNITS + PARAMETER_UNITS, ) from backend.geo_utils import datum_transform, ALLOWED_DATUMS from backend.record import ( @@ -348,11 +348,13 @@ def do_transform( return # ensure that a site or summary record is contained within the boundaing polygon - if 'longitude' in record and 'latitude' in record: + if "longitude" in record and "latitude" in record: if not self.contained(record["longitude"], record["latitude"]): - self.warn(f"Skipping site {record['id']}. It is not within the defined geographic bounds") + self.warn( + f"Skipping site {record['id']}. It is not within the defined geographic bounds" + ) return - + self._post_transform(record, *args, **kw) # standardize datetime @@ -383,7 +385,9 @@ def do_transform( input_horizontal_datum = record.horizontal_datum if input_horizontal_datum not in ALLOWED_DATUMS: - self.warn(f"Skipping site {record.id}. Datum {input_horizontal_datum} cannot be processed") + self.warn( + f"Skipping site {record.id}. Datum {input_horizontal_datum} cannot be processed" + ) return None output_elevation_units = "" @@ -428,7 +432,9 @@ def do_transform( u = record.parameter_units warning_msg = "" try: - converted_result, warning_msg = convert_units(float(r), u, self.config.analyte_output_units) + converted_result, warning_msg = convert_units( + float(r), u, self.config.analyte_output_units + ) if warning_msg != "": msg = f"{warning_msg} for {record.id}" self.warn(msg) @@ -440,7 +446,7 @@ def do_transform( msg = f"Keeping {r} for {record.id} on {record.date_measured} for time series data" self.warn(msg) converted_result = r - + if warning_msg == "": record.update(parameter_value=converted_result) record.update(parameter_units=self.config.analyte_output_units) @@ -482,7 +488,7 @@ def contained( return poly.contains(pt) return True - + def warn(self, msg): """ Prints warning messages to the console in red diff --git a/backend/unifier.py b/backend/unifier.py index 74f16fc..7e2795d 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -120,8 +120,8 @@ def _site_wrapper(site_source, parameter_source, persister, config): # if not schemas: # print(f"No schemas found for {site_source}") - # in the future make discover required - # return + # in the future make discover required + # return use_summarize = config.output_summary site_limit = config.site_limit @@ -152,11 +152,15 @@ def _site_wrapper(site_source, parameter_source, persister, config): end_ind += len(sites) if use_summarize: - summary_records = parameter_source.read(sites, use_summarize, start_ind, end_ind) + summary_records = parameter_source.read( + sites, use_summarize, start_ind, end_ind + ) if summary_records: persister.records.extend(summary_records) else: - results = parameter_source.read(sites, use_summarize, start_ind, end_ind) + results = parameter_source.read( + sites, use_summarize, start_ind, end_ind + ) # no records are returned if there is no site record for parameter # or if the record isn't clean (doesn't have the correct fields) # don't count these sites to apply to site_limit @@ -176,7 +180,6 @@ def _site_wrapper(site_source, parameter_source, persister, config): persister.timeseries.append((site, records)) persister.sites.append(site) sites_with_records_count += 1 - except BaseException: import traceback @@ -184,7 +187,7 @@ def _site_wrapper(site_source, parameter_source, persister, config): exc = traceback.format_exc() print(exc) print(f"Failed to unify {site_source}") - + config.logs.append(exc) config.logs.append(f"Failed to unify {site_source}") @@ -208,7 +211,6 @@ def _unify_parameter( else: persister.dump_combined(f"{config.output_path}.combined") persister.dump_timeseries(f"{config.output_path}_timeseries") - persister.logs.extend(config.logs) persister.warnings.extend(config.warnings) diff --git a/frontend/cli.py b/frontend/cli.py index 7e7efff..12a3cb2 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -150,6 +150,7 @@ def cli(): ), ] + def add_options(options): def _add_options(func): for option in reversed(options): diff --git a/tests/test_unifier.py b/tests/test_unifier.py index 38e9100..3947ef6 100644 --- a/tests/test_unifier.py +++ b/tests/test_unifier.py @@ -131,11 +131,8 @@ def _test_waterlevels_timeseries( def _test_waterelevels_timeseries_date_range( - tmp_path, - cfg, - source, - timeseries_flag=True, - combined_flag=False): + tmp_path, cfg, source, timeseries_flag=True, combined_flag=False +): combined, timeseries = _test_waterlevels_timeseries( tmp_path, cfg, @@ -363,7 +360,7 @@ def test_unify_waterlevels_nwis_timeseries(tmp_path, waterlevel_timeseries_cfg): tmp_path, waterlevel_timeseries_cfg, "nwis", - combined_flag=True, + combined_flag=True, timeseries_flag=True, ) @@ -379,7 +376,7 @@ def test_unify_waterlevels_pvacd_timeseries(tmp_path, waterlevel_timeseries_cfg) tmp_path, waterlevel_timeseries_cfg, "pvacd", - combined_flag=False, + combined_flag=False, timeseries_flag=True, ) @@ -411,9 +408,7 @@ def test_waterlevels_nwis_summary_date_range(tmp_path, waterlevel_summary_cfg): # Waterlevel timeseries date range ==================================================================================== -def test_waterlevels_nwis_timeseries_date_range( - tmp_path, - waterlevel_timeseries_cfg): +def test_waterlevels_nwis_timeseries_date_range(tmp_path, waterlevel_timeseries_cfg): # there are one or more locations within the bounding box and date range # that have only one record, so there is a combined file _test_waterelevels_timeseries_date_range( @@ -421,33 +416,33 @@ def test_waterlevels_nwis_timeseries_date_range( waterlevel_timeseries_cfg, "nwis", timeseries_flag=True, - combined_flag=True + combined_flag=True, ) def test_waterlevels_isc_seven_rivers_timeseries_date_range( tmp_path, waterlevel_timeseries_cfg ): - # all locations within the bounding box and date rangehave more than one + # all locations within the bounding box and date rangehave more than one # record so there is no combined file _test_waterelevels_timeseries_date_range( tmp_path, waterlevel_timeseries_cfg, "iscsevenrivers", timeseries_flag=True, - combined_flag=False + combined_flag=False, ) def test_waterlevels_pvacd_timeseries_date_range(tmp_path, waterlevel_timeseries_cfg): - # all locations within the bounding box and date rangehave more than one + # all locations within the bounding box and date rangehave more than one # record so there is no combined file _test_waterelevels_timeseries_date_range( tmp_path, waterlevel_timeseries_cfg, "pvacd", timeseries_flag=True, - combined_flag=False + combined_flag=False, ) From b7d4a0c2666a51ce52d969231ad6984b093054fc Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 16:27:57 -0700 Subject: [PATCH 42/55] Removed redundant print message --- backend/unifier.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/unifier.py b/backend/unifier.py index 74f16fc..d8d00e6 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -129,10 +129,6 @@ def _site_wrapper(site_source, parameter_source, persister, config): sites = site_source.read() if not sites: - not_sites_msg = f"No sites found for {site_source.tag}" - print(not_sites_msg) - config.logs.append(not_sites_msg) - config.warnings.append(not_sites_msg) return sites_with_records_count = 0 From bbc06bf8309ce8874c7f1fab188e0d3e7f42ed64 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 17:28:54 -0700 Subject: [PATCH 43/55] Clarify warning that no records were found for a site --- backend/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/source.py b/backend/source.py index afc3635..6c2f2bb 100644 --- a/backend/source.py +++ b/backend/source.py @@ -657,7 +657,7 @@ def read( for site in site_record: site_records = self._extract_site_records(all_analyte_records, site) if not site_records: - self.warn(f"{site.id}: No site records found") + self.warn(f"{site.id}: No records found") continue # get cleaned records if _clean_records is defined by the source From b0b9e204936366bc0690382ee75da3837e5ec09b Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 17:29:36 -0700 Subject: [PATCH 44/55] Add Silica analyte --- backend/constants.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/constants.py b/backend/constants.py index 0ccd1ec..2064a87 100644 --- a/backend/constants.py +++ b/backend/constants.py @@ -24,6 +24,7 @@ MAGNESIUM = "Magnesium" NITRATE = "Nitrate" POTASSIUM = "Potassium" +SILICA = "Silica" SODIUM = "Sodium" SULFATE = "Sulfate" URANIUM = "Uranium" @@ -60,6 +61,7 @@ MAGNESIUM, NITRATE, POTASSIUM, + SILICA, SODIUM, SULFATE, TDS, From 01990ff00fe3e859c05bec1cf23c4a284d0ca961 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Tue, 5 Nov 2024 17:29:59 -0700 Subject: [PATCH 45/55] Silica mapping for all but dwb DWB is still in progress. This commit is to push to GitHub to preserve the work done so far. --- backend/connectors/mappings.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/connectors/mappings.py b/backend/connectors/mappings.py index 5231476..38fc1ce 100644 --- a/backend/connectors/mappings.py +++ b/backend/connectors/mappings.py @@ -22,6 +22,7 @@ ARSENIC, NITRATE, CALCIUM, + SILICA, SODIUM, POTASSIUM, MAGNESIUM, @@ -41,6 +42,7 @@ MAGNESIUM: None, NITRATE: 35, POTASSIUM: None, + SILICA: None, SODIUM: None, SULFATE: 41, TDS: 90, @@ -95,6 +97,7 @@ MAGNESIUM: "Magnesium", NITRATE: "Nitrate", POTASSIUM: "Potassium", + SILICA: "SiO2", SODIUM: "Sodium", SULFATE: "Sulfate", TDS: "TDS calc", @@ -137,6 +140,7 @@ MAGNESIUM: "Magnesium", NITRATE: "Nitrate (as N)", POTASSIUM: "Potassium", + SILICA: "Silica", SODIUM: "Sodium", SULFATE: "Sulfate", TDS: "Total Dissolved Solids", @@ -155,6 +159,7 @@ MAGNESIUM: ["Magnesium"], NITRATE: ["Nitrate", "Nitrate-N", "Nitrate as N"], POTASSIUM: ["Potassium"], + SILICA: ["Silica"], SODIUM: ["Sodium"], SULFATE: [ "Sulfate", @@ -232,6 +237,7 @@ MAGNESIUM: "Mg", NITRATE: "NO3", POTASSIUM: "K", + SILICA: "SiO2", SODIUM: "Na", SULFATE: "SO4", TDS: "TDS", From a250b91685f045bdd29731e07bdf618decb5ad7f Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Thu, 7 Nov 2024 11:43:36 -0700 Subject: [PATCH 46/55] Conversion factor=1 for CaCO3 if unit is mg/L as CaCO3, else warn user --- backend/source.py | 2 +- backend/transformer.py | 21 ++++++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/backend/source.py b/backend/source.py index e78a3a6..9a60ded 100644 --- a/backend/source.py +++ b/backend/source.py @@ -678,7 +678,7 @@ def read( for r, u in zip(results, units): try: converted_result, warning_msg = convert_units( - float(r), u, self._get_output_units() + float(r), u, self._get_output_units(), self.config.analyte ) if warning_msg == "": kept_items.append(converted_result) diff --git a/backend/transformer.py b/backend/transformer.py index c7d2444..7fd5de1 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -119,7 +119,7 @@ def transform_length_units( def convert_units( - input_value: int | float | str, input_units: str, output_units: str + input_value: int | float | str, input_units: str, output_units: str, analyte: str ) -> float: """ Converts the following units for any parameter value: @@ -146,6 +146,9 @@ def convert_units( output_units: str The output unit of the value + analyte: str + The analyte to convert + Returns -------- float @@ -165,8 +168,16 @@ def convert_units( # edge cases for Bicarbonate # BOR, WQP - if input_units in ["mg/l caco3", "mg/l caco3**"] and output_units == mgl: + if input_units in ["mg/l caco3", "mg/l caco3**"] and output_units == mgl and analyte == "Bicarbonate": + """ + mg/L as CaCO3 = mg/L * equivalent mass of CaCO3/equivalent mass substance + + So, when the substance is CaCO3, the ratio is 1 + """ conversion_factor = 1 + elif input_units in ["mg/l caco3", "mg/l caco3**"] and output_units == mgl and analyte != "Bicarbonate": + # this will catch if the input units are mg/l caco3 and the analyte is not bicarbonate so that the developer can perform the appropriate calculations for the conversion factor + conversion_factor = None if input_units == output_units: conversion_factor = 1 @@ -201,7 +212,7 @@ def convert_units( if conversion_factor: return input_value * conversion_factor, warning else: - warning = f"Failed to convert {input_value} {input_units} to {output_units}" + warning = f"Failed to convert {input_value} {input_units} to {output_units} for {analyte}" return input_value, warning @@ -433,7 +444,7 @@ def do_transform( warning_msg = "" try: converted_result, warning_msg = convert_units( - float(r), u, self.config.analyte_output_units + float(r), u, self.config.analyte_output_units, self.config.analyte ) if warning_msg != "": msg = f"{warning_msg} for {record.id}" @@ -672,7 +683,7 @@ def _transform_most_recents(self, record): record["most_recent_time"] = tt p, u = self._get_parameter() record["most_recent_value"] = convert_units( - record["most_recent_value"], record["most_recent_units"], u + record["most_recent_value"], record["most_recent_units"], u, self.config.analyte ) record["most_recent_units"] = u From 7933b35e587ca854a08ca2509bf833d83add75e2 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Thu, 7 Nov 2024 11:44:17 -0700 Subject: [PATCH 47/55] DWB analyte mappings --- backend/connectors/mappings.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/backend/connectors/mappings.py b/backend/connectors/mappings.py index 38fc1ce..168f665 100644 --- a/backend/connectors/mappings.py +++ b/backend/connectors/mappings.py @@ -32,23 +32,24 @@ ) # DWB =============================================================================== +# the mapping below is the corresponding "@iot.id" for the ObservedProperties DWB_ANALYTE_MAPPING: dict = { ARSENIC: 3, - BICARBONATE: None, - CALCIUM: None, - CARBONATE: None, + BICARBONATE: 22, # BICARBONATE AS HCO3 + CALCIUM: 11, + CARBONATE: None, CHLORIDE: 15, FLUORIDE: 19, - MAGNESIUM: None, + MAGNESIUM: 23, NITRATE: 35, - POTASSIUM: None, - SILICA: None, - SODIUM: None, + POTASSIUM: 33, + SILICA: 37, + SODIUM: 38, SULFATE: 41, TDS: 90, # "Uranium-238": 386, URANIUM: 385, # "Combined Uranium" - PH: None, + PH: 81, } # ISC Seven Rivers =============================================================================== """ @@ -176,8 +177,8 @@ """ Temp DO -ALK HCO3 -ALK CO3 +ALK HCO3 <-- this is a measure of alkalinity, and not necessarily the same as Bicarbonate +ALK CO3 <-- this is a measure of alkalinity, and not necessarily the same as Carbonate ALK OH ALK P ALK @@ -229,9 +230,9 @@ """ BOR_ANALYTE_MAPPING: dict = { ARSENIC: "As", - BICARBONATE: "ALK HCO3", + BICARBONATE: None, CALCIUM: "Ca", - CARBONATE: "ALK CO3", + CARBONATE: None, CHLORIDE: "Cl", FLUORIDE: "F", MAGNESIUM: "Mg", From 7015139968e3c51bf914108f8a0e3ccc2a3879ab Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Thu, 7 Nov 2024 19:10:45 +0000 Subject: [PATCH 48/55] Formatting changes --- backend/connectors/mappings.py | 4 ++-- backend/source.py | 5 ++++- backend/transformer.py | 19 +++++++++++++++---- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/backend/connectors/mappings.py b/backend/connectors/mappings.py index 168f665..2fe4267 100644 --- a/backend/connectors/mappings.py +++ b/backend/connectors/mappings.py @@ -35,9 +35,9 @@ # the mapping below is the corresponding "@iot.id" for the ObservedProperties DWB_ANALYTE_MAPPING: dict = { ARSENIC: 3, - BICARBONATE: 22, # BICARBONATE AS HCO3 + BICARBONATE: 22, # BICARBONATE AS HCO3 CALCIUM: 11, - CARBONATE: None, + CARBONATE: None, CHLORIDE: 15, FLUORIDE: 19, MAGNESIUM: 23, diff --git a/backend/source.py b/backend/source.py index 9a60ded..ffeead9 100644 --- a/backend/source.py +++ b/backend/source.py @@ -678,7 +678,10 @@ def read( for r, u in zip(results, units): try: converted_result, warning_msg = convert_units( - float(r), u, self._get_output_units(), self.config.analyte + float(r), + u, + self._get_output_units(), + self.config.analyte, ) if warning_msg == "": kept_items.append(converted_result) diff --git a/backend/transformer.py b/backend/transformer.py index 7fd5de1..61ccf54 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -168,14 +168,22 @@ def convert_units( # edge cases for Bicarbonate # BOR, WQP - if input_units in ["mg/l caco3", "mg/l caco3**"] and output_units == mgl and analyte == "Bicarbonate": + if ( + input_units in ["mg/l caco3", "mg/l caco3**"] + and output_units == mgl + and analyte == "Bicarbonate" + ): """ mg/L as CaCO3 = mg/L * equivalent mass of CaCO3/equivalent mass substance - + So, when the substance is CaCO3, the ratio is 1 """ conversion_factor = 1 - elif input_units in ["mg/l caco3", "mg/l caco3**"] and output_units == mgl and analyte != "Bicarbonate": + elif ( + input_units in ["mg/l caco3", "mg/l caco3**"] + and output_units == mgl + and analyte != "Bicarbonate" + ): # this will catch if the input units are mg/l caco3 and the analyte is not bicarbonate so that the developer can perform the appropriate calculations for the conversion factor conversion_factor = None @@ -683,7 +691,10 @@ def _transform_most_recents(self, record): record["most_recent_time"] = tt p, u = self._get_parameter() record["most_recent_value"] = convert_units( - record["most_recent_value"], record["most_recent_units"], u, self.config.analyte + record["most_recent_value"], + record["most_recent_units"], + u, + self.config.analyte, ) record["most_recent_units"] = u From 3c00090ead3fdf45ee4135502fae23e0bf54da7d Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 11 Nov 2024 08:56:40 -0800 Subject: [PATCH 49/55] Update conversion factor for HCO3- when unit are mg/L as CaCO3 --- backend/transformer.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/backend/transformer.py b/backend/transformer.py index 61ccf54..06b60e6 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -174,11 +174,9 @@ def convert_units( and analyte == "Bicarbonate" ): """ - mg/L as CaCO3 = mg/L * equivalent mass of CaCO3/equivalent mass substance - - So, when the substance is CaCO3, the ratio is 1 + https://aqua-chem.com/water-chemistry-caco3-equivalents/ """ - conversion_factor = 1 + conversion_factor = 1.22 elif ( input_units in ["mg/l caco3", "mg/l caco3**"] and output_units == mgl From 8e3cfd8446f8fe5ea1ea0d210fa1ce2ac79c7bbb Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 11 Nov 2024 12:03:17 -0800 Subject: [PATCH 50/55] Clarified docstrings | Simplified gathering log --- backend/source.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/source.py b/backend/source.py index ffeead9..4278f20 100644 --- a/backend/source.py +++ b/backend/source.py @@ -642,7 +642,7 @@ def read( ) else: self.log( - f"{site_record.id} ({site_record.id}): Gathering {self.name} summary" + f"{site_record.id}: Gathering {self.name} summary" ) all_analyte_records = self.get_records(site_record) @@ -660,7 +660,7 @@ def read( self.warn(f"{site.id}: No records found") continue - # get cleaned records if _clean_records is defined by the source + # get cleaned records if _clean_records is defined by the source. This usually removes Nones/Null cleaned = self._clean_records(site_records) if not cleaned: self.warn(f"{site.id} No clean records found") @@ -877,7 +877,7 @@ def _extract_parameter_units(self, records: list) -> list: def _extract_parameter_record(self, record: dict) -> dict: """ - Returns a parameter record with standardized fields added. + Returns a parameter record with standardized fields added. This is only used for time series, not summary outputs For an analyte, the fields are - backend.constants.PARAMETER @@ -905,7 +905,7 @@ def _extract_parameter_record(self, record: dict) -> dict: def _extract_parameter_results(self, records: list) -> list: """ - Returns the parameter results as a list from the records, in the same order as the records themselves + Returns the parameter results as a list from the records, in the same order as the records themselves. This is only used for summary outputs, not time serie Parameters ---------- @@ -923,7 +923,7 @@ def _extract_parameter_results(self, records: list) -> list: def _extract_parameter(self, record: dict) -> dict: """ - Extracts a parameter record from a list of records + Extracts a parameter record from a list of records. This is only used for time series, not summary outputs Parameters ---------- From 4fe499c4d9c344ef75905f0c964ad789c9280c22 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 11 Nov 2024 15:33:06 -0800 Subject: [PATCH 51/55] Make log message more informative --- backend/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/source.py b/backend/source.py index 4278f20..490c968 100644 --- a/backend/source.py +++ b/backend/source.py @@ -642,7 +642,7 @@ def read( ) else: self.log( - f"{site_record.id}: Gathering {self.name} summary" + f"{site_record.id}: Gathering {self.name} data" ) all_analyte_records = self.get_records(site_record) From fbe2595a23d4a44e9e3c1e11fb041d4859583943 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 11 Nov 2024 15:33:34 -0800 Subject: [PATCH 52/55] Update NMBGMR URLs for new API --- backend/connectors/nmbgmr/source.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 63ceef1..48305e7 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -49,13 +49,10 @@ ) -def _make_url(endpoint): - # if os.getenv("DEBUG") == "1": - # return f"http://localhost:8000/latest/{endpoint}" - # return f"https://waterdata.nmt.edu/latest/{endpoint}" +def _make_url(endpoint): if os.getenv("DEBUG") == "1": - return f"http://localhost:8000/{endpoint}" - return f"https://waterdata.nmt.edu/{endpoint}" + return f"http://localhost:8000/latest/{endpoint}" + return f"https://waterdata.nmt.edu/latest/{endpoint}" class NMBGMRSiteSource(BaseSiteSource): @@ -71,13 +68,12 @@ def health(self): def get_records(self): config = self.config - # params = {"site_type": "Groundwater other than spring (well)"} - params = {} + params = {"site_type": "Groundwater other than spring (well)", "expand": False} if config.has_bounds(): params["wkt"] = config.bounding_wkt() if config.site_limit: - params["limit"] = config.site_limit + params["limit"] = config.site_limit if config.analyte: params["parameter"] = get_analyte_search_param( From 41e7dc030a11557026f98e34b84dd8e897102b5b Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Mon, 11 Nov 2024 15:33:58 -0800 Subject: [PATCH 53/55] Handle DWB non-detects --- backend/connectors/nmenv/source.py | 31 ++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/backend/connectors/nmenv/source.py b/backend/connectors/nmenv/source.py index da6f684..d4ae0de 100644 --- a/backend/connectors/nmenv/source.py +++ b/backend/connectors/nmenv/source.py @@ -21,7 +21,7 @@ ) from backend.connectors.st_connector import STSiteSource, STAnalyteSource from backend.constants import PARAMETER, PARAMETER_UNITS, DT_MEASURED, PARAMETER_VALUE -from backend.source import get_analyte_search_param +from backend.source import get_analyte_search_param, get_most_recent URL = "https://nmenv.newmexicowaterdata.org/FROST-Server/v1.1/" @@ -64,8 +64,16 @@ class DWBAnalyteSource(STAnalyteSource): url = URL transformer_klass = DWBAnalyteTransformer - def _parse_result(self, result): - return float(result.split(" ")[0]) + def _parse_result(self, result, result_dt=None, result_id=None, result_location=None): + if "< mrl" in result.lower(): + if self.config.output_summary: + self.warn(f"Non-detect found: {result} for {result_location} on {result_dt} (observation {result_id}). Setting to 0 for summary.") + return 0.0 + else: + # return the results for timeseries, regardless of format (None/Null/non-detect) + return result + else: + return float(result.split(" ")[0]) def get_records(self, site, *args, **kw): service = self.get_service() @@ -92,16 +100,31 @@ def get_records(self, site, *args, **kw): return rs def _extract_parameter_record(self, record): + # this is only used for time series record[PARAMETER_VALUE] = self._parse_result(record["observation"].result) record[PARAMETER_UNITS] = record["datastream"].unit_of_measurement.symbol record[DT_MEASURED] = record["observation"].phenomenon_time return record def _extract_parameter_results(self, records): - return [self._parse_result(r["observation"].result) for r in records] + # this is only used in summary output + return [self._parse_result(r["observation"].result, r["observation"].phenomenon_time, r["observation"].id, r["location"].id) for r in records] def _extract_parameter_units(self, records): + # this is only used in summary output return [r["datastream"].unit_of_measurement.symbol for r in records] + def _extract_most_recent(self, records): + # this is only used in summary output + record = get_most_recent( + records, tag=lambda x: x["observation"].phenomenon_time + ) + + return { + "value": self._parse_result(record["observation"].result, record["observation"].phenomenon_time, record["observation"].id, record["location"].id), + "datetime": record["observation"].phenomenon_time, + "units": record["datastream"].unit_of_measurement.symbol, + } + # ============= EOF ============================================= From 082e58301cb7e110562ec9ba5db698aed08702f2 Mon Sep 17 00:00:00 2001 From: jacob-a-brown Date: Mon, 11 Nov 2024 23:34:38 +0000 Subject: [PATCH 54/55] Formatting changes --- backend/connectors/nmbgmr/source.py | 4 ++-- backend/connectors/nmenv/source.py | 25 +++++++++++++++++++++---- backend/source.py | 4 +--- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 48305e7..3f2463e 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -49,7 +49,7 @@ ) -def _make_url(endpoint): +def _make_url(endpoint): if os.getenv("DEBUG") == "1": return f"http://localhost:8000/latest/{endpoint}" return f"https://waterdata.nmt.edu/latest/{endpoint}" @@ -73,7 +73,7 @@ def get_records(self): params["wkt"] = config.bounding_wkt() if config.site_limit: - params["limit"] = config.site_limit + params["limit"] = config.site_limit if config.analyte: params["parameter"] = get_analyte_search_param( diff --git a/backend/connectors/nmenv/source.py b/backend/connectors/nmenv/source.py index d4ae0de..37e4b7a 100644 --- a/backend/connectors/nmenv/source.py +++ b/backend/connectors/nmenv/source.py @@ -64,10 +64,14 @@ class DWBAnalyteSource(STAnalyteSource): url = URL transformer_klass = DWBAnalyteTransformer - def _parse_result(self, result, result_dt=None, result_id=None, result_location=None): + def _parse_result( + self, result, result_dt=None, result_id=None, result_location=None + ): if "< mrl" in result.lower(): if self.config.output_summary: - self.warn(f"Non-detect found: {result} for {result_location} on {result_dt} (observation {result_id}). Setting to 0 for summary.") + self.warn( + f"Non-detect found: {result} for {result_location} on {result_dt} (observation {result_id}). Setting to 0 for summary." + ) return 0.0 else: # return the results for timeseries, regardless of format (None/Null/non-detect) @@ -108,7 +112,15 @@ def _extract_parameter_record(self, record): def _extract_parameter_results(self, records): # this is only used in summary output - return [self._parse_result(r["observation"].result, r["observation"].phenomenon_time, r["observation"].id, r["location"].id) for r in records] + return [ + self._parse_result( + r["observation"].result, + r["observation"].phenomenon_time, + r["observation"].id, + r["location"].id, + ) + for r in records + ] def _extract_parameter_units(self, records): # this is only used in summary output @@ -121,7 +133,12 @@ def _extract_most_recent(self, records): ) return { - "value": self._parse_result(record["observation"].result, record["observation"].phenomenon_time, record["observation"].id, record["location"].id), + "value": self._parse_result( + record["observation"].result, + record["observation"].phenomenon_time, + record["observation"].id, + record["location"].id, + ), "datetime": record["observation"].phenomenon_time, "units": record["datastream"].unit_of_measurement.symbol, } diff --git a/backend/source.py b/backend/source.py index 490c968..0d56ef9 100644 --- a/backend/source.py +++ b/backend/source.py @@ -641,9 +641,7 @@ def read( f"Gathering {self.name} summary for {len(site_record)} sites. {start_ind}-{end_ind}" ) else: - self.log( - f"{site_record.id}: Gathering {self.name} data" - ) + self.log(f"{site_record.id}: Gathering {self.name} data") all_analyte_records = self.get_records(site_record) if all_analyte_records: From 0dc1bde5dc847270996d0ba27d8f5c6349b19941 Mon Sep 17 00:00:00 2001 From: Jacob Brown Date: Thu, 14 Nov 2024 09:17:33 -0800 Subject: [PATCH 55/55] Added silica to list of available analytes --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index e63c62d..34923d9 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ Available analytes: - Nitrate - pH - Potassium +- Silica - Sodium - Sulfate - TDS