Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions orchestration/assets/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,27 @@ def _combine_asset(

info = gcs.upload_product(str(out), pid)

return dg.MaterializeResult(
metadata={
"feature_count": dg.MetadataValue.int(info["feature_count"]),
"dated_uri": dg.MetadataValue.url(info["dated_uri"]),
"latest_uri": dg.MetadataValue.url(info["latest_uri"]),
"source_count": dg.MetadataValue.int(len(sources)),
}
)
metadata: dict = {
"feature_count": dg.MetadataValue.int(info["feature_count"]),
"latest_uri": dg.MetadataValue.url(info["latest_uri"]),
"source_count": dg.MetadataValue.int(len(sources)),
# True when content matched what's already in GCS (no new upload).
"skipped_unchanged": dg.MetadataValue.bool(bool(info.get("skipped"))),
}
# dated_uri is None when the upload was skipped as unchanged.
if info.get("dated_uri"):
metadata["dated_uri"] = dg.MetadataValue.url(info["dated_uri"])
# Change-recency signal for tuning run frequency: how long the data has
# been static. A large, growing value means the schedule can be relaxed
# (e.g. daily -> monthly); 0 means it changed this run.
if info.get("last_changed"):
metadata["last_changed"] = dg.MetadataValue.text(info["last_changed"])
if info.get("days_since_last_change") is not None:
metadata["days_since_last_change"] = dg.MetadataValue.int(
info["days_since_last_change"]
)

return dg.MaterializeResult(metadata=metadata)

return _combine_asset

Expand Down
79 changes: 70 additions & 9 deletions orchestration/resources/gcs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import json
import os
from datetime import datetime, timezone
Expand All @@ -13,6 +14,30 @@
_GCS_AVAILABLE = False


_CONTENT_HASH_KEY = "content_hash"
_LAST_CHANGED_KEY = "last_changed" # YYYY-MM-DD the content last actually changed


def _days_between(start: str, end: str) -> Optional[int]:
"""Whole days between two YYYY-MM-DD strings, or None if unparseable."""
try:
a = datetime.strptime(start, "%Y-%m-%d")
b = datetime.strptime(end, "%Y-%m-%d")
return (b - a).days
except (TypeError, ValueError):
return None


def _content_hash(local_path: str) -> str:
"""Stable SHA-256 of a product's GeoJSON content, ignoring the volatile
`timeStamp` so that re-running with unchanged data yields the same hash."""
with open(local_path, encoding="utf-8") as f:
data = json.load(f)
data.pop("timeStamp", None)
payload = json.dumps(data, sort_keys=True, default=str).encode("utf-8")
return hashlib.sha256(payload).hexdigest()


def _storage_client():
"""Build a GCS client. Dagster+ serverless has no Application Default
Credentials, so prefer an explicit service-account key from a Dagster+
Expand Down Expand Up @@ -56,13 +81,20 @@ def upload_product(
run_date: Optional[str] = None,
) -> dict:
"""
Upload *local_path* as both a dated archive and latest.geojson.
Upload *local_path* as both a dated archive and latest.geojson — unless
the content is identical to what's already in GCS, in which case the
upload is skipped to avoid duplicate dated archives.

Dedup compares a content hash (ignoring the volatile timeStamp) against
the hash stored on the current latest.geojson's metadata.

Returns dict with:
dated_uri: gs://bucket/products/{product_id}/{date}.geojson
dated_uri: gs://bucket/products/{product_id}/{date}.geojson (None if skipped)
latest_uri: gs://bucket/products/{product_id}/latest.geojson
feature_count: int
file_size_bytes: int
run_date: str
skipped: bool — True when content matched and nothing was written
"""
if run_date is None:
run_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
Expand All @@ -74,25 +106,54 @@ def upload_product(
latest_key = f"{self.products_prefix}/{product_id}/latest.geojson"

file_size = Path(local_path).stat().st_size
new_hash = _content_hash(local_path)

with open(local_path, encoding="utf-8") as f:
data = json.load(f)
feature_count = data.get("numberReturned", len(data.get("features", [])))

latest_uri = f"gs://{self.bucket_name}/{latest_key}"

# Skip if the existing latest.geojson has the same content hash. Carry
# forward the last-changed date so we can report how long the data has
# been static (a signal for tuning run frequency — e.g. data unchanged
# for months means the schedule can safely drop to monthly).
latest_blob = bucket.blob(latest_key)
if latest_blob.exists():
latest_blob.reload()
existing_meta = latest_blob.metadata or {}
if existing_meta.get(_CONTENT_HASH_KEY) == new_hash:
last_changed = existing_meta.get(_LAST_CHANGED_KEY, run_date)
return {
"dated_uri": None,
"latest_uri": latest_uri,
"feature_count": feature_count,
"file_size_bytes": file_size,
"run_date": run_date,
"skipped": True,
"last_changed": last_changed,
"days_since_last_change": _days_between(last_changed, run_date),
}

# Content changed (or first upload) — last_changed is now.
dated_blob = bucket.blob(dated_key)
dated_blob.metadata = {_CONTENT_HASH_KEY: new_hash, _LAST_CHANGED_KEY: run_date}
dated_blob.upload_from_filename(local_path, content_type="application/geo+json")

# §V: atomic latest — copy from the just-uploaded dated blob, not
# another upload that could race with a concurrent reader.
# another upload that could race with a concurrent reader. copy_blob
# carries the content_hash/last_changed metadata to latest.
bucket.copy_blob(dated_blob, bucket, latest_key)

import json
with open(local_path, encoding="utf-8") as f:
data = json.load(f)
feature_count = data.get("numberReturned", len(data.get("features", [])))

return {
"dated_uri": f"gs://{self.bucket_name}/{dated_key}",
"latest_uri": f"gs://{self.bucket_name}/{latest_key}",
"latest_uri": latest_uri,
"feature_count": feature_count,
"file_size_bytes": file_size,
"run_date": run_date,
"skipped": False,
"last_changed": run_date,
"days_since_last_change": 0,
}


Expand Down
Loading