Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .agent-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ Documentation + CI:
- [x] `leadforge/narrative/dataset_card.py` — renders task name and label window from `world_spec.config` instead of hard-coded literals
- [x] 10 new tests (3 dataset card + 7 config resolution); total 750 passing

### Thread primary_task through pipeline (PR #40, closes #37)

- [x] `leadforge/schema/tasks.py` — `task_manifest_for_config()` factory builds `TaskManifest` from `primary_task` and `label_window_days`
- [x] `leadforge/api/bundle.py` — uses `task_manifest_for_config(config.primary_task, config.label_window_days)` for task splits and manifest key (replaces hardcoded `CONVERTED_WITHIN_90_DAYS.task_id`)
- [x] `leadforge/api/generator.py` — `primary_task` and `label_window_days` kwargs on `Generator.from_recipe()` (Layer 1 precedence)
- [x] `leadforge/api/recipes.py` — `primary_task` and `label_window_days` kwargs on `resolve_config()` (Layer 1 precedence)
- [x] `leadforge/render/manifests.py` — manifest includes `primary_task` and `label_window_days` fields
- [x] `leadforge/validation/realism.py` — reads task train path from manifest (not hardcoded); helper `_first_task_train_path()`
- [x] `leadforge/validation/drift.py` — reads task train path from bundle manifest via `_find_task_train()`; falls back to default path
- [x] `leadforge/pipelines/build_v5.py` — `rename_and_select()` accepts `label_column` kwarg (defaults to `"converted_within_90_days"`)
- [x] `leadforge/pipelines/build_v6.py` — same `label_column` kwarg on `rename_and_select()`
- [x] 16 new tests: `task_manifest_for_config`, bundle layout, manifest keys, validation with custom task, pipeline rename with custom label column; total 773 passing

### Parquet metadata row counts (PR #37, closes #17)

- [x] `leadforge/validation/bundle_checks.py` — `_check_task_splits()` uses `pq.read_metadata().num_rows` instead of `pd.read_parquet()`; `_check_leakage()` uses `pq.read_schema().names` instead of `pd.read_parquet(columns=[])`
Expand Down
7 changes: 4 additions & 3 deletions leadforge/api/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from leadforge.render.tasks import write_task_splits
from leadforge.schema.dictionaries import write_feature_dictionary
from leadforge.schema.tables import write_parquet
from leadforge.schema.tasks import CONVERTED_WITHIN_90_DAYS
from leadforge.schema.tasks import task_manifest_for_config

if TYPE_CHECKING:
from leadforge.core.models import WorldBundle
Expand Down Expand Up @@ -74,7 +74,8 @@ def write_bundle(
# 2. Snapshot + task splits → tasks/
# ------------------------------------------------------------------
snapshot = build_snapshot(result, population, horizon_days=config.horizon_days)
task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed)
task = task_manifest_for_config(config.primary_task, config.label_window_days)
task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task)

