diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml index a07045c..ce4c0a5 100644 --- a/.github/workflows/publish-to-pypi.yml +++ b/.github/workflows/publish-to-pypi.yml @@ -1,12 +1,15 @@ name: Publish Python 🐍 distributions 📦 to PyPI and TestPyPI on: - push: - tags: - - '*' + pull_request: + branches: + - main + types: + - closed jobs: - build-n-publish: + build-and-publish-if-merged: + if: github.event.pull_request.merged == true name: Build and publish Python 🐍 distributions 📦 to PyPI and TestPyPI runs-on: ubuntu-latest permissions: @@ -36,3 +39,5 @@ jobs: - name: Publish distribution 📦 to PyPI if: startsWith(github.ref, 'refs/tags') uses: pypa/gh-action-pypi-publish@release/v1 + with: + attestations: false diff --git a/.gitignore b/.gitignore index 5c03936..cc04cc7 100644 --- a/.gitignore +++ b/.gitignore @@ -171,10 +171,4 @@ cython_debug/ #.idea/ # outputs -output_timeseries -output.combined.csv -output.csv -output.sites.csv -output.timeseries.csv -output.logs.txt -output.warnings.txt \ No newline at end of file +output* \ No newline at end of file diff --git a/README.md b/README.md index 5a9dd9a..0a05895 100644 --- a/README.md +++ b/README.md @@ -18,90 +18,151 @@ pip install nmuwd ## Sources Data comes from the following sources. We are continuously adding new sources as we learn of them and they become available. If you have data that you would like to be part of the Data Integration Engine please get in touch at newmexicowaterdata@nmt.edu. - - [Bureau of Reclamation](https://data.usbr.gov/) - - [USGS (NWIS)](https://waterdata.usgs.gov/nwis) - - [ST2 (NMWDI)](https://st2.newmexicowaterdata.org/FROST-Server/v1.1/) - - Pecos Valley Artesian Conservancy District - - Bernalillo County - - New Mexico Environment Department Drinking Water Bureau - - [NM Water Data CKAN catalog](https://catalog.newmexicowaterdata.org/) - - OSE Roswell District Office - - ISC Seven Rivers - - [New Mexico Bureau of Geology and Mineral Resources (AMP)](https://waterdata.nmt.edu/) - - [Water Quality Portal](https://www.waterqualitydata.us/) - - USGS - - EPA - - and over 400 state, federal, tribal, and local agencies - - -### Source Inclusion & Exclusion -The Data Integration Engine enables the user to obtain groundwater level and groundwater quality data from a variety of sources. Data from sources are included in the output unless specifically excluded. The following flags are available to exclude a specific data source: - -- `--no-amp` to exclude New Mexico Bureau of Geology and Mineral Resources Aquifer Mapping Program (AMP) data -- `--no-bor` to exclude Bureaof of Reclamation data -- `--no-nwis` to exclude USGS NWIS data -- `--no-pvacd` to exclude Pecos Valley Artesian Convservancy District (PVACD) data -- `--no-isc-seven-rivers` to exclude Interstate Stream Commission (ISC) Seven Rivers data -- `--no-wqp` to exclude Water Quality Portal (WQP) data -- `--no-ckan` to exclude NM OSE Roswell data that is hosted on CKAN -- `--no-dwb` to exclude New Mexico Environment Department Drinking Water Bureau (DWB) data -- `--no-bernco` to exclude Bernalillo County (BernCo) data - -### Water Levels - -To obtain groundwater levels, use - +- [Bernalillo County (BernCo)](https://st2.newmexicowaterdata.org/FROST-Server/v1.1/Locations?$filter=properties/agency%20eq%20%27BernCo%27) + - Available data: `water levels` +- [Bureau of Reclamation (BoR)](https://data.usbr.gov/) + - Available data: `water quality` +- [New Mexico Bureau of Geology and Mineral Resources (NMBGMR) Aquifer Mapping Program (AMP)](https://waterdata.nmt.edu/) + - Available data: `water levels`, `water quality` +- [New Mexico Environment Department Drinking Water Bureau (NMED DWB)](https://nmenv.newmexicowaterdata.org/FROST-Server/v1.1/) + - Available data: `water quality` +- [New Mexico Office of the State Engineer ISC Seven Rivers (NMOSE ISC Seven Rivers)](https://nmisc-wf.gladata.com/api/getMonitoringPoints.ashx) + - Available data: `water levels`, `water quality` +- [New Mexico Office of the State Engineer Roswell District Office (NMOSE Roswell)](https://catalog.newmexicowaterdata.org/dataset/pecos_region_manual_groundwater_levels) + - Available data: `water levels` +- [Pecos Valley Artesian Conservancy District (PVACD)](https://st2.newmexicowaterdata.org/FROST-Server/v1.1/Locations?$filter=properties/agency%20eq%20%27PVACD%27) + - Available data: `water levels` +- [USGS (NWIS)](https://waterdata.usgs.gov/nwis) + - Available data: `water levels` +- [Water Quality Portal (WQP)](https://www.waterqualitydata.us/) + - Available data: `water quality` + +## Usage + +### Parameter Data + +To obtain parameter summary or time series data, use ``` -weave waterlevels +die weave {parameter} ``` -followed by the desired output type, source filters, date filters, geographic filters, and excluded data sources. - -#### Available Data Sources -The following data sources are available for groundwater levels: - -- amp -- bor -- ckan -- dwb -- isc-seven-rivers -- nwis -- pvacd -- bernco +where `{parameter}` is the name of the parameter whose data is to be retrieved, followed by the desired output type, excluded data sources, date filters, and geographic filters. `{parameter}` is case-insensitive. + + +#### Available Parameters +The following parameters are currently available for retrieval: +- waterlevels +- arsenic +- bicarbonate +- calcium +- carbonate +- chloride +- magnesium +- nitrate +- ph +- potassium +- silica +- sodium +- sulfate +- tds +- uranium + +### Output +The `--output` option is required and used to set the output type: -### Water Quality -To obtain groundwater quality, use +``` +--output summary +``` +- A summary table consisting of location information as well as summary statistics for the parameter of interest for every location that has observations. ``` -weave analytes {analyte} +--output timeseries_unified ``` +- A single table consisting of time series data for all locations for the parameter of interest. +- A single table of site data that contains information such as latitude, longitude, and elevation -where `{analyte}` is the name of the analyte whose data is to be retrieved. +``` +--output timeseries_separated +``` +- Separate time series tables for all locations for the parameter of interest. +- A single table of site data that contains information such as latitude, longitude, and elevation + +The data is saved to a directory titled `output` in the current working directory. If the directory `output` already exists, then the output directory will be called `output_1`. If enumerated output directories already exist, then the output directory will be called `output_{n}` where `n` is equal to the greatest existing integer suffix +1. + +A log of the inputs and processes, called `die.log`, is also saved to the output directory. + +#### Summary Table + +| field/header | description | data type | always present | +| :----------- | :---------- | :-------- | :------------- | +| source | the organization/source for the site | string | Y | +| id | the id of the site. The id is used as the key to join the site and timeseries tables | string | Y | +| location | the colloquial name for the site | string | Y | +| usgs_site_id | USGS site id | string | N | +| alternate_site_id | alternate site id | string | N | +| latitude | latitude in decimal degrees | float | Y | +| longitude | longitude in decimal degrees | float | Y | +| horizontal_datum | horizontal datum of the latitude and longitude. Defaults to WGS84 | string | Y | +| elevation | ground surface elevation of the site | float | Y | +| elevation_units | the units of the ground surface elevation. Defaults to ft | string | Y | +| well_depth | depth of well | float | N | +| well_depth_units | units of well depth. Defaults to ft | float | N | +| parameter | the name of the parameter whose measurements are reported in the table | string | Y | +| pramater_units | units of the observation | float | Y | +| nrecords | number of records at the site for the parameter | integer | Y | +| min | the minimum observation | float | Y | +| max | the maximum observation | float | Y | +| mean | the mean value of the observations | float | Y | +| most_recent_date| date of most recent record in YYYY-MM-DD | string | Y | +| most_recent_time | time of most recent record in HH:MM:SS or HH:MM:SS.mmm | string | N | +| most_recent_value | value of the most recent record | float | Y | +| most_recent_units | units of the most recent record | string | Y | + + +#### Sites Table + +| field/header | description | data type | always present | +| :----------- | :---------- | :-------- | :------------- | +| source | the organization/source for the site | string | Y | +| id | the id of the site. The id is used as the key to join the site and timeseries tables | string | Y | +| name | the colloquial name for the site | string | Y | +| latitude | latitude in decimal degrees | float | Y | +| longitude | longitude in decimal degrees | float | Y | +| elevation | ground surface elevation of the site | float | Y | +| elevation_units | the units of the ground surface elevation. Defaults to ft | string | Y | +| horizontal_datum | horizontal datum of the latitude and longitude. Defaults to WGS84 | string | Y | +| vertical_datum | vertical datum of the elevation | string | N | +| usgs_site_id | USGS site id | string | N | +| alternate_site_id | alternate site id | string | N | +| formation | geologic formation in which the well terminates | string | N | +| aquifer | aquifer from which the well draws water | string | N | +| well_depth | depth of well | float | N | + + +#### Time Series Table(s) + +| field/header | description | data type | always present | +| :----------- | :---------- | :-------- | :------------- | +| source | the organization/source for the site | string | Y | +| id | the id of the site. The id is used as the key to join the site and timeseries tables | string | Y | +| parameter | the name of the parameter whose measurements are reported in the table | string | Y | +| parameter_value | value of the observation | float | Y | +| pramater_units | units of the observation | float | Y | +| date_measured | date of measurement in YYYY-MM-DD | string | Y | +| time_measured | time of measurement in HH:MM:SS or HH:MM:SS.mmm | string | N | -#### Available Analytes -The following analytes are currently available for retrieval: -- Arsenic -- Bicarbonate -- Calcium -- Carbonate -- Chloride -- Magnesium -- Nitrate -- pH -- Potassium -- Silica -- Sodium -- Sulfate -- TDS -- Uranium +### Source Inclusion & Exclusion +The Data Integration Engine enables the user to obtain groundwater level and groundwater quality data from a variety of sources. Data from sources are automatically included in the output if available unless specifically excluded. The following flags are available to exclude specific data sources: -#### Available Data Sources -The follow data sources are available for analytes, though not every source has measurements for every analyte: -- bor -- wqp -- isc-seven-rivers -- amp -- dwb +- `--no-bernco` to exclude Bernalillo County (BernCo) data +- `--no-bor` to exclude Bureaof of Reclamation (Bor) data +- `--no-nmbgmr-amp` to exclude New Mexico Bureau of Geology and Mineral Resources (NMBGMR) Aquifer Mapping Program (AMP) data +- `--no-nmed-dwb` to exclude New Mexico Environment Department (NMED) Drinking Water Bureau (DWB) data +- `--no-nmose-isc-seven-rivers` to exclude New Mexico Office of State Engineer (NMOSE) Interstate Stream Commission (ISC) Seven Rivers data +- `--no-nmose-roswell` to exclude New Mexico Office of State Engineer (NMOSE) Roswell data +- `--no-nwis` to exclude USGS NWIS data +- `--no-pvacd` to exclude Pecos Valley Artesian Convservancy District (PVACD) data +- `--no-wqp` to exclude Water Quality Portal (WQP) data ### Geographic Filters @@ -127,121 +188,22 @@ The following flags can be used to filter by dates: --end-date YYYY-MM-DD ``` -## Output -The data is saved to the current working directory. A log of the inputs and processes, called `die.log`, is also saved to the current working directory. If a subsquent process is run and the log from the previous process has not been moved or stored elsewhere, the log for the subsequent process will be appended to the existing log. - -### Timeseries Data -The flag `--separated_timeseries` exports timeseries for every location in their own file in the directory output_series (e.g. `AB-0002.csv`, `AB-0003.csv`). Locations with only one observation are gathered and exported to the file `output.combined.csv`. - -The flag `--unified_timeseries` exports all timeseries for all locations in one file titled `output.timeseries.csv`. It also exports a file titled `output.sites.csv` that contains site information, such as latitude, longitude, and elevation. - -#### Table Headers: Unified - -The table headers for unified timeseries data are as follows: - -**output.sites.csv** -- `source`: the organization/source for the site -- `id`: the id of the site. The id is used as the key to join the output.timeseries.csv table -- `name`: the colloquial name for the site if it exists -- `latitude`: latitude in decimal degrees -- `longitude`: the longitude in decimal degrees -- `elevation` ground surface elevation of the site in feet -- `elevation_units`: the units of the ground surface elevation. Defaults to ft -- `horizontal_datum`: horizontal datum of the latitude and longitude. Defaults to WGS84 -- `vertical_datum`: the vertical datum of the elevation -- `usgs_site_id`: USGS site id if it exists -- `alternate_site_id`: alternate site id if it exists -- `formation`: geologic formation in which the well terminates if it exists -- `aquifer`: aquifer from which the well draws water if it exists -- `well_depth`: depth of well if it exists - -**output.timeseries.csv - waterlevels** -- `source`: the organization/sources for the site -- `id`: the id of the site. The id is used as the key to join the output.sites.csv table -- `depth_to_water_ft_below_ground_surface`: depth to water below ground surface in ft -- `date_measured`: date of measurement in YYYY-MM-DD format -- `time_measured`: time of measurement if it exists - -**output.timeseries.csv - analytes** -- `source`: the organization/sources for the site -- `id`: the id of the site. The id is used as the key to join the output.sites.csv table -- `parameter`: the name of the analyte whose measurements are reported in the table. This corresponds the requested analyte -- `parameter_value`: value of the measurement -- `parameter_units`: units of the measurement -- `date_measured`: date of measurement in YYYY-MM-DD format -- `time_measured`: time of measurement if it exists - -#### Table Headers: Separated - -The files for the individual sites contain the same headers as **output.timeseries.csv** from the unified time series tables. - -**output.combined.csv - waterlevels** -- `source`: the organization/source for the site -- `id`: the id of the site. The id is used as the key to join the output.timeseries.csv table -- `name`: the colloquial name for the site if it exists -- `latitude`: latitude in decimal degrees -- `longitude`: the longitude in decimal degrees -- `elevation` ground surface elevation of the site in feet -- `elevation_units`: the units of the ground surface elevation. Defaults to ft -- `horizontal_datum`: horizontal datum of the latitude and longitude. Defaults to WGS84 -- `vertical_datum`: the vertical datum of the elevation -- `usgs_site_id`: USGS site id if it exists -- `alternate_site_id`: alternate site id if it exists -- `formation`: geologic formation in which the well terminates if it exists -- `aquifer`: aquifer from which the well draws water if it exists -- `well_depth`: depth of well if it exists -- `depth_to_water_ft_below_ground_surface`: depth to water below ground surface in ft -- `date_measured`: date of measurement in YYYY-MM-DD format -- `time_measured`: time of measurement if it exists - -**output.combined.csv - analytes** -- `source`: the organization/source for the site -- `id`: the id of the site. The id is used as the key to join the output.timeseries.csv table -- `name`: the colloquial name for the site if it exists -- `latitude`: latitude in decimal degrees -- `longitude`: the longitude in decimal degrees -- `elevation` ground surface elevation of the site in feet -- `elevation_units`: the units of the ground surface elevation. Defaults to ft -- `horizontal_datum`: horizontal datum of the latitude and longitude. Defaults to WGS84 -- `vertical_datum`: the vertical datum of the elevation -- `usgs_site_id`: USGS site id if it exists -- `alternate_site_id`: alternate site id if it exists -- `formation`: geologic formation in which the well terminates if it exists -- `aquifer`: aquifer from which the well draws water if it exists -- `well_depth`: depth of well if it exists -- `parameter`: the name of the analyte whose measurements are reported in the table. This corresponds the requested analyte -- `parameter_value`: value of the measurement -- `parameter_units`: units of the measurement -- `date_measured`: date of measurement in YYYY-MM-DD format -- `time_measured`: time of measurement if it exists - -### Summary Data - -If neither of the above flags are specified, a summary table called `output.csv` is exported. The summary table consists of location information as well as summary statistics for the parameter of interest for every location that has observations. - -#### Table Headers: Summary - -**output.csv - waterlevels and analytes** -- `source`: the organization/source for the site -- `id`: the id of the site. The id is used as the key to join the output.timeseries.csv table -- `location`: the colloquial name for the site if it exists -- `usgs_site_id`: USGS site id if it exists -- `alternate_site_id`: alternate site id if it exists -- `latitude`: latitude in decimal degrees -- `longitude`: the longitude in decimal degrees -- `horizontal_datum`: horizontal datum of the latitude and longitude. Defaults to WGS84 -- `elevation` ground surface elevation of the site in feet -- `elevation_units`: the units of the ground surface elevation. Defaults to ft -- `well_depth`: depth of well if it exists -- `well_depth_units`: units of well depth. Defaults to ft -- `parameter`: the name of the analyte whose measurements are reported in the table. This corresponds the requested analyte -- `parameter_value`: value of the measurement -- `parameter_units`: units of the measurement -- `nrecords`: the number of records for the site -- `min`: the minimum record for the site -- `max`: the maximum record for the site -- `mean`: the mean value for the records at the site -- `most_recent_date`: date of most recent record -- `most_recent_time`: time of most recent record if it exists -- `most_recent_value` the value of the most recent record -- `most_recent_units`: the units of the most recent record \ No newline at end of file +### Source Enumeration [In Development] + +Use + +``` +die sources {parameter} +``` + +to print the sources that report that parameter to the terminal. + +### Wells [In Development] + +Use + +``` +die wells +``` + +to print wells to the terminal. \ No newline at end of file diff --git a/backend/config.py b/backend/config.py index 69af728..59d474e 100644 --- a/backend/config.py +++ b/backend/config.py @@ -57,37 +57,37 @@ from .connectors.wqp.source import WQPSiteSource, WQPAnalyteSource SOURCE_KEYS = ( - "nmbgmr", - "wqp", - "iscsevenrivers", + "bernco", + "bor", + "nmbgmr_amp", + "nmed_dwb", + "nmose_isc_seven_rivers", + "nmose_roswell", "nwis", - "oseroswell", "pvacd", - "bor", - "dwb", - "bernco", + "wqp", ) def get_source(source): - if source == "nmbgmr": + if source == "bernco": + return BernCoSiteSource() + elif source == "bor": + return BORSiteSource() + elif source == "nmbgmr_amp": return NMBGMRSiteSource() - elif source == "wqp": - return WQPSiteSource() - elif source == "iscsevenrivers": + elif source == "nmed_dwb": + return DWBSiteSource() + elif source == "nmose_isc_seven_rivers": return ISCSevenRiversSiteSource() + elif source == "nmose_roswell": + return OSERoswellSiteSource(HONDO_RESOURCE_ID) elif source == "nwis": return NWISSiteSource() - elif source == "oseroswell": - return OSERoswellSiteSource(HONDO_RESOURCE_ID) elif source == "pvacd": return PVACDSiteSource() - elif source == "bor": - return BORSiteSource() - elif source == "dwb": - return DWBSiteSource() - elif source == "bernco": - return BernCoSiteSource() + elif source == "wqp": + return WQPSiteSource() return None @@ -106,27 +106,29 @@ class Config(Loggable): wkt: str = "" # sources - use_source_nmbgmr: bool = True - use_source_wqp: bool = True - use_source_iscsevenrivers: bool = True + use_source_bernco: bool = True + use_source_bor: bool = True + use_source_nmbgmr_amp: bool = True + use_source_nmed_dwb: bool = True + use_source_nmose_isc_seven_rivers: bool = True + use_source_nmose_roswell: bool = True use_source_nwis: bool = True - use_source_oseroswell: bool = True use_source_pvacd: bool = True - use_source_bor: bool = True - use_source_dwb: bool = True - use_source_bernco: bool = True + use_source_wqp: bool = True - analyte: str = "" + # parameter + parameter: str = "" # output use_cloud_storage: bool = False - output_dir: str = "" + output_dir: str = "." output_name: str = "output" output_horizontal_datum: str = WGS84 output_elevation_units: str = FEET output_well_depth_units: str = FEET output_summary: bool = False - output_single_timeseries: bool = False + output_timeseries_unified: bool = False + output_timeseries_separated: bool = False latest_water_level_only: bool = False @@ -157,13 +159,16 @@ def __init__(self, model=None, payload=None): self.wkt = payload.get("wkt", "") self.county = payload.get("county", "") self.output_summary = payload.get("output_summary", False) + self.output_timeseries_unified = payload.get( + "output_timeseries_unified", False + ) + self.output_timeseries_separated = payload.get( + "output_timeseries_separated", False + ) self.output_name = payload.get("output_name", "output") self.start_date = payload.get("start_date", "") self.end_date = payload.get("end_date", "") - self.analyte = payload.get("analyte", "") - self.output_single_timeseries = payload.get( - "output_single_timeseries", False - ) + self.parameter = payload.get("parameter", "") for s in SOURCE_KEYS: setattr(self, f"use_source_{s}", s in payload.get("sources", [])) @@ -171,34 +176,29 @@ def __init__(self, model=None, payload=None): def analyte_sources(self): sources = [] - # if self.use_source_wqp: - # sources.append((WQPSiteSource, WQPAnalyteSource)) if self.use_source_bor: sources.append((BORSiteSource(), BORAnalyteSource())) if self.use_source_wqp: sources.append((WQPSiteSource(), WQPAnalyteSource())) - if self.use_source_iscsevenrivers: + if self.use_source_nmose_isc_seven_rivers: sources.append((ISCSevenRiversSiteSource(), ISCSevenRiversAnalyteSource())) - if self.use_source_nmbgmr: + if self.use_source_nmbgmr_amp: sources.append((NMBGMRSiteSource(), NMBGMRAnalyteSource())) - if self.use_source_dwb: + if self.use_source_nmed_dwb: sources.append((DWBSiteSource(), DWBAnalyteSource())) for s, ss in sources: s.set_config(self) ss.set_config(self) - # s.config = self - # ss.config = self - return sources def water_level_sources(self): sources = [] - if self.use_source_nmbgmr: + if self.use_source_nmbgmr_amp: sources.append((NMBGMRSiteSource(), NMBGMRWaterLevelSource())) - if self.use_source_iscsevenrivers: + if self.use_source_nmose_isc_seven_rivers: sources.append( (ISCSevenRiversSiteSource(), ISCSevenRiversWaterLevelSource()) ) @@ -206,7 +206,7 @@ def water_level_sources(self): if self.use_source_nwis: sources.append((NWISSiteSource(), NWISWaterLevelSource())) - if self.use_source_oseroswell: + if self.use_source_nmose_roswell: sources.append( ( OSERoswellSiteSource(HONDO_RESOURCE_ID), @@ -227,11 +227,8 @@ def water_level_sources(self): ) if self.use_source_pvacd: sources.append((PVACDSiteSource(), PVACDWaterLevelSource())) - # sources.append((EBIDSiteSource, EBIDWaterLevelSource)) if self.use_source_bernco: sources.append((BernCoSiteSource(), BernCoWaterLevelSource())) - # if self.use_source_bor: - # sources.append((BORSiteSource(), BORWaterLevelSource())) for s, ss in sources: s.set_config(self) @@ -239,36 +236,6 @@ def water_level_sources(self): return sources - # def site_sources(self): - # sources = [ - # NMBGMRSiteSource(), - # WQPSiteSource(), - # ISCSevenRiversSiteSource(), - # NWISSiteSource(), - # DWBSiteSource(), - # BORSiteSource(), - # PVACDSiteSource(), - # EBIDSiteSource(), - # OSERoswellSiteSource(HONDO_RESOURCE_ID), - # OSERoswellSiteSource(FORT_SUMNER_RESOURCE_ID), - # OSERoswellSiteSource(ROSWELL_RESOURCE_ID), - # ] - # - # # if self.use_source_nmbgmr: - # # sources.append(NMBGMRSiteSource) - # # if self.use_source_isc_seven_rivers: - # # sources.append(ISCSevenRiversSiteSource) - # # if self.use_source_ose_roswell: - # # sources.append(OSERoswellSiteSource) - # # if self.use_source_nwis: - # # sources.append(USGSSiteSource) - # # if self.use_source_st2: - # # sources.append(PVACDSiteSource) - # # sources.append(EBIDSiteSource) - # # if self.use_source_bor: - # # sources.append(BORSiteSource) - # return sources - def bbox_bounding_points(self, bbox=None): if bbox is None: bbox = self.bbox @@ -340,7 +307,7 @@ def _report_attributes(title, attrs): "county", "bbox", "wkt", - "analyte", + "parameter", "site_limit", ] + sources # inputs @@ -353,10 +320,10 @@ def _report_attributes(title, attrs): _report_attributes( "Outputs", ( - "output_dir", - "output_name", + "output_path", "output_summary", - "output_single_timeseries", + "output_timeseries_unified", + "output_timeseries_separated", "output_horizontal_datum", "output_elevation_units", ), @@ -415,6 +382,45 @@ def _validate_county(self): return True + def _update_output_name(self): + """ + Generate a unique output name based on existing directories in the output directory. + + If there are no directories with the string "output" in their name, the output name will be "output". + + If there is a directory called "output", then output_name will be "output_1". + + If there are directories called "output_{n}" where n is an integer, then output_name will be "output_{m+1}" + where m is the highest integer in the existing directories. + """ + output_name = self.output_name + + # find if there are already directories with the string "output" their names + output_names = [ + name + for name in os.listdir(self.output_dir) + if os.path.isdir(name) and output_name in name + ] + + if len(output_names) > 0: + max_count = 0 + # find the highest number appended to directories with "output" in their name + counts = [ + name.split("_")[-1] + for name in output_names + if name.split("_")[-1].isdigit() + ] + counts = [int(count) for count in counts] + if len(counts) > 0: + max_count = max(counts) + output_name = f"{output_name}_{max_count + 1}" + + self.output_name = output_name + + def _make_output_path(self): + if not os.path.exists(self.output_path): + os.mkdir(self.output_path) + @property def start_dt(self): return self._extract_date(self.start_date) diff --git a/backend/connectors/bor/source.py b/backend/connectors/bor/source.py index 1007bed..190e268 100644 --- a/backend/connectors/bor/source.py +++ b/backend/connectors/bor/source.py @@ -44,6 +44,9 @@ class BORSiteSource(BaseSiteSource): transformer_klass = BORSiteTransformer + def __repr__(self): + return "BORSiteSource" + def health(self): try: self.get_records() @@ -66,6 +69,9 @@ class BORAnalyteSource(BaseAnalyteSource): transformer_klass = BORAnalyteTransformer _catalog_item_idx = None + def __repr__(self): + return "BORAnalyteSource" + def _extract_parameter_record(self, record): record[PARAMETER_VALUE] = record["attributes"]["result"] record[PARAMETER_UNITS] = record["attributes"]["resultAttributes"]["units"] @@ -102,9 +108,14 @@ def _reorder_catalog_items(self, items): return items def get_records(self, site_record): - code = get_analyte_search_param(self.config.analyte, BOR_ANALYTE_MAPPING) + code = get_analyte_search_param(self.config.parameter, BOR_ANALYTE_MAPPING) + + catalog_record_data = self._execute_json_request( + f"https://data.usbr.gov{site_record.catalogRecords[0]['id']}" + ) + catalog_items = catalog_record_data["relationships"]["catalogItems"]["data"] - for i, item in enumerate(self._reorder_catalog_items(site_record.catalogItems)): + for i, item in enumerate(self._reorder_catalog_items(catalog_items)): data = self._execute_json_request(f'https://data.usbr.gov{item["id"]}') if not data: diff --git a/backend/connectors/bor/transformer.py b/backend/connectors/bor/transformer.py index 2c4f263..4dd81a3 100644 --- a/backend/connectors/bor/transformer.py +++ b/backend/connectors/bor/transformer.py @@ -14,6 +14,7 @@ # limitations under the License. # =============================================================================== import pprint +import json from backend.record import SiteRecord, WaterLevelRecord, AnalyteSummaryRecord from backend.transformer import ( @@ -57,7 +58,6 @@ def _transform(self, record): "well_depth": WELL_DEPTHS.get(props["_id"]), "well_depth_units": "ft", "catalogRecords": record["relationships"]["catalogRecords"]["data"], - "catalogItems": record["relationships"]["catalogItems"]["data"], } return rec diff --git a/backend/connectors/ckan/source.py b/backend/connectors/ckan/source.py index da3f373..90d599f 100644 --- a/backend/connectors/ckan/source.py +++ b/backend/connectors/ckan/source.py @@ -31,7 +31,15 @@ OSERoswellSiteTransformer, OSERoswellWaterLevelTransformer, ) -from backend.constants import FEET, DTW, DTW_UNITS, DT_MEASURED +from backend.constants import ( + FEET, + DTW, + DTW_UNITS, + DT_MEASURED, + PARAMETER, + PARAMETER_UNITS, + PARAMETER_VALUE, +) from backend.source import ( BaseSource, BaseSiteSource, @@ -92,6 +100,9 @@ def __init__(self, resource_id, **kw): elif resource_id == ROSWELL_RESOURCE_ID: self.bounding_polygon = OSE_ROSWELL_ROSWELL_BOUNDING_POLYGON + def __repr__(self): + return "NMOSERoswellSiteSource" + def health(self): params = self._get_params() params["limit"] = 1 @@ -112,6 +123,9 @@ def _parse_response(self, resp): class OSERoswellWaterLevelSource(OSERoswellSource, BaseWaterLevelSource): transformer_klass = OSERoswellWaterLevelTransformer + def __repr__(self): + return "NMOSERoswellWaterLevelSource" + def get_records(self, site_record): return self._parse_response(site_record, self.get_response()) @@ -130,9 +144,10 @@ def _extract_parameter_dates(self, records: list) -> list: return [r["Date"] for r in records] def _extract_parameter_record(self, record): - record[DTW] = float(record["DTWGS"]) + record[PARAMETER] = DTW + record[PARAMETER_VALUE] = float(record["DTWGS"]) + record[PARAMETER_UNITS] = FEET record[DT_MEASURED] = record["Date"] - record[DTW_UNITS] = FEET return record def _clean_records(self, records: list) -> list: diff --git a/backend/connectors/isc_seven_rivers/source.py b/backend/connectors/isc_seven_rivers/source.py index 46dbc7c..550f961 100644 --- a/backend/connectors/isc_seven_rivers/source.py +++ b/backend/connectors/isc_seven_rivers/source.py @@ -29,6 +29,7 @@ DT_MEASURED, DTW_UNITS, DTW, + PARAMETER, PARAMETER_VALUE, PARAMETER_UNITS, ) @@ -72,6 +73,9 @@ class ISCSevenRiversSiteSource(BaseSiteSource): transformer_klass = ISCSevenRiversSiteTransformer bounding_polygon = ISC_SEVEN_RIVERS_BOUNDING_POLYGON + def __repr__(self): + return "ISCSevenRiversSiteSource" + def health(self): try: self.get_records() @@ -90,6 +94,9 @@ class ISCSevenRiversAnalyteSource(BaseAnalyteSource): transformer_klass = ISCSevenRiversAnalyteTransformer _analyte_ids = None + def __repr__(self): + return "ISCSevenRiversAnalyteSource" + def _get_analyte_id(self, analyte): """ """ if self._analyte_ids is None: @@ -103,6 +110,7 @@ def _get_analyte_id(self, analyte): return self._analyte_ids.get(analyte) def _extract_parameter_record(self, record): + record[PARAMETER] = self.config.parameter record[PARAMETER_VALUE] = record["result"] record[PARAMETER_UNITS] = record["units"] record[DT_MEASURED] = get_datetime(record) @@ -131,7 +139,7 @@ def _extract_parameter_dates(self, records: list) -> list: def get_records(self, site_record): config = self.config - analyte_id = self._get_analyte_id(config.analyte) + analyte_id = self._get_analyte_id(config.parameter) if analyte_id: params = { "monitoringPointId": site_record.id, @@ -166,8 +174,9 @@ def _clean_records(self, records): return [r for r in records if r["depthToWaterFeet"] is not None] def _extract_parameter_record(self, record): - record[DTW] = record["depthToWaterFeet"] - record[DTW_UNITS] = FEET + record[PARAMETER] = DTW + record[PARAMETER_VALUE] = record["depthToWaterFeet"] + record[PARAMETER_UNITS] = FEET record[DT_MEASURED] = get_datetime(record) return record diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index ba58c0a..90e5a4c 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -60,6 +60,9 @@ class NMBGMRSiteSource(BaseSiteSource): chunk_size = 100 bounding_polygon = NM_STATE_BOUNDING_POLYGON + def __repr__(self): + return "NMBGMRSiteSource" + def health(self): resp = self._execute_json_request( _make_url("locations"), tag="features", params={"limit": 1} @@ -75,9 +78,9 @@ def get_records(self): if config.site_limit: params["limit"] = config.site_limit - if config.analyte: + if config.parameter.lower() != "waterlevels": params["parameter"] = get_analyte_search_param( - config.analyte, NMBGMR_ANALYTE_MAPPING + config.parameter, NMBGMR_ANALYTE_MAPPING ) else: params["parameter"] = "Manual groundwater levels" @@ -107,8 +110,13 @@ def get_records(self): class NMBGMRAnalyteSource(BaseAnalyteSource): transformer_klass = NMBGMRAnalyteTransformer + def __repr__(self): + return "NMBGMRAnalyteSource" + def get_records(self, site_record): - analyte = get_analyte_search_param(self.config.analyte, NMBGMR_ANALYTE_MAPPING) + analyte = get_analyte_search_param( + self.config.parameter, NMBGMR_ANALYTE_MAPPING + ) records = self._execute_json_request( _make_url("waterchemistry"), params={ @@ -144,7 +152,7 @@ def _extract_parameter_dates(self, records: list) -> list: return [r["info"]["CollectionDate"] for r in records] def _extract_parameter_record(self, record): - record[PARAMETER] = self.config.analyte + record[PARAMETER] = self.config.parameter record[PARAMETER_VALUE] = record["SampleValue"] record[PARAMETER_UNITS] = record["Units"] record[DT_MEASURED] = record["info"]["CollectionDate"] @@ -154,14 +162,18 @@ def _extract_parameter_record(self, record): class NMBGMRWaterLevelSource(BaseWaterLevelSource): transformer_klass = NMBGMRWaterLevelTransformer + def __repr__(self): + return "NMBGMRWaterLevelSource" + def _clean_records(self, records): # remove records with no depth to water value return [r for r in records if r["DepthToWaterBGS"] is not None] def _extract_parameter_record(self, record, *args, **kw): - record[DTW] = record["DepthToWaterBGS"] + record[PARAMETER] = DTW + record[PARAMETER_VALUE] = record["DepthToWaterBGS"] + record[PARAMETER_UNITS] = FEET record[DT_MEASURED] = (record["DateMeasured"], record["TimeMeasured"]) - record[DTW_UNITS] = FEET return record def _extract_most_recent(self, records): diff --git a/backend/connectors/nmenv/source.py b/backend/connectors/nmenv/source.py index 6366c8f..991e1a0 100644 --- a/backend/connectors/nmenv/source.py +++ b/backend/connectors/nmenv/source.py @@ -25,12 +25,17 @@ URL = "https://nmenv.newmexicowaterdata.org/FROST-Server/v1.1/" +import sys + class DWBSiteSource(STSiteSource): url = URL transformer_klass = DWBSiteTransformer bounding_polygon = NM_STATE_BOUNDING_POLYGON + def __repr__(self): + return "DWBSiteSource" + def health(self): return self.get_records(top=10, analyte="TDS") @@ -39,7 +44,7 @@ def get_records(self, *args, **kw): if "analyte" in kw: analyte = kw["analyte"] elif self.config: - analyte = self.config.analyte + analyte = self.config.parameter analyte = get_analyte_search_param(analyte, DWB_ANALYTE_MAPPING) if analyte is None: @@ -64,10 +69,13 @@ class DWBAnalyteSource(STAnalyteSource): url = URL transformer_klass = DWBAnalyteTransformer + def __repr__(self): + return "DWBAnalyteSource" + def _parse_result( self, result, result_dt=None, result_id=None, result_location=None ): - if "< mrl" in result.lower(): + if "< mrl" in result.lower() or "< mdl" in result.lower(): if self.config.output_summary: self.warn( f"Non-detect found: {result} for {result_location} on {result_dt} (observation {result_id}). Setting to 0 for summary." @@ -82,7 +90,7 @@ def _parse_result( def get_records(self, site, *args, **kw): service = self.get_service() - analyte = get_analyte_search_param(self.config.analyte, DWB_ANALYTE_MAPPING) + analyte = get_analyte_search_param(self.config.parameter, DWB_ANALYTE_MAPPING) ds = service.datastreams() q = ds.query() q = q.expand("Thing/Locations, ObservedProperty, Observations") diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index 23f7db9..c7004bb 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -31,7 +31,14 @@ STWaterLevelSource, make_dt_filter, ) -from backend.constants import DTW, DTW_UNITS, DT_MEASURED +from backend.constants import ( + DTW, + DTW_UNITS, + DT_MEASURED, + PARAMETER, + PARAMETER_VALUE, + PARAMETER_UNITS, +) from backend.source import BaseSiteSource, BaseWaterLevelSource, get_most_recent URL = "https://st2.newmexicowaterdata.org/FROST-Server/v1.1" @@ -53,17 +60,26 @@ class PVACDSiteSource(ST2SiteSource): agency = "PVACD" bounding_polygon = PVACD_BOUNDING_POLYGON + def __repr__(self): + return "PVACDSiteSource" + class EBIDSiteSource(ST2SiteSource): transformer_klass = EBIDSiteTransformer agency = "EBID" + def __repr__(self): + return "EBIDSiteSource" + class BernCoSiteSource(ST2SiteSource): agency = "BernCo" transformer_klass = BernCoSiteTransformer bounding_polygon = BERNCO_BOUNDING_POLYGON + def __repr__(self): + return "BernCoSiteSource" + class ST2WaterLevelSource(STWaterLevelSource): url = URL @@ -80,8 +96,9 @@ def _extract_most_recent(self, records): } def _extract_parameter_record(self, record): - record[DTW] = record["observation"].result - record[DTW_UNITS] = record["datastream"].unit_of_measurement.symbol + record[PARAMETER] = DTW + record[PARAMETER_VALUE] = record["observation"].result + record[PARAMETER_UNITS] = record["datastream"].unit_of_measurement.symbol record[DT_MEASURED] = record["observation"].phenomenon_time return record @@ -134,15 +151,24 @@ class PVACDWaterLevelSource(ST2WaterLevelSource): transformer_klass = PVACDWaterLevelTransformer agency = "PVACD" + def __repr__(self): + return "PVACDWaterLevelSource" + class EBIDWaterLevelSource(ST2WaterLevelSource): transformer_klass = EBIDWaterLevelTransformer agency = "EBID" + def __repr__(self): + return "EBIDWaterLevelSource" + class BernCoWaterLevelSource(ST2WaterLevelSource): agency = "BernCo" transformer_klass = BernCoWaterLevelTransformer + def __repr__(self): + return "BernCoWaterLevelSource" + # ============= EOF ============================================= diff --git a/backend/connectors/usgs/source.py b/backend/connectors/usgs/source.py index b1c330a..d8583f5 100644 --- a/backend/connectors/usgs/source.py +++ b/backend/connectors/usgs/source.py @@ -17,7 +17,15 @@ import httpx from backend.connectors import NM_STATE_BOUNDING_POLYGON -from backend.constants import FEET, DTW, DTW_UNITS, DT_MEASURED +from backend.constants import ( + FEET, + DTW, + DTW_UNITS, + DT_MEASURED, + PARAMETER, + PARAMETER_VALUE, + PARAMETER_UNITS, +) from backend.connectors.usgs.transformer import ( NWISSiteTransformer, NWISWaterLevelTransformer, @@ -81,6 +89,9 @@ class NWISSiteSource(BaseSiteSource): chunk_size = 500 bounding_polygon = NM_STATE_BOUNDING_POLYGON + def __repr__(self): + return "NWISSiteSource" + @property def tag(self): return "nwis" @@ -127,6 +138,9 @@ def get_records(self): class NWISWaterLevelSource(BaseWaterLevelSource): transformer_klass = NWISWaterLevelTransformer + def __repr__(self): + return "NWISWaterLevelSource" + def get_records(self, site_record): params = { "format": "json", @@ -177,8 +191,9 @@ def _extract_most_recent(self, records): } def _extract_parameter_record(self, record): - record[DTW] = float(record["value"]) - record[DTW_UNITS] = FEET + record[PARAMETER] = DTW + record[PARAMETER_VALUE] = float(record["value"]) + record[PARAMETER_UNITS] = FEET # record[DT_MEASURED] = (record["date_measured"], record["time_measured"]) record[DT_MEASURED] = record["datetime_measured"] return record diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index 5b4007e..8fd9e89 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -62,6 +62,9 @@ class WQPSiteSource(BaseSiteSource): bounding_polygon = NM_STATE_BOUNDING_POLYGON + def __repr__(self): + return "WQPSiteSource" + def health(self): try: r = httpx.get( @@ -83,9 +86,9 @@ def get_records(self): if config.has_bounds(): params["bBox"] = ",".join([str(b) for b in config.bbox_bounding_points()]) - if config.analyte: + if config.parameter.lower() != "waterlevels": params["characteristicName"] = get_analyte_search_param( - config.analyte, WQP_ANALYTE_MAPPING + config.parameter, WQP_ANALYTE_MAPPING ) params.update(get_date_range(config)) @@ -100,6 +103,9 @@ def get_records(self): class WQPAnalyteSource(BaseAnalyteSource): transformer_klass = WQPAnalyteTransformer + def __repr__(self): + return "WQPAnalyteSource" + def _extract_parameter_record(self, record): record[PARAMETER_VALUE] = record["ResultMeasureValue"] record[PARAMETER_UNITS] = record["ResultMeasure/MeasureUnitCode"] @@ -138,7 +144,7 @@ def get_records(self, site_record): "siteid": sites, "mimeType": "tsv", "characteristicName": get_analyte_search_param( - self.config.analyte, WQP_ANALYTE_MAPPING + self.config.parameter, WQP_ANALYTE_MAPPING ), } params.update(get_date_range(self.config)) diff --git a/backend/constants.py b/backend/constants.py index 2064a87..97e8ff6 100644 --- a/backend/constants.py +++ b/backend/constants.py @@ -14,23 +14,24 @@ # limitations under the License. # =============================================================================== -TDS = "TDS" -ARSENIC = "Arsenic" -BICARBONATE = "Bicarbonate" -CALCIUM = "Calcium" -CARBONATE = "Carbonate" -CHLORIDE = "Chloride" -FLUORIDE = "Fluoride" -MAGNESIUM = "Magnesium" -NITRATE = "Nitrate" -POTASSIUM = "Potassium" -SILICA = "Silica" -SODIUM = "Sodium" -SULFATE = "Sulfate" -URANIUM = "Uranium" +TDS = "tds" +ARSENIC = "arsenic" +BICARBONATE = "bicarbonate" +CALCIUM = "calcium" +CARBONATE = "carbonate" +CHLORIDE = "chloride" +FLUORIDE = "fluoride" +MAGNESIUM = "magnesium" +NITRATE = "nitrate" +POTASSIUM = "potassium" +SILICA = "silica" +SODIUM = "sodium" +SULFATE = "sulfate" +URANIUM = "uranium" +WATERLEVELS = "waterlevels" -PH = "pH" +PH = "ph" MILLIGRAMS_PER_LITER = "mg/L" @@ -43,29 +44,32 @@ DT_MEASURED = "datetime_measured" -DTW = "depth_to_water_ft_below_ground_surface" +DTW = "depth_to_water_below_ground_surface" DTW_UNITS = FEET PARAMETER = "parameter" PARAMETER_UNITS = "parameter_units" PARAMETER_VALUE = "parameter_value" +ANALYTE_OPTIONS = sorted( + [ + ARSENIC, + BICARBONATE, + CALCIUM, + CARBONATE, + CHLORIDE, + # FLUORIDE, + MAGNESIUM, + NITRATE, + POTASSIUM, + SILICA, + SODIUM, + SULFATE, + TDS, + URANIUM, + PH, + ] +) -ANALYTE_CHOICES = [ - ARSENIC, - BICARBONATE, - CALCIUM, - CARBONATE, - CHLORIDE, - # FLUORIDE, - MAGNESIUM, - NITRATE, - POTASSIUM, - SILICA, - SODIUM, - SULFATE, - TDS, - URANIUM, - PH, -] +PARAMETER_OPTIONS = [WATERLEVELS] + ANALYTE_OPTIONS # ============= EOF ============================================= diff --git a/backend/logging.py b/backend/logging.py index 8a91519..2175854 100644 --- a/backend/logging.py +++ b/backend/logging.py @@ -15,6 +15,7 @@ # =============================================================================== import logging from logging.handlers import RotatingFileHandler +import os import click @@ -27,7 +28,7 @@ def log(self, msg, level=None, fg="yellow"): if level is None: level = logging.INFO - click.secho(f"{self.__class__.__name__:30s}{msg}", fg=fg) + click.secho(f"{self.__class__.__name__:40s}{msg}", fg=fg) self.logger.log(level, msg) def warn(self, msg, fg="red"): @@ -51,6 +52,8 @@ def setup_logging(level=None, log_format=None, path=None): if path is None: path = "die.log" + else: + path = os.path.join(path, "die.log") # shandler = logging.StreamHandler() rhandler = RotatingFileHandler(path, maxBytes=1e8, backupCount=50) diff --git a/backend/persister.py b/backend/persister.py index 296ee2a..38e8493 100644 --- a/backend/persister.py +++ b/backend/persister.py @@ -30,12 +30,16 @@ class BasePersister(Loggable): + """ + Class to persist the data to a file or cloud storage. + If persisting to a file, the output directory is created by config._make_output_path() + """ + extension: str # output_id: str def __init__(self): self.records = [] - self.combined = [] self.timeseries = [] self.sites = [] @@ -48,51 +52,46 @@ def load(self, records: list): def finalize(self, output_name: str): pass - def dump_timeseries(self, root: str): - if self.timeseries: - if os.path.isdir(root): - self.log(f"root {root} already exists", fg="red") - shutil.rmtree(root) - - self._make_root_directory(root) - - for site, records in self.timeseries: - path = os.path.join(root, str(site.id).replace(" ", "_")) - path = self.add_extension(path) - self.log(f"dumping {site.id} to {os.path.abspath(path)}") - self._write(path, records) - - self._write( - os.path.join(root, self.add_extension("sites")), - [s[0] for s in self.timeseries], - ) + def dump_sites(self, path: str): + if self.sites: + path = os.path.join(path, "sites") + path = self.add_extension(path) + self.log(f"dumping sites to {os.path.abspath(path)}") + self._write(path, self.sites) else: - self.log("no timeseries records to dump", fg="red") + self.log("no sites to dump", fg="red") - def dump_combined(self, path: str): - if self.combined: + def dump_summary(self, path: str): + if self.records: + path = os.path.join(path, "summary") path = self.add_extension(path) - - self.log(f"dumping combined to {os.path.abspath(path)}") - self._dump_combined(path, self.combined) + self.log(f"dumping summary to {os.path.abspath(path)}") + self._write(path, self.records) else: - self.log("no combined records to dump", fg="red") + self.log("no records to dump", fg="red") - def dump_single_timeseries(self, path: str): + def dump_timeseries_unified(self, path: str): if self.timeseries: + path = os.path.join(path, "timeseries_unified") path = self.add_extension(path) - self.log(f"dumping single timeseries to {os.path.abspath(path)}") - self._dump_single_timeseries(path, self.timeseries) + self.log(f"dumping unified timeseries to {os.path.abspath(path)}") + self._dump_timeseries_unified(path, self.timeseries) else: self.log("no timeseries records to dump", fg="red") - def dump_sites(self, path: str): - if self.sites: - path = self.add_extension(path) - self.log(f"dumping sites to {os.path.abspath(path)}") - self._write(path, self.sites) + def dump_timeseries_separated(self, path: str): + if self.timeseries: + # make timeseries path inside of config.output_path to which + # the individual site timeseries will be dumped + timeseries_path = os.path.join(path, "timeseries") + self._make_output_directory(timeseries_path) + for site, records in self.timeseries: + path = os.path.join(timeseries_path, str(site.id).replace(" ", "_")) + path = self.add_extension(path) + self.log(f"dumping {site.id} to {os.path.abspath(path)}") + self._write(path, records) else: - self.log("no sites to dump", fg="red") + self.log("no timeseries records to dump", fg="red") def save(self, path: str): if self.records: @@ -113,14 +112,11 @@ def add_extension(self, path: str): def _write(self, path: str, records): raise NotImplementedError - def _dump_combined(self, path: str, combined: list): - raise NotImplementedError - - def _dump_single_timeseries(self, path: str, timeseries: list): + def _dump_timeseries_unified(self, path: str, timeseries: list): raise NotImplementedError - def _make_root_directory(self, root: str): - os.mkdir(root) + def _make_output_directory(self, output_directory: str): + os.mkdir(output_directory) def write_file(path, func, records): @@ -134,7 +130,7 @@ def write_memory(path, func, records): return f.getvalue() -def dump_single_timeseries(writer, timeseries): +def dump_timeseries_unified(writer, timeseries): headers_have_not_been_written = True for i, (site, records) in enumerate(timeseries): for j, record in enumerate(records): @@ -151,13 +147,6 @@ def dump_sites(writer, records): writer.writerow(site.to_row()) -def dump_combined(writer, combined): - for i, (site, record) in enumerate(combined): - if i == 0: - writer.writerow(site.keys + record.keys) - writer.writerow(site.to_row() + record.to_row()) - - class CloudStoragePersister(BasePersister): extension = "csv" _content: list @@ -191,7 +180,7 @@ def finalize(self, output_name: str): blob = bucket.blob(path) blob.upload_from_string(cnt) - def _make_root_directory(self, root: str): + def _make_output_directory(self, output_directory: str): # prevent making root directory, because we are not saving to disk pass @@ -202,12 +191,8 @@ def _write(self, path: str, records: list): def _add_content(self, path: str, content: str): self._content.append((path, content)) - def _dump_single_timeseries(self, path: str, timeseries: list): - content = write_memory(path, dump_single_timeseries, timeseries) - self._add_content(path, content) - - def _dump_combined(self, path: str, combined: list): - content = write_memory(path, dump_combined, combined) + def _dump_timeseries_unified(self, path: str, timeseries: list): + content = write_memory(path, dump_timeseries_unified, timeseries) self._add_content(path, content) @@ -217,11 +202,8 @@ class CSVPersister(BasePersister): def _write(self, path: str, records: list): write_file(path, dump_sites, records) - def _dump_single_timeseries(self, path: str, timeseries: list): - write_file(path, dump_single_timeseries, timeseries) - - def _dump_combined(self, path: str, combined: list): - write_file(path, dump_combined, combined) + def _dump_timeseries_unified(self, path: str, timeseries: list): + write_file(path, dump_timeseries_unified, timeseries) class GeoJSONPersister(BasePersister): diff --git a/backend/record.py b/backend/record.py index 487d644..a230812 100644 --- a/backend/record.py +++ b/backend/record.py @@ -30,23 +30,28 @@ def get(attr): # if v is None and self.defaults: # v = self.defaults.get(attr) v = self.__getattr__(attr) - for key, sigfigs in ( + + field_sigfigs = [ ("elevation", 2), - ("depth_to_water_ft_below_ground_surface", 2), - ("surface_elevation_ft", 2), - ("well_depth_ft_below_ground_surface", 2), ("well_depth", 2), ("latitude", 6), ("longitude", 6), ("min", 2), ("max", 2), ("mean", 2), - ): - if v is not None and key == attr: + ] + + # both analyte and water level tables have the same fields, but the + # rounding should only occur for water level tables + if isinstance(self, WaterLevelRecord): + field_sigfigs.append((PARAMETER_VALUE, 2)) + + for field, sigfigs in field_sigfigs: + if v is not None and field == attr: try: v = round(v, sigfigs) except TypeError as e: - print(key, attr) + print(field, attr) raise e break return v @@ -72,7 +77,9 @@ class WaterLevelRecord(BaseRecord): # "longitude", # "surface_elevation_ft", # "well_depth_ft_below_ground_surface", - DTW, + PARAMETER, + PARAMETER_VALUE, + PARAMETER_UNITS, "date_measured", "time_measured", ) diff --git a/backend/source.py b/backend/source.py index 24f6346..1de8f09 100644 --- a/backend/source.py +++ b/backend/source.py @@ -586,18 +586,10 @@ class BaseParameterSource(BaseSource): _extract_parameter_record Returns a parameter record with standardized fields added. - For an analyte, the fields are - - backend.constants.PARAMETER - backend.constants.PARAMETER_VALUE - backend.constants.PARAMETER_UNITS - For a water level, the fields are - - - backend.constants.DTW - - backend.constants.DTW_UNITS - - backend.constants.DT_MEASURED - _extract_parameter_results Returns the parameter results as a list from the records, in the same order as the records themselves """ @@ -682,7 +674,7 @@ def read( float(r), u, self._get_output_units(), - self.config.analyte, + self.config.parameter, d, ) if warning_msg == "": @@ -988,7 +980,7 @@ def _get_output_units(self): return self.config.analyte_output_units def _validate_record(self, record): - record[PARAMETER] = self.config.analyte + record[PARAMETER] = self.config.parameter for k in (PARAMETER_VALUE, PARAMETER_UNITS, DT_MEASURED): if k not in record: raise ValueError(f"Invalid record. Missing {k}") @@ -1010,7 +1002,7 @@ def _extract_parameter_units(self, records): return [FEET for _ in records] def _validate_record(self, record): - for k in (DTW, DTW_UNITS, DT_MEASURED): + for k in (PARAMETER_VALUE, PARAMETER_UNITS, DT_MEASURED): if k not in record: raise ValueError(f"Invalid record. Missing {k}") diff --git a/backend/transformer.py b/backend/transformer.py index e7acdc7..eb43ea3 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -29,6 +29,7 @@ MICROGRAMS_PER_LITER, DT_MEASURED, PARAMETER_UNITS, + DTW, ) from backend.geo_utils import datum_transform, ALLOWED_DATUMS from backend.logging import Loggable @@ -125,7 +126,7 @@ def convert_units( output_units: str, analyte: str, dt: str = None, -) -> float: +) -> tuple[float, str]: """ Converts the following units for any parameter value: @@ -159,8 +160,8 @@ def convert_units( Returns -------- - float - The converted value + tuple[float, str] + The converted value and warning message is conversion failed """ warning = "" conversion_factor = None @@ -485,7 +486,7 @@ def do_transform( float(r), u, self.config.analyte_output_units, - self.config.analyte, + self.config.parameter, dt, ) if warning_msg != "": @@ -724,12 +725,16 @@ def _transform_most_recents(self, record): record["most_recent_date"] = dt record["most_recent_time"] = tt p, u = self._get_parameter() - record["most_recent_value"] = convert_units( + + most_recent_value, warning_msg = convert_units( record["most_recent_value"], record["most_recent_units"], u, - self.config.analyte, + self.config.parameter, ) + + # all failed conversions are skipped and handled in source.read(), so no need to duplicate here + record["most_recent_value"] = most_recent_value record["most_recent_units"] = u @@ -759,7 +764,7 @@ def _get_parameter(self) -> tuple: tuple The parameter and units for the water level records """ - return "DTW BGS", self.config.waterlevel_output_units + return DTW, self.config.waterlevel_output_units class AnalyteTransformer(ParameterTransformer): @@ -788,7 +793,7 @@ def _get_parameter(self) -> tuple: tuple The parameter and units for the analyte records """ - return self.config.analyte, self.config.analyte_output_units + return self.config.parameter, self.config.analyte_output_units # ============= EOF ============================================= diff --git a/backend/unifier.py b/backend/unifier.py index 6fe3815..9523da9 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -164,18 +164,10 @@ def _site_wrapper(site_source, parameter_source, persister, config): if results is None or len(results) == 0: continue - if config.output_single_timeseries: - for site, records in results: - persister.timeseries.append((site, records)) - persister.sites.append(site) - else: - # combine sites that only have one record - for site, records in results: - if len(records) == 1: - persister.combined.append((site, records[0])) - else: - persister.timeseries.append((site, records)) - persister.sites.append(site) + for site, records in results: + persister.timeseries.append((site, records)) + persister.sites.append(site) + sites_with_records_count += 1 except BaseException: @@ -190,18 +182,18 @@ def _unify_parameter( config, sources, ): - use_summarize = config.output_summary persister = _perister_factory(config) for site_source, parameter_source in sources: _site_wrapper(site_source, parameter_source, persister, config) - if use_summarize: - persister.save(config.output_path) - elif config.output_single_timeseries: - persister.dump_sites(f"{config.output_path}.sites") - persister.dump_single_timeseries(f"{config.output_path}.timeseries") - else: - persister.dump_combined(f"{config.output_path}.combined") - persister.dump_timeseries(f"{config.output_path}_timeseries") + + if config.output_summary: + persister.dump_summary(config.output_path) + elif config.output_timeseries_unified: + persister.dump_timeseries_unified(config.output_path) + persister.dump_sites(config.output_path) + else: # config.output_timeseries_separated + persister.dump_timeseries_separated(config.output_path) + persister.dump_sites(config.output_path) persister.finalize(config.output_name) @@ -252,13 +244,16 @@ def get_sources(config=None): config = Config() sources = [] - if config.analyte: - allsources = config.analyte_sources() - else: + if config.parameter.lower() == "waterlevels": allsources = config.water_level_sources() + else: + allsources = config.analyte_sources() for source, _ in allsources: - if source.intersects(config.bounding_wkt()): + if config.wkt or config.bbox or config.county: + if source.intersects(config.bounding_wkt()): + sources.append(source) + else: sources.append(source) return sources diff --git a/backend/worker.py b/backend/worker.py index 5e3384b..049c0ee 100644 --- a/backend/worker.py +++ b/backend/worker.py @@ -62,8 +62,7 @@ def sources_handler(): if polygon: config.wkt = polygon - if parameter: - config.analyte = parameter + config.parameter = parameter sources = get_sources(config) return make_cors_response({"sources": [s.tag for s in sources]}) diff --git a/frontend/cli.py b/frontend/cli.py index 208ca68..a38e429 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -18,12 +18,12 @@ import click from backend.config import Config -from backend.constants import ANALYTE_CHOICES +from backend.constants import PARAMETER_OPTIONS from backend.unifier import unify_sites, unify_waterlevels, unify_analytes from backend.logging import setup_logging -setup_logging() +# setup_logging() @click.group() @@ -31,69 +31,69 @@ def cli(): pass -SOURCE_OPTIONS = [ +ALL_SOURCE_OPTIONS = [ click.option( - "--no-amp", + "--no-bernco", is_flag=True, default=True, show_default=True, - help="Include/Exclude AMP data. Default is to include", + help="Exclude Bernalillo County Water Authority data. Default is to include", ), click.option( - "--no-nwis", + "--no-bor", is_flag=True, default=True, show_default=True, - help="Exclude NWIS data. Default is to include", + help="Exclude BoR data. Default is to include", ), click.option( - "--no-pvacd", + "--no-nmbgmr-amp", is_flag=True, default=True, show_default=True, - help="Exclude PVACD data. Default is to include", + help="Exclude NMBGMR AMP data. Default is to include", ), click.option( - "--no-isc-seven-rivers", + "--no-nmed-dwb", is_flag=True, default=True, show_default=True, - help="Exclude ISC Seven Rivers data. Default is to include", + help="Exclude NMED DWB data. Default is to include", ), click.option( - "--no-bor", + "--no-nmose-isc-seven-rivers", is_flag=True, default=True, show_default=True, - help="Exclude BOR data. Default is to include", + help="Exclude NMOSE ISC Seven Rivers data. Default is to include", ), click.option( - "--no-wqp", + "--no-nmose-roswell", is_flag=True, default=True, show_default=True, - help="Exclude WQP data. Default is to include", + help="Exclude NMOSE Roswell data. Default is to include", ), click.option( - "--no-ckan", + "--no-nwis", is_flag=True, default=True, show_default=True, - help="Exclude CKAN data. Default is to include", + help="Exclude NWIS data. Default is to include", ), click.option( - "--no-dwb", + "--no-pvacd", is_flag=True, default=True, show_default=True, - help="Exclude DWB data. Default is to include", + help="Exclude PVACD data. Default is to include", ), click.option( - "--no-bernco", + "--no-wqp", is_flag=True, default=True, show_default=True, - help="Exclude Bernalillo County Water Authority data. Default is to include", + help="Exclude WQP data. Default is to include", ), ] @@ -154,6 +154,15 @@ def cli(): ), ] +OUTPUT_OPTIONS = [ + click.option( + "--output", + type=click.Choice(["summary", "timeseries_unified", "timeseries_separated"]), + required=True, + help="Output summary file, single unified timeseries file, or separated timeseries files", + ) +] + def add_options(options): def _add_options(func): @@ -165,58 +174,95 @@ def _add_options(func): @cli.command() -@add_options(SPATIAL_OPTIONS) -def wells(bbox, county): - """ - Get locations - """ - - config = setup_config("sites", bbox, county) - unify_sites(config) - - -@cli.command() -@add_options(TIMESERIES_OPTIONS) +@click.argument( + "weave", + type=click.Choice(PARAMETER_OPTIONS, case_sensitive=False), + required=True, +) +@add_options(OUTPUT_OPTIONS) @add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) -@add_options(SOURCE_OPTIONS) +@add_options(ALL_SOURCE_OPTIONS) @add_options(DEBUG_OPTIONS) -def waterlevels( - separated_timeseries, - unified_timeseries, +def weave( + weave, + output, start_date, end_date, bbox, county, - no_amp, + no_bernco, + no_bor, # has no water levels + no_nmbgmr_amp, + no_nmed_dwb, # has no water levels + no_nmose_isc_seven_rivers, + no_nmose_roswell, no_nwis, no_pvacd, - no_isc_seven_rivers, - no_bor, - no_wqp, - no_ckan, - no_dwb, - no_bernco, + no_wqp, # has no water levels site_limit, dry, ): - if separated_timeseries or unified_timeseries: - timeseries = True + """ + Get parameter timeseries or summary data + """ + parameter = weave + # instantiate config and set up parameter + config = setup_config(f"{parameter}", bbox, county, site_limit, dry) + config.parameter = parameter + + # make sure config.output_name is properly set + config._update_output_name() + + # make output_path now so that die.log can be written to it live + config._make_output_path() + + # setup logging here so that the path can be set to config.output_path + setup_logging(path=config.output_path) + + # output type + if output == "summary": + summary = True + timeseries_unified = False + timeseries_separated = False + elif output == "timeseries_unified": + summary = False + timeseries_unified = True + timeseries_separated = False + elif output == "timeseries_separated": + summary = False + timeseries_unified = False + timeseries_separated = True + + config.output_summary = summary + config.output_timeseries_unified = timeseries_unified + config.output_timeseries_separated = timeseries_separated + + # sources + if parameter.lower() == "waterlevels": + config.use_source_bernco = no_bernco + config.use_source_nmbgmr_amp = no_nmbgmr_amp + config.use_source_nmose_isc_seven_rivers = no_nmose_isc_seven_rivers + config.use_source_nmose_roswell = no_nmose_roswell + config.use_source_nwis = no_nwis + config.use_source_pvacd = no_pvacd + + config.use_source_bor = False + config.use_source_nmed_dwb = False + config.use_source_wqp = False else: - timeseries = False - config = setup_config("waterlevels", timeseries, bbox, county, site_limit, dry) - - config.output_single_timeseries = unified_timeseries - config.use_source_nmbgmr = no_amp - config.use_source_nwis = no_nwis - config.use_source_pvacd = no_pvacd - config.use_source_iscsevenrivers = no_isc_seven_rivers - config.use_source_bor = no_bor - config.use_source_wqp = no_wqp - config.use_source_oseroswell = no_ckan - config.use_source_dwb = no_dwb - config.use_source_bernco = no_bernco - + config.use_source_bor = no_bor + config.use_source_nmbgmr_amp = no_nmbgmr_amp + config.use_source_nmed_dwb = no_nmed_dwb + config.use_source_nmose_isc_seven_rivers = no_nmose_isc_seven_rivers + config.use_source_wqp = no_wqp + + config.use_source_bernco = False + config.use_source_nmose_roswell = False + config.use_source_nwis = False + config.use_source_pvacd = False + + # dates config.start_date = start_date config.end_date = end_date @@ -226,71 +272,30 @@ def waterlevels( if not click.confirm("Do you want to continue?", default=True): return - unify_waterlevels(config) + if parameter.lower() == "waterlevels": + unify_waterlevels(config) + else: + unify_analytes(config) @cli.command() -@click.argument("analyte", type=click.Choice(ANALYTE_CHOICES)) -@add_options(TIMESERIES_OPTIONS) -@add_options(DT_OPTIONS) @add_options(SPATIAL_OPTIONS) -@add_options(SOURCE_OPTIONS) -@add_options(DEBUG_OPTIONS) -def analytes( - analyte, - separated_timeseries, - unified_timeseries, - start_date, - end_date, - bbox, - county, - no_amp, - no_nwis, - no_pvacd, - no_isc_seven_rivers, - no_bor, - no_wqp, - no_ckan, - no_dwb, - no_bernco, - site_limit, - dry, -): - if separated_timeseries or unified_timeseries: - timeseries = True - else: - timeseries = False - config = setup_config( - f"analytes ({analyte})", timeseries, bbox, county, site_limit, dry - ) - config.analyte = analyte - - config.output_single_timeseries = unified_timeseries - config.use_source_nmbgmr = no_amp - config.use_source_nwis = no_nwis - config.use_source_pvacd = no_pvacd - config.use_source_iscsevenrivers = no_isc_seven_rivers - config.use_source_bor = no_bor - config.use_source_wqp = no_wqp - config.use_source_oseroswell = no_ckan - config.use_source_dwb = no_dwb - config.use_source_bernco = no_bernco - - config.start_date = start_date - config.end_date = end_date - - if not dry: - config.report() - # prompt user to continue - if not click.confirm("Do you want to continue?", default=True): - return - - unify_analytes(config) +def wells(bbox, county): + """ + Get locations + """ + config = setup_config("sites", bbox, county) + unify_sites(config) @cli.command() +@click.argument( + "sources", + type=click.Choice(PARAMETER_OPTIONS, case_sensitive=False), + required=True, +) @add_options(SPATIAL_OPTIONS) -def sources(bbox, county): +def sources(sources, bbox, county): """ List available sources """ @@ -302,12 +307,14 @@ def sources(bbox, county): elif bbox: config.bbox = bbox + parameter = sources + config.parameter = parameter sources = get_sources(config) for s in sources: click.echo(s) -def setup_config(tag, timeseries, bbox, county, site_limit, dry): +def setup_config(tag, bbox, county, site_limit, dry): config = Config() if county: click.echo(f"Getting {tag} for county {county}") @@ -317,7 +324,6 @@ def setup_config(tag, timeseries, bbox, county, site_limit, dry): # bbox = -105.396826 36.219290, -106.024162 35.384307 config.bbox = bbox - config.output_summary = not timeseries config.site_limit = site_limit config.dry = dry diff --git a/setup.py b/setup.py index 8313973..42279b6 100644 --- a/setup.py +++ b/setup.py @@ -21,12 +21,12 @@ setup( name="nmuwd", - version="0.2.0", + version="0.3.1", author="Jake Ross", description="New Mexico Water Data Integration Engine", long_description=long_description, long_description_content_type="text/markdown", - url="https://github.com/DataIntegrationGroup/PyWeaver", + url="https://github.com/DataIntegrationGroup/DataIntegrationEngine", classifiers=[ "Programming Language :: Python :: 3", "Operating System :: OS Independent", @@ -34,7 +34,7 @@ install_requires=["click", "httpx", "geopandas", "frost_sta_client"], entry_points={ "console_scripts": [ - "weave = frontend.cli:cli", + "die = frontend.cli:cli", ], }, packages=["frontend", "backend"]