Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"""
import tempfile
import traceback
from collections.abc import Iterator
from pathlib import Path

import dagster as dg
Expand Down Expand Up @@ -63,15 +64,15 @@
]


def _product_params(product: dict) -> list:
def _product_params(product: dict) -> list[str]:
"""The DIE parameter(s) a product unifies. Single-parameter products yield
one; the major-chemistry product yields the major-ion suite."""
if product.get("output_type") == "ogc_major_chemistry":
return list(_MAJOR_CHEMISTRY)
return [product["parameter"]]


def _product_source_keys(product: dict) -> list:
def _product_source_keys(product: dict) -> list[str]:
"""Source keys that apply to this product: the union of its parameters'
agencies, filtered by the product's include/exclude list."""
agencies: list = []
Expand All @@ -93,7 +94,9 @@ def _in_name(source_key: str) -> str:
return f"src_{source_key.replace('-', '_')}"


def _build_source_asset(product: dict, source_key: str, group: str):
def _build_source_asset(
product: dict, source_key: str, group: str
) -> tuple[dg.AssetsDefinition, dg.AssetKey]:
"""Build the asset that unifies a single source for *product*.

Returns ``(asset_def, asset_key)``. The asset never raises: on failure it
Expand All @@ -109,9 +112,13 @@ def _build_source_asset(product: dict, source_key: str, group: str):
group_name=group,
check_specs=[dg.AssetCheckSpec(name=_CHECK_NAME, asset=src_key)],
)
def _source_asset(context: dg.AssetExecutionContext, die_config: DIEConfigResource):
def _source_asset(
context: dg.AssetExecutionContext, die_config: DIEConfigResource
) -> Iterator[dg.Output | dg.AssetCheckResult]:
error = ""
records, sites, timeseries = [], [], []
records: list[dict] = []
sites: list[dict] = []
timeseries: list[list[dict]] = []
try:
# One unification pass per parameter. Single-parameter products run
# once; the major-chemistry product runs once per analyte and
Expand Down Expand Up @@ -163,7 +170,12 @@ def _source_asset(context: dg.AssetExecutionContext, die_config: DIEConfigResour
return _source_asset, src_key


def _build_combine_asset(product: dict, source_keys: list, source_asset_keys: list, group: str):
def _build_combine_asset(
product: dict,
source_keys: list[str],
source_asset_keys: list[dg.AssetKey],
group: str,
) -> dg.AssetsDefinition:
"""Build the combine asset (keyed ``[product_id]``) for *product*.

Depends on every source asset (wired via ``ins``), merges their
Expand Down Expand Up @@ -260,7 +272,7 @@ def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path):
return gpkg_path, (minx, miny, maxx, maxy)


def _build_geoserver_asset(product: dict, group: str):
def _build_geoserver_asset(product: dict, group: str) -> dg.AssetsDefinition:
"""Build the GeoServer publish asset (keyed ``[product_id, "geoserver"]``).

Depends on the combine asset for ordering only (``deps``, no data passed):
Expand Down Expand Up @@ -320,7 +332,7 @@ def _geoserver_asset(
return _geoserver_asset


def build_product_assets(product: dict) -> list:
def build_product_assets(product: dict) -> list[dg.AssetsDefinition]:
"""Return the full asset list for *product*: one source asset per applicable
source, the combine asset, and the geoserver publish asset (see module
docstring for the graph shape). Assets are grouped ``waterlevels`` or
Expand Down
18 changes: 14 additions & 4 deletions orchestration/definitions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING

import dagster as dg
import yaml
Expand All @@ -9,6 +11,12 @@
from orchestration.resources.geoserver import GeoServerResource
from orchestration.assets.products import build_product_assets

if TYPE_CHECKING:
# define_asset_job returns this; it isn't a public dagster export.
from dagster._core.definitions.unresolved_asset_job_definition import (
UnresolvedAssetJobDefinition,
)

_PRODUCTS_PATH = Path(__file__).parent / "config" / "products.yaml"

_SUPPORTED_OUTPUT_TYPES = {
Expand All @@ -23,7 +31,7 @@ def _load_products() -> dict:
return yaml.safe_load(_PRODUCTS_PATH.read_text())


def _build_assets(products_config: dict) -> list:
def _build_assets(products_config: dict) -> list[dg.AssetsDefinition]:
assets = []
for product in products_config["products"]:
if product.get("output_type") in _SUPPORTED_OUTPUT_TYPES:
Expand All @@ -37,13 +45,13 @@ def _product_selection(pid: str) -> dg.AssetSelection:
return dg.AssetSelection.keys(dg.AssetKey([pid, "geoserver"])).upstream()


def _products(products_config: dict):
def _products(products_config: dict) -> Iterator[dict]:
for product in products_config["products"]:
if product.get("output_type") in _SUPPORTED_OUTPUT_TYPES:
yield product


def _build_jobs(products_config: dict) -> dict:
def _build_jobs(products_config: dict) -> dict[str, "UnresolvedAssetJobDefinition"]:
"""One asset job per product, selecting that product's whole graph.
Returns {product_id: job} so schedules can target the job."""
jobs = {}
Expand All @@ -57,7 +65,9 @@ def _build_jobs(products_config: dict) -> dict:
return jobs


def _build_schedules(products_config: dict, jobs: dict) -> list:
def _build_schedules(
products_config: dict, jobs: dict[str, "UnresolvedAssetJobDefinition"]
) -> list[dg.ScheduleDefinition]:
schedules = []
for product in _products(products_config):
pid = product["id"]
Expand Down
Loading