From c7be39477ea96254e03ab390ec810dc70f9a3e9e Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 27 Jun 2026 13:48:28 -0600 Subject: [PATCH 1/2] Skip GCS upload when product content is unchanged Every run wrote a new dated archive + overwrote latest.geojson even when the data was identical, duplicating datasets in GCS. Dedup by content hash: - _content_hash hashes the GeoJSON ignoring the volatile timeStamp, so an unchanged dataset hashes the same across runs. - upload_product stores the hash on the dated/latest blob metadata; on the next run it compares against latest.geojson's stored hash and, on a match, skips writing entirely (dated_uri=None, skipped=True). - combine asset surfaces skipped_unchanged in MaterializeResult metadata and omits dated_uri when skipped. Verified: hash is stable across timeStamp, changes with features; skip path writes nothing, upload path sets metadata + copies to latest. Co-Authored-By: Claude Opus 4.8 --- orchestration/assets/products.py | 20 ++++++----- orchestration/resources/gcs.py | 58 +++++++++++++++++++++++++++----- 2 files changed, 61 insertions(+), 17 deletions(-) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index 77aa496..f365f44 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -234,14 +234,18 @@ def _combine_asset( info = gcs.upload_product(str(out), pid) - return dg.MaterializeResult( - metadata={ - "feature_count": dg.MetadataValue.int(info["feature_count"]), - "dated_uri": dg.MetadataValue.url(info["dated_uri"]), - "latest_uri": dg.MetadataValue.url(info["latest_uri"]), - "source_count": dg.MetadataValue.int(len(sources)), - } - ) + metadata: dict = { + "feature_count": dg.MetadataValue.int(info["feature_count"]), + "latest_uri": dg.MetadataValue.url(info["latest_uri"]), + "source_count": dg.MetadataValue.int(len(sources)), + # True when content matched what's already in GCS (no new upload). + "skipped_unchanged": dg.MetadataValue.bool(bool(info.get("skipped"))), + } + # dated_uri is None when the upload was skipped as unchanged. + if info.get("dated_uri"): + metadata["dated_uri"] = dg.MetadataValue.url(info["dated_uri"]) + + return dg.MaterializeResult(metadata=metadata) return _combine_asset diff --git a/orchestration/resources/gcs.py b/orchestration/resources/gcs.py index 9c77198..2365e68 100644 --- a/orchestration/resources/gcs.py +++ b/orchestration/resources/gcs.py @@ -1,3 +1,4 @@ +import hashlib import json import os from datetime import datetime, timezone @@ -13,6 +14,19 @@ _GCS_AVAILABLE = False +_CONTENT_HASH_KEY = "content_hash" + + +def _content_hash(local_path: str) -> str: + """Stable SHA-256 of a product's GeoJSON content, ignoring the volatile + `timeStamp` so that re-running with unchanged data yields the same hash.""" + with open(local_path, encoding="utf-8") as f: + data = json.load(f) + data.pop("timeStamp", None) + payload = json.dumps(data, sort_keys=True, default=str).encode("utf-8") + return hashlib.sha256(payload).hexdigest() + + def _storage_client(): """Build a GCS client. Dagster+ serverless has no Application Default Credentials, so prefer an explicit service-account key from a Dagster+ @@ -56,13 +70,20 @@ def upload_product( run_date: Optional[str] = None, ) -> dict: """ - Upload *local_path* as both a dated archive and latest.geojson. + Upload *local_path* as both a dated archive and latest.geojson — unless + the content is identical to what's already in GCS, in which case the + upload is skipped to avoid duplicate dated archives. + + Dedup compares a content hash (ignoring the volatile timeStamp) against + the hash stored on the current latest.geojson's metadata. Returns dict with: - dated_uri: gs://bucket/products/{product_id}/{date}.geojson + dated_uri: gs://bucket/products/{product_id}/{date}.geojson (None if skipped) latest_uri: gs://bucket/products/{product_id}/latest.geojson feature_count: int file_size_bytes: int + run_date: str + skipped: bool — True when content matched and nothing was written """ if run_date is None: run_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") @@ -74,25 +95,44 @@ def upload_product( latest_key = f"{self.products_prefix}/{product_id}/latest.geojson" file_size = Path(local_path).stat().st_size + new_hash = _content_hash(local_path) + + with open(local_path, encoding="utf-8") as f: + data = json.load(f) + feature_count = data.get("numberReturned", len(data.get("features", []))) + + latest_uri = f"gs://{self.bucket_name}/{latest_key}" + + # Skip if the existing latest.geojson has the same content hash. + latest_blob = bucket.blob(latest_key) + if latest_blob.exists(): + latest_blob.reload() + if (latest_blob.metadata or {}).get(_CONTENT_HASH_KEY) == new_hash: + return { + "dated_uri": None, + "latest_uri": latest_uri, + "feature_count": feature_count, + "file_size_bytes": file_size, + "run_date": run_date, + "skipped": True, + } dated_blob = bucket.blob(dated_key) + dated_blob.metadata = {_CONTENT_HASH_KEY: new_hash} dated_blob.upload_from_filename(local_path, content_type="application/geo+json") # §V: atomic latest — copy from the just-uploaded dated blob, not - # another upload that could race with a concurrent reader. + # another upload that could race with a concurrent reader. copy_blob + # carries the content_hash metadata to latest for the next dedup check. bucket.copy_blob(dated_blob, bucket, latest_key) - import json - with open(local_path, encoding="utf-8") as f: - data = json.load(f) - feature_count = data.get("numberReturned", len(data.get("features", []))) - return { "dated_uri": f"gs://{self.bucket_name}/{dated_key}", - "latest_uri": f"gs://{self.bucket_name}/{latest_key}", + "latest_uri": latest_uri, "feature_count": feature_count, "file_size_bytes": file_size, "run_date": run_date, + "skipped": False, } From 9f06b3483536119ec83674c9acd8715bb1de261f Mon Sep 17 00:00:00 2001 From: jakeross Date: Sat, 27 Jun 2026 22:39:19 -0600 Subject: [PATCH 2/2] Track data change-recency to inform run frequency Persist last_changed (YYYY-MM-DD the content actually changed) in the GCS blob metadata alongside content_hash, and report days_since_last_change on every run. On a skipped (unchanged) run the last_changed date is carried forward and the day count grows; on a real change it resets to 0. Surfaced in the combine asset's MaterializeResult metadata (last_changed, days_since_last_change) so a product whose data has been static for, say, 60+ days is an obvious candidate to relax from a daily to a monthly schedule. Co-Authored-By: Claude Opus 4.8 --- orchestration/assets/products.py | 9 +++++++++ orchestration/resources/gcs.py | 29 +++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/orchestration/assets/products.py b/orchestration/assets/products.py index f365f44..b1d43ca 100644 --- a/orchestration/assets/products.py +++ b/orchestration/assets/products.py @@ -244,6 +244,15 @@ def _combine_asset( # dated_uri is None when the upload was skipped as unchanged. if info.get("dated_uri"): metadata["dated_uri"] = dg.MetadataValue.url(info["dated_uri"]) + # Change-recency signal for tuning run frequency: how long the data has + # been static. A large, growing value means the schedule can be relaxed + # (e.g. daily -> monthly); 0 means it changed this run. + if info.get("last_changed"): + metadata["last_changed"] = dg.MetadataValue.text(info["last_changed"]) + if info.get("days_since_last_change") is not None: + metadata["days_since_last_change"] = dg.MetadataValue.int( + info["days_since_last_change"] + ) return dg.MaterializeResult(metadata=metadata) diff --git a/orchestration/resources/gcs.py b/orchestration/resources/gcs.py index 2365e68..119c808 100644 --- a/orchestration/resources/gcs.py +++ b/orchestration/resources/gcs.py @@ -15,6 +15,17 @@ _CONTENT_HASH_KEY = "content_hash" +_LAST_CHANGED_KEY = "last_changed" # YYYY-MM-DD the content last actually changed + + +def _days_between(start: str, end: str) -> Optional[int]: + """Whole days between two YYYY-MM-DD strings, or None if unparseable.""" + try: + a = datetime.strptime(start, "%Y-%m-%d") + b = datetime.strptime(end, "%Y-%m-%d") + return (b - a).days + except (TypeError, ValueError): + return None def _content_hash(local_path: str) -> str: @@ -103,11 +114,16 @@ def upload_product( latest_uri = f"gs://{self.bucket_name}/{latest_key}" - # Skip if the existing latest.geojson has the same content hash. + # Skip if the existing latest.geojson has the same content hash. Carry + # forward the last-changed date so we can report how long the data has + # been static (a signal for tuning run frequency — e.g. data unchanged + # for months means the schedule can safely drop to monthly). latest_blob = bucket.blob(latest_key) if latest_blob.exists(): latest_blob.reload() - if (latest_blob.metadata or {}).get(_CONTENT_HASH_KEY) == new_hash: + existing_meta = latest_blob.metadata or {} + if existing_meta.get(_CONTENT_HASH_KEY) == new_hash: + last_changed = existing_meta.get(_LAST_CHANGED_KEY, run_date) return { "dated_uri": None, "latest_uri": latest_uri, @@ -115,15 +131,18 @@ def upload_product( "file_size_bytes": file_size, "run_date": run_date, "skipped": True, + "last_changed": last_changed, + "days_since_last_change": _days_between(last_changed, run_date), } + # Content changed (or first upload) — last_changed is now. dated_blob = bucket.blob(dated_key) - dated_blob.metadata = {_CONTENT_HASH_KEY: new_hash} + dated_blob.metadata = {_CONTENT_HASH_KEY: new_hash, _LAST_CHANGED_KEY: run_date} dated_blob.upload_from_filename(local_path, content_type="application/geo+json") # §V: atomic latest — copy from the just-uploaded dated blob, not # another upload that could race with a concurrent reader. copy_blob - # carries the content_hash metadata to latest for the next dedup check. + # carries the content_hash/last_changed metadata to latest. bucket.copy_blob(dated_blob, bucket, latest_key) return { @@ -133,6 +152,8 @@ def upload_product( "file_size_bytes": file_size, "run_date": run_date, "skipped": False, + "last_changed": run_date, + "days_since_last_change": 0, }