diff --git a/.github/workflows/dagster-cloud-branch-deployments.yml b/.github/workflows/dagster-cloud-branch-deployments.yml new file mode 100644 index 0000000..34f38e3 --- /dev/null +++ b/.github/workflows/dagster-cloud-branch-deployments.yml @@ -0,0 +1,75 @@ +name: Dagster+ Serverless Branch Deployments + +on: + pull_request: + types: [opened, synchronize, reopened, closed] + +concurrency: + group: ${{ github.ref }}/branch_deployments + cancel-in-progress: true + +env: + DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }} + DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }} + ENABLE_FAST_DEPLOYS: 'true' + PYTHON_VERSION: '3.10' + DAGSTER_CLOUD_FILE: 'dagster_cloud.yaml' + +jobs: + dagster_cloud_default_deploy: + name: Dagster Serverless Deploy + runs-on: ubuntu-22.04 + outputs: + build_info: ${{ steps.parse-workspace.outputs.build_info }} + steps: + - name: Prerun Checks + id: prerun + uses: dagster-io/dagster-cloud-action/actions/utils/prerun@v0.1 + + - name: Launch Docker Deploy + if: steps.prerun.outputs.result == 'docker-deploy' + id: parse-workspace + uses: dagster-io/dagster-cloud-action/actions/utils/parse_workspace@v0.1 + with: + dagster_cloud_file: $DAGSTER_CLOUD_FILE + + - name: Checkout for Python Executable Deploy + if: steps.prerun.outputs.result == 'pex-deploy' + uses: actions/checkout@v6 + with: + ref: ${{ github.head_ref }} + path: project-repo + + - name: Python Executable Deploy + if: steps.prerun.outputs.result == 'pex-deploy' + uses: dagster-io/dagster-cloud-action/actions/build_deploy_python_executable@v0.1 + with: + dagster_cloud_file: "$GITHUB_WORKSPACE/project-repo/$DAGSTER_CLOUD_FILE" + build_output_dir: "$GITHUB_WORKSPACE/build" + python_version: "${{ env.PYTHON_VERSION }}" + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + dagster_cloud_docker_deploy: + name: Docker Deploy + runs-on: ubuntu-22.04 + if: needs.dagster_cloud_default_deploy.outputs.build_info + needs: dagster_cloud_default_deploy + strategy: + fail-fast: false + matrix: + location: ${{ fromJSON(needs.dagster_cloud_default_deploy.outputs.build_info) }} + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + ref: ${{ github.head_ref }} + - name: Build and deploy to Dagster+ serverless + uses: dagster-io/dagster-cloud-action/actions/serverless_branch_deploy@v0.1 + with: + dagster_cloud_api_token: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }} + location: ${{ toJson(matrix.location) }} + base_image: "python:${{ env.PYTHON_VERSION }}-slim" + organization_id: ${{ secrets.ORGANIZATION_ID }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/dagster-cloud-deploy.yml b/.github/workflows/dagster-cloud-deploy.yml new file mode 100644 index 0000000..7dd39ad --- /dev/null +++ b/.github/workflows/dagster-cloud-deploy.yml @@ -0,0 +1,78 @@ +name: Dagster+ Serverless Deploy + +on: + push: + branches: + - "main" + +concurrency: + group: ${{ github.ref }}/deploy + cancel-in-progress: true + +env: + DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }} + DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }} + # PEX fast deploy. No system GDAL needed (only shapely, which ships a wheel + # with bundled GEOS), so a Docker build would be pure overhead. + ENABLE_FAST_DEPLOYS: 'true' + PYTHON_VERSION: '3.10' + DAGSTER_CLOUD_FILE: 'dagster_cloud.yaml' + +jobs: + dagster_cloud_default_deploy: + name: Dagster Serverless Deploy + runs-on: ubuntu-22.04 + outputs: + build_info: ${{ steps.parse-workspace.outputs.build_info }} + steps: + - name: Prerun Checks + id: prerun + uses: dagster-io/dagster-cloud-action/actions/utils/prerun@v0.1 + + - name: Launch Docker Deploy + if: steps.prerun.outputs.result == 'docker-deploy' + id: parse-workspace + uses: dagster-io/dagster-cloud-action/actions/utils/parse_workspace@v0.1 + with: + dagster_cloud_file: $DAGSTER_CLOUD_FILE + + - name: Checkout for Python Executable Deploy + if: steps.prerun.outputs.result == 'pex-deploy' + uses: actions/checkout@v6 + with: + ref: ${{ github.head_ref }} + path: project-repo + + - name: Python Executable Deploy + if: steps.prerun.outputs.result == 'pex-deploy' + uses: dagster-io/dagster-cloud-action/actions/build_deploy_python_executable@v0.1 + with: + dagster_cloud_file: "$GITHUB_WORKSPACE/project-repo/$DAGSTER_CLOUD_FILE" + build_output_dir: "$GITHUB_WORKSPACE/build" + python_version: "${{ env.PYTHON_VERSION }}" + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + dagster_cloud_docker_deploy: + name: Docker Deploy + runs-on: ubuntu-22.04 + if: needs.dagster_cloud_default_deploy.outputs.build_info + needs: dagster_cloud_default_deploy + strategy: + fail-fast: false + matrix: + location: ${{ fromJSON(needs.dagster_cloud_default_deploy.outputs.build_info) }} + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + ref: ${{ github.head_ref }} + - name: Build and deploy to Dagster+ serverless + uses: dagster-io/dagster-cloud-action/actions/serverless_prod_deploy@v0.1 + with: + dagster_cloud_api_token: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }} + location: ${{ toJson(matrix.location) }} + base_image: "python:${{ env.PYTHON_VERSION }}-slim" + organization_id: ${{ secrets.ORGANIZATION_ID }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/backend/bounding_polygons.py b/backend/bounding_polygons.py index 8a9ccd6..1bd53a6 100644 --- a/backend/bounding_polygons.py +++ b/backend/bounding_polygons.py @@ -15,113 +15,14 @@ # =============================================================================== import json import os -from pprint import pprint import click import httpx -from shapely import Polygon, box from shapely.geometry import shape from backend.geo_utils import transform_srid, SRID_WGS84, SRID_UTM_ZONE_13N -# polygon retrivial functions -# multiple polygons -def get_congressional_district_boundaries(state, district): - pass - - -def get_tribal_boundaries(state=None): - state, statefp = _get_statefp(state) - - # use the processes service to get all tribal boundaries that intersect the state - def func(): - payload = { - "inputs": { - "collection": f"aiannh", - "url": f"https://geoconnex.us/ref/states/{statefp}", - } - } - resp = httpx.post( - "https://reference.geoconnex.us/processes/intersector/execution", - json=payload, - ) - return resp.json() - - obj = _get_cached_object(f"{state}.aiannh", f"{state} AIANNH", func) - - return obj - - -def get_state_hucs_boundaries(state=None, level=8): - state, statefp = _get_statefp(state) - - # use the processes service to get all hucs from this level that intersect the state of NM - def func(): - payload = { - "inputs": { - "collection": f"hu{level:02n}", - "url": f"https://geoconnex.us/ref/states/{statefp}", - } - } - resp = httpx.post( - "https://reference.geoconnex.us/processes/intersector/execution", - json=payload, - ) - return resp.json() - - obj = _get_cached_object(f"{state}.hucs.{level}", f"{state} HU{level:02n}", func) - - return obj - - -def get_state_pwss_boundaries(state=None): - state, statefp = _get_statefp(state) - obj = _get_cached_object( - f"{state}.pws", - f"{state} PWSs", - f"https://reference.geoconnex.us/collections/pws/items?f=json&state_code={state}", - ) - - return obj - - -# single polygons - - -def get_pws_polygon(pwsid, as_wkt=True): - obj = _get_cached_object( - pwsid, - pwsid, - f"https://reference.geoconnex.us/collections/pws/items/{pwsid}?f=json", - ) - return _make_shape(obj, as_wkt) - - -def get_huc_polygon(huc, as_wkt=True): - if len(huc) == 2: - collection = "hu02" - elif len(huc) == 4: - collection = "hu04" - elif len(huc) == 6: - collection = "hu06" - elif len(huc) == 8: - collection = "hu08" - elif len(huc) == 10: - collection = "hu10" - else: - _warning(f"Invalid HUC {huc}. length must be 2, 4, 6, 8, or 10") - return - - obj = _get_cached_object( - huc, - huc, - f"https://reference.geoconnex.us/collections/{collection}/items/{huc}?f=json", - ) - - return _make_shape(obj, as_wkt) - - def get_county_polygon(name, as_wkt=True): if ":" in name: state, county = name.split(":") diff --git a/backend/config.py b/backend/config.py index 1709b55..d349881 100644 --- a/backend/config.py +++ b/backend/config.py @@ -16,7 +16,6 @@ import os import sys from datetime import datetime, timedelta -from enum import Enum import shapely.wkt import yaml diff --git a/backend/connectors/bor/source.py b/backend/connectors/bor/source.py index 7ef532e..506da1c 100644 --- a/backend/connectors/bor/source.py +++ b/backend/connectors/bor/source.py @@ -13,10 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import pprint -from json import JSONDecodeError -import httpx from backend.connectors.bor.transformer import BORSiteTransformer, BORAnalyteTransformer from backend.connectors.mappings import BOR_ANALYTE_MAPPING @@ -27,12 +24,9 @@ SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, DT_MEASURED, - EARLIEST, - LATEST, ) from backend.source import ( - BaseSource, BaseSiteSource, BaseAnalyteSource, get_terminal_record, diff --git a/backend/connectors/bor/transformer.py b/backend/connectors/bor/transformer.py index 8692b52..4540069 100644 --- a/backend/connectors/bor/transformer.py +++ b/backend/connectors/bor/transformer.py @@ -13,13 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import pprint -import json -from backend.record import SiteRecord from backend.transformer import ( - BaseTransformer, - WaterLevelTransformer, SiteTransformer, AnalyteTransformer, ) diff --git a/backend/connectors/ckan/source.py b/backend/connectors/ckan/source.py index 680c914..4297f63 100644 --- a/backend/connectors/ckan/source.py +++ b/backend/connectors/ckan/source.py @@ -15,7 +15,6 @@ # =============================================================================== from itertools import groupby -import httpx from backend.connectors import ( OSE_ROSWELL_HONDO_BOUNDING_POLYGON, @@ -34,7 +33,6 @@ from backend.constants import ( FEET, DTW, - DTW_UNITS, DT_MEASURED, PARAMETER_NAME, PARAMETER_UNITS, @@ -43,7 +41,6 @@ SOURCE_PARAMETER_UNITS, ) from backend.source import ( - BaseSource, BaseSiteSource, BaseWaterLevelSource, get_terminal_record, diff --git a/backend/connectors/ckan/transformer.py b/backend/connectors/ckan/transformer.py index e9ba1ee..48a1c1d 100644 --- a/backend/connectors/ckan/transformer.py +++ b/backend/connectors/ckan/transformer.py @@ -13,15 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import pprint -from backend.record import SiteRecord -from backend.transformer import BaseTransformer, WaterLevelTransformer, SiteTransformer +from backend.transformer import WaterLevelTransformer, SiteTransformer class OSERoswellSiteTransformer(SiteTransformer): def _transform(self, record): - # pprint.pprint(record) lat = float(record["DD_lat"]) lng = float(record["DD_lon"]) # if not self.contained(lng, lat): diff --git a/backend/connectors/isc_seven_rivers/source.py b/backend/connectors/isc_seven_rivers/source.py index d456c18..c6f5dce 100644 --- a/backend/connectors/isc_seven_rivers/source.py +++ b/backend/connectors/isc_seven_rivers/source.py @@ -15,7 +15,6 @@ # =============================================================================== from datetime import datetime -import httpx from backend.connectors import ISC_SEVEN_RIVERS_BOUNDING_POLYGON from backend.connectors.mappings import ISC_SEVEN_RIVERS_ANALYTE_MAPPING @@ -28,8 +27,6 @@ PARAMETER_UNITS, SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, - EARLIEST, - LATEST, ) from backend.connectors.isc_seven_rivers.transformer import ( ISCSevenRiversSiteTransformer, @@ -37,7 +34,6 @@ ISCSevenRiversAnalyteTransformer, ) from backend.source import ( - BaseSource, BaseSiteSource, BaseWaterLevelSource, BaseAnalyteSource, diff --git a/backend/connectors/isc_seven_rivers/transformer.py b/backend/connectors/isc_seven_rivers/transformer.py index dddbe9d..3340651 100644 --- a/backend/connectors/isc_seven_rivers/transformer.py +++ b/backend/connectors/isc_seven_rivers/transformer.py @@ -13,12 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import shapely.wkt -from shapely import Point -from backend.record import SiteRecord from backend.transformer import ( - BaseTransformer, WaterLevelTransformer, SiteTransformer, AnalyteTransformer, diff --git a/backend/connectors/nmbgmr/source.py b/backend/connectors/nmbgmr/source.py index 5823bee..766bba5 100644 --- a/backend/connectors/nmbgmr/source.py +++ b/backend/connectors/nmbgmr/source.py @@ -32,8 +32,6 @@ PARAMETER_VALUE, SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, - EARLIEST, - LATEST, ) from backend.source import ( BaseWaterLevelSource, diff --git a/backend/connectors/nmbgmr/transformer.py b/backend/connectors/nmbgmr/transformer.py index db7bf67..8b32c70 100644 --- a/backend/connectors/nmbgmr/transformer.py +++ b/backend/connectors/nmbgmr/transformer.py @@ -13,10 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from backend.constants import DTW -from backend.record import SiteRecord from backend.transformer import ( - BaseTransformer, WaterLevelTransformer, SiteTransformer, AnalyteTransformer, diff --git a/backend/connectors/nmenv/source.py b/backend/connectors/nmenv/source.py index 3ae2170..7252551 100644 --- a/backend/connectors/nmenv/source.py +++ b/backend/connectors/nmenv/source.py @@ -33,7 +33,6 @@ URL = "https://nmenv.newmexicowaterdata.org/FROST-Server/v1.1/" -import sys class DWBSiteSource(STSiteSource): diff --git a/backend/connectors/nmenv/transformer.py b/backend/connectors/nmenv/transformer.py index 80f534a..dde755f 100644 --- a/backend/connectors/nmenv/transformer.py +++ b/backend/connectors/nmenv/transformer.py @@ -14,7 +14,7 @@ # limitations under the License. # =============================================================================== from backend.connectors.st_connector import STSiteTransformer -from backend.transformer import SiteTransformer, AnalyteTransformer +from backend.transformer import AnalyteTransformer class DWBSiteTransformer(STSiteTransformer): diff --git a/backend/connectors/nmose/transformer.py b/backend/connectors/nmose/transformer.py index 8f26ebb..b519758 100644 --- a/backend/connectors/nmose/transformer.py +++ b/backend/connectors/nmose/transformer.py @@ -1,4 +1,4 @@ -from backend.transformer import BaseTransformer, SiteTransformer +from backend.transformer import SiteTransformer class NMOSEPODSiteTransformer(SiteTransformer): diff --git a/backend/connectors/st2/source.py b/backend/connectors/st2/source.py index ee947f9..3467a6d 100644 --- a/backend/connectors/st2/source.py +++ b/backend/connectors/st2/source.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import datetime from functools import partial from backend.connectors import ( @@ -41,7 +40,6 @@ ) from backend.constants import ( DTW, - DTW_UNITS, DT_MEASURED, PARAMETER_NAME, PARAMETER_VALUE, @@ -49,7 +47,6 @@ SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, ) -from backend.source import BaseSiteSource, BaseWaterLevelSource, get_terminal_record URL = "https://st2.newmexicowaterdata.org/FROST-Server/v1.1" diff --git a/backend/connectors/st2/transformer.py b/backend/connectors/st2/transformer.py index 5417306..4a2a1ca 100644 --- a/backend/connectors/st2/transformer.py +++ b/backend/connectors/st2/transformer.py @@ -13,16 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import pprint -import sys from backend.connectors.st_connector import STSiteTransformer -from backend.record import SiteRecord from backend.converter import StandardUnitConverter from backend.transformer import ( - BaseTransformer, WaterLevelTransformer, - SiteTransformer, ) diff --git a/backend/connectors/st_connector.py b/backend/connectors/st_connector.py index 38aeeb3..ed9f10e 100644 --- a/backend/connectors/st_connector.py +++ b/backend/connectors/st_connector.py @@ -20,7 +20,6 @@ from shapely import MultiPolygon, unary_union from backend.bounding_polygons import get_state_polygon -from backend.constants import EARLIEST, LATEST from backend.source import ( BaseSiteSource, BaseWaterLevelSource, diff --git a/backend/connectors/usgs/transformer.py b/backend/connectors/usgs/transformer.py index 35a5ee0..41fa8d5 100644 --- a/backend/connectors/usgs/transformer.py +++ b/backend/connectors/usgs/transformer.py @@ -13,8 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from backend.record import SiteRecord -from backend.transformer import BaseTransformer, WaterLevelTransformer, SiteTransformer +from backend.transformer import WaterLevelTransformer, SiteTransformer class NWISSiteTransformer(SiteTransformer): diff --git a/backend/connectors/wqp/source.py b/backend/connectors/wqp/source.py index 63fc74f..e324fef 100644 --- a/backend/connectors/wqp/source.py +++ b/backend/connectors/wqp/source.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import pprint -import httpx from backend.connectors import NM_STATE_BOUNDING_POLYGON from backend.connectors.mappings import WQP_ANALYTE_MAPPING @@ -26,8 +24,6 @@ SOURCE_PARAMETER_NAME, SOURCE_PARAMETER_UNITS, DT_MEASURED, - EARLIEST, - LATEST, TDS, WATERLEVELS, SPECIFIC_CONDUCTANCE, diff --git a/backend/connectors/wqp/transformer.py b/backend/connectors/wqp/transformer.py index a2cf623..9acffbe 100644 --- a/backend/connectors/wqp/transformer.py +++ b/backend/connectors/wqp/transformer.py @@ -13,11 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import pprint -from backend.record import SiteRecord from backend.transformer import ( - BaseTransformer, SiteTransformer, AnalyteTransformer, WaterLevelTransformer, @@ -26,7 +23,6 @@ class WQPSiteTransformer(SiteTransformer): def _transform(self, record): - # pprint.pprint(record) provider = record["ProviderName"] rec = { "source": f"WQP/{provider}", diff --git a/backend/converter.py b/backend/converter.py index 42337b7..3c404f1 100644 --- a/backend/converter.py +++ b/backend/converter.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from typing import Protocol, runtime_checkable - from backend.constants import ( MILLIGRAMS_PER_LITER, PARTS_PER_MILLION, @@ -26,19 +24,6 @@ ) -@runtime_checkable -class UnitConverter(Protocol): - def convert( - self, - input_value: float, - input_units: str, - output_units: str, - source_parameter_name: str, - die_parameter_name: str, - dt: str | None = None, - ) -> tuple[float, float | None, str]: ... - - class StandardUnitConverter: def convert( self, diff --git a/backend/geo_utils.py b/backend/geo_utils.py index 43b81dd..eb6e850 100644 --- a/backend/geo_utils.py +++ b/backend/geo_utils.py @@ -16,7 +16,6 @@ import pyproj from shapely.ops import transform -PROJECTIONS: dict = {} TRANSFORMS: dict = {} ALLOWED_DATUMS = ["NAD27", "NAD83", "WGS84"] @@ -75,56 +74,4 @@ def datum_transform(x, y, in_datum, out_datum): return lng, lat -def utm_to_lonlat(e, n, zone=13): - """ - Converts easting and northing into longitude and latitude - - Parameters - -------- - e: float - easting - n: float - northing - - Returns - -------- - tuple - (longitude, latitude) - """ - name = f"utm{zone}" - if name not in PROJECTIONS: - pr = pyproj.Proj(proj="utm", zone=int(zone), ellps="WGS84") - PROJECTIONS[name] = pr - pr = PROJECTIONS[name] - lonlat = pr(e, n, inverse=True) - return lonlat - - -def lonlat_to_utm(lon, lat, zone=13): - """ - Converts longitude and latitude into easting and northing - - Parameters - -------- - lon: float - longitude in decimal degrees - lat: float - latitude in decimal degrees - - - Returns - -------- - tuple - (easting, northing) - """ - name = "lonlat" - if name not in PROJECTIONS: - pr = pyproj.Proj(proj="utm", ellps="WGS84", zone=zone) - PROJECTIONS[name] = pr - - pr = PROJECTIONS[name] - easting_northing = pr(lon, lat) - return easting_northing - - # ============= EOF ============================================= diff --git a/backend/logger.py b/backend/logger.py index b07a396..ae23cf0 100644 --- a/backend/logger.py +++ b/backend/logger.py @@ -48,26 +48,6 @@ def make_logger(name: str) -> Logger: return Logger(name) -class Loggable: - """Deprecated — do not subclass. Use make_logger() instead.""" - - def __init__(self): - self.logger = logging.getLogger(self.__class__.__name__) - - def log(self, msg, level=None, fg="yellow", **kwargs): - if level is None: - level = logging.INFO - - click.secho(f"{self.__class__.__name__:40s}{msg}", fg=fg) - self.logger.log(level, msg, **kwargs) - - def warn(self, msg, fg="red", **kwargs): - self.log(msg, fg=fg, level=logging.WARNING, **kwargs) - - def debug(self, msg): - self.log(msg, level=logging.DEBUG, fg="blue") - - def setup_logging(level=None, log_format=None, path=None): # _managed_handlers is mutated in place (clear/append), never reassigned, # so no `global` declaration is needed. diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index c76fa14..bd73a88 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -5,11 +5,9 @@ # You may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # =============================================================================== -import json -import os import time from itertools import groupby -from typing import Any, Type +from typing import Any from shapely.geometry.multipoint import MultiPoint from shapely.geometry.point import Point from sqlalchemy.dialects.postgresql import JSONB, insert @@ -22,7 +20,6 @@ Column, ForeignKey, create_engine, - UUID, String, Integer, Float, diff --git a/backend/persisters/strategies.py b/backend/persisters/strategies.py index c0dff2b..fe40296 100644 --- a/backend/persisters/strategies.py +++ b/backend/persisters/strategies.py @@ -1,11 +1,5 @@ import io import os -from typing import Protocol - - -class OutputStrategy(Protocol): - def write_bytes(self, path: str, content: bytes) -> None: ... - def make_directory(self, path: str) -> None: ... class LocalFileStrategy: diff --git a/backend/source.py b/backend/source.py index d54a495..cbfe158 100644 --- a/backend/source.py +++ b/backend/source.py @@ -14,7 +14,7 @@ # limitations under the License. # =============================================================================== from json import JSONDecodeError -from typing import Any, Literal, Optional, Union, List, Callable, Dict, cast +from typing import Any, Optional, Union, List, Callable, Dict, cast import httpx import shapely.wkt @@ -274,20 +274,6 @@ def health(self) -> bool: raise NotImplementedError(f"test not implemented by {self.__class__.__name__}") -class BaseContainerSource(BaseSource): - def __init__(self, *args, **kw): - super().__init__(*args, **kw) - - def check(self): - pass - - def discover(self, *args, **kw): - pass - - def read(self, *args, **kw): - pass - - class BaseSiteSource(BaseSource): chunk_size = 1 bounding_polygon = None @@ -496,8 +482,4 @@ def _extract_source_parameter_units(self, records): return [FEET for _ in records] -class BaseFileSource(BaseSource): - name = "files" - - # ============= EOF ============================================= diff --git a/backend/transformer.py b/backend/transformer.py index 646bd2d..32c89c5 100644 --- a/backend/transformer.py +++ b/backend/transformer.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import click -import pprint from datetime import datetime, date, timedelta import shapely @@ -22,13 +20,8 @@ from backend.bounding_polygons import NM_BOUNDARY_BUFFERED from backend.constants import ( - MILLIGRAMS_PER_LITER, - PARTS_PER_MILLION, - PARTS_PER_BILLION, FEET, METERS, - TONS_PER_ACRE_FOOT, - MICROGRAMS_PER_LITER, DT_MEASURED, DTW, EARLIEST, @@ -120,19 +113,6 @@ def transform_length_units( return value, out_unit -def convert_units( - input_value: int | float | str, - input_units: str, - output_units: str, - source_parameter_name: str, - die_parameter_name: str, - dt: str | None = None, -) -> tuple[float, float | None, str]: - """Deprecated: use StandardUnitConverter().convert() instead.""" - from backend.converter import StandardUnitConverter - return StandardUnitConverter().convert(float(input_value), input_units, output_units, source_parameter_name, die_parameter_name, dt) - - def standardize_datetime(dt, record_id): if isinstance(dt, tuple): dt = [di for di in dt if di is not None] diff --git a/backend/unifier.py b/backend/unifier.py index 739f6a7..a52f0c6 100644 --- a/backend/unifier.py +++ b/backend/unifier.py @@ -15,8 +15,8 @@ # =============================================================================== import shapely -from backend.config import Config, get_source, OutputFormat -from backend.logger import make_logger, setup_logging +from backend.config import Config, get_source +from backend.logger import make_logger _log = make_logger("unifier") from backend.constants import WATERLEVELS @@ -270,17 +270,6 @@ def _unify_parameter( persister.finalize(config.output_name) -def get_sources_in_polygon(polygon): - # polygon = shapely.wkt.loads(polygon) - sources = get_sources() - rets = [] - for source in sources: - _log.log(str(source)) - if source.intersects(polygon): - rets.append(source.tag) - return rets - - def get_county_bounds(county): config = Config() config.county = county @@ -330,84 +319,4 @@ def get_sources(config=None): return sources -def generate_site_bounds(): - source = get_source("bernco") - source.generate_bounding_polygon() - - -def analyte_unification_test(): - cfg = Config() - cfg.county = "chaves" - cfg.county = "eddy" - - cfg.analyte = "TDS" - cfg.output_summary = True - - # analyte testing - cfg.use_source_wqp = False - # cfg.use_source_nmbgmr = False - cfg.use_source_iscsevenrivers = False - cfg.use_source_bor = False - cfg.use_source_dwb = False - cfg.site_limit = 10 - - unify_analytes(cfg) - - -def waterlevel_unification_test(): - cfg = Config() - cfg.county = "chaves" - # cfg.county = "eddy" - # cfg.bbox = "-104.5 32.5,-104 33" - # cfg.start_date = "2020-01-01" - # cfg.end_date = "2020-5-01" - cfg.output_summary = False - cfg.output_name = "test00112233" - # cfg.output_summary = True - cfg.output_single_timeseries = True - - cfg.use_source_nwis = False - cfg.use_source_nmbgmr = False - cfg.use_source_iscsevenrivers = False - cfg.use_source_pvacd = False - # cfg.use_source_oseroswell = False - cfg.use_source_bernco = False - cfg.use_source_iscsevenrivers = False - cfg.use_source_nmose_isc_seven_rivers = False - cfg.use_source_ebid = False - # cfg.site_limit = 10 - - unify_waterlevels(cfg) - - -def get_datastream(siteid): - import httpx - - resp = httpx.get( - f"https://st2.newmexicowaterdata.org/FROST-Server/v1.1/Locations({siteid})?$expand=Things/Datastreams" - ) - obj = resp.json() - return obj["Things"][0]["Datastreams"][0] - - -def get_datastreams(): - s = get_source("pvacd") - for si in s.read_sites(): - ds = get_datastream(si.id) - _log.log(f"{si} {si.id} {ds['@iot.id']}") - - -# if __name__ == "__main__": -# test_waterlevel_unification() -# root = logging.getLogger() -# root.setLevel(logging.DEBUG) -# shandler = logging.StreamHandler() -# get_sources(Config()) -# setup_logging() -# site_unification_test() -# waterlevel_unification_test() -# analyte_unification_test() -# print(health_check("nwis")) -# generate_site_bounds() - # ============= EOF ============================================= diff --git a/dagster_cloud.yaml b/dagster_cloud.yaml new file mode 100644 index 0000000..7896743 --- /dev/null +++ b/dagster_cloud.yaml @@ -0,0 +1,9 @@ +locations: + - location_name: die-orchestration + code_source: + module_name: orchestration.definitions + build: + # PEX fast-deploy build root. orchestration/pyproject.toml declares the + # dagster deps and the `nmuwd` path-dependency (tool.uv.sources); the + # builder recurses into the repo root to collect backend/ + its deps. + directory: orchestration diff --git a/frontend/api/app.py b/frontend/api/app.py index bddd34d..5b7a06e 100644 --- a/frontend/api/app.py +++ b/frontend/api/app.py @@ -15,9 +15,7 @@ # =============================================================================== import hashlib import json -import multiprocessing import os -import time from typing import Optional from fastapi import FastAPI, HTTPException diff --git a/orchestration/assets/wells.py b/orchestration/assets/wells.py deleted file mode 100644 index e62acca..0000000 --- a/orchestration/assets/wells.py +++ /dev/null @@ -1,47 +0,0 @@ -import tempfile -from pathlib import Path - -import dagster as dg - -from backend.unifier import unify_sites -from backend.persisters.ogc_features import dump_summary_collection -from orchestration.resources.die_config import DIEConfigResource -from orchestration.resources.gcs import GCSResource - - -def build_wells_asset(product: dict): - @dg.asset(name=product["id"], group_name="wells") - def _wells_asset( - die_config: DIEConfigResource, - gcs: GCSResource, - ) -> dg.MaterializeResult: - config = die_config.get_config(product) - config.sites_only = True - - with tempfile.TemporaryDirectory() as tmpdir: - unify_sites(config) - - # Collect sites from persister (set by unify_sites via _unify_parameter) - from backend.persister import BasePersister - sites = config._persister.sites if hasattr(config, "_persister") else [] - - out = Path(tmpdir) / "collection.geojson" - meta = { - "id": product["id"], - "title": product.get("title", product["id"]), - "description": product.get("description", ""), - } - dump_summary_collection(str(out), sites, meta) - - info = gcs.upload_product(str(out), product["id"]) - - return dg.MaterializeResult( - metadata={ - "feature_count": dg.MetadataValue.int(info["feature_count"]), - "dated_uri": dg.MetadataValue.url(info["dated_uri"]), - "latest_uri": dg.MetadataValue.url(info["latest_uri"]), - "file_size_bytes": dg.MetadataValue.int(info["file_size_bytes"]), - } - ) - - return _wells_asset diff --git a/orchestration/deploy_serverless.sh b/orchestration/deploy_serverless.sh new file mode 100755 index 0000000..bd76476 --- /dev/null +++ b/orchestration/deploy_serverless.sh @@ -0,0 +1,41 @@ +#!/bin/bash +# Manual deploy of the DIE code location to Dagster+ Serverless. +# +# Same target as the GitHub Action (.github/workflows/dagster-cloud-deploy.yml): +# a PEX fast-deploy built from dagster_cloud.yaml. Use this to push from a laptop +# without going through CI. +# +# Prerequisites: +# - Docker running locally (build-method=docker builds manylinux-compatible +# wheels so the deploy works regardless of host OS). +# - uv installed (the dagster-cloud CLI is pulled in on the fly via `uv run`). +# +# Required env vars: +# DAGSTER_CLOUD_ORGANIZATION your Dagster+ org name (e.g. "nmwd") +# DAGSTER_CLOUD_API_TOKEN a Dagster+ user/agent token +# Optional: +# DEPLOYMENT target deployment (default: prod) +# +# Usage: +# DAGSTER_CLOUD_ORGANIZATION=nmwd DAGSTER_CLOUD_API_TOKEN=*** \ +# orchestration/deploy_serverless.sh +set -euo pipefail + +: "${DAGSTER_CLOUD_ORGANIZATION:?set DAGSTER_CLOUD_ORGANIZATION to your Dagster+ org}" +: "${DAGSTER_CLOUD_API_TOKEN:?set DAGSTER_CLOUD_API_TOKEN to a Dagster+ token}" +DEPLOYMENT="${DEPLOYMENT:-prod}" + +# Run from repo root so dagster_cloud.yaml + the orchestration build dir resolve. +REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$REPO_ROOT" + +echo "Deploying location 'die-orchestration' to org '$DAGSTER_CLOUD_ORGANIZATION' deployment '$DEPLOYMENT'..." + +uv run --with dagster-cloud -- \ + dagster-cloud serverless deploy-python-executable \ + --organization "$DAGSTER_CLOUD_ORGANIZATION" \ + --deployment "$DEPLOYMENT" \ + --location-file dagster_cloud.yaml \ + --location-name die-orchestration \ + --python-version 3.10 \ + --build-method docker diff --git a/orchestration/resources/gcs.py b/orchestration/resources/gcs.py index 0400033..f9a1ae9 100644 --- a/orchestration/resources/gcs.py +++ b/orchestration/resources/gcs.py @@ -1,4 +1,3 @@ -import os from datetime import datetime, timezone from pathlib import Path from typing import Optional