From 6f07da695114dc2366ca25bcb364d400a05cf364 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 1 May 2026 21:52:03 +0300 Subject: [PATCH 1/3] feat: thread primary_task and label_window_days through full pipeline Make the bundle directory names, manifest keys, and validation paths respect config.primary_task and config.label_window_days instead of hardcoding converted_within_90_days throughout. Changes: - schema/tasks.py: add task_manifest_for_config() factory - api/bundle.py: use config-derived task manifest for splits and manifest - api/generator.py + recipes.py: accept primary_task/label_window_days kwargs - render/manifests.py: include primary_task and label_window_days in manifest - validation/realism.py: read task path from manifest - validation/drift.py: read task path from manifest with fallback - pipelines/build_v5.py, build_v6.py: configurable label_column in rename Default behavior (primary_task="converted_within_90_days", label_window_days=90) produces identical output to before. 16 new tests; 773 total passing. Closes #37 Co-Authored-By: Claude Opus 4.6 --- .agent-plan.md | 13 ++ leadforge/api/bundle.py | 7 +- leadforge/api/generator.py | 4 + leadforge/api/recipes.py | 6 + leadforge/pipelines/build_v5.py | 16 +- leadforge/pipelines/build_v6.py | 14 +- leadforge/render/manifests.py | 2 + leadforge/schema/tasks.py | 30 ++++ leadforge/validation/drift.py | 32 +++- leadforge/validation/realism.py | 43 +++-- tests/test_primary_task_threading.py | 228 +++++++++++++++++++++++++++ 11 files changed, 372 insertions(+), 23 deletions(-) create mode 100644 tests/test_primary_task_threading.py diff --git a/.agent-plan.md b/.agent-plan.md index 6cdcd37..b045570 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -128,6 +128,19 @@ Documentation + CI: - [x] `leadforge/narrative/dataset_card.py` — renders task name and label window from `world_spec.config` instead of hard-coded literals - [x] 10 new tests (3 dataset card + 7 config resolution); total 750 passing +### Thread primary_task through pipeline (PR #??, closes #37) + +- [x] `leadforge/schema/tasks.py` — `task_manifest_for_config()` factory builds `TaskManifest` from `primary_task` and `label_window_days` +- [x] `leadforge/api/bundle.py` — uses `task_manifest_for_config(config.primary_task, config.label_window_days)` for task splits and manifest key (replaces hardcoded `CONVERTED_WITHIN_90_DAYS.task_id`) +- [x] `leadforge/api/generator.py` — `primary_task` and `label_window_days` kwargs on `Generator.from_recipe()` (Layer 1 precedence) +- [x] `leadforge/api/recipes.py` — `primary_task` and `label_window_days` kwargs on `resolve_config()` (Layer 1 precedence) +- [x] `leadforge/render/manifests.py` — manifest includes `primary_task` and `label_window_days` fields +- [x] `leadforge/validation/realism.py` — reads task train path from manifest (not hardcoded); helper `_first_task_train_path()` +- [x] `leadforge/validation/drift.py` — reads task train path from bundle manifest via `_find_task_train()`; falls back to default path +- [x] `leadforge/pipelines/build_v5.py` — `rename_and_select()` accepts `label_column` kwarg (defaults to `"converted_within_90_days"`) +- [x] `leadforge/pipelines/build_v6.py` — same `label_column` kwarg on `rename_and_select()` +- [x] 16 new tests: `task_manifest_for_config`, bundle layout, manifest keys, validation with custom task, pipeline rename with custom label column; total 773 passing + ### Parquet metadata row counts (PR #37, closes #17) - [x] `leadforge/validation/bundle_checks.py` — `_check_task_splits()` uses `pq.read_metadata().num_rows` instead of `pd.read_parquet()`; `_check_leakage()` uses `pq.read_schema().names` instead of `pd.read_parquet(columns=[])` diff --git a/leadforge/api/bundle.py b/leadforge/api/bundle.py index b5fcc2e..dbc3470 100644 --- a/leadforge/api/bundle.py +++ b/leadforge/api/bundle.py @@ -24,7 +24,7 @@ from leadforge.render.tasks import write_task_splits from leadforge.schema.dictionaries import write_feature_dictionary from leadforge.schema.tables import write_parquet -from leadforge.schema.tasks import CONVERTED_WITHIN_90_DAYS +from leadforge.schema.tasks import task_manifest_for_config if TYPE_CHECKING: from leadforge.core.models import WorldBundle @@ -74,7 +74,8 @@ def write_bundle( # 2. Snapshot + task splits → tasks/ # ------------------------------------------------------------------ snapshot = build_snapshot(result, population, horizon_days=config.horizon_days) - task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed) + task = task_manifest_for_config(config.primary_task, config.label_window_days) + task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task) # ------------------------------------------------------------------ # 3. Dataset card and feature dictionary @@ -94,7 +95,7 @@ def write_bundle( config=config, world_graph=world_graph, table_row_counts=table_row_counts, - task_row_counts={CONVERTED_WITHIN_90_DAYS.task_id: task_row_counts}, + task_row_counts={task.task_id: task_row_counts}, bundle_root=root, generation_timestamp=generation_timestamp, ) diff --git a/leadforge/api/generator.py b/leadforge/api/generator.py index 9ef87a4..ba8d8e9 100644 --- a/leadforge/api/generator.py +++ b/leadforge/api/generator.py @@ -52,6 +52,8 @@ def from_recipe( n_contacts: int | None = None, n_leads: int | None = None, horizon_days: int | None = None, + primary_task: str | None = None, + label_window_days: int | None = None, output_path: str = _MISSING, # type: ignore[assignment] override: dict[str, Any] | None = None, ) -> Generator: @@ -96,6 +98,8 @@ def from_recipe( n_contacts=n_contacts, n_leads=n_leads, horizon_days=horizon_days, + primary_task=primary_task, + label_window_days=label_window_days, output_path=output_path, override=override, ) diff --git a/leadforge/api/recipes.py b/leadforge/api/recipes.py index 2d98e1c..8a39fb5 100644 --- a/leadforge/api/recipes.py +++ b/leadforge/api/recipes.py @@ -132,6 +132,8 @@ def resolve_config( n_contacts: int | None = None, n_leads: int | None = None, horizon_days: int | None = None, + primary_task: str | None = None, + label_window_days: int | None = None, output_path: str = _MISSING, # type: ignore[assignment] override: dict[str, Any] | None = None, ) -> GenerationConfig: @@ -210,6 +212,10 @@ def resolve_config( resolved["n_leads"] = n_leads if horizon_days is not None: resolved["horizon_days"] = horizon_days + if primary_task is not None: + resolved["primary_task"] = primary_task + if label_window_days is not None: + resolved["label_window_days"] = label_window_days try: mode = ExposureMode(resolved["exposure_mode"]) diff --git a/leadforge/pipelines/build_v5.py b/leadforge/pipelines/build_v5.py index 0ccf9ef..fcfde31 100644 --- a/leadforge/pipelines/build_v5.py +++ b/leadforge/pipelines/build_v5.py @@ -102,9 +102,19 @@ def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: return df -def rename_and_select(df: pd.DataFrame) -> pd.DataFrame: - """Rename snapshot columns to v5 names and select final column set.""" - df = df.rename(columns=RENAME_MAP) +def rename_and_select( + df: pd.DataFrame, *, label_column: str = "converted_within_90_days" +) -> pd.DataFrame: + """Rename snapshot columns to v5 names and select final column set. + + Args: + df: Snapshot DataFrame. + label_column: Source column for the binary label. Defaults to + ``"converted_within_90_days"`` for backward compatibility. + """ + rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} + rename_map[label_column] = "converted" + df = df.rename(columns=rename_map) df["converted"] = df["converted"].astype(int) missing = [c for c in FINAL_COLUMNS if c not in df.columns] if missing: diff --git a/leadforge/pipelines/build_v6.py b/leadforge/pipelines/build_v6.py index c624b9c..46d9a18 100644 --- a/leadforge/pipelines/build_v6.py +++ b/leadforge/pipelines/build_v6.py @@ -211,9 +211,19 @@ def rename_and_select( df: pd.DataFrame, *, instructor: bool = False, + label_column: str = "converted_within_90_days", ) -> pd.DataFrame: - """Rename snapshot columns to v6 names and select final column set.""" - df = df.rename(columns=RENAME_MAP) + """Rename snapshot columns to v6 names and select final column set. + + Args: + df: Snapshot DataFrame. + instructor: If True, include the instructor leakage trap column. + label_column: Source column for the binary label. Defaults to + ``"converted_within_90_days"`` for backward compatibility. + """ + rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} + rename_map[label_column] = "converted" + df = df.rename(columns=rename_map) df["converted"] = df["converted"].astype(int) columns = FINAL_COLUMNS_INSTRUCTOR if instructor else FINAL_COLUMNS_STUDENT missing = [c for c in columns if c not in df.columns] diff --git a/leadforge/render/manifests.py b/leadforge/render/manifests.py index 6b12723..6b6f88b 100644 --- a/leadforge/render/manifests.py +++ b/leadforge/render/manifests.py @@ -83,6 +83,8 @@ def build_manifest( "n_contacts": config.n_contacts, "n_leads": config.n_leads, "horizon_days": config.horizon_days, + "primary_task": config.primary_task, + "label_window_days": config.label_window_days, "motif_family": world_graph.motif_family, "tables": tables, "tasks": tasks, diff --git a/leadforge/schema/tasks.py b/leadforge/schema/tasks.py index c93b155..f050dbb 100644 --- a/leadforge/schema/tasks.py +++ b/leadforge/schema/tasks.py @@ -104,3 +104,33 @@ def to_dict(self) -> dict[str, object]: "directly. All features are pre-anchor (leakage-free by construction)." ), ) + + +def task_manifest_for_config( + primary_task: str = "converted_within_90_days", + label_window_days: int = 90, +) -> TaskManifest: + """Build a :class:`TaskManifest` from generation config fields. + + When *primary_task* and *label_window_days* match the defaults, this + returns a manifest equivalent to :data:`CONVERTED_WITHIN_90_DAYS`. + + Args: + primary_task: Task identifier — used as the task directory name and + manifest key. + label_window_days: Label observation window in days. + """ + return TaskManifest( + task_id=primary_task, + label_column=CONVERTED_WITHIN_90_DAYS.label_column, + label_window_days=label_window_days, + primary_table=CONVERTED_WITHIN_90_DAYS.primary_table, + split=CONVERTED_WITHIN_90_DAYS.split, + task_type=CONVERTED_WITHIN_90_DAYS.task_type, + description=( + f"Predict whether a lead converts (closed_won event) within " + f"{label_window_days} days of the snapshot anchor date. Label is " + f"event-derived — never sampled directly. All features are " + f"pre-anchor (leakage-free by construction)." + ), + ) diff --git a/leadforge/validation/drift.py b/leadforge/validation/drift.py index b894fe2..399f782 100644 --- a/leadforge/validation/drift.py +++ b/leadforge/validation/drift.py @@ -8,9 +8,31 @@ from __future__ import annotations from pathlib import Path +from typing import Any import pandas as pd +from leadforge.core.serialization import load_json + + +def _find_task_train(bundle_path: Path) -> tuple[Path | None, str]: + """Locate the train.parquet and label column for the first task in the manifest. + + Returns (train_path_or_None, label_column). + """ + manifest_path = bundle_path / "manifest.json" + label_col = "converted_within_90_days" + if manifest_path.exists(): + manifest: dict[str, Any] = load_json(manifest_path) + tasks = manifest.get("tasks", {}) + if isinstance(tasks, dict) and tasks: + task_id = next(iter(tasks)) + train = bundle_path / f"tasks/{task_id}/train.parquet" + return (train if train.exists() else None), label_col + # Fallback: default path + default = bundle_path / "tasks/converted_within_90_days/train.parquet" + return (default if default.exists() else None), label_col + def check_cross_seed_stability(bundles: dict[int, Path]) -> list[str]: """Compare bundles generated with different seeds. @@ -30,13 +52,13 @@ def check_cross_seed_stability(bundles: dict[int, Path]) -> list[str]: stage_counts: dict[int, int] = {} for seed, bundle_path in bundles.items(): - train_path = bundle_path / "tasks/converted_within_90_days/train.parquet" - if not train_path.exists(): - errors.append(f"Seed {seed}: missing tasks/converted_within_90_days/train.parquet") + train_path, label_col = _find_task_train(bundle_path) + if train_path is None: + errors.append(f"Seed {seed}: missing task train.parquet") continue - df = pd.read_parquet(train_path, columns=["converted_within_90_days"]) + df = pd.read_parquet(train_path, columns=[label_col]) if len(df) > 0: - rates[seed] = float(df["converted_within_90_days"].mean()) + rates[seed] = float(df[label_col].mean()) leads_path = bundle_path / "tables/leads.parquet" if leads_path.exists(): diff --git a/leadforge/validation/realism.py b/leadforge/validation/realism.py index 1e7634e..c67e467 100644 --- a/leadforge/validation/realism.py +++ b/leadforge/validation/realism.py @@ -30,26 +30,49 @@ def check_realism(bundle_root: Path, manifest: dict[str, Any]) -> list[str]: Returns a list of warning/error strings (empty = all checks pass). """ errors: list[str] = [] - errors.extend(_check_conversion_rate(bundle_root)) + errors.extend(_check_conversion_rate(bundle_root, manifest)) errors.extend(_check_table_nonempty(bundle_root, manifest)) - errors.extend(_check_feature_ranges(bundle_root)) + errors.extend(_check_feature_ranges(bundle_root, manifest)) errors.extend(_check_stage_distribution(bundle_root)) return errors -def _check_conversion_rate(root: Path) -> list[str]: +def _first_task_train_path(root: Path, manifest: dict[str, Any]) -> Path | None: + """Return the train.parquet path of the first task in the manifest.""" + tasks = manifest.get("tasks", {}) + if not isinstance(tasks, dict) or not tasks: + return None + task_id = next(iter(tasks)) + path = root / f"tasks/{task_id}/train.parquet" + return path if path.exists() else None + + +def _label_column() -> str: + """Return the label column name. + + The underlying snapshot always uses ``converted_within_90_days`` as the + column name (it mirrors :class:`~leadforge.schema.entities.LeadRow`). + """ + return "converted_within_90_days" + + +def _check_conversion_rate(root: Path, manifest: dict[str, Any]) -> list[str]: """Check that conversion rate is within plausible bounds.""" errors: list[str] = [] - train_path = root / "tasks/converted_within_90_days/train.parquet" - if not train_path.exists(): + train_path = _first_task_train_path(root, manifest) + if train_path is None: return errors - df = pd.read_parquet(train_path, columns=["converted_within_90_days"]) + label_col = _label_column() + # Read only the label column; fall back to full read if column missing. + schema = pq.read_schema(train_path) + read_col = label_col if label_col in schema.names else "converted_within_90_days" + df = pd.read_parquet(train_path, columns=[read_col]) if len(df) == 0: errors.append("Train split is empty") return errors - rate = df["converted_within_90_days"].mean() + rate = df.iloc[:, 0].mean() # Absolute bounds — any reasonable simulation should land here. # The v1 engine typically produces rates in the 30–90% range depending @@ -79,11 +102,11 @@ def _check_table_nonempty(root: Path, manifest: dict[str, Any]) -> list[str]: return errors -def _check_feature_ranges(root: Path) -> list[str]: +def _check_feature_ranges(root: Path, manifest: dict[str, Any]) -> list[str]: """Spot-check that key features have valid values.""" errors: list[str] = [] - train_path = root / "tasks/converted_within_90_days/train.parquet" - if not train_path.exists(): + train_path = _first_task_train_path(root, manifest) + if train_path is None: return errors # Only read the columns we actually check. diff --git a/tests/test_primary_task_threading.py b/tests/test_primary_task_threading.py new file mode 100644 index 0000000..76a4c84 --- /dev/null +++ b/tests/test_primary_task_threading.py @@ -0,0 +1,228 @@ +"""Tests for primary_task threading through the generation pipeline. + +Verifies that ``config.primary_task`` and ``config.label_window_days`` +are respected by bundle writing, manifest, validation, and pipeline +rename functions. +""" + +from __future__ import annotations + +import shutil +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.api.generator import Generator +from leadforge.core.serialization import load_json +from leadforge.pipelines.build_v5 import rename_and_select as v5_rename +from leadforge.pipelines.build_v6 import rename_and_select as v6_rename +from leadforge.schema.tasks import CONVERTED_WITHIN_90_DAYS, task_manifest_for_config +from leadforge.validation.drift import check_cross_seed_stability +from leadforge.validation.realism import check_realism + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} +_CUSTOM_TASK = "converted_within_60_days" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def default_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + """Bundle with default primary_task.""" + out = tmp_path_factory.mktemp("default_task") + Generator.from_recipe( + "b2b_saas_procurement_v1", seed=42, exposure_mode="student_public" + ).generate(**_SMALL).save(str(out)) + return out + + +@pytest.fixture(scope="module") +def custom_task_bundle(tmp_path_factory: pytest.TempPathFactory) -> Path: + """Bundle with a non-default primary_task.""" + out = tmp_path_factory.mktemp("custom_task") + Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=42, + exposure_mode="student_public", + primary_task=_CUSTOM_TASK, + label_window_days=60, + ).generate(**_SMALL).save(str(out)) + return out + + +# --------------------------------------------------------------------------- +# task_manifest_for_config +# --------------------------------------------------------------------------- + + +class TestTaskManifestForConfig: + def test_default_matches_constant(self) -> None: + m = task_manifest_for_config() + assert m.task_id == CONVERTED_WITHIN_90_DAYS.task_id + assert m.label_column == CONVERTED_WITHIN_90_DAYS.label_column + assert m.label_window_days == CONVERTED_WITHIN_90_DAYS.label_window_days + + def test_custom_task_id(self) -> None: + m = task_manifest_for_config(primary_task="my_task", label_window_days=30) + assert m.task_id == "my_task" + assert m.label_window_days == 30 + assert m.label_column == "converted_within_90_days" + + def test_description_includes_window(self) -> None: + m = task_manifest_for_config(label_window_days=45) + assert "45 days" in m.description + + +# --------------------------------------------------------------------------- +# Bundle directory layout +# --------------------------------------------------------------------------- + + +class TestBundleLayout: + def test_default_task_directory(self, default_bundle: Path) -> None: + task_dir = default_bundle / "tasks/converted_within_90_days" + assert task_dir.is_dir() + for split in ("train", "valid", "test"): + assert (task_dir / f"{split}.parquet").exists() + + def test_custom_task_directory(self, custom_task_bundle: Path) -> None: + task_dir = custom_task_bundle / f"tasks/{_CUSTOM_TASK}" + assert task_dir.is_dir() + for split in ("train", "valid", "test"): + assert (task_dir / f"{split}.parquet").exists() + + def test_custom_task_no_default_dir(self, custom_task_bundle: Path) -> None: + assert not (custom_task_bundle / "tasks/converted_within_90_days").exists() + + +# --------------------------------------------------------------------------- +# Manifest +# --------------------------------------------------------------------------- + + +class TestManifest: + def test_default_manifest_task_key(self, default_bundle: Path) -> None: + manifest = load_json(default_bundle / "manifest.json") + assert "converted_within_90_days" in manifest["tasks"] + assert manifest["primary_task"] == "converted_within_90_days" + assert manifest["label_window_days"] == 90 + + def test_custom_manifest_task_key(self, custom_task_bundle: Path) -> None: + manifest = load_json(custom_task_bundle / "manifest.json") + assert _CUSTOM_TASK in manifest["tasks"] + assert "converted_within_90_days" not in manifest["tasks"] + assert manifest["primary_task"] == _CUSTOM_TASK + assert manifest["label_window_days"] == 60 + + def test_task_manifest_json_uses_custom_id(self, custom_task_bundle: Path) -> None: + tm = load_json(custom_task_bundle / f"tasks/{_CUSTOM_TASK}/task_manifest.json") + assert tm["task_id"] == _CUSTOM_TASK + assert tm["label_window_days"] == 60 + + +# --------------------------------------------------------------------------- +# Validation respects manifest-driven paths +# --------------------------------------------------------------------------- + + +class TestValidation: + def test_realism_passes_default(self, default_bundle: Path) -> None: + manifest = load_json(default_bundle / "manifest.json") + errors = check_realism(default_bundle, manifest) + assert errors == [] + + def test_realism_passes_custom_task(self, custom_task_bundle: Path) -> None: + manifest = load_json(custom_task_bundle / "manifest.json") + errors = check_realism(custom_task_bundle, manifest) + assert errors == [] + + def test_drift_finds_custom_task_path(self, tmp_path: Path, custom_task_bundle: Path) -> None: + """Verify drift check reads from manifest-based task path, not hardcoded.""" + # Copy the same bundle as a second "seed" so rates match exactly. + out2 = tmp_path / "seed2" + shutil.copytree(custom_task_bundle, out2) + + bundles = {42: custom_task_bundle, 99: out2} + errors = check_cross_seed_stability(bundles) + # Should not produce "missing task train.parquet" errors. + assert not any("missing" in e for e in errors) + + +# --------------------------------------------------------------------------- +# Pipeline rename functions +# --------------------------------------------------------------------------- + + +class TestPipelineRename: + @pytest.fixture + def sample_snapshot(self) -> pd.DataFrame: + return pd.DataFrame( + { + "industry": ["tech"], + "region": ["us"], + "employee_band": ["200-499"], + "estimated_revenue_band": ["$10M-$50M"], + "role_function": ["finance"], + "seniority": ["vp"], + "lead_source": ["inbound_marketing"], + "opportunity_created": [True], + "demo_completed": [False], + "expected_acv": [50000], + "inbound_touch_count": [5], + "outbound_touch_count": [3], + "touches_week_1": [2], + "days_since_first_touch": [10], + "session_count": [4], + "activity_count": [2], + "days_since_last_touch": [1.0], + "total_touches_all": [8], + "demo_page_views": [0], + "converted_within_90_days": [True], + } + ) + + def test_v5_default_label_column(self, sample_snapshot: pd.DataFrame) -> None: + from leadforge.pipelines.build_v5 import derive_binary_features + + df = derive_binary_features(sample_snapshot) + result = v5_rename(df) + assert "converted" in result.columns + assert result["converted"].iloc[0] == 1 + + def test_v5_custom_label_column(self, sample_snapshot: pd.DataFrame) -> None: + from leadforge.pipelines.build_v5 import derive_binary_features + + df = sample_snapshot.rename( + columns={"converted_within_90_days": "converted_within_60_days"} + ) + df = derive_binary_features(df) + result = v5_rename(df, label_column="converted_within_60_days") + assert "converted" in result.columns + assert result["converted"].iloc[0] == 1 + + def test_v6_default_label_column(self, sample_snapshot: pd.DataFrame) -> None: + from leadforge.pipelines.build_v6 import derive_features + + df = sample_snapshot.copy() + df["touches_last_7_days"] = 1 + df["acquisition_wave"] = "A" + df = derive_features(df) + result = v6_rename(df) + assert "converted" in result.columns + + def test_v6_custom_label_column(self, sample_snapshot: pd.DataFrame) -> None: + from leadforge.pipelines.build_v6 import derive_features + + df = sample_snapshot.rename( + columns={"converted_within_90_days": "converted_within_60_days"} + ) + df["touches_last_7_days"] = 1 + df["acquisition_wave"] = "A" + df = derive_features(df) + result = v6_rename(df, label_column="converted_within_60_days") + assert "converted" in result.columns + assert result["converted"].iloc[0] == 1 From 01d4f7a974f0d4e1dd5a1c1b2f5aa835318f7c81 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 1 May 2026 22:10:23 +0300 Subject: [PATCH 2/3] fix: address self-review feedback on primary_task threading - realism.py: delete dead _label_column() function; replace with _LABEL_COLUMN constant; remove no-op schema fallback - tasks.py: use dataclasses.replace() instead of field-by-field copy - drift.py: return task_id (not label_col) from _find_task_train; include task path in error message; add _LABEL_COLUMN constant - build_v5.py, build_v6.py: short-circuit rename_map when using default label column - .agent-plan.md: replace PR #?? with PR #40 - tests: add 3 negative tests for empty/missing manifest tasks Co-Authored-By: Claude Opus 4.6 --- .agent-plan.md | 2 +- leadforge/pipelines/build_v5.py | 7 +++++-- leadforge/pipelines/build_v6.py | 7 +++++-- leadforge/schema/tasks.py | 14 ++++++-------- leadforge/validation/drift.py | 25 ++++++++++++++----------- leadforge/validation/realism.py | 19 ++++++------------- tests/test_primary_task_threading.py | 24 ++++++++++++++++++++++++ 7 files changed, 61 insertions(+), 37 deletions(-) diff --git a/.agent-plan.md b/.agent-plan.md index b045570..ff3fae8 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -128,7 +128,7 @@ Documentation + CI: - [x] `leadforge/narrative/dataset_card.py` — renders task name and label window from `world_spec.config` instead of hard-coded literals - [x] 10 new tests (3 dataset card + 7 config resolution); total 750 passing -### Thread primary_task through pipeline (PR #??, closes #37) +### Thread primary_task through pipeline (PR #40, closes #37) - [x] `leadforge/schema/tasks.py` — `task_manifest_for_config()` factory builds `TaskManifest` from `primary_task` and `label_window_days` - [x] `leadforge/api/bundle.py` — uses `task_manifest_for_config(config.primary_task, config.label_window_days)` for task splits and manifest key (replaces hardcoded `CONVERTED_WITHIN_90_DAYS.task_id`) diff --git a/leadforge/pipelines/build_v5.py b/leadforge/pipelines/build_v5.py index fcfde31..d1e3357 100644 --- a/leadforge/pipelines/build_v5.py +++ b/leadforge/pipelines/build_v5.py @@ -112,8 +112,11 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ - rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} - rename_map[label_column] = "converted" + if label_column == "converted_within_90_days": + rename_map = RENAME_MAP + else: + rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} + rename_map[label_column] = "converted" df = df.rename(columns=rename_map) df["converted"] = df["converted"].astype(int) missing = [c for c in FINAL_COLUMNS if c not in df.columns] diff --git a/leadforge/pipelines/build_v6.py b/leadforge/pipelines/build_v6.py index 46d9a18..5f047fc 100644 --- a/leadforge/pipelines/build_v6.py +++ b/leadforge/pipelines/build_v6.py @@ -221,8 +221,11 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ - rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} - rename_map[label_column] = "converted" + if label_column == "converted_within_90_days": + rename_map = RENAME_MAP + else: + rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"} + rename_map[label_column] = "converted" df = df.rename(columns=rename_map) df["converted"] = df["converted"].astype(int) columns = FINAL_COLUMNS_INSTRUCTOR if instructor else FINAL_COLUMNS_STUDENT diff --git a/leadforge/schema/tasks.py b/leadforge/schema/tasks.py index f050dbb..f027af9 100644 --- a/leadforge/schema/tasks.py +++ b/leadforge/schema/tasks.py @@ -7,7 +7,7 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, replace @dataclass(frozen=True) @@ -112,21 +112,19 @@ def task_manifest_for_config( ) -> TaskManifest: """Build a :class:`TaskManifest` from generation config fields. - When *primary_task* and *label_window_days* match the defaults, this - returns a manifest equivalent to :data:`CONVERTED_WITHIN_90_DAYS`. + Derives from :data:`CONVERTED_WITHIN_90_DAYS` via ``dataclasses.replace``, + overriding only the fields that vary. When *primary_task* and + *label_window_days* match the defaults, this returns an equivalent manifest. Args: primary_task: Task identifier — used as the task directory name and manifest key. label_window_days: Label observation window in days. """ - return TaskManifest( + return replace( + CONVERTED_WITHIN_90_DAYS, task_id=primary_task, - label_column=CONVERTED_WITHIN_90_DAYS.label_column, label_window_days=label_window_days, - primary_table=CONVERTED_WITHIN_90_DAYS.primary_table, - split=CONVERTED_WITHIN_90_DAYS.split, - task_type=CONVERTED_WITHIN_90_DAYS.task_type, description=( f"Predict whether a lead converts (closed_won event) within " f"{label_window_days} days of the snapshot anchor date. Label is " diff --git a/leadforge/validation/drift.py b/leadforge/validation/drift.py index 399f782..4422baa 100644 --- a/leadforge/validation/drift.py +++ b/leadforge/validation/drift.py @@ -14,24 +14,27 @@ from leadforge.core.serialization import load_json +_LABEL_COLUMN = "converted_within_90_days" + def _find_task_train(bundle_path: Path) -> tuple[Path | None, str]: - """Locate the train.parquet and label column for the first task in the manifest. + """Locate the train.parquet for the first task listed in the manifest. - Returns (train_path_or_None, label_column). + Returns ``(train_path_or_None, task_id)`` where *task_id* is included + so callers can produce useful error messages. """ manifest_path = bundle_path / "manifest.json" - label_col = "converted_within_90_days" if manifest_path.exists(): manifest: dict[str, Any] = load_json(manifest_path) tasks = manifest.get("tasks", {}) if isinstance(tasks, dict) and tasks: task_id = next(iter(tasks)) train = bundle_path / f"tasks/{task_id}/train.parquet" - return (train if train.exists() else None), label_col - # Fallback: default path - default = bundle_path / "tasks/converted_within_90_days/train.parquet" - return (default if default.exists() else None), label_col + return (train if train.exists() else None), task_id + # Fallback: default task id + task_id = "converted_within_90_days" + default = bundle_path / f"tasks/{task_id}/train.parquet" + return (default if default.exists() else None), task_id def check_cross_seed_stability(bundles: dict[int, Path]) -> list[str]: @@ -52,13 +55,13 @@ def check_cross_seed_stability(bundles: dict[int, Path]) -> list[str]: stage_counts: dict[int, int] = {} for seed, bundle_path in bundles.items(): - train_path, label_col = _find_task_train(bundle_path) + train_path, task_id = _find_task_train(bundle_path) if train_path is None: - errors.append(f"Seed {seed}: missing task train.parquet") + errors.append(f"Seed {seed}: missing tasks/{task_id}/train.parquet") continue - df = pd.read_parquet(train_path, columns=[label_col]) + df = pd.read_parquet(train_path, columns=[_LABEL_COLUMN]) if len(df) > 0: - rates[seed] = float(df[label_col].mean()) + rates[seed] = float(df[_LABEL_COLUMN].mean()) leads_path = bundle_path / "tables/leads.parquet" if leads_path.exists(): diff --git a/leadforge/validation/realism.py b/leadforge/validation/realism.py index c67e467..8eb2890 100644 --- a/leadforge/validation/realism.py +++ b/leadforge/validation/realism.py @@ -47,13 +47,10 @@ def _first_task_train_path(root: Path, manifest: dict[str, Any]) -> Path | None: return path if path.exists() else None -def _label_column() -> str: - """Return the label column name. - - The underlying snapshot always uses ``converted_within_90_days`` as the - column name (it mirrors :class:`~leadforge.schema.entities.LeadRow`). - """ - return "converted_within_90_days" +# The label column in the snapshot is always ``converted_within_90_days`` +# (mirroring :class:`~leadforge.schema.entities.LeadRow`). The task *directory* +# may vary via ``config.primary_task``, but the column inside does not. +_LABEL_COLUMN = "converted_within_90_days" def _check_conversion_rate(root: Path, manifest: dict[str, Any]) -> list[str]: @@ -63,16 +60,12 @@ def _check_conversion_rate(root: Path, manifest: dict[str, Any]) -> list[str]: if train_path is None: return errors - label_col = _label_column() - # Read only the label column; fall back to full read if column missing. - schema = pq.read_schema(train_path) - read_col = label_col if label_col in schema.names else "converted_within_90_days" - df = pd.read_parquet(train_path, columns=[read_col]) + df = pd.read_parquet(train_path, columns=[_LABEL_COLUMN]) if len(df) == 0: errors.append("Train split is empty") return errors - rate = df.iloc[:, 0].mean() + rate = df[_LABEL_COLUMN].mean() # Absolute bounds — any reasonable simulation should land here. # The v1 engine typically produces rates in the 30–90% range depending diff --git a/tests/test_primary_task_threading.py b/tests/test_primary_task_threading.py index 76a4c84..b72998e 100644 --- a/tests/test_primary_task_threading.py +++ b/tests/test_primary_task_threading.py @@ -151,6 +151,30 @@ def test_drift_finds_custom_task_path(self, tmp_path: Path, custom_task_bundle: # Should not produce "missing task train.parquet" errors. assert not any("missing" in e for e in errors) + def test_realism_empty_tasks_in_manifest(self, default_bundle: Path) -> None: + """Realism checks gracefully skip when manifest has no tasks.""" + manifest: dict = {"tables": {}, "tasks": {}} + errors = check_realism(default_bundle, manifest) + # No crash; table-nonempty checks may fail but no task-related crash. + assert not any("traceback" in e.lower() for e in errors) + + def test_realism_missing_tasks_key(self, default_bundle: Path) -> None: + """Realism checks gracefully skip when manifest has no 'tasks' key.""" + manifest: dict = {"tables": {}} + errors = check_realism(default_bundle, manifest) + assert not any("traceback" in e.lower() for e in errors) + + def test_drift_missing_manifest(self, tmp_path: Path) -> None: + """Drift check falls back gracefully when manifest.json is absent.""" + # Create two minimal bundles with no manifest.json. + for name in ("a", "b"): + d = tmp_path / name + d.mkdir() + bundles = {1: tmp_path / "a", 2: tmp_path / "b"} + errors = check_cross_seed_stability(bundles) + # Should report missing files, not crash. + assert all("missing" in e for e in errors) + # --------------------------------------------------------------------------- # Pipeline rename functions From 59483eb2d8af861e27c867bf508d7d990493ce57 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Fri, 1 May 2026 22:21:33 +0300 Subject: [PATCH 3/3] fix: address Copilot review feedback - tasks.py: derive defaults from CONVERTED_WITHIN_90_DAYS constant (not hardcoded literals) to prevent silent drift - realism.py, drift.py: use manifest["primary_task"] to find the declared primary task instead of next(iter(tasks)) - manifests.py: bump BUNDLE_SCHEMA_VERSION to "2" (new fields) - build_v5.py, build_v6.py: validate label_column exists upfront before rename (clear ValueError instead of KeyError) - generator.py: document primary_task and label_window_days kwargs Co-Authored-By: Claude Opus 4.6 --- leadforge/api/generator.py | 5 +++++ leadforge/pipelines/build_v5.py | 4 ++++ leadforge/pipelines/build_v6.py | 4 ++++ leadforge/render/manifests.py | 2 +- leadforge/schema/tasks.py | 4 ++-- leadforge/validation/drift.py | 7 +++++-- leadforge/validation/realism.py | 5 +++-- 7 files changed, 24 insertions(+), 7 deletions(-) diff --git a/leadforge/api/generator.py b/leadforge/api/generator.py index ba8d8e9..30fe0c9 100644 --- a/leadforge/api/generator.py +++ b/leadforge/api/generator.py @@ -71,6 +71,11 @@ def from_recipe( n_contacts: Override recipe default contact count. n_leads: Override recipe default lead count. horizon_days: Override recipe default simulation horizon. + primary_task: Override recipe default task identifier (e.g. + ``"converted_within_60_days"``). Controls the task + directory name and manifest key. + label_window_days: Override recipe default label observation + window in days. output_path: Directory where the bundle will be saved. override: Optional dict of overrides (mirrors a ``--override`` file). Applied after recipe defaults but before explicit kwargs. diff --git a/leadforge/pipelines/build_v5.py b/leadforge/pipelines/build_v5.py index d1e3357..5cf12c5 100644 --- a/leadforge/pipelines/build_v5.py +++ b/leadforge/pipelines/build_v5.py @@ -112,6 +112,10 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ + if label_column not in df.columns: + raise ValueError( + f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" + ) if label_column == "converted_within_90_days": rename_map = RENAME_MAP else: diff --git a/leadforge/pipelines/build_v6.py b/leadforge/pipelines/build_v6.py index 5f047fc..4c65162 100644 --- a/leadforge/pipelines/build_v6.py +++ b/leadforge/pipelines/build_v6.py @@ -221,6 +221,10 @@ def rename_and_select( label_column: Source column for the binary label. Defaults to ``"converted_within_90_days"`` for backward compatibility. """ + if label_column not in df.columns: + raise ValueError( + f"Label column {label_column!r} not found. Available: {sorted(df.columns)}" + ) if label_column == "converted_within_90_days": rename_map = RENAME_MAP else: diff --git a/leadforge/render/manifests.py b/leadforge/render/manifests.py index 6b6f88b..03d6201 100644 --- a/leadforge/render/manifests.py +++ b/leadforge/render/manifests.py @@ -20,7 +20,7 @@ from leadforge.structure.graph import WorldGraph # Bump this whenever the bundle layout or manifest schema changes. -BUNDLE_SCHEMA_VERSION = "1" +BUNDLE_SCHEMA_VERSION = "2" def build_manifest( diff --git a/leadforge/schema/tasks.py b/leadforge/schema/tasks.py index f027af9..1560bda 100644 --- a/leadforge/schema/tasks.py +++ b/leadforge/schema/tasks.py @@ -107,8 +107,8 @@ def to_dict(self) -> dict[str, object]: def task_manifest_for_config( - primary_task: str = "converted_within_90_days", - label_window_days: int = 90, + primary_task: str = CONVERTED_WITHIN_90_DAYS.task_id, + label_window_days: int = CONVERTED_WITHIN_90_DAYS.label_window_days, ) -> TaskManifest: """Build a :class:`TaskManifest` from generation config fields. diff --git a/leadforge/validation/drift.py b/leadforge/validation/drift.py index 4422baa..4f9c951 100644 --- a/leadforge/validation/drift.py +++ b/leadforge/validation/drift.py @@ -18,7 +18,7 @@ def _find_task_train(bundle_path: Path) -> tuple[Path | None, str]: - """Locate the train.parquet for the first task listed in the manifest. + """Locate the primary task's train.parquet from the bundle manifest. Returns ``(train_path_or_None, task_id)`` where *task_id* is included so callers can produce useful error messages. @@ -28,7 +28,10 @@ def _find_task_train(bundle_path: Path) -> tuple[Path | None, str]: manifest: dict[str, Any] = load_json(manifest_path) tasks = manifest.get("tasks", {}) if isinstance(tasks, dict) and tasks: - task_id = next(iter(tasks)) + primary = manifest.get("primary_task") + task_id = ( + primary if isinstance(primary, str) and primary in tasks else next(iter(tasks)) + ) train = bundle_path / f"tasks/{task_id}/train.parquet" return (train if train.exists() else None), task_id # Fallback: default task id diff --git a/leadforge/validation/realism.py b/leadforge/validation/realism.py index 8eb2890..0c52f2d 100644 --- a/leadforge/validation/realism.py +++ b/leadforge/validation/realism.py @@ -38,11 +38,12 @@ def check_realism(bundle_root: Path, manifest: dict[str, Any]) -> list[str]: def _first_task_train_path(root: Path, manifest: dict[str, Any]) -> Path | None: - """Return the train.parquet path of the first task in the manifest.""" + """Return the train.parquet path of the primary task in the manifest.""" tasks = manifest.get("tasks", {}) if not isinstance(tasks, dict) or not tasks: return None - task_id = next(iter(tasks)) + primary = manifest.get("primary_task") + task_id = primary if isinstance(primary, str) and primary in tasks else next(iter(tasks)) path = root / f"tasks/{task_id}/train.parquet" return path if path.exists() else None