# ------------------------------------------------------------------
# 3. Dataset card and feature dictionary
Expand All @@ -94,7 +95,7 @@ def write_bundle(
config=config,
world_graph=world_graph,
table_row_counts=table_row_counts,
task_row_counts={CONVERTED_WITHIN_90_DAYS.task_id: task_row_counts},
task_row_counts={task.task_id: task_row_counts},
bundle_root=root,
generation_timestamp=generation_timestamp,
)
Expand Down
9 changes: 9 additions & 0 deletions leadforge/api/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def from_recipe(
n_contacts: int | None = None,
n_leads: int | None = None,
horizon_days: int | None = None,
primary_task: str | None = None,
label_window_days: int | None = None,
output_path: str = _MISSING, # type: ignore[assignment]
override: dict[str, Any] | None = None,
) -> Generator:
Expand All @@ -69,6 +71,11 @@ def from_recipe(
n_contacts: Override recipe default contact count.
n_leads: Override recipe default lead count.
horizon_days: Override recipe default simulation horizon.
primary_task: Override recipe default task identifier (e.g.
``"converted_within_60_days"``). Controls the task
directory name and manifest key.
label_window_days: Override recipe default label observation
window in days.
output_path: Directory where the bundle will be saved.
override: Optional dict of overrides (mirrors a ``--override`` file).
Applied after recipe defaults but before explicit kwargs.
Expand Down Expand Up @@ -96,6 +103,8 @@ def from_recipe(
n_contacts=n_contacts,
n_leads=n_leads,
horizon_days=horizon_days,
primary_task=primary_task,
label_window_days=label_window_days,
output_path=output_path,
override=override,
)
Expand Down
6 changes: 6 additions & 0 deletions leadforge/api/recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ def resolve_config(
n_contacts: int | None = None,
n_leads: int | None = None,
horizon_days: int | None = None,
primary_task: str | None = None,
label_window_days: int | None = None,
output_path: str = _MISSING, # type: ignore[assignment]
override: dict[str, Any] | None = None,
) -> GenerationConfig:
Expand Down Expand Up @@ -210,6 +212,10 @@ def resolve_config(
resolved["n_leads"] = n_leads
if horizon_days is not None:
resolved["horizon_days"] = horizon_days
if primary_task is not None:
resolved["primary_task"] = primary_task
if label_window_days is not None:
resolved["label_window_days"] = label_window_days

try:
mode = ExposureMode(resolved["exposure_mode"])
Expand Down
23 changes: 20 additions & 3 deletions leadforge/pipelines/build_v5.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,26 @@ def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame:
return df


def rename_and_select(df: pd.DataFrame) -> pd.DataFrame:
"""Rename snapshot columns to v5 names and select final column set."""
df = df.rename(columns=RENAME_MAP)
def rename_and_select(
df: pd.DataFrame, *, label_column: str = "converted_within_90_days"
) -> pd.DataFrame:
"""Rename snapshot columns to v5 names and select final column set.

Args:
df: Snapshot DataFrame.
label_column: Source column for the binary label. Defaults to
``"converted_within_90_days"`` for backward compatibility.
"""
if label_column not in df.columns:
raise ValueError(
f"Label column {label_column!r} not found. Available: {sorted(df.columns)}"
)
if label_column == "converted_within_90_days":
rename_map = RENAME_MAP
else:
rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"}
rename_map[label_column] = "converted"
df = df.rename(columns=rename_map)
df["converted"] = df["converted"].astype(int)
missing = [c for c in FINAL_COLUMNS if c not in df.columns]
if missing:
Expand Down
21 changes: 19 additions & 2 deletions leadforge/pipelines/build_v6.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,26 @@ def rename_and_select(
df: pd.DataFrame,
*,
instructor: bool = False,
label_column: str = "converted_within_90_days",
) -> pd.DataFrame:
"""Rename snapshot columns to v6 names and select final column set."""
df = df.rename(columns=RENAME_MAP)
"""Rename snapshot columns to v6 names and select final column set.

Args:
df: Snapshot DataFrame.
instructor: If True, include the instructor leakage trap column.
label_column: Source column for the binary label. Defaults to
``"converted_within_90_days"`` for backward compatibility.
"""
if label_column not in df.columns:
raise ValueError(
f"Label column {label_column!r} not found. Available: {sorted(df.columns)}"
)
if label_column == "converted_within_90_days":
rename_map = RENAME_MAP
else:
rename_map = {k: v for k, v in RENAME_MAP.items() if v != "converted"}
rename_map[label_column] = "converted"
df = df.rename(columns=rename_map)
df["converted"] = df["converted"].astype(int)
columns = FINAL_COLUMNS_INSTRUCTOR if instructor else FINAL_COLUMNS_STUDENT
missing = [c for c in columns if c not in df.columns]
Expand Down
4 changes: 3 additions & 1 deletion leadforge/render/manifests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from leadforge.structure.graph import WorldGraph

# Bump this whenever the bundle layout or manifest schema changes.
BUNDLE_SCHEMA_VERSION = "1"
BUNDLE_SCHEMA_VERSION = "2"


def build_manifest(
Expand Down Expand Up @@ -83,6 +83,8 @@ def build_manifest(
"n_contacts": config.n_contacts,
"n_leads": config.n_leads,
"horizon_days": config.horizon_days,
"primary_task": config.primary_task,
"label_window_days": config.label_window_days,
"motif_family": world_graph.motif_family,
Comment thread
shaypal5 marked this conversation as resolved.
"tables": tables,
"tasks": tasks,
Expand Down
30 changes: 29 additions & 1 deletion leadforge/schema/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from __future__ import annotations

from dataclasses import dataclass
from dataclasses import dataclass, replace


@dataclass(frozen=True)
Expand Down Expand Up @@ -104,3 +104,31 @@ def to_dict(self) -> dict[str, object]:
"directly. All features are pre-anchor (leakage-free by construction)."
),
)


def task_manifest_for_config(
primary_task: str = CONVERTED_WITHIN_90_DAYS.task_id,
label_window_days: int = CONVERTED_WITHIN_90_DAYS.label_window_days,
) -> TaskManifest:
"""Build a :class:`TaskManifest` from generation config fields.

Derives from :data:`CONVERTED_WITHIN_90_DAYS` via ``dataclasses.replace``,
overriding only the fields that vary. When *primary_task* and
*label_window_days* match the defaults, this returns an equivalent manifest.

Args:
primary_task: Task identifier — used as the task directory name and
manifest key.
label_window_days: Label observation window in days.
"""
return replace(
CONVERTED_WITHIN_90_DAYS,
task_id=primary_task,
label_window_days=label_window_days,
description=(
f"Predict whether a lead converts (closed_won event) within "
f"{label_window_days} days of the snapshot anchor date. Label is "
f"event-derived — never sampled directly. All features are "
f"pre-anchor (leakage-free by construction)."
),
)
38 changes: 33 additions & 5 deletions leadforge/validation/drift.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,37 @@
from __future__ import annotations

from pathlib import Path
from typing import Any

import pandas as pd

from leadforge.core.serialization import load_json

_LABEL_COLUMN = "converted_within_90_days"


def _find_task_train(bundle_path: Path) -> tuple[Path | None, str]:
"""Locate the primary task's train.parquet from the bundle manifest.

Returns ``(train_path_or_None, task_id)`` where *task_id* is included
so callers can produce useful error messages.
"""
manifest_path = bundle_path / "manifest.json"
if manifest_path.exists():
manifest: dict[str, Any] = load_json(manifest_path)
tasks = manifest.get("tasks", {})
if isinstance(tasks, dict) and tasks:
primary = manifest.get("primary_task")
task_id = (
primary if isinstance(primary, str) and primary in tasks else next(iter(tasks))
)
train = bundle_path / f"tasks/{task_id}/train.parquet"
return (train if train.exists() else None), task_id
Comment on lines +28 to +36

Copilot AI May 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_find_task_train() calls load_json() when manifest.json exists, but load_json() raises LeadforgeError on corrupt/unparseable JSON. That means check_cross_seed_stability() can now raise instead of returning an error string list (even though it already tries to be graceful when manifest is missing). Consider catching LeadforgeError (or Exception from load_json) here and falling back to the default task path (or returning (None, <task_id>) plus an error message).

Suggested change
manifest: dict[str, Any] = load_json(manifest_path)
tasks = manifest.get("tasks", {})
if isinstance(tasks, dict) and tasks:
primary = manifest.get("primary_task")
task_id = (
primary if isinstance(primary, str) and primary in tasks else next(iter(tasks))
)
train = bundle_path / f"tasks/{task_id}/train.parquet"
return (train if train.exists() else None), task_id
try:
manifest: dict[str, Any] = load_json(manifest_path)
tasks = manifest.get("tasks", {})
if isinstance(tasks, dict) and tasks:
primary = manifest.get("primary_task")
task_id = (
primary if isinstance(primary, str) and primary in tasks else next(iter(tasks))
)
train = bundle_path / f"tasks/{task_id}/train.parquet"
return (train if train.exists() else None), task_id
except Exception:
pass

Copilot uses AI. Check for mistakes.
# Fallback: default task id
task_id = "converted_within_90_days"
default = bundle_path / f"tasks/{task_id}/train.parquet"
return (default if default.exists() else None), task_id


def check_cross_seed_stability(bundles: dict[int, Path]) -> list[str]:
"""Compare bundles generated with different seeds.
Expand All @@ -30,13 +58,13 @@ def check_cross_seed_stability(bundles: dict[int, Path]) -> list[str]:
stage_counts: dict[int, int] = {}

for seed, bundle_path in bundles.items():
train_path = bundle_path / "tasks/converted_within_90_days/train.parquet"
if not train_path.exists():
errors.append(f"Seed {seed}: missing tasks/converted_within_90_days/train.parquet")
train_path, task_id = _find_task_train(bundle_path)
if train_path is None:
errors.append(f"Seed {seed}: missing tasks/{task_id}/train.parquet")
continue
df = pd.read_parquet(train_path, columns=["converted_within_90_days"])
df = pd.read_parquet(train_path, columns=[_LABEL_COLUMN])
if len(df) > 0:
rates[seed] = float(df["converted_within_90_days"].mean())
rates[seed] = float(df[_LABEL_COLUMN].mean())

leads_path = bundle_path / "tables/leads.parquet"
if leads_path.exists():
Expand Down
37 changes: 27 additions & 10 deletions leadforge/validation/realism.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,43 @@ def check_realism(bundle_root: Path, manifest: dict[str, Any]) -> list[str]:
Returns a list of warning/error strings (empty = all checks pass).
"""
errors: list[str] = []
errors.extend(_check_conversion_rate(bundle_root))
errors.extend(_check_conversion_rate(bundle_root, manifest))
errors.extend(_check_table_nonempty(bundle_root, manifest))
errors.extend(_check_feature_ranges(bundle_root))
errors.extend(_check_feature_ranges(bundle_root, manifest))
errors.extend(_check_stage_distribution(bundle_root))
return errors


def _check_conversion_rate(root: Path) -> list[str]:
def _first_task_train_path(root: Path, manifest: dict[str, Any]) -> Path | None:
"""Return the train.parquet path of the primary task in the manifest."""
tasks = manifest.get("tasks", {})
if not isinstance(tasks, dict) or not tasks:
return None
primary = manifest.get("primary_task")
task_id = primary if isinstance(primary, str) and primary in tasks else next(iter(tasks))
path = root / f"tasks/{task_id}/train.parquet"
return path if path.exists() else None


# The label column in the snapshot is always ``converted_within_90_days``
# (mirroring :class:`~leadforge.schema.entities.LeadRow`). The task *directory*
# may vary via ``config.primary_task``, but the column inside does not.
_LABEL_COLUMN = "converted_within_90_days"


def _check_conversion_rate(root: Path, manifest: dict[str, Any]) -> list[str]:
"""Check that conversion rate is within plausible bounds."""
errors: list[str] = []
train_path = root / "tasks/converted_within_90_days/train.parquet"
if not train_path.exists():
train_path = _first_task_train_path(root, manifest)
if train_path is None:
return errors

df = pd.read_parquet(train_path, columns=["converted_within_90_days"])
df = pd.read_parquet(train_path, columns=[_LABEL_COLUMN])
if len(df) == 0:
errors.append("Train split is empty")
return errors

rate = df["converted_within_90_days"].mean()
rate = df[_LABEL_COLUMN].mean()

# Absolute bounds — any reasonable simulation should land here.
# The v1 engine typically produces rates in the 30–90% range depending
Expand Down Expand Up @@ -79,11 +96,11 @@ def _check_table_nonempty(root: Path, manifest: dict[str, Any]) -> list[str]:
return errors


def _check_feature_ranges(root: Path) -> list[str]:
def _check_feature_ranges(root: Path, manifest: dict[str, Any]) -> list[str]:
"""Spot-check that key features have valid values."""
errors: list[str] = []
train_path = root / "tasks/converted_within_90_days/train.parquet"
if not train_path.exists():
train_path = _first_task_train_path(root, manifest)
if train_path is None:
return errors

# Only read the columns we actually check.
Expand Down
Loading
Loading