diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 7668de0..5ea32ac 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -5,7 +5,7 @@ name: CI/CD on: push: - branches: [ "main", "feature/jir"] + branches: [ "main", "feature/jir", "dev/jab"] pull_request: branches: [ "main"] diff --git a/.github/workflows/format_code.yml b/.github/workflows/format_code.yml index bae40ea..7b1c1b4 100644 --- a/.github/workflows/format_code.yml +++ b/.github/workflows/format_code.yml @@ -1,9 +1,9 @@ name: Format code on: pull_request: - branches: [feature/jir, ] + branches: [feature/jir] push: - branches: [feature/jir,] + branches: [feature/jir, dev/jab] jobs: format: runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index f6855af..5c03936 100644 --- a/.gitignore +++ b/.gitignore @@ -169,3 +169,12 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# outputs +output_timeseries +output.combined.csv +output.csv +output.sites.csv +output.timeseries.csv +output.logs.txt +output.warnings.txt \ No newline at end of file diff --git a/README.md b/README.md index 89f8d4c..ea3a50f 100644 --- a/README.md +++ b/README.md @@ -5,19 +5,25 @@ ![NMWDI](https://newmexicowaterdata.org/wp-content/uploads/2023/11/newmexicowaterdatalogoNov2023.png) -![NMBGMR](https://waterdata.nmt.edu/static/nmbgmr_logo_resized.png) +![NMBGMR](https://waterdata.nmt.edu/latest/static/nmbgmr_logo_resized.png) This package provides a command line interface to New Mexico Water Data Initiaive's Data Integration Engine. This tool is used to integrate the water data from multiple sources. +## Installation +```bash +pip install nmuwd +``` ## Sources +Data comes from the following sources. We are continuously adding new sources as we learn of them and they become available. If you have data that you would like to be part of the Data Integration Engine please get in touch at newmexicowaterdata@nmt.edu. + - [Bureau of Reclamation](https://data.usbr.gov/) - [USGS (NWIS)](https://waterdata.usgs.gov/nwis) - [ST2 (NMWDI)](https://st2.newmexicowaterdata.org/FROST-Server/v1.1/) - Pecos Valley Artesian Conservancy District - - Elephant Butte Irrigation District - Bernalillo County + - New Mexico Environment Department Drinking Water Bureau - [NM Water Data CKAN catalog](https://catalog.newmexicowaterdata.org/) - OSE Roswell District Office - ISC Seven Rivers @@ -27,62 +33,53 @@ This package provides a command line interface to New Mexico Water Data Initiaiv - EPA - and over 400 state, federal, tribal, and local agencies -## Installation -```bash -pip install nmuwd -``` - -## Usage -### Water Levels +### Source Inclusion & Exclusion +The Data Integration Engine enables the user to obtain groundwater level and groundwater quality data from a variety of sources. Data from sources are included in the output unless specifically excluded. The following flags are available to exclude a specific data source: -Get water levels for a county. Return a summary csv -```bash -weave waterlevels --county eddy -``` -Get water levels for a bounding box. Return a summary csv -```bash -weave waterlevels --bbox -106.5 32.5 -106.0 33.0 -``` +- `--no-amp` to exclude New Mexico Bureau of Geology and Mineral Resources Aquifer Mapping Program (AMP) data +- `--no-bor` to exclude Bureaof of Reclamation data +- `--no-nwis` to exclude USGS NWIS data +- `--no-pvacd` to exclude Pecos Valley Artesian Convservancy District (PVACD) data +- `--no-isc-seven-rivers` to exclude Interstate Stream Commission (ISC) Seven Rivers data +- `--no-wqp` to exclude Water Quality Portal (WQP) data +- `--no-ckan` to exclude NM OSE Roswell data that is hosted on CKAN +- `--no-dwb` to exclude New Mexico Environment Department Drinking Water Bureau (DWB) data +- `--no-bernco` to exclude Bernalillo County (BernCo) data +### Water Levels -Get water levels for a county. Return timeseries of water levels for each site -```bash -weave waterlevels --county eddy --timeseries -``` +To obtain groundwater levels, use -Exclude a specific data source -```bash -weave waterlevels --county eddy --no-amp ``` - -Exclude multiple data sources -```bash -weave waterlevels --county eddy --no-amp --no-nwis +weave waterlevels ``` -Available data source flags: - - --no-amp - - --no-bor - - --no-ckan - - --no-dwb - - --no-isc-seven-rivers - - --no-nwis - - --no-pvacd - - --no-wqp - - --no-bernco +followed by the desired output type, source filters, date filters, geographic filters, and excluded data sources. +#### Available Data Sources +The following data sources are available for groundwater levels: +- amp +- bor +- ckan +- dwb +- isc-seven-rivers +- nwis +- pvacd +- bernco ### Water Quality -```bash -weave analytes TDS --county eddy +To obtain groundwater quality, use + ``` -```bash -weave analytes TDS --county eddy --no-bor +weave analytes {analyte} ``` -Available analytes: +where `{analyte}` is the name of the analyte whose data is to be retrieved. + +#### Available Analytes +The following analytes are currently available for retrieval: - Arsenic - Bicarbonate - Calcium @@ -92,7 +89,159 @@ Available analytes: - Nitrate - pH - Potassium +- Silica - Sodium - Sulfate - TDS -- Uranium \ No newline at end of file +- Uranium + +#### Available Data Sources +The follow data sources are available for analytes, though not every source has measurements for every analyte: +- bor +- wqp +- isc-seven-rivers +- amp +- dwb + +### Geographic Filters + +The following flags can be used to geographically filter data: + +``` +-- county {county name} +``` + +``` +-- bbox 'x1 y1, x2 y2' +``` + +### Date Filters + +The following flags can be used to filter by dates: + +``` +--start-date YYYY-MM-DD +``` + +``` +--end-date YYYY-MM-DD +``` + +## Output +The data is saved to the current working directory. A log of the inputs and processes, called `die.log`, is also saved to the current working directory. If a subsquent process is run and the log from the previous process has not been moved or stored elsewhere, the log for the subsequent process will be appended to the existing log. + +### Timeseries Data +The flag `--separated_timeseries` exports timeseries for every location in their own file in the directory output_series (e.g. `AB-0002.csv`, `AB-0003.csv`). Locations with only one observation are gathered and exported to the file `output.combined.csv`. + +The flag `--unified_timeseries` exports all timeseries for all locations in one file titled `output.timeseries.csv`. It also exports a file titled `output.sites.csv` that contains site information, such as latitude, longitude, and elevation. + +#### Table Headers: Unified + +The table headers for unified timeseries data are as follows: + +**output.sites.csv** +- `source`: the organization/source for the site +- `id`: the id of the site. The id is used as the key to join the output.timeseries.csv table +- `name`: the colloquial name for the site if it exists +- `latitude`: latitude in decimal degrees +- `longitude`: the longitude in decimal degrees +- `elevation` ground surface elevation of the site in feet +- `elevation_units`: the units of the ground surface elevation. Defaults to ft +- `horizontal_datum`: horizontal datum of the latitude and longitude. Defaults to WGS84 +- `vertical_datum`: the vertical datum of the elevation +- `usgs_site_id`: USGS site id if it exists +- `alternate_site_id`: alternate site id if it exists +- `formation`: geologic formation in which the well terminates if it exists +- `aquifer`: aquifer from which the well draws water if it exists +- `well_depth`: depth of well if it exists + +**output.timeseries.csv - waterlevels** +- `source`: the organization/sources for the site +- `id`: the id of the site. The id is used as the key to join the output.sites.csv table +- `depth_to_water_ft_below_ground_surface`: depth to water below ground surface in ft +- `date_measured`: date of measurement in YYYY-MM-DD format +- `time_measured`: time of measurement if it exists + +**output.timeseries.csv - analytes** +- `source`: the organization/sources for the site +- `id`: the id of the site. The id is used as the key to join the output.sites.csv table +- `parameter`: the name of the analyte whose measurements are reported in the table. This corresponds the requested analyte +- `parameter_value`: value of the measurement +- `parameter_units`: units of the measurement +- `date_measured`: date of measurement in YYYY-MM-DD format +- `time_measured`: time of measurement if it exists + +#### Table Headers: Separated + +The files for the individual sites contain the same headers as **output.timeseries.csv** from the unified time series tables. + +**output.combined.csv - waterlevels** +- `source`: the organization/source for the site +- `id`: the id of the site. The id is used as the key to join the output.timeseries.csv table +- `name`: the colloquial name for the site if it exists +- `latitude`: latitude in decimal degrees +- `longitude`: the longitude in decimal degrees +- `elevation` ground surface elevation of the site in feet +- `elevation_units`: the units of the ground surface elevation. Defaults to ft +- `horizontal_datum`: horizontal datum of the latitude and longitude. Defaults to WGS84 +- `vertical_datum`: the vertical datum of the elevation +- `usgs_site_id`: USGS site id if it exists +- `alternate_site_id`: alternate site id if it exists +- `formation`: geologic formation in which the well terminates if it exists +- `aquifer`: aquifer from which the well draws water if it exists +- `well_depth`: depth of well if it exists +- `depth_to_water_ft_below_ground_surface`: depth to water below ground surface in ft +- `date_measured`: date of measurement in YYYY-MM-DD format +- `time_measured`: time of measurement if it exists + +**output.combined.csv - analytes** +- `source`: the organization/source for the site +- `id`: the id of the site. The id is used as the key to join the output.timeseries.csv table +- `name`: the colloquial name for the site if it exists +- `latitude`: latitude in decimal degrees +- `longitude`: the longitude in decimal degrees +- `elevation` ground surface elevation of the site in feet +- `elevation_units`: the units of the ground surface elevation. Defaults to ft +- `horizontal_datum`: horizontal datum of the latitude and longitude. Defaults to WGS84 +- `vertical_datum`: the vertical datum of the elevation +- `usgs_site_id`: USGS site id if it exists +- `alternate_site_id`: alternate site id if it exists +- `formation`: geologic formation in which the well terminates if it exists +- `aquifer`: aquifer from which the well draws water if it exists +- `well_depth`: depth of well if it exists +- `parameter`: the name of the analyte whose measurements are reported in the table. This corresponds the requested analyte +- `parameter_value`: value of the measurement +- `parameter_units`: units of the measurement +- `date_measured`: date of measurement in YYYY-MM-DD format +- `time_measured`: time of measurement if it exists + +### Summary Data + +If neither of the above flags are specified, a summary table called `output.csv` is exported. The summary table consists of location information as well as summary statistics for the parameter of interest for every location that has observations. + +#### Table Headers: Summary + +**output.csv - waterlevels and analytes** +- `source`: the organization/source for the site +- `id`: the id of the site. The id is used as the key to join the output.timeseries.csv table +- `location`: the colloquial name for the site if it exists +- `usgs_site_id`: USGS site id if it exists +- `alternate_site_id`: alternate site id if it exists +- `latitude`: latitude in decimal degrees +- `longitude`: the longitude in decimal degrees +- `horizontal_datum`: horizontal datum of the latitude and longitude. Defaults to WGS84 +- `elevation` ground surface elevation of the site in feet +- `elevation_units`: the units of the ground surface elevation. Defaults to ft +- `well_depth`: depth of well if it exists +- `well_depth_units`: units of well depth. Defaults to ft +- `parameter`: the name of the analyte whose measurements are reported in the table. This corresponds the requested analyte +- `parameter_value`: value of the measurement +- `parameter_units`: units of the measurement +- `nrecords`: the number of records for the site +- `min`: the minimum record for the site +- `max`: the maximum record for the site +- `mean`: the mean value for the records at the site +- `most_recent_date`: date of most recent record +- `most_recent_time`: time of most recent record if it exists +- `most_recent_value` the value of the most recent record +- `most_recent_units`: the units of the most recent record \ No newline at end of file diff --git a/backend/config.py b/backend/config.py index 68515f7..69af728 100644 --- a/backend/config.py +++ b/backend/config.py @@ -18,9 +18,10 @@ import time from datetime import datetime, timedelta -import click import shapely.wkt +from backend.logging import Loggable + from .bounding_polygons import get_county_polygon from .connectors.nmbgmr.source import ( NMBGMRSiteSource, @@ -91,7 +92,7 @@ def get_source(source): return None -class Config(object): +class Config(Loggable): site_limit: int = 0 dry: bool = False @@ -136,6 +137,9 @@ class Config(object): use_geojson: bool = False def __init__(self, model=None, payload=None): + # need to initialize logger + super().__init__() + self.bbox = {} if model: if model.wkt: @@ -315,19 +319,20 @@ def now_ms(self, days=0): def report(self): def _report_attributes(title, attrs): - click.secho( - f"---- {title} --------------------------------------------------", - fg="yellow", - ) + s = f"---- {title} --------------------------------------------------" + self.log(s) + for k in attrs: v = getattr(self, k) - click.secho(f"{k}: {v}", fg="yellow") - click.secho("", fg="yellow") + s = f"{k}: {v}" + self.log(s) + + s = "" + self.log(s) + + s = "---- Begin configuration -------------------------------------\n" + self.log(s) - click.secho( - "---- Begin configuration -------------------------------------\n", - fg="yellow", - ) sources = [f"use_source_{s}" for s in SOURCE_KEYS] attrs = [ "start_date", @@ -351,30 +356,30 @@ def _report_attributes(title, attrs): "output_dir", "output_name", "output_summary", + "output_single_timeseries", "output_horizontal_datum", "output_elevation_units", ), ) - click.secho( - "---- End configuration -------------------------------------", fg="yellow" - ) + s = "---- End configuration -------------------------------------\n" + self.log(s) def validate(self): if not self._validate_bbox(): - click.secho("Invalid bounding box", fg="red") + self.warn("Invalid bounding box") sys.exit(2) if not self._validate_county(): - click.secho("Invalid county", fg="red") + self.warn("Invalid county") sys.exit(2) if not self._validate_date(self.start_date): - click.secho(f"Invalid start date {self.start_date}", fg="red") + self.warn(f"Invalid start date {self.start_date}") sys.exit(2) if not self._validate_date(self.end_date): - click.secho("Invalid end date", fg="red") + self.warn(f"Invalid end date {self.end_date}") sys.exit(2) def _extract_date(self, d): diff --git a/backend/connectors/README.md b/backend/connectors/README.md index 01384e9..4281414 100644 --- a/backend/connectors/README.md +++ b/backend/connectors/README.md @@ -8,6 +8,8 @@ The following are necessary for adding a source: - the `use_source_` flag needs to be added to the `analyte_sources` method in the `Config` class in **/backend/config.py** if analytes are available for that source - the `use_source_` flag needs to be added to the `water_level_sources` method in the `Config` class in **/backend/config.py** if water levels are available for that source +**IMPORTANT: add tests for the source** + For the sake of discription, the example source is called Faux. # /backend/connectors/faux @@ -30,7 +32,7 @@ The following methods need to be defined for Faux. See `BaseSiteSource` for doc The following methods need to be defined for Faux. See `BaseAnalyteSource` for doc strings for each of the methods: - `get_records` -- `_extract_parent_records` +- `_extract_site_records` - `_extract_parameter_units` - `_extract_most_recent` - `_extract_parameter_result` @@ -45,7 +47,7 @@ The following method is optional: The following methods need to be defined for Faux. See `BaseWaterLevelSource` for doc strings for each of the methods: - `get_records` -- `_extract_parent_records` +- `_extract_site_records` - `_extract_parameter_units` - `_extract_most_recent` - `_extract_parameter_result` diff --git a/backend/connectors/bor/source.py b/backend/connectors/bor/source.py index 2bfb772..1007bed 100644 --- a/backend/connectors/bor/source.py +++ b/backend/connectors/bor/source.py @@ -78,6 +78,9 @@ def _extract_parameter_results(self, rs): def _extract_parameter_units(self, records): return [ri["attributes"]["resultAttributes"]["units"] for ri in records] + def _extract_parameter_dates(self, records): + return [parse_dt(ri["attributes"]["dateTime"]) for ri in records] + def _extract_most_recent(self, rs): record = get_most_recent(rs, "attributes.dateTime") @@ -87,9 +90,9 @@ def _extract_most_recent(self, rs): "units": record["attributes"]["resultAttributes"]["units"], } - def _extract_parent_records(self, records, parent_record): + def _extract_site_records(self, records, site_record): return [ - ri for ri in records if ri["attributes"]["locationId"] == parent_record.id + ri for ri in records if ri["attributes"]["locationId"] == site_record.id ] def _reorder_catalog_items(self, items): @@ -98,12 +101,10 @@ def _reorder_catalog_items(self, items): items = items[self._catalog_item_idx :] + items[: self._catalog_item_idx] return items - def get_records(self, parent_record): + def get_records(self, site_record): code = get_analyte_search_param(self.config.analyte, BOR_ANALYTE_MAPPING) - for i, item in enumerate( - self._reorder_catalog_items(parent_record.catalogItems) - ): + for i, item in enumerate(self._reorder_catalog_items(site_record.catalogItems)): data = self._execute_json_request(f'https://data.usbr.gov{item["id"]}') if not data: diff --git a/backend/connectors/ckan/source.py b/backend/connectors/ckan/source.py index e70475a..da3f373 100644 --- a/backend/connectors/ckan/source.py +++ b/backend/connectors/ckan/source.py @@ -112,12 +112,12 @@ def _parse_response(self, resp): class OSERoswellWaterLevelSource(OSERoswellSource, BaseWaterLevelSource): transformer_klass = OSERoswellWaterLevelTransformer - def get_records(self, parent_record): - return self._parse_response(parent_record, self.get_response()) + def get_records(self, site_record): + return self._parse_response(site_record, self.get_response()) - def _parse_response(self, parent_record, resp): + def _parse_response(self, site_record, resp): records = resp.json()["result"]["records"] - return [record for record in records if record["Site_ID"] == parent_record.id] + return [record for record in records if record["Site_ID"] == site_record.id] def _extract_parameter_results(self, records): return [float(r["DTWGS"]) for r in records] @@ -126,11 +126,17 @@ def _extract_most_recent(self, records): record = get_most_recent(records, tag="Date") return {"value": record["DTWGS"], "datetime": record["Date"], "units": FEET} + def _extract_parameter_dates(self, records: list) -> list: + return [r["Date"] for r in records] + def _extract_parameter_record(self, record): record[DTW] = float(record["DTWGS"]) record[DT_MEASURED] = record["Date"] record[DTW_UNITS] = FEET return record + def _clean_records(self, records: list) -> list: + return [r for r in records if r["DTWGS"] is not None and r["Date"] is not None] + # ============= EOF ============================================= diff --git a/backend/connectors/ckan/transformer.py b/backend/connectors/ckan/transformer.py index 111171a..e56d609 100644 --- a/backend/connectors/ckan/transformer.py +++ b/backend/connectors/ckan/transformer.py @@ -48,17 +48,17 @@ def _transform(self, record): class OSERoswellWaterLevelTransformer(WaterLevelTransformer): source_tag = "CKAN/OSERoswell" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { - # "id": parent_record.id, + # "id": site_record.id, # "source": "CKAN/OSERoswell", - # "location": parent_record.name, - # "usgs_site_id": parent_record.id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, + # "location": site_record.name, + # "usgs_site_id": site_record.id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, # "elevation_units": "ft", - # "well_depth": parent_record.well_depth, + # "well_depth": site_record.well_depth, # "well_depth_units": "ft", # } # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/isc_seven_rivers/source.py b/backend/connectors/isc_seven_rivers/source.py index fe32656..46dbc7c 100644 --- a/backend/connectors/isc_seven_rivers/source.py +++ b/backend/connectors/isc_seven_rivers/source.py @@ -47,6 +47,23 @@ ) +def get_date_range(config): + params = {} + + def to_milliseconds(dt): + return int(dt.timestamp() * 1000) + + if config.start_date: + params["start"] = to_milliseconds(config.start_dt) + if config.end_date: + params["end"] = to_milliseconds(config.end_dt) + return params + + +def get_datetime(record): + return datetime.fromtimestamp(record["dateTime"] / 1000) + + def _make_url(endpoint): return f"https://nmisc-wf.gladata.com/api/{endpoint}" @@ -109,12 +126,15 @@ def _extract_parameter_results(self, records): def _extract_parameter_units(self, records): return [r["units"] for r in records] - def get_records(self, parent_record): + def _extract_parameter_dates(self, records: list) -> list: + return [get_datetime(r) for r in records] + + def get_records(self, site_record): config = self.config analyte_id = self._get_analyte_id(config.analyte) if analyte_id: params = { - "monitoringPointId": parent_record.id, + "monitoringPointId": site_record.id, "analyteId": analyte_id, "start": 0, "end": config.now_ms(days=1), @@ -126,29 +146,12 @@ def get_records(self, parent_record): ) -def get_date_range(config): - params = {} - - def to_milliseconds(dt): - return int(dt.timestamp() * 1000) - - if config.start_date: - params["start"] = to_milliseconds(config.start_dt) - if config.end_date: - params["end"] = to_milliseconds(config.end_dt) - return params - - -def get_datetime(record): - return datetime.fromtimestamp(record["dateTime"] / 1000) - - class ISCSevenRiversWaterLevelSource(BaseWaterLevelSource): transformer_klass = ISCSevenRiversWaterLevelTransformer - def get_records(self, parent_record): + def get_records(self, site_record): params = { - "id": parent_record.id, + "id": site_record.id, "start": 0, "end": self.config.now_ms(days=1), } @@ -173,6 +176,9 @@ def _extract_parameter_results(self, records): r["depthToWaterFeet"] for r in records if not r["invalid"] and not r["dry"] ] + def _extract_parameter_dates(self, records: list) -> list: + return [get_datetime(r) for r in records] + def _extract_most_recent(self, records): record = get_most_recent(records, "dateTime") t = get_datetime(record) diff --git a/backend/connectors/isc_seven_rivers/transformer.py b/backend/connectors/isc_seven_rivers/transformer.py index e68aba3..dddbe9d 100644 --- a/backend/connectors/isc_seven_rivers/transformer.py +++ b/backend/connectors/isc_seven_rivers/transformer.py @@ -53,14 +53,14 @@ class ISCSevenRiversAnalyteTransformer(AnalyteTransformer): class ISCSevenRiversWaterLevelTransformer(WaterLevelTransformer): source_tag = "ISCSevenRivers" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "ISCSevenRivers", - # "id": parent_record.id, - # "location": parent_record.name, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, + # "id": site_record.id, + # "location": site_record.name, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, # "elevation_units": "ft", # } # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/mappings.py b/backend/connectors/mappings.py index 5231476..2fe4267 100644 --- a/backend/connectors/mappings.py +++ b/backend/connectors/mappings.py @@ -22,6 +22,7 @@ ARSENIC, NITRATE, CALCIUM, + SILICA, SODIUM, POTASSIUM, MAGNESIUM, @@ -31,22 +32,24 @@ ) # DWB =============================================================================== +# the mapping below is the corresponding "@iot.id" for the ObservedProperties DWB_ANALYTE_MAPPING: dict = { ARSENIC: 3, - BICARBONATE: None, - CALCIUM: None, + BICARBONATE: 22, # BICARBONATE AS HCO3 + CALCIUM: 11, CARBONATE: None, CHLORIDE: 15, FLUORIDE: 19, - MAGNESIUM: None, + MAGNESIUM: 23, NITRATE: 35, - POTASSIUM: None, - SODIUM: None, + POTASSIUM: 33, + SILICA: 37, + SODIUM: 38, SULFATE: 41, TDS: 90, # "Uranium-238": 386, URANIUM: 385, # "Combined Uranium" - PH: None, + PH: 81, } # ISC Seven Rivers =============================================================================== """ @@ -95,6 +98,7 @@ MAGNESIUM: "Magnesium", NITRATE: "Nitrate", POTASSIUM: "Potassium", + SILICA: "SiO2", SODIUM: "Sodium", SULFATE: "Sulfate", TDS: "TDS calc", @@ -137,6 +141,7 @@ MAGNESIUM: "Magnesium", NITRATE: "Nitrate (as N)", POTASSIUM: "Potassium", + SILICA: "Silica", SODIUM: "Sodium", SULFATE: "Sulfate", TDS: "Total Dissolved Solids", @@ -155,6 +160,7 @@ MAGNESIUM: ["Magnesium"], NITRATE: ["Nitrate", "Nitrate-N", "Nitrate as N"], POTASSIUM: ["Potassium"], + SILICA: ["Silica"], SODIUM: ["Sodium"], SULFATE: [ "Sulfate", @@ -171,8 +177,8 @@ """ Temp DO -ALK HCO3 -ALK CO3 +ALK HCO3 <-- this is a measure of alkalinity, and not necessarily the same as Bicarbonate +ALK CO3 <-- this is a measure of alkalinity, and not necessarily the same as Carbonate ALK OH ALK P ALK @@ -224,14 +230,15 @@ """ BOR_ANALYTE_MAPPING: dict = { ARSENIC: "As", - BICARBONATE: "ALK HCO3", + BICARBONATE: None, CALCIUM: "Ca", - CARBONATE: "ALK CO3", + CARBONATE: None, CHLORIDE: "Cl", FLUORIDE: "F", MAGNESIUM: "Mg", NITRATE: "NO3", POTASSIUM: "K", + SILICA: "SiO2", SODIUM: "Na", SULFATE: "SO4", TDS: "TDS", diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 08734f9..ba58c0a 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -51,8 +51,8 @@ def _make_url(endpoint): if os.getenv("DEBUG") == "1": - return f"http://localhost:8000/{endpoint}" - return f"https://waterdata.nmt.edu/{endpoint}" + return f"http://localhost:8000/latest/{endpoint}" + return f"https://waterdata.nmt.edu/latest/{endpoint}" class NMBGMRSiteSource(BaseSiteSource): @@ -68,7 +68,7 @@ def health(self): def get_records(self): config = self.config - params = {} + params = {"site_type": "Groundwater other than spring (well)", "expand": False} if config.has_bounds(): params["wkt"] = config.bounding_wkt() @@ -83,20 +83,36 @@ def get_records(self): params["parameter"] = "Manual groundwater levels" # tags="features" because the response object is a GeoJSON - return self._execute_json_request( + sites = self._execute_json_request( _make_url("locations"), params, tag="features", timeout=30 ) + return sites + + # loop through the responses and add well information for each location + # this may be slow because of the number of sites that need to be queried + # but it is necessary to get the well information. With further + # development, this could be faster if one can batch the requests + # to /wells + # for site in sites: + # well_info = self._execute_json_request( + # _make_url("/wells"), + # params={"pointid": site["properties"]["point_id"]}, + # tag="", + # ) + # site["properties"]["formation"] = well_info["formation"] + # site["properties"]["well_depth"] = well_info["well_depth_ftbgs"] + # site["properties"]["well_depth_units"] = "ft" class NMBGMRAnalyteSource(BaseAnalyteSource): transformer_klass = NMBGMRAnalyteTransformer - def get_records(self, parent_record): + def get_records(self, site_record): analyte = get_analyte_search_param(self.config.analyte, NMBGMR_ANALYTE_MAPPING) records = self._execute_json_request( _make_url("waterchemistry"), params={ - "pointid": ",".join(make_site_list(parent_record)), + "pointid": ",".join(make_site_list(site_record)), "analyte": analyte, }, tag="", @@ -107,8 +123,8 @@ def get_records(self, parent_record): return records_sorted_by_pointid - def _extract_parent_records(self, records, parent_record): - return records.get(parent_record.id, []) + def _extract_site_records(self, records, site_record): + return records.get(site_record.id, []) def _extract_parameter_units(self, records): return [r["Units"] for r in records] @@ -124,6 +140,9 @@ def _extract_most_recent(self, records): def _extract_parameter_results(self, records): return [r["SampleValue"] for r in records] + def _extract_parameter_dates(self, records: list) -> list: + return [r["info"]["CollectionDate"] for r in records] + def _extract_parameter_record(self, record): record[PARAMETER] = self.config.analyte record[PARAMETER_VALUE] = record["SampleValue"] @@ -153,18 +172,21 @@ def _extract_most_recent(self, records): "units": FEET, } + def _extract_parameter_dates(self, records: list) -> list: + return [(r["DateMeasured"], r["TimeMeasured"]) for r in records] + def _extract_parameter_results(self, records): return [r["DepthToWaterBGS"] for r in records] - def _extract_parent_records(self, records, parent_record): - return [ri for ri in records if ri["Well"]["PointID"] == parent_record.id] + def _extract_site_records(self, records, site_record): + return [ri for ri in records if ri["Well"]["PointID"] == site_record.id] - def get_records(self, parent_record): + def get_records(self, site_record): # if self.config.latest_water_level_only: - # params = {"pointids": parent_record.id} + # params = {"pointids": site_record.id} # url = _make_url("waterlevels/latest") # else: - params = {"pointid": ",".join(make_site_list(parent_record))} + params = {"pointid": ",".join(make_site_list(site_record))} # just use manual waterlevels temporarily url = _make_url("waterlevels/manual") diff --git a/backend/connectors/nmbgmr/transformer.py b/backend/connectors/nmbgmr/transformer.py index 693b728..b84f6b2 100644 --- a/backend/connectors/nmbgmr/transformer.py +++ b/backend/connectors/nmbgmr/transformer.py @@ -51,19 +51,19 @@ class NMBGMRAnalyteTransformer(AnalyteTransformer): class NMBGMRWaterLevelTransformer(WaterLevelTransformer): source_tag = "NMBGMR" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "NMBGMR", - # "id": parent_record.id, - # "location": parent_record.name, - # "usgs_site_id": parent_record.usgs_site_id, - # "alternate_site_id": parent_record.alternate_site_id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "well_depth": parent_record.well_depth, - # "well_depth_units": parent_record.well_depth_units, - # "elevation": parent_record.elevation, - # "elevation_units": parent_record.elevation_units, + # "id": site_record.id, + # "location": site_record.name, + # "usgs_site_id": site_record.usgs_site_id, + # "alternate_site_id": site_record.alternate_site_id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "well_depth": site_record.well_depth, + # "well_depth_units": site_record.well_depth_units, + # "elevation": site_record.elevation, + # "elevation_units": site_record.elevation_units, # } # # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/nmenv/source.py b/backend/connectors/nmenv/source.py index da6f684..6366c8f 100644 --- a/backend/connectors/nmenv/source.py +++ b/backend/connectors/nmenv/source.py @@ -21,7 +21,7 @@ ) from backend.connectors.st_connector import STSiteSource, STAnalyteSource from backend.constants import PARAMETER, PARAMETER_UNITS, DT_MEASURED, PARAMETER_VALUE -from backend.source import get_analyte_search_param +from backend.source import get_analyte_search_param, get_most_recent URL = "https://nmenv.newmexicowaterdata.org/FROST-Server/v1.1/" @@ -64,8 +64,20 @@ class DWBAnalyteSource(STAnalyteSource): url = URL transformer_klass = DWBAnalyteTransformer - def _parse_result(self, result): - return float(result.split(" ")[0]) + def _parse_result( + self, result, result_dt=None, result_id=None, result_location=None + ): + if "< mrl" in result.lower(): + if self.config.output_summary: + self.warn( + f"Non-detect found: {result} for {result_location} on {result_dt} (observation {result_id}). Setting to 0 for summary." + ) + return 0.0 + else: + # return the results for timeseries, regardless of format (None/Null/non-detect) + return result + else: + return float(result.split(" ")[0]) def get_records(self, site, *args, **kw): service = self.get_service() @@ -92,16 +104,47 @@ def get_records(self, site, *args, **kw): return rs def _extract_parameter_record(self, record): + # this is only used for time series record[PARAMETER_VALUE] = self._parse_result(record["observation"].result) record[PARAMETER_UNITS] = record["datastream"].unit_of_measurement.symbol record[DT_MEASURED] = record["observation"].phenomenon_time return record def _extract_parameter_results(self, records): - return [self._parse_result(r["observation"].result) for r in records] + # this is only used in summary output + return [ + self._parse_result( + r["observation"].result, + r["observation"].phenomenon_time, + r["observation"].id, + r["location"].id, + ) + for r in records + ] def _extract_parameter_units(self, records): + # this is only used in summary output return [r["datastream"].unit_of_measurement.symbol for r in records] + def _extract_parameter_dates(self, records: list) -> list: + return [r["observation"].phenomenon_time for r in records] + + def _extract_most_recent(self, records): + # this is only used in summary output + record = get_most_recent( + records, tag=lambda x: x["observation"].phenomenon_time + ) + + return { + "value": self._parse_result( + record["observation"].result, + record["observation"].phenomenon_time, + record["observation"].id, + record["location"].id, + ), + "datetime": record["observation"].phenomenon_time, + "units": record["datastream"].unit_of_measurement.symbol, + } + # ============= EOF ============================================= diff --git a/backend/connectors/nmose/__init__.py b/backend/connectors/nmose/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/connectors/nmose/source.py b/backend/connectors/nmose/source.py new file mode 100644 index 0000000..5cb7a3e --- /dev/null +++ b/backend/connectors/nmose/source.py @@ -0,0 +1,2 @@ +import os +from backend.source import BaseSiteSource diff --git a/backend/connectors/nmose/transformer.py b/backend/connectors/nmose/transformer.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index f600b8f..23f7db9 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -88,12 +88,19 @@ def _extract_parameter_record(self, record): def _extract_parameter_results(self, records): return [r["observation"].result for r in records] - def get_records(self, parent_record, *args, **kw): + def _extract_parameter_dates(self, records: list) -> list: + return [r["observation"].phenomenon_time for r in records] + + def _clean_records(self, records: list) -> list: + rs = [r for r in records if r["observation"].result is not None] + return rs + + def get_records(self, site_record, *args, **kw): service = self.get_service() config = self.config records = [] - for t in self._get_things(service, parent_record): + for t in self._get_things(service, site_record): if t.name == "Water Well": for di in t.datastreams: @@ -112,7 +119,7 @@ def get_records(self, parent_record, *args, **kw): records.append( { "thing": t, - "location": parent_record, + "location": site_record, "datastream": di, "observation": obs, } diff --git a/backend/connectors/st2/transformer.py b/backend/connectors/st2/transformer.py index 352e44b..e41a170 100644 --- a/backend/connectors/st2/transformer.py +++ b/backend/connectors/st2/transformer.py @@ -58,15 +58,15 @@ class EBIDSiteTransformer(STSiteTransformer): # class ST2WaterLevelTransformer(WaterLevelTransformer): # source_tag = "ST2" -# def _transform_hook(self, record, config, parent_record, *args, **kw): +# def _transform_hook(self, record, config, site_record, *args, **kw): # rec = { # "source": self.source_id, -# "id": parent_record.id, -# "location": parent_record.name, -# "latitude": parent_record.latitude, -# "longitude": parent_record.longitude, -# "surface_elevation_ft": parent_record.elevation, -# "well_depth_ft_below_ground_surface": parent_record.well_depth, +# "id": site_record.id, +# "location": site_record.name, +# "latitude": site_record.latitude, +# "longitude": site_record.longitude, +# "surface_elevation_ft": site_record.elevation, +# "well_depth_ft_below_ground_surface": site_record.well_depth, # } # # if config.output_summary_waterlevel_stats: diff --git a/backend/connectors/st_connector.py b/backend/connectors/st_connector.py index 19d127c..92cac48 100644 --- a/backend/connectors/st_connector.py +++ b/backend/connectors/st_connector.py @@ -16,7 +16,9 @@ from datetime import datetime import frost_sta_client as fsc +from shapely import MultiPolygon, Polygon, unary_union +from backend.bounding_polygons import get_state_polygon from backend.source import ( BaseSiteSource, BaseWaterLevelSource, @@ -99,6 +101,17 @@ def get_records(self, *args, **kw): poly = config.bounding_wkt(as_wkt=False) # if poly is a MULTIPOLYGON convert to POLYGON + if type(poly) == MultiPolygon: + if len(poly.geoms) == 1: + poly = unary_union(poly) + else: + # HUC4 1508 has 2 polygons, one of them is outside of NM + state_boundary = get_state_polygon("NM") + for geom in poly: + if state_boundary.contains(geom): + poly = geom + break + fs.append(f"st_within(location, geography'{poly}')") fi = make_dt_filter( diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index 0e1a14d..b1c330a 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -127,13 +127,13 @@ def get_records(self): class NWISWaterLevelSource(BaseWaterLevelSource): transformer_klass = NWISWaterLevelTransformer - def get_records(self, parent_record): + def get_records(self, site_record): params = { "format": "json", "siteType": "GW", "siteStatus": "all", "parameterCd": "72019", - "sites": ",".join(make_site_list(parent_record)), + "sites": ",".join(make_site_list(site_record)), } config = self.config @@ -155,8 +155,8 @@ def get_records(self, parent_record): self.log(f"Retrieved {len(records)} records") return records - def _extract_parent_records(self, records, parent_record): - return [ri for ri in records if ri["site_code"] == parent_record.id] + def _extract_site_records(self, records, site_record): + return [ri for ri in records if ri["site_code"] == site_record.id] def _clean_records(self, records): return [r for r in records if r["value"] is not None and r["value"].strip()] @@ -164,6 +164,9 @@ def _clean_records(self, records): def _extract_parameter_results(self, records): return [float(r["value"]) for r in records] + def _extract_parameter_dates(self, records: list) -> list: + return [r["datetime_measured"] for r in records] + def _extract_most_recent(self, records): record = get_most_recent(records, "datetime_measured") return { diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index 40d0acc..1f61cf5 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -52,18 +52,18 @@ def _transform(self, record): class NWISWaterLevelTransformer(WaterLevelTransformer): source_tag = "USGS-NWIS" - # def _transform_hook(self, record, config, parent_record): + # def _transform_hook(self, record, config, site_record): # rec = { # "source": "USGS-NWIS", - # "id": parent_record.id, - # "location": parent_record.name, - # "usgs_site_id": parent_record.id, - # "latitude": parent_record.latitude, - # "longitude": parent_record.longitude, - # "elevation": parent_record.elevation, - # "elevation_units": parent_record.elevation_units, - # "well_depth": parent_record.well_depth, - # "well_depth_units": parent_record.well_depth_units, + # "id": site_record.id, + # "location": site_record.name, + # "usgs_site_id": site_record.id, + # "latitude": site_record.latitude, + # "longitude": site_record.longitude, + # "elevation": site_record.elevation, + # "elevation_units": site_record.elevation_units, + # "well_depth": site_record.well_depth, + # "well_depth_units": site_record.well_depth_units, # # "date": record["datetime"], # # "value": record["lev_va"], # # "units": "ft", diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index 3213fc2..5b4007e 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -74,7 +74,12 @@ def health(self): def get_records(self): config = self.config - params = {"mimeType": "tsv", "siteType": "Well"} + params = { + "mimeType": "tsv", + "siteType": "Well", + "sampleMedia": "Water", + "statecode": "US:35", + } if config.has_bounds(): params["bBox"] = ",".join([str(b) for b in config.bbox_bounding_points()]) @@ -101,11 +106,9 @@ def _extract_parameter_record(self, record): record[DT_MEASURED] = record["ActivityStartDate"] return record - def _extract_parent_records(self, records, parent_record): + def _extract_site_records(self, records, site_record): return [ - ri - for ri in records - if ri["MonitoringLocationIdentifier"] == parent_record.id + ri for ri in records if ri["MonitoringLocationIdentifier"] == site_record.id ] def _extract_parameter_results(self, records): @@ -117,6 +120,9 @@ def _clean_records(self, records): def _extract_parameter_units(self, records): return [ri["ResultMeasure/MeasureUnitCode"] for ri in records] + def _extract_parameter_dates(self, records): + return [ri["ActivityStartDate"] for ri in records] + def _extract_most_recent(self, records): ri = get_most_recent(records, "ActivityStartDate") return { @@ -125,8 +131,8 @@ def _extract_most_recent(self, records): "units": ri["ResultMeasure/MeasureUnitCode"], } - def get_records(self, parent_record): - sites = make_site_list(parent_record) + def get_records(self, site_record): + sites = make_site_list(site_record) params = { "siteid": sites, diff --git a/backend/constants.py b/backend/constants.py index 0ccd1ec..2064a87 100644 --- a/backend/constants.py +++ b/backend/constants.py @@ -24,6 +24,7 @@ MAGNESIUM = "Magnesium" NITRATE = "Nitrate" POTASSIUM = "Potassium" +SILICA = "Silica" SODIUM = "Sodium" SULFATE = "Sulfate" URANIUM = "Uranium" @@ -60,6 +61,7 @@ MAGNESIUM, NITRATE, POTASSIUM, + SILICA, SODIUM, SULFATE, TDS, diff --git a/backend/geo_utils.py b/backend/geo_utils.py index cdd284f..930f76d 100644 --- a/backend/geo_utils.py +++ b/backend/geo_utils.py @@ -18,6 +18,8 @@ PROJECTIONS = {} TRANSFORMS = {} +ALLOWED_DATUMS = ["NAD27", "NAD83", "WGS84"] + def datum_transform(x, y, in_datum, out_datum): """ diff --git a/backend/logging.py b/backend/logging.py new file mode 100644 index 0000000..8a91519 --- /dev/null +++ b/backend/logging.py @@ -0,0 +1,67 @@ +# =============================================================================== +# Copyright 2024 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +import logging +from logging.handlers import RotatingFileHandler + +import click + + +class Loggable: + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + + def log(self, msg, level=None, fg="yellow"): + if level is None: + level = logging.INFO + + click.secho(f"{self.__class__.__name__:30s}{msg}", fg=fg) + self.logger.log(level, msg) + + def warn(self, msg, fg="red"): + self.log(msg, fg=fg, level=logging.WARNING) + + def debug(self, msg): + self.log(msg, level=logging.DEBUG, fg="blue") + + +def setup_logging(level=None, log_format=None, path=None): + + if level is None: + level = logging.DEBUG + if log_format is None: + log_format = ( + "%(name)-40s: %(asctime)s %(levelname)-9s (%(threadName)-10s) %(message)s" + ) + + root = logging.getLogger() + root.setLevel(level) + + if path is None: + path = "die.log" + + # shandler = logging.StreamHandler() + rhandler = RotatingFileHandler(path, maxBytes=1e8, backupCount=50) + + handlers = [rhandler] + + fmt = logging.Formatter(log_format) + for hi in handlers: + hi.setLevel(level) + hi.setFormatter(fmt) + root.addHandler(hi) + + +# ============= EOF ============================================= diff --git a/backend/persister.py b/backend/persister.py index b1a8a6b..296ee2a 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -18,11 +18,10 @@ import os import shutil -import click import pandas as pd import geopandas as gpd -from backend.record import SiteRecord +from backend.logging import Loggable try: from google.cloud import storage @@ -30,11 +29,6 @@ print("google cloud storage not available") -class Loggable: - def log(self, msg, fg="yellow"): - click.secho(f"{self.__class__.__name__:30s}{msg}", fg=fg) - - class BasePersister(Loggable): extension: str # output_id: str @@ -44,6 +38,8 @@ def __init__(self): self.combined = [] self.timeseries = [] self.sites = [] + + super().__init__() # self.keys = record_klass.keys def load(self, records: list): @@ -139,11 +135,12 @@ def write_memory(path, func, records): def dump_single_timeseries(writer, timeseries): + headers_have_not_been_written = True for i, (site, records) in enumerate(timeseries): - for j, record in enumerate(records): - if i == 0: + if i == 0 and headers_have_not_been_written: writer.writerow(record.keys) + headers_have_not_been_written = False writer.writerow(record.to_row()) diff --git a/backend/record.py b/backend/record.py index fc531b8..487d644 100644 --- a/backend/record.py +++ b/backend/record.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from backend.constants import DTW, PARAMETER, PARAMETER_VALUE, FEET +from backend.constants import DTW, PARAMETER, PARAMETER_VALUE, PARAMETER_UNITS, FEET class BaseRecord: @@ -82,8 +82,8 @@ class WaterLevelRecord(BaseRecord): class AnalyteRecord(BaseRecord): keys: tuple = ( - # "source", - # "id", + "source", + "id", # "location", # "latitude", # "longitude", @@ -91,6 +91,7 @@ class AnalyteRecord(BaseRecord): # "well_depth_ft_below_ground_surface", PARAMETER, PARAMETER_VALUE, + PARAMETER_UNITS, "date_measured", "time_measured", ) @@ -107,6 +108,7 @@ class SummaryRecord(BaseRecord): "alternate_site_id", "latitude", "longitude", + "horizontal_datum", "elevation", "elevation_units", "well_depth", diff --git a/backend/source.py b/backend/source.py index 9d120d2..24f6346 100644 --- a/backend/source.py +++ b/backend/source.py @@ -33,6 +33,7 @@ PARAMETER_UNITS, PARAMETER_VALUE, ) +from backend.logging import Loggable from backend.persister import BasePersister, CSVPersister from backend.record import ( AnalyteRecord, @@ -124,7 +125,7 @@ def get_analyte_search_param(parameter: str, mapping: dict) -> str: ) -class BaseSource: +class BaseSource(Loggable): """ The BaseSource class is a base class for all sources, whether it be a site source or a parameter source. @@ -174,6 +175,7 @@ class BaseSource: def __init__(self, config=None): self.transformer = self.transformer_klass() self.set_config(config) + super().__init__() @property def tag(self): @@ -195,38 +197,42 @@ def discover(self, *args, **kw): # Methods Already Implemented # ========================================================================== - def warn(self, msg): - """ - Prints warning messages to the console in red - - Parameters - ---------- - msg : str - the message to print - - Returns - ------- - None - """ - self.log(msg, fg="red") - - def log(self, msg, fg="yellow"): - """ - Prints the message to the console in yellow - - Parameters - ---------- - msg : str - the message to print - - fg : str - the color of the message, defaults to yellow - - Returns - ------- - None - """ - click.secho(f"{self.__class__.__name__:25s} -- {msg}", fg=fg) + # def warn(self, msg): + # """ + # Prints warning messages to the console in red + # + # Parameters + # ---------- + # msg : str + # the message to print + # + # Returns + # ------- + # None + # """ + # s = self.log(msg, fg="red") + # self.config.warnings.append(s) + + # def log(self, msg, fg="yellow"): + # """ + # Prints the message to the console in yellow + # + # Parameters + # ---------- + # msg : str + # the message to print + # + # fg : str + # the color of the message, defaults to yellow + # + # Returns + # ------- + # None + # """ + # s = f"{self.__class__.__name__:25s} -- {msg}" + # click.secho(s, fg=fg) + # self.config.logs.append(s) + # return s def _execute_text_request(self, url: str, params=None, **kw) -> str: """ @@ -326,7 +332,7 @@ def get_records(self, *args, **kw) -> dict: Parameters ---------- If parameter records: - parent_record : dict + site_record : dict the site record for the location whose parameter records are to be retrieved If site records: @@ -478,7 +484,7 @@ def read(self, *args, **kw) -> List[SiteRecord]: else: self.warn("No site records returned") - def _transform_sites(self, records: list) -> list: + def _transform_sites(self, records: list) -> List[SiteRecord]: """ Transforms site records into the standardized format. @@ -489,8 +495,8 @@ def _transform_sites(self, records: list) -> list: Returns ------- - list - a list of transformed site records + list[SiteRecord] + a list of transformed site records as SiteRecords """ transformed_records = [] for record in records: @@ -564,7 +570,7 @@ class BaseParameterSource(BaseSource): Returns a dictionary of parameter records where the keys are the site ids and the values are a list of the parameter records - _extract_parent_records + _extract_site_records Returns all records for a single site as a list of records _extract_most_recent @@ -603,7 +609,7 @@ class BaseParameterSource(BaseSource): # ========================================================================== def read( - self, parent_record: BaseSiteSource, use_summarize: bool + self, site_record: SiteRecord, use_summarize: bool, start_ind: int, end_ind: int ) -> List[ AnalyteRecord | AnalyteSummaryRecord @@ -621,7 +627,7 @@ def read( Parameters ---------- - parent_record : BaseSiteSource + site_record : SiteRecord the site record(s) for the location whose parameter records are to be retrieved use_summarize : bool @@ -632,57 +638,82 @@ def read( list[AnalyteRecord | AnalyteSummaryRecord | WaterLevelRecord | WaterLevelSummaryRecord] a list of transformed parameter records """ - if isinstance(parent_record, list): + if isinstance(site_record, list): self.log( - f"Gathering {self.name} summary for multiple records. {len(parent_record)}" + f"Gathering {self.name} summary for {len(site_record)} sites. {start_ind}-{end_ind}" ) else: - self.log( - f"{parent_record.id} ({parent_record.id}): Gathering {self.name} summary" - ) + self.log(f"{site_record.id}: Gathering {self.name} data") - all_analyte_records = self.get_records(parent_record) + all_analyte_records = self.get_records(site_record) if all_analyte_records: - if not isinstance(parent_record, list): - parent_record = [parent_record] + if not isinstance(site_record, list): + site_record = [site_record] # return values ret = [] # iterate over each site record and extract the parameter records for each site - for site in parent_record: - site_records = self._extract_parent_records(all_analyte_records, site) + for site in site_record: + site_records = self._extract_site_records(all_analyte_records, site) if not site_records: - self.warn(f"{site.name}: No parent records found") + self.warn(f"{site.id}: No records found") continue - # get cleaned records if _clean_records is defined by the source + # get cleaned records if _clean_records is defined by the source. This usually removes Nones/Null cleaned = self._clean_records(site_records) if not cleaned: - self.warn(f"{site.name} No clean records found") + self.warn(f"{site.id} No clean records found") continue - items = self._extract_parameter_results(cleaned) - units = self._extract_parameter_units(cleaned) - items = [ - convert_units(float(result), unit, self._get_output_units()) - for result, unit in zip(items, units) - ] + if use_summarize: + + # doesn't need to be returned, but can be used to debug/for development + kept_items = [] + skipped_items = [] + + results = self._extract_parameter_results(cleaned) + units = self._extract_parameter_units(cleaned) + dates = self._extract_parameter_dates(cleaned) - if items is not None: - n = len(items) - self.log(f"{site.name}: Retrieved {self.name}: {n}") + for r, u, d in zip(results, units, dates): + try: + converted_result, warning_msg = convert_units( + float(r), + u, + self._get_output_units(), + self.config.analyte, + d, + ) + if warning_msg == "": + kept_items.append(converted_result) + else: + msg = f"{warning_msg} for {site.id}" + self.warn(msg) + skipped_items.append((site.id, r, u)) + except TypeError: + skipped_items.append((site.id, r, u)) + except ValueError: + skipped_items.append((site.id, r, u)) + + if len(skipped_items) > 0: + self.warn( + f"Skipped results because of formatting: {skipped_items}" + ) + + # if items is None or empty, no records were found or all results were None + if kept_items is not None and len(kept_items): + n = len(kept_items) + # self.log(f"{site.id}: Retrieved {self.name}: {n}") - # create the summaries if use_summarize is True, otherwise returned the cleaned and sorted records - if use_summarize: most_recent_result = self._extract_most_recent(cleaned) if not most_recent_result: continue rec = { "nrecords": n, - "min": min(items), - "max": max(items), - "mean": sum(items) / n, + "min": min(kept_items), + "max": max(kept_items), + "mean": sum(kept_items) / n, "most_recent_datetime": most_recent_result["datetime"], "most_recent_value": most_recent_result["value"], "most_recent_units": most_recent_result["units"], @@ -691,23 +722,32 @@ def read( rec, site, ) - ret.append(transformed_record) - else: - cleaned_sorted = [ - self.transformer.do_transform( - self._extract_parameter(record), site - ) - for record in cleaned - ] - cleaned_sorted = sorted(cleaned_sorted, key=self._sort_func) - ret.append((site, cleaned_sorted)) - + if transformed_record is None: + continue + else: + ret.append(transformed_record) + else: + cleaned_sorted = [ + self.transformer.do_transform( + self._extract_parameter(record), site + ) + for record in cleaned + if self.transformer.do_transform( + self._extract_parameter(record), site + ) + is not None + ] + if len(cleaned_sorted) == 0: + self.warn(f"{site.id}: No clean records found") + continue + cleaned_sorted = sorted(cleaned_sorted, key=self._sort_func) + ret.append((site, cleaned_sorted)) return ret else: - if isinstance(parent_record, list): - names = [str(r.id) for r in parent_record] + if isinstance(site_record, list): + names = [str(r.id) for r in site_record] else: - names = [str(parent_record.id)] + names = [str(site_record.id)] name = ",".join(names) self.warn(f"{name}: No records found") @@ -759,7 +799,7 @@ def _get_output_units(self) -> str: # Methods That Need to be Implemented For Each Source # ========================================================================== - def _extract_parent_records(self, records: dict, parent_record: dict) -> list: + def _extract_site_records(self, records: dict, site_record: dict) -> list: """ Returns all records for a single site as a list of records (which are dictionaries). @@ -768,7 +808,7 @@ def _extract_parent_records(self, records: dict, parent_record: dict) -> list: records : dict a dictionary of lists, where the keys are site ids and the values are parameter records - parent_record : dict + site_record : dict the site record for the location whose parameter records are to be retrieved Returns @@ -776,11 +816,11 @@ def _extract_parent_records(self, records: dict, parent_record: dict) -> list: list a list of records for the site """ - if parent_record.chunk_size == 1: + if site_record.chunk_size == 1: return records raise NotImplementedError( - f"{self.__class__.__name__} Must implement _extract_parent_records" + f"{self.__class__.__name__} Must implement _extract_site_records" ) def _clean_records(self, records: list) -> list: @@ -837,9 +877,27 @@ def _extract_parameter_units(self, records: list) -> list: f"{self.__class__.__name__} Must implement _extract_parameter_units" ) + def _extract_parameter_dates(self, records: list) -> list: + """ + Returns the dates of the parameter records as a list, in the same order as the records themselves + + Parameters + ---------- + records: list + a list of parameter records + + Returns + ------- + list + a list of dates for the parameter records in the same order as the records + """ + raise NotImplementedError( + f"{self.__class__.__name__} Must implement _extract_parameter_dates" + ) + def _extract_parameter_record(self, record: dict) -> dict: """ - Returns a parameter record with standardized fields added. + Returns a parameter record with standardized fields added. This is only used for time series, not summary outputs For an analyte, the fields are - backend.constants.PARAMETER @@ -867,7 +925,7 @@ def _extract_parameter_record(self, record: dict) -> dict: def _extract_parameter_results(self, records: list) -> list: """ - Returns the parameter results as a list from the records, in the same order as the records themselves + Returns the parameter results as a list from the records, in the same order as the records themselves. This is only used for summary outputs, not time serie Parameters ---------- @@ -885,7 +943,7 @@ def _extract_parameter_results(self, records: list) -> list: def _extract_parameter(self, record: dict) -> dict: """ - Extracts a parameter record from a list of records + Extracts a parameter record from a list of records. This is only used for time series, not summary outputs Parameters ---------- @@ -957,4 +1015,12 @@ def _validate_record(self, record): raise ValueError(f"Invalid record. Missing {k}") +class BaseFileSource(BaseSource): + """ + Base class for all file sources + """ + + name = "files" + + # ============= EOF ============================================= diff --git a/backend/transformer.py b/backend/transformer.py index b780630..e7acdc7 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -13,8 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import click import pprint -from datetime import datetime +from datetime import datetime, date, timedelta import shapely from shapely import Point @@ -27,8 +28,10 @@ TONS_PER_ACRE_FOOT, MICROGRAMS_PER_LITER, DT_MEASURED, + PARAMETER_UNITS, ) -from backend.geo_utils import datum_transform +from backend.geo_utils import datum_transform, ALLOWED_DATUMS +from backend.logging import Loggable from backend.record import ( WaterLevelSummaryRecord, WaterLevelRecord, @@ -117,7 +120,11 @@ def transform_length_units( def convert_units( - input_value: int | float | str, input_units: str, output_units: str + input_value: int | float | str, + input_units: str, + output_units: str, + analyte: str, + dt: str = None, ) -> float: """ Converts the following units for any parameter value: @@ -127,6 +134,7 @@ def convert_units( - ppm to mg/L - ton/ac-ft to mg/L - ug/L to mg/L + - mg/L CaCO3 to mg/L length: - ft to m @@ -143,11 +151,20 @@ def convert_units( output_units: str The output unit of the value + analyte: str + The analyte to convert + + dt: str + The date of the record + Returns -------- float The converted value """ + warning = "" + conversion_factor = None + input_value = float(input_value) input_units = input_units.lower() output_units = output_units.lower() @@ -157,11 +174,40 @@ def convert_units( ppm = PARTS_PER_MILLION.lower() tpaf = TONS_PER_ACRE_FOOT.lower() + """ + # edge cases for Bicarbonate and Calcium + # BOR, WQP + + https://aqua-chem.com/water-chemistry-caco3-equivalents/ + https://industrialh2osolutions.com/conversions-and-guides-water-chemistry-caco3-equivalents/ + """ + if ( + input_units in ["mg/l caco3", "mg/l caco3**"] + and output_units == mgl + and analyte == "Bicarbonate" + ): + # 1/0.82 + conversion_factor = 1.22 + elif ( + input_units in ["mg/l caco3", "mg/l caco3**"] + and output_units == mgl + and analyte == "Calcium" + ): + # 1/2.5 + conversion_factor = 0.4 + elif ( + input_units in ["mg/l caco3", "mg/l caco3**"] + and output_units == mgl + and analyte != "Bicarbonate" + ): + # this will catch if the input units are mg/l caco3 and the analyte is not bicarbonate or calcium so that the developer can determine the appropriate conversion factor(s) + conversion_factor = None + if input_units == output_units: - return input_value + conversion_factor = 1 if input_units == tpaf and output_units == mgl: - return input_value * 735.47 + conversion_factor = 735.47 if ( input_units == mgl @@ -169,10 +215,10 @@ def convert_units( or input_units == ppm and output_units == mgl ): - return input_value * 1.0 + conversion_factor = 1.0 if input_units == ugl and output_units == mgl: - return input_value * 0.001 + conversion_factor = 0.001 ft = FEET.lower() m = METERS.lower() @@ -183,12 +229,15 @@ def convert_units( input_units = m if input_units == ft and output_units == m: - return input_value * 0.3048 + conversion_factor = 0.3048 if input_units == m and output_units == ft: - return input_value * 3.28084 + conversion_factor = 3.28084 - print(f"Failed to convert {input_value} {input_units} to {output_units}") - return input_value + if conversion_factor: + return input_value * conversion_factor, warning + else: + warning = f"Failed to convert {input_value} {input_units} to {output_units} for {analyte} on {dt}" + return input_value, warning def standardize_datetime(dt): @@ -211,12 +260,20 @@ def standardize_datetime(dt): "%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M", "%Y/%m/%d", + "%m/%d/%Y", ]: try: dt = datetime.strptime(dt.split(".")[0], fmt) break except ValueError as e: - pass + try: + # Ft Sumner (OSE Roswell) reports Excel date numbers + num_days_to_add = int(dt) + base_date = date(1900, 1, 1) + dt = base_date + timedelta(days=num_days_to_add) + break + except ValueError as e: + pass else: raise ValueError(f"Failed to parse datetime {dt}") @@ -235,7 +292,7 @@ def standardize_datetime(dt): return dt.strftime("%Y-%m-%d"), tt -class BaseTransformer: +class BaseTransformer(Loggable): """ Base class for transforming records. Transformers are used in BaseSiteSource and BaseParameterSource to transform records @@ -333,8 +390,13 @@ def do_transform( if not record: return - if not self.contained(record["longitude"], record["latitude"]): - return + # ensure that a site or summary record is contained within the boundaing polygon + if "longitude" in record and "latitude" in record: + if not self.contained(record["longitude"], record["latitude"]): + self.warn( + f"Skipping site {record['id']}. It is not within the defined geographic bounds" + ) + return self._post_transform(record, *args, **kw) @@ -363,8 +425,19 @@ def do_transform( if isinstance(record, (SiteRecord, SummaryRecord)): y = float(record.latitude) x = float(record.longitude) + + if x == 0 or y == 0: + self.warn(f"Skipping site {record.id}. Latitude or Longitude is 0") + return None + input_horizontal_datum = record.horizontal_datum + if input_horizontal_datum not in ALLOWED_DATUMS: + self.warn( + f"Skipping site {record.id}. Datum {input_horizontal_datum} cannot be processed" + ) + return None + output_elevation_units = "" well_depth_units = "" output_horizontal_datum = "WGS84" @@ -399,6 +472,40 @@ def do_transform( record.update(well_depth=well_depth) record.update(well_depth_units=well_depth_unit) + # update the units to the output unit for analyte records + # this is done after converting the units to the output unit for the analyte records + # convert the parameter value to the output unit specified in the config + elif isinstance(record, (AnalyteRecord)): + r = record.parameter_value + u = record.parameter_units + dt = record.date_measured + warning_msg = "" + try: + converted_result, warning_msg = convert_units( + float(r), + u, + self.config.analyte_output_units, + self.config.analyte, + dt, + ) + if warning_msg != "": + msg = f"{warning_msg} for {record.id}" + self.warn(msg) + except TypeError: + msg = f"Keeping {r} for {record.id} on {record.date_measured} for time series data" + self.warn(msg) + converted_result = r + except ValueError: + msg = f"Keeping {r} for {record.id} on {record.date_measured} for time series data" + self.warn(msg) + converted_result = r + + if warning_msg == "": + record.update(parameter_value=converted_result) + record.update(parameter_units=self.config.analyte_output_units) + else: + record = None + return record def contained( @@ -435,6 +542,41 @@ def contained( return True + # def warn(self, msg): + # """ + # Prints warning messages to the console in red + # + # Parameters + # ---------- + # msg : str + # the message to print + # + # Returns + # ------- + # None + # """ + # self.log(msg, fg="red") + # self.config.warnings.append(msg) + + # def log(self, msg, fg="yellow"): + # """ + # Prints the message to the console in yellow + # + # Parameters + # ---------- + # msg : str + # the message to print + # + # fg : str + # the color of the message, defaults to yellow + # + # Returns + # ------- + # None + # """ + # click.secho(f"{self.__class__.__name__:25s} -- {msg}", fg=fg) + # self.config.logs.append(f"{self.__class__.__name__:25s} -- {msg}") + # ========================================================================== # Methods That Need to be Implemented For Each SiteTransformer # ========================================================================== @@ -493,6 +635,11 @@ def _transform(self, *args, **kw) -> dict: site_record: dict The site record associated with the parameter record + + Returns + -------- + dict + The record with the standard fields added and populated """ raise NotImplementedError( f"{self.__class__.__name__} must implement _transform" @@ -536,10 +683,7 @@ def _transform(self, record, site_record): f"{self.__class__.__name__} source_tag is not set" ) - rec = { - "source": self.source_tag, - "id": site_record.id, - } + rec = {} if self.config.output_summary: self._transform_most_recents(record) @@ -552,6 +696,7 @@ def _transform(self, record, site_record): "alternate_site_id": site_record.alternate_site_id, "latitude": site_record.latitude, "longitude": site_record.longitude, + "horizontal_datum": site_record.horizontal_datum, "elevation": site_record.elevation, "elevation_units": site_record.elevation_units, "well_depth": site_record.well_depth, @@ -561,6 +706,16 @@ def _transform(self, record, site_record): } ) rec.update(record) + + """ + Some analyte records, like BOR, have a field called "id" that is the record's ID. + To allow for the record's "id" to be the site's "id", the record's "id" needs to be updated at the end. + """ + source_id = { + "source": self.source_tag, + "id": site_record.id, + } + rec.update(source_id) return rec def _transform_most_recents(self, record): @@ -570,7 +725,10 @@ def _transform_most_recents(self, record): record["most_recent_time"] = tt p, u = self._get_parameter() record["most_recent_value"] = convert_units( - record["most_recent_value"], record["most_recent_units"], u + record["most_recent_value"], + record["most_recent_units"], + u, + self.config.analyte, ) record["most_recent_units"] = u diff --git a/backend/unifier.py b/backend/unifier.py index 09da609..6fe3815 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -16,6 +16,7 @@ import shapely from backend.config import Config, get_source +from backend.logging import setup_logging from backend.persister import CSVPersister, GeoJSONPersister, CloudStoragePersister from backend.source import BaseSiteSource @@ -40,7 +41,7 @@ def health_check(source: BaseSiteSource) -> bool: def unify_sites(config): - print("Unifying sites") + print("Unifying sites\n") # def func(config, persister): # for source in config.site_sources(): @@ -51,8 +52,8 @@ def unify_sites(config): def unify_analytes(config): - print("Unifying analytes") - config.report() + print("Unifying analytes\n") + # config.report() -- report is done in cli.py, no need to do it twice config.validate() if not config.dry: @@ -62,9 +63,9 @@ def unify_analytes(config): def unify_waterlevels(config): - print("Unifying waterlevels") + print("Unifying waterlevels\n") - config.report() + # config.report() -- report is done in cli.py, no need to do it twice config.validate() if not config.dry: @@ -112,36 +113,55 @@ def _perister_factory(config): def _site_wrapper(site_source, parameter_source, persister, config): try: + # TODO: fully develop checks/discoveries below + # if not site_source.check(): + # print(f"Skipping {site_source}. check failed") - if site_source.check(): - print(f"Skipping {site_source}. check failed") + # schemas = site_source.discover() + # if not schemas: + # print(f"No schemas found for {site_source}") - schemas = site_source.discover() - if not schemas: - print(f"No schemas found for {site_source}") - - # in the future make discover required - # return + # in the future make discover required + # return use_summarize = config.output_summary site_limit = config.site_limit sites = site_source.read() + if not sites: - print(f"No sites found for {site_source}") return - for i, sites in enumerate(site_source.chunks(sites)): - if site_limit and i > site_limit: + sites_with_records_count = 0 + start_ind = 1 + end_ind = 0 + first_flag = True + for sites in site_source.chunks(sites): + if site_limit and sites_with_records_count == site_limit: break + if type(sites) == list: + if first_flag: + end_ind += len(sites) + first_flag = False + else: + start_ind = end_ind + 1 + end_ind += len(sites) + if use_summarize: - summary_records = parameter_source.read(sites, use_summarize) + summary_records = parameter_source.read( + sites, use_summarize, start_ind, end_ind + ) if summary_records: persister.records.extend(summary_records) else: - results = parameter_source.read(sites, use_summarize) - if results is None: + results = parameter_source.read( + sites, use_summarize, start_ind, end_ind + ) + # no records are returned if there is no site record for parameter + # or if the record isn't clean (doesn't have the correct fields) + # don't count these sites to apply to site_limit + if results is None or len(results) == 0: continue if config.output_single_timeseries: @@ -156,13 +176,14 @@ def _site_wrapper(site_source, parameter_source, persister, config): else: persister.timeseries.append((site, records)) persister.sites.append(site) + sites_with_records_count += 1 except BaseException: import traceback exc = traceback.format_exc() - print(exc) - print(f"Failed to unify {site_source}") + config.warn(exc) + config.warn(f"Failed to unify {site_source}") def _unify_parameter( @@ -181,6 +202,7 @@ def _unify_parameter( else: persister.dump_combined(f"{config.output_path}.combined") persister.dump_timeseries(f"{config.output_path}_timeseries") + persister.finalize(config.output_name) @@ -311,8 +333,9 @@ def get_datastreams(): # root.setLevel(logging.DEBUG) # shandler = logging.StreamHandler() # get_sources(Config()) - # waterlevel_unification_test() - analyte_unification_test() + setup_logging() + waterlevel_unification_test() + # analyte_unification_test() # print(health_check("nwis")) # generate_site_bounds() diff --git a/frontend/cli.py b/frontend/cli.py index 1dab0f9..208ca68 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -21,6 +21,10 @@ from backend.constants import ANALYTE_CHOICES from backend.unifier import unify_sites, unify_waterlevels, unify_analytes +from backend.logging import setup_logging + +setup_logging() + @click.group() def cli(): @@ -133,6 +137,23 @@ def cli(): ), ] +TIMESERIES_OPTIONS = [ + click.option( + "--separated_timeseries", + is_flag=True, + default=False, + show_default=True, + help="Output separate timeseries files for every site", + ), + click.option( + "--unified_timeseries", + is_flag=True, + default=False, + show_default=True, + help="Output single timeseries file, which includes all sites", + ), +] + def add_options(options): def _add_options(func): @@ -155,19 +176,14 @@ def wells(bbox, county): @cli.command() -@click.option( - "--timeseries", - is_flag=True, - default=False, - show_default=True, - help="Include timeseries data", -) +@add_options(TIMESERIES_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) def waterlevels( - timeseries, + separated_timeseries, + unified_timeseries, start_date, end_date, bbox, @@ -184,8 +200,13 @@ def waterlevels( site_limit, dry, ): + if separated_timeseries or unified_timeseries: + timeseries = True + else: + timeseries = False config = setup_config("waterlevels", timeseries, bbox, county, site_limit, dry) + config.output_single_timeseries = unified_timeseries config.use_source_nmbgmr = no_amp config.use_source_nwis = no_nwis config.use_source_pvacd = no_pvacd @@ -210,20 +231,15 @@ def waterlevels( @cli.command() @click.argument("analyte", type=click.Choice(ANALYTE_CHOICES)) -@click.option( - "--timeseries", - is_flag=True, - default=False, - show_default=True, - help="Include timeseries data", -) +@add_options(TIMESERIES_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) @add_options(SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) def analytes( analyte, - timeseries, + separated_timeseries, + unified_timeseries, start_date, end_date, bbox, @@ -240,11 +256,16 @@ def analytes( site_limit, dry, ): + if separated_timeseries or unified_timeseries: + timeseries = True + else: + timeseries = False config = setup_config( f"analytes ({analyte})", timeseries, bbox, county, site_limit, dry ) config.analyte = analyte + config.output_single_timeseries = unified_timeseries config.use_source_nmbgmr = no_amp config.use_source_nwis = no_nwis config.use_source_pvacd = no_pvacd diff --git a/release.sh b/release.sh index 746bc53..5c5c4ad 100755 --- a/release.sh +++ b/release.sh @@ -1,9 +1,9 @@ git stash git checkout main git pull -git merge feature/jir +git merge dev/jir git tag $1 git push git push origin $1 -git checkout feature/jir +git checkout dev/jir git stash pop diff --git a/requirements.txt b/requirements.txt index 9e67900..4e9f7c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ pandas geopandas frost_sta_client google-cloud-storage -pytest \ No newline at end of file +pytest +urllib3>=2.2.0,<3.0.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 6ff673b..8313973 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ setup( name="nmuwd", - version="0.0.23", + version="0.2.0", author="Jake Ross", description="New Mexico Water Data Integration Engine", long_description=long_description, diff --git a/tests/test_unifier.py b/tests/test_unifier.py index a2cd59a..3947ef6 100644 --- a/tests/test_unifier.py +++ b/tests/test_unifier.py @@ -120,6 +120,7 @@ def _test_waterlevels_timeseries( d = _setup_waterlevels(tmp_path, cfg, source) combined = d / "output.combined.csv" timeseries = d / "output_timeseries" + print(combined_flag) print("combined", combined.is_file(), combined_flag) assert combined.is_file() == combined_flag @@ -129,13 +130,15 @@ def _test_waterlevels_timeseries( return combined, timeseries -def _test_waterelevels_timeseries_date_range(tmp_path, cfg, source): +def _test_waterelevels_timeseries_date_range( + tmp_path, cfg, source, timeseries_flag=True, combined_flag=False +): combined, timeseries = _test_waterlevels_timeseries( tmp_path, cfg, source, - timeseries_flag=True, - combined_flag=False, + timeseries_flag=timeseries_flag, + combined_flag=combined_flag, ) for p in timeseries.iterdir(): @@ -351,11 +354,13 @@ def test_unify_waterlevels_ose_roswell_summary(tmp_path, waterlevel_summary_cfg) # Waterlevel timeseries tests ========================================================================================= def test_unify_waterlevels_nwis_timeseries(tmp_path, waterlevel_timeseries_cfg): + # there are one or more locations within the bounding box that have only + # one record, so there is a combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, "nwis", - combined_flag=False, + combined_flag=True, timeseries_flag=True, ) @@ -365,6 +370,8 @@ def test_unify_waterlevels_amp_timeseries(tmp_path, waterlevel_timeseries_cfg): def test_unify_waterlevels_pvacd_timeseries(tmp_path, waterlevel_timeseries_cfg): + # all locations within the bounding box have more than one record + # so there is no combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, @@ -377,6 +384,8 @@ def test_unify_waterlevels_pvacd_timeseries(tmp_path, waterlevel_timeseries_cfg) def test_unify_waterlevels_isc_seven_rivers_timeseries( tmp_path, waterlevel_timeseries_cfg ): + # all locations within the bounding box have more than one record + # so there is no combined file _test_waterlevels_timeseries( tmp_path, waterlevel_timeseries_cfg, @@ -400,22 +409,40 @@ def test_waterlevels_nwis_summary_date_range(tmp_path, waterlevel_summary_cfg): # Waterlevel timeseries date range ==================================================================================== def test_waterlevels_nwis_timeseries_date_range(tmp_path, waterlevel_timeseries_cfg): + # there are one or more locations within the bounding box and date range + # that have only one record, so there is a combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "nwis" + tmp_path, + waterlevel_timeseries_cfg, + "nwis", + timeseries_flag=True, + combined_flag=True, ) def test_waterlevels_isc_seven_rivers_timeseries_date_range( tmp_path, waterlevel_timeseries_cfg ): + # all locations within the bounding box and date rangehave more than one + # record so there is no combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "iscsevenrivers" + tmp_path, + waterlevel_timeseries_cfg, + "iscsevenrivers", + timeseries_flag=True, + combined_flag=False, ) def test_waterlevels_pvacd_timeseries_date_range(tmp_path, waterlevel_timeseries_cfg): + # all locations within the bounding box and date rangehave more than one + # record so there is no combined file _test_waterelevels_timeseries_date_range( - tmp_path, waterlevel_timeseries_cfg, "pvacd" + tmp_path, + waterlevel_timeseries_cfg, + "pvacd", + timeseries_flag=True, + combined_flag=False, ) @@ -429,6 +456,9 @@ def test_unify_analytes_amp_summary(tmp_path, analyte_summary_cfg): def test_unify_analytes_bor_summary(tmp_path, analyte_summary_cfg): + # BOR locations are found within Otero County + analyte_summary_cfg.county = "otero" + analyte_summary_cfg.bbox = None _test_analytes_summary(tmp_path, analyte_summary_cfg, "bor")