From 324bd4bcdd1412a5bd92664c092834f24464f0f6 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 26 Jun 2026 15:12:10 -0600 Subject: [PATCH] Annotate asset and builder return types The asset compute fns and builders were unannotated, so mypy/Dagster inferred Any. Add precise types: - _source_asset -> Iterator[dg.Output | dg.AssetCheckResult] (yields both). - _build_source_asset -> tuple[AssetsDefinition, AssetKey]; _build_combine_asset / _build_geoserver_asset -> AssetsDefinition; build_product_assets -> list[AssetsDefinition]. - definitions: _build_assets -> list[AssetsDefinition], _build_jobs -> dict[str, UnresolvedAssetJobDefinition] (TYPE_CHECKING import, not a public export), _build_schedules -> list[ScheduleDefinition], _products -> Iterator[dict]. - helpers: _product_params/_product_source_keys -> list[str]; annotate the source asset's record/site/timeseries accumulators. mypy clean on both files; defs load; 13 persister tests pass. Co-Authored-By: Claude Opus 4.8 --- orchestration/assets/products.py | 28 ++++++++++++++++++++-------- orchestration/definitions.py | 18 ++++++++++++++---- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 22a3d74..e951c78 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -27,6 +27,7 @@ """ import tempfile import traceback +from collections.abc import Iterator from pathlib import Path import dagster as dg @@ -63,7 +64,7 @@ ] -def _product_params(product: dict) -> list: +def _product_params(product: dict) -> list[str]: """The DIE parameter(s) a product unifies. Single-parameter products yield one; the major-chemistry product yields the major-ion suite.""" if product.get("output_type") == "ogc_major_chemistry": @@ -71,7 +72,7 @@ def _product_params(product: dict) -> list: return [product["parameter"]] -def _product_source_keys(product: dict) -> list: +def _product_source_keys(product: dict) -> list[str]: """Source keys that apply to this product: the union of its parameters' agencies, filtered by the product's include/exclude list.""" agencies: list = [] @@ -93,7 +94,9 @@ def _in_name(source_key: str) -> str: return f"src_{source_key.replace('-', '_')}" -def _build_source_asset(product: dict, source_key: str, group: str): +def _build_source_asset( + product: dict, source_key: str, group: str +) -> tuple[dg.AssetsDefinition, dg.AssetKey]: """Build the asset that unifies a single source for *product*. Returns ``(asset_def, asset_key)``. The asset never raises: on failure it @@ -109,9 +112,13 @@ def _build_source_asset(product: dict, source_key: str, group: str): group_name=group, check_specs=[dg.AssetCheckSpec(name=_CHECK_NAME, asset=src_key)], ) - def _source_asset(context: dg.AssetExecutionContext, die_config: DIEConfigResource): + def _source_asset( + context: dg.AssetExecutionContext, die_config: DIEConfigResource + ) -> Iterator[dg.Output | dg.AssetCheckResult]: error = "" - records, sites, timeseries = [], [], [] + records: list[dict] = [] + sites: list[dict] = [] + timeseries: list[list[dict]] = [] try: # One unification pass per parameter. Single-parameter products run # once; the major-chemistry product runs once per analyte and @@ -163,7 +170,12 @@ def _source_asset(context: dg.AssetExecutionContext, die_config: DIEConfigResour return _source_asset, src_key -def _build_combine_asset(product: dict, source_keys: list, source_asset_keys: list, group: str): +def _build_combine_asset( + product: dict, + source_keys: list[str], + source_asset_keys: list[dg.AssetKey], + group: str, +) -> dg.AssetsDefinition: """Build the combine asset (keyed ``[product_id]``) for *product*. Depends on every source asset (wired via ``ins``), merges their @@ -260,7 +272,7 @@ def _geojson_to_geopackage(geojson_path: Path, layer_name: str, out_dir: Path): return gpkg_path, (minx, miny, maxx, maxy) -def _build_geoserver_asset(product: dict, group: str): +def _build_geoserver_asset(product: dict, group: str) -> dg.AssetsDefinition: """Build the GeoServer publish asset (keyed ``[product_id, "geoserver"]``). Depends on the combine asset for ordering only (``deps``, no data passed): @@ -320,7 +332,7 @@ def _geoserver_asset( return _geoserver_asset -def build_product_assets(product: dict) -> list: +def build_product_assets(product: dict) -> list[dg.AssetsDefinition]: """Return the full asset list for *product*: one source asset per applicable source, the combine asset, and the geoserver publish asset (see module docstring for the graph shape). Assets are grouped ``waterlevels`` or diff --git a/orchestration/definitions.py b/orchestration/definitions.py index 21d05cc..57178a2 100644 --- a/orchestration/definitions.py +++ b/orchestration/definitions.py @@ -1,4 +1,6 @@ +from collections.abc import Iterator from pathlib import Path +from typing import TYPE_CHECKING import dagster as dg import yaml @@ -9,6 +11,12 @@ from orchestration.resources.geoserver import GeoServerResource from orchestration.assets.products import build_product_assets +if TYPE_CHECKING: + # define_asset_job returns this; it isn't a public dagster export. + from dagster._core.definitions.unresolved_asset_job_definition import ( + UnresolvedAssetJobDefinition, + ) + _PRODUCTS_PATH = Path(__file__).parent / "config" / "products.yaml" _SUPPORTED_OUTPUT_TYPES = { @@ -23,7 +31,7 @@ def _load_products() -> dict: return yaml.safe_load(_PRODUCTS_PATH.read_text()) -def _build_assets(products_config: dict) -> list: +def _build_assets(products_config: dict) -> list[dg.AssetsDefinition]: assets = [] for product in products_config["products"]: if product.get("output_type") in _SUPPORTED_OUTPUT_TYPES: @@ -37,13 +45,13 @@ def _product_selection(pid: str) -> dg.AssetSelection: return dg.AssetSelection.keys(dg.AssetKey([pid, "geoserver"])).upstream() -def _products(products_config: dict): +def _products(products_config: dict) -> Iterator[dict]: for product in products_config["products"]: if product.get("output_type") in _SUPPORTED_OUTPUT_TYPES: yield product -def _build_jobs(products_config: dict) -> dict: +def _build_jobs(products_config: dict) -> dict[str, "UnresolvedAssetJobDefinition"]: """One asset job per product, selecting that product's whole graph. Returns {product_id: job} so schedules can target the job.""" jobs = {} @@ -57,7 +65,9 @@ def _build_jobs(products_config: dict) -> dict: return jobs -def _build_schedules(products_config: dict, jobs: dict) -> list: +def _build_schedules( + products_config: dict, jobs: dict[str, "UnresolvedAssetJobDefinition"] +) -> list[dg.ScheduleDefinition]: schedules = [] for product in _products(products_config): pid = product["id"]