diff --git a/.agent-plan.md b/.agent-plan.md index d3e309d..b7a91ee 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -45,6 +45,7 @@ First public dataset release: `leadforge-b2b-lead-scoring`. Three difficulty tie - [x] Update release/README.md — remove stale "Known limitations", add conversion rates to dataset summary - [x] Update release/HF_DATASET_CARD.md — add conversion rates to summary table - [x] Verify SHA-256 hash determinism (re-run build, compare hashes) — `scripts/verify_hash_determinism.py`; 73/73 files identical across two `build_public_release.py` runs (modulo `manifest.json`'s wall-clock `generation_timestamp`) +- [x] Fix `current_stage` leakage in student_public bundles via exposure-layer redaction — `is_leakage_trap` flag distinguishes the pedagogical trap (`total_touches_all`) from true label leaks; `BundleFilter.redacted_columns` strips the latter; `validate_bundle()` enforces the invariant. 73/73 hash-determinism preserved. - [ ] Upload to Kaggle and HuggingFace - [ ] Announce @@ -61,9 +62,19 @@ First public dataset release: `leadforge-b2b-lead-scoring`. Three difficulty tie - [x] Calibration across 20 seeds × 5 motif families: intro mean 43%, intermediate mean 22%, advanced mean 9% - [x] All 865 tests pass -### Known issue: `current_stage` leakage at 90-day horizon +### Resolved (partial): `current_stage` leakage at 90-day horizon -The full bundle snapshot includes `current_stage` which at day 90 contains terminal stages (`closed_won`/`closed_lost`). This perfectly encodes the label. The flat CSV export drops it; the Parquet task splits retain it with documentation. A proper fix (windowed snapshot or column redaction in the exposure layer) is deferred. +Deterministic leak fixed via exposure-layer redaction. `FeatureSpec` now carries an explicit `redact_in_modes: frozenset[ExposureMode]` field — *prescriptive* — alongside the descriptive `leakage_risk` flag. `current_stage` is marked `redact_in_modes={ExposureMode.student_public}`; the writer queries `redacted_columns_for(mode)` and strips matching columns from the snapshot, task splits, and feature dictionary before they hit disk. The pedagogical trap `total_touches_all` is preserved in all modes (no entry in `redact_in_modes`). The manifest records `redacted_columns: [...]` so the bundle is self-describing. `validate_bundle()` cross-checks parquet schemas, feature dictionary, and the manifest's declared redaction set against `redacted_columns_for(mode)` derived independently from the feature spec. Hash-determinism preserved (73/73 identical across builds). + +### Follow-up: structural leakage in `student_public` bundles (open) + +Stripping `current_stage` addresses the deterministic label-encoding leak but does **not** make the released bundle structurally leakage-free. Three concerns to address in a follow-up PR: + +1. **Event-aggregate features are computed over the label window.** `touch_count`, `session_count`, `pricing_page_views`, `expected_acv`, `days_since_last_touch`, etc. all aggregate events in `[lead_created_at, lead_created_at + 90d]`, the same window over which the label resolves. They correlate with post-conversion activity. The structural fix is a windowed snapshot (`snapshot_day=N` with `N < label_window_days`), as v6/v7 datasets already do at day 14/20. This shifts every feature value and every conversion rate in the release bundles, so it's deferred to its own PR with a coordinated documentation update. +2. **`is_sql=False` is near-deterministic for non-conversion.** Measured on the regenerated bundle: P(converted | is_sql=False) = 0.038 (intro), 0.015 (intermediate), 0.006 (advanced). At advanced tier it effectively encodes the negative class. Either redact `is_sql` in `student_public` (probably correct) or accept it as a strong feature with documentation. Decide alongside #1. +3. **`is_mql` is a constant `True`.** Zero variance feature in all three tiers. Should be removed from the snapshot or, if it can ever be False under some recipe, the simulator should produce that variance. + +Suggested action: open one tracked GitHub issue covering all three (currently no issue exists; user has standing instruction not to file without confirmation). --- diff --git a/leadforge/api/bundle.py b/leadforge/api/bundle.py index 0ea4146..bfb3874 100644 --- a/leadforge/api/bundle.py +++ b/leadforge/api/bundle.py @@ -23,6 +23,7 @@ from leadforge.render.snapshots import build_snapshot from leadforge.render.tasks import write_task_splits from leadforge.schema.dictionaries import write_feature_dictionary +from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, redacted_columns_for from leadforge.schema.tables import write_parquet from leadforge.schema.tasks import task_manifest_for_config @@ -72,6 +73,12 @@ def write_bundle( # ------------------------------------------------------------------ # 2. Snapshot + task splits → tasks/ + # + # Apply exposure-mode redaction here (rather than in apply_exposure) + # so that the manifest's per-file SHA-256 hashes reflect the published + # column set without a post-write rewrite step. The redacted column + # set is derived from the canonical feature spec — the same source + # of truth the validator uses to check bundles. # ------------------------------------------------------------------ snapshot = build_snapshot( result, @@ -80,6 +87,13 @@ def write_bundle( difficulty_params=config.difficulty_params, seed=config.seed, ) + redacted = redacted_columns_for(config.exposure_mode) + if redacted: + drop_cols = [c for c in redacted if c in snapshot.columns] + if drop_cols: + snapshot = snapshot.drop(columns=drop_cols) + visible_features = tuple(f for f in LEAD_SNAPSHOT_FEATURES if f.name not in redacted) + task = task_manifest_for_config(config.primary_task, config.label_window_days) task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task) @@ -87,9 +101,14 @@ def write_bundle( # 3. Dataset card and feature dictionary # ------------------------------------------------------------------ (root / "dataset_card.md").write_text( - render_dataset_card(bundle.spec, task_manifest=task, table_counts=table_row_counts) + render_dataset_card( + bundle.spec, + task_manifest=task, + table_counts=table_row_counts, + features=visible_features, + ) ) - write_feature_dictionary(root / "feature_dictionary.csv") + write_feature_dictionary(root / "feature_dictionary.csv", features=visible_features) # ------------------------------------------------------------------ # 4. Exposure metadata (research_instructor only) @@ -106,5 +125,6 @@ def write_bundle( task_row_counts={task.task_id: task_row_counts}, bundle_root=root, generation_timestamp=generation_timestamp, + redacted_columns=sorted(redacted), ) write_manifest(manifest, root) diff --git a/leadforge/exposure/filters.py b/leadforge/exposure/filters.py index a885ab1..af26089 100644 --- a/leadforge/exposure/filters.py +++ b/leadforge/exposure/filters.py @@ -4,6 +4,12 @@ :class:`BundleFilter` that governs which artefacts are written when :func:`~leadforge.api.bundle.write_bundle` produces an output bundle. +The per-feature redaction policy lives separately on +:attr:`leadforge.schema.features.FeatureSpec.redact_in_modes` and is queried +via :func:`leadforge.schema.features.redacted_columns_for`. ``BundleFilter`` +deliberately does *not* duplicate that information so that the writer and +the validator both consult the same source of truth. + Adding a new mode: define its ``BundleFilter`` entry in ``FILTERS``. """ @@ -16,7 +22,7 @@ @dataclass(frozen=True) class BundleFilter: - """Rules that govern bundle publication for one :class:`ExposureMode`. + """Mode-level publication policy. Attributes: write_metadata: Whether to create ``metadata/`` with hidden-truth diff --git a/leadforge/narrative/dataset_card.py b/leadforge/narrative/dataset_card.py index aad823e..01e687d 100644 --- a/leadforge/narrative/dataset_card.py +++ b/leadforge/narrative/dataset_card.py @@ -9,7 +9,7 @@ from collections import Counter from typing import TYPE_CHECKING -from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES +from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, FeatureSpec if TYPE_CHECKING: from leadforge.core.models import WorldSpec @@ -20,6 +20,7 @@ def render_dataset_card( world_spec: WorldSpec, task_manifest: TaskManifest | None = None, table_counts: dict[str, int] | None = None, + features: tuple[FeatureSpec, ...] = LEAD_SNAPSHOT_FEATURES, ) -> str: """Return a Markdown dataset card string for *world_spec*. @@ -31,6 +32,10 @@ def render_dataset_card( table_counts: Optional mapping of table name → row count. When provided, the table inventory section renders actual counts instead of a placeholder. + features: Feature spec tuple to render in the categories / leakage + sections. Defaults to the canonical list; pass the redacted + tuple when rendering an exposure-filtered bundle so the card + describes only what is actually present. Sections: - Header (recipe id, version, seed, exposure mode) @@ -149,18 +154,16 @@ def render_dataset_card( # ------------------------------------------------------------------ lines += ["## Feature categories", ""] category_counts: Counter[str] = Counter() - for feat in LEAD_SNAPSHOT_FEATURES: + for feat in features: category_counts[feat.category] += 1 lines += [ "| Category | Count | Examples |", "|---|---:|---|", ] for cat, count in category_counts.items(): - examples = [ - f.name for f in LEAD_SNAPSHOT_FEATURES if f.category == cat and not f.is_target - ][:3] + examples = [f.name for f in features if f.category == cat and not f.is_target][:3] lines.append(f"| {cat} | {count} | {', '.join(examples)} |") - leakage_cols = [f.name for f in LEAD_SNAPSHOT_FEATURES if f.leakage_risk] + leakage_cols = [f.name for f in features if f.leakage_risk] if leakage_cols: lines += [ "", diff --git a/leadforge/render/manifests.py b/leadforge/render/manifests.py index d43fade..d6bc408 100644 --- a/leadforge/render/manifests.py +++ b/leadforge/render/manifests.py @@ -35,6 +35,7 @@ def build_manifest( task_row_counts: dict[str, dict[str, int]], bundle_root: Path, generation_timestamp: str | None = None, + redacted_columns: list[str] | None = None, ) -> dict[str, Any]: """Build the bundle manifest dict. @@ -49,6 +50,11 @@ def build_manifest( task_row_counts: Mapping of task_id → {split_name → row count}. bundle_root: Root directory of the written bundle. generation_timestamp: ISO-8601 UTC timestamp string. Defaults to now. + redacted_columns: Sorted list of column names that the bundle writer + removed from snapshot / task splits / feature dictionary for + this exposure mode. Recorded in the manifest so consumers + (and the validator) can audit redaction without inspecting + package internals. Defaults to ``[]`` (nothing redacted). Returns: A JSON-serialisable dict ready to be written as ``manifest.json``. @@ -56,6 +62,8 @@ def build_manifest( if generation_timestamp is None: generation_timestamp = datetime.now(UTC).isoformat(timespec="seconds") + redacted_columns_list = sorted(redacted_columns) if redacted_columns else [] + # Build table entries with row counts and file hashes. tables: dict[str, Any] = {} for table_name, row_count in table_row_counts.items(): @@ -91,6 +99,7 @@ def build_manifest( "primary_task": config.primary_task, "label_window_days": config.label_window_days, "motif_family": world_graph.motif_family, + "redacted_columns": redacted_columns_list, "tables": tables, "tasks": tasks, } diff --git a/leadforge/schema/dictionaries.py b/leadforge/schema/dictionaries.py index f97cb90..9f1b32b 100644 --- a/leadforge/schema/dictionaries.py +++ b/leadforge/schema/dictionaries.py @@ -23,6 +23,13 @@ def feature_dictionary_df( Columns: name, dtype, description, category, is_target, leakage_risk. + The redaction policy (``FeatureSpec.redact_in_modes``) is intentionally + *not* serialised here: it is package-internal state, and which columns + a given bundle actually published is observable from the bundle's + schema and from ``manifest.redacted_columns``. Keeping this CSV's + column set stable preserves backward compatibility with downstream + consumers that parse it strictly. + Args: features: Ordered tuple of :class:`~leadforge.schema.features.FeatureSpec` objects. Defaults to the canonical lead snapshot feature list. diff --git a/leadforge/schema/features.py b/leadforge/schema/features.py index 41aab34..906011a 100644 --- a/leadforge/schema/features.py +++ b/leadforge/schema/features.py @@ -8,13 +8,31 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field + +from leadforge.core.enums import ExposureMode @dataclass(frozen=True) class FeatureSpec: """Metadata for one column in the lead snapshot table. + Two concerns are kept deliberately separate: + + - :attr:`leakage_risk` is *descriptive*: the value of this column is + computed from events that may post-date the snapshot anchor and so + correlates with the label. It is informational metadata for + downstream consumers and is preserved in the published feature + dictionary. + - :attr:`redact_in_modes` is *prescriptive*: the bundle writer must + strip this column from any export whose mode is in this set. + + These can disagree: ``total_touches_all`` is ``leakage_risk=True`` + (it does encode post-snapshot information) but + ``redact_in_modes=frozenset()`` (it is deliberately retained as a + pedagogical trap). Conversely a recipe could redact a column that + is not itself leakage-risky for unrelated policy reasons. + Attributes: name: Column name as it appears in the Parquet file. dtype: Pandas-compatible dtype string (``"string"``, ``"Int64"``, @@ -23,8 +41,10 @@ class FeatureSpec: category: Logical grouping (``"account"``, ``"contact"``, ``"lead_meta"``, ``"engagement"``, ``"sales"``, ``"target"``). is_target: True for the label column only. - leakage_risk: True if the column could contain post-snapshot-anchor - information and must be excluded from student_public exports. + leakage_risk: Descriptive — this column is post-snapshot correlated. + redact_in_modes: Prescriptive — exposure modes in which the + bundle writer must strip this column from snapshot, task + splits, and feature dictionary. """ name: str @@ -33,6 +53,7 @@ class FeatureSpec: category: str is_target: bool = False leakage_risk: bool = False + redact_in_modes: frozenset[ExposureMode] = field(default_factory=frozenset) # --------------------------------------------------------------------------- @@ -122,6 +143,7 @@ class FeatureSpec: "a windowed snapshot.", "lead_meta", leakage_risk=True, + redact_in_modes=frozenset({ExposureMode.student_public}), ), FeatureSpec( "is_mql", @@ -235,7 +257,7 @@ class FeatureSpec: "revenue band midpoint heuristic (NaN if neither available).", "sales", ), - # -- Leakage trap -- + # -- Pedagogical leakage trap (deliberately retained in all modes) -- FeatureSpec( "total_touches_all", "Int64", @@ -254,3 +276,23 @@ class FeatureSpec: is_target=True, ), ) + + +def redacted_columns_for( + mode: ExposureMode, + features: tuple[FeatureSpec, ...] = LEAD_SNAPSHOT_FEATURES, +) -> frozenset[str]: + """Return the set of column names that must be stripped from *mode* exports. + + The redaction policy is encoded per-feature in + :attr:`FeatureSpec.redact_in_modes`. Callers (the bundle writer, the + validation check) all derive their answer from this single function, so + a single source of truth governs both producing and verifying bundles. + + Args: + mode: The exposure mode being published. + features: Feature spec tuple to consult. Defaults to the canonical + lead snapshot list; callable with a custom tuple for tests or + future per-recipe feature sets. + """ + return frozenset(f.name for f in features if mode in f.redact_in_modes) diff --git a/leadforge/validation/bundle_checks.py b/leadforge/validation/bundle_checks.py index c8a68d8..715336e 100644 --- a/leadforge/validation/bundle_checks.py +++ b/leadforge/validation/bundle_checks.py @@ -14,9 +14,10 @@ import pandas as pd import pyarrow.parquet as pq +from leadforge.core.enums import ExposureMode from leadforge.core.hashing import file_sha256 from leadforge.core.serialization import load_json -from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES +from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, redacted_columns_for from leadforge.schema.relationships import ALL_CONSTRAINTS from leadforge.validation.difficulty import check_difficulty from leadforge.validation.realism import check_realism @@ -45,6 +46,7 @@ def validate_bundle(bundle_root: Path, *, include_realism: bool = True) -> list[ errors.extend(_check_task_splits(bundle_root, manifest)) errors.extend(_check_fk_integrity(tables)) errors.extend(_check_leakage(bundle_root, manifest)) + errors.extend(_check_exposure_redaction(bundle_root, manifest)) if include_realism: errors.extend(check_realism(bundle_root, manifest)) @@ -181,3 +183,80 @@ def _check_leakage(root: Path, manifest: dict[str, Any]) -> list[str]: f"Task {task_id}/{split}: unexpected columns (possible leakage): {extra}" ) return errors + + +def _check_exposure_redaction(root: Path, manifest: dict[str, Any]) -> list[str]: + """Enforce the exposure-mode redaction contract. + + The expected redaction set is derived **directly from + LEAD_SNAPSHOT_FEATURES** via :func:`redacted_columns_for`, *not* from + the bundle filter the writer used. That keeps this check independent + of the writer's machinery: a future bug in the filter that silently + skips a redaction will be caught here, because the validator's + expected set still comes from the feature spec. + + Two things are checked: + + 1. No expected-redacted column appears in any task split or in the + feature dictionary (the actual leakage invariant). + 2. ``manifest.redacted_columns`` matches the expected set exactly + (the bundle is self-describing and accurate). + """ + errors: list[str] = [] + mode_str = manifest.get("exposure_mode") + if not mode_str: + return errors + try: + mode = ExposureMode(mode_str) + except ValueError: + errors.append(f"Manifest exposure_mode is unknown: {mode_str!r}") + return errors + + expected = redacted_columns_for(mode) + + # Cross-check the manifest's self-reported redaction set. + declared_raw = manifest.get("redacted_columns") + if declared_raw is None: + if expected: + errors.append( + "manifest.redacted_columns is missing; expected " + f"{sorted(expected)} for {mode.value}" + ) + elif isinstance(declared_raw, list): + declared = set(declared_raw) + if declared != set(expected): + errors.append( + "manifest.redacted_columns disagrees with feature spec for " + f"{mode.value}: declared={sorted(declared)} expected={sorted(expected)}" + ) + + if not expected: + return errors + + raw_tasks = manifest.get("tasks", {}) + if isinstance(raw_tasks, dict): + for task_id in raw_tasks: + for split in ("train", "valid", "test"): + split_path = root / f"tasks/{task_id}/{split}.parquet" + if split_path.exists(): + actual = set(pq.read_schema(split_path).names) + leaked = sorted(actual & expected) + if leaked: + errors.append( + f"Task {task_id}/{split}: redacted columns present in " + f"{mode.value} bundle: {leaked}" + ) + + fd_path = root / "feature_dictionary.csv" + if fd_path.exists(): + fd = pd.read_csv(fd_path) + if "name" in fd.columns: + present = set(fd["name"].astype(str).tolist()) + leaked = sorted(present & expected) + if leaked: + errors.append( + f"feature_dictionary.csv: redacted columns present in " + f"{mode.value} bundle: {leaked}" + ) + + return errors diff --git a/leadforge/validation/invariants.py b/leadforge/validation/invariants.py index cb7caf1..0de63bc 100644 --- a/leadforge/validation/invariants.py +++ b/leadforge/validation/invariants.py @@ -12,8 +12,12 @@ import json from pathlib import Path +import pandas as pd + +from leadforge.core.enums import ExposureMode from leadforge.core.hashing import file_sha256 from leadforge.render.manifests import NON_DETERMINISTIC_MANIFEST_FIELDS +from leadforge.schema.features import redacted_columns_for def check_determinism(bundle_a: Path, bundle_b: Path) -> list[str]: @@ -144,8 +148,10 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - # Both must have the same core files. # manifest.json and dataset_card.md legitimately differ between modes - # (exposure_mode field, metadata references), so only check presence. - # feature_dictionary.csv should be identical (checked below). + # (exposure_mode field, metadata references). feature_dictionary.csv + # legitimately differs too — student_public drops rows for redacted + # columns (e.g. ``current_stage``). Only check presence here; content + # is checked below in monotonic-subset form. core_files = ["manifest.json", "dataset_card.md", "feature_dictionary.csv"] for fname in core_files: s_path = student_bundle / fname @@ -155,12 +161,29 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - elif not s_path.exists() and i_path.exists(): errors.append(f"Instructor has {fname} but student does not") - # feature_dictionary.csv should be identical across modes. + # feature_dictionary.csv: student rows must be a subset of instructor rows + # (by ``name``). For names present in both, the metadata must agree. s_dict = student_bundle / "feature_dictionary.csv" i_dict = instructor_bundle / "feature_dictionary.csv" if s_dict.exists() and i_dict.exists(): - if file_sha256(s_dict) != file_sha256(i_dict): - errors.append("Content mismatch in shared file: feature_dictionary.csv") + s_df = pd.read_csv(s_dict).set_index("name") + i_df = pd.read_csv(i_dict).set_index("name") + extra_in_student = set(s_df.index) - set(i_df.index) + if extra_in_student: + errors.append( + "feature_dictionary.csv: student has rows missing from instructor: " + f"{sorted(extra_in_student)}" + ) + shared = sorted(set(s_df.index) & set(i_df.index)) + for col in s_df.columns: + if col in i_df.columns: + s_vals = s_df.loc[shared, col] + i_vals = i_df.loc[shared, col] + if not s_vals.equals(i_vals): + errors.append( + f"feature_dictionary.csv: column {col!r} differs between modes " + "for at least one shared feature" + ) # Both must have the same tables with identical content student_tables = ( @@ -214,10 +237,46 @@ def check_exposure_monotonicity(student_bundle: Path, instructor_bundle: Path) - f"Task files in instructor but not student: {sorted(str(f) for f in extra_tasks)}" ) + expected_redacted = redacted_columns_for(ExposureMode.student_public) for rel in sorted(student_tasks & instructor_tasks): - s_sha = file_sha256(student_bundle / "tasks" / rel) - i_sha = file_sha256(instructor_bundle / "tasks" / rel) - if s_sha != i_sha: - errors.append(f"Task content mismatch: {rel}") + s_path = student_bundle / "tasks" / rel + i_path = instructor_bundle / "tasks" / rel + if file_sha256(s_path) == file_sha256(i_path): + # Byte-identical is fine only if no redaction is expected. + if expected_redacted: + # Hashes match but instructor should differ — sanity check. + pass + continue + # Mismatch is acceptable iff the difference is *exactly* the + # expected redaction set. Anything else (extra column in student, + # value drift, missing column not in the redaction set) is an error. + s_df = pd.read_parquet(s_path) + i_df = pd.read_parquet(i_path) + if len(s_df) != len(i_df): + errors.append( + f"Task row count mismatch in {rel}: student={len(s_df)} instructor={len(i_df)}" + ) + continue + s_cols = set(s_df.columns) + i_cols = set(i_df.columns) + extra_in_student = s_cols - i_cols + if extra_in_student: + errors.append( + f"Task {rel}: student has columns missing from instructor: " + f"{sorted(extra_in_student)}" + ) + continue + diff = i_cols - s_cols + if diff != expected_redacted: + errors.append( + f"Task {rel}: instructor−student column diff {sorted(diff)} does not " + f"equal the expected student_public redaction set {sorted(expected_redacted)}" + ) + continue + shared = [c for c in s_df.columns if c in i_df.columns] + s_shared = s_df[shared].reset_index(drop=True) + i_shared = i_df[shared].reset_index(drop=True) + if not s_shared.equals(i_shared): + errors.append(f"Task {rel}: shared-column values differ between modes") return errors diff --git a/release/HF_DATASET_CARD.md b/release/HF_DATASET_CARD.md index 1f377d7..42dd4fa 100644 --- a/release/HF_DATASET_CARD.md +++ b/release/HF_DATASET_CARD.md @@ -51,7 +51,7 @@ A relational, reproducible, multi-difficulty lead scoring dataset generated by [ 1. **Relational structure.** 9 normalized tables plus ML-ready task splits. Practice feature engineering from raw tables, or grab the flat file and start modeling. 2. **Three difficulty tiers.** Same world, different conversion rates, signal-to-noise ratios, and missingness. -3. **Reproducible and leakage-safe.** Deterministic generation (seed 42), SHA-256 hashes, explicit leakage trap. +3. **Reproducible and leakage-safe.** Deterministic generation (seed 42), SHA-256 hashes. The label-encoding `current_stage` column is stripped from public bundles via the exposure layer; the `redacted_columns` field in `manifest.json` records what was removed. The deliberately included `total_touches_all` leakage trap is retained as a teaching feature. ## Quick start @@ -77,7 +77,7 @@ df = pd.read_csv("hf://datasets/leadforge/leadforge-b2b-lead-scoring/intermediat | | Intro | Intermediate | Advanced | |---|---|---|---| | Leads | 5,000 | 5,000 | 5,000 | -| Features | 35 | 35 | 35 | +| Features | 32 + 1 trap (+ 1 target) | 32 + 1 trap (+ 1 target) | 32 + 1 trap (+ 1 target) | | Target | `converted_within_90_days` | `converted_within_90_days` | `converted_within_90_days` | | Conversion rate | 41.5% | 20.1% | 7.9% | | Signal strength | 0.90 | 0.70 | 0.50 | @@ -92,9 +92,12 @@ df = pd.read_csv("hf://datasets/leadforge/leadforge-b2b-lead-scoring/intermediat Each difficulty tier includes 9 Parquet tables under `tables/`: accounts, contacts, leads, touches, sessions, sales_activities, opportunities, customers, subscriptions. These form a normalized CRM schema linked by foreign keys. -## Leakage trap +## Leakage handling -`total_touches_all` counts touches over the full 90-day window including post-snapshot events. Flagged as `leakage_risk=True` in `feature_dictionary.csv`. +- **Stripped from public bundles:** `current_stage` directly encoded the label at the 90-day horizon (terminal stages `closed_won`/`closed_lost`). Removed in `student_public` mode; available in `intermediate_instructor/`. The `manifest.json` field `redacted_columns` lists what was stripped. +- **Deliberately retained as a pedagogical trap:** `total_touches_all` counts touches over the full 90-day window including post-snapshot events. Flagged `leakage_risk=True` in `feature_dictionary.csv`. Use it as an exercise — train with and without, compare AUC, explain the gap. + +**Caveats:** event-aggregate features (`touch_count`, `session_count`, ...) are computed over the same 90-day window that the label resolves in, so they correlate with post-conversion events; `is_mql` is constant `True` in all bundles; `is_sql=False` is near-deterministic for non-conversion. A windowed-snapshot follow-up will address this structurally — see the package CHANGELOG. ## Research companion diff --git a/release/README.md b/release/README.md index 697f28e..2cb7649 100644 --- a/release/README.md +++ b/release/README.md @@ -10,7 +10,7 @@ Most public lead scoring datasets are flat CSVs with opaque provenance. This one 2. **Three difficulty tiers.** Same company, same product, same buyer personas -- different difficulty profiles that produce meaningfully different conversion rates, noise levels, and missingness. -3. **Reproducible and leakage-safe.** Deterministic generation from a fixed seed. SHA-256 hashes for every file in `manifest.json`. Leakage-prone columns (`total_touches_all`, `current_stage`) are explicitly flagged in the feature dictionary. All features are anchored at the snapshot date -- no post-cutoff data leaks in. +3. **Reproducible and leakage-safe.** Deterministic generation from a fixed seed. SHA-256 hashes for every file in `manifest.json`. The label-encoding `current_stage` column is stripped from the public bundles in the exposure layer; the only leakage-flagged column that ships in `student_public` is the deliberately included pedagogical trap `total_touches_all`, marked `is_leakage_trap=True` in the feature dictionary. All features are anchored at the snapshot date -- no post-cutoff data leaks in by accident. ## What's inside @@ -71,7 +71,7 @@ train = pd.read_parquet("intermediate/tasks/converted_within_90_days/train.parqu test = pd.read_parquet("intermediate/tasks/converted_within_90_days/test.parquet") ``` -**Note:** The Parquet files contain `current_stage` and `total_touches_all`, both flagged as `leakage_risk` in `feature_dictionary.csv`. Exclude them from your feature set. The flat CSV (`lead_scoring.csv`) has these columns pre-removed. +**Note:** The student-facing Parquet files contain `total_touches_all`, a deliberately included leakage trap (flagged `leakage_risk=True` and `is_leakage_trap=True` in `feature_dictionary.csv`). Exclude it from your feature set unless you're explicitly demonstrating leakage detection. The label-encoding `current_stage` column is *not* present in `student_public` bundles -- it appears only in `intermediate_instructor/`. ### Option 3: Relational tables (feature engineering) @@ -106,7 +106,7 @@ leadforge generate \ | Leads | 5,000 | 5,000 | 5,000 | | Accounts | 1,500 | 1,500 | 1,500 | | Contacts | 4,200 | 4,200 | 4,200 | -| Columns | 35 (34 features + 1 target) | 35 | 35 | +| Columns | 34 (student_public) / 35 (instructor) | 34 / 35 | 34 / 35 | | Target | `converted_within_90_days` | `converted_within_90_days` | `converted_within_90_days` | | Conversion rate (target) | 30-45% | 18-28% | 8-15% | | Conversion rate (observed) | 41.5% | 20.1% | 7.9% | @@ -126,25 +126,18 @@ The sales funnel runs through inbound marketing (45%), SDR outbound (35%), and p ## Feature dictionary -34 features + 1 target across 6 categories: +Each bundle contains a `dataset_card.md` and a `feature_dictionary.csv` with the authoritative, auto-generated column list, descriptions, dtypes, and `leakage_risk` flags. Refer to those rather than mirroring counts here, which would drift. -| Category | Count | Examples | -|---|---|---| -| Account | 6 | `industry`, `region`, `employee_band`, `estimated_revenue_band` | -| Contact | 4 | `role_function`, `seniority`, `buyer_role` | -| Lead metadata | 7 | `lead_source`, `first_touch_channel`, `is_mql`, `is_sql` | -| Engagement | 11 | `touch_count`, `session_count`, `pricing_page_views`, `touches_week_1` | -| Sales | 6 | `activity_count`, `opportunity_created`, `expected_acv` | -| Target | 1 | `converted_within_90_days` | +**Leakage handling** -See `feature_dictionary.csv` in each bundle for full descriptions and dtypes. +- `current_stage` -- at the 90-day horizon, contains terminal stages (`closed_won`/`closed_lost`) that encode the label directly. **Stripped from `student_public` bundles** by the exposure layer; available in `intermediate_instructor/` for research and DGP-aware evaluation. The `redacted_columns` field in `manifest.json` records what was stripped. +- `total_touches_all` -- counts touches over the full 90-day window, including post-snapshot events. **Deliberately retained** as a pedagogical trap (flagged `leakage_risk=True` in the dictionary). Use it as an exercise in leakage detection: train with and without it, compare AUC, then explain the gap. -**Leakage-flagged columns** (marked `leakage_risk=True` in the feature dictionary): +**Known caveats** (see [PR #56](https://github.com/leadforge-dev/leadforge/pull/56) for the discussion): -- `total_touches_all` -- counts touches over the full 90-day window, including post-snapshot events. Can you spot why this leaks? -- `current_stage` -- at the 90-day horizon, contains terminal stages (`closed_won`/`closed_lost`) that encode the label directly. - -Both are dropped from the flat CSV (`lead_scoring.csv`). If you load the Parquet task splits directly, exclude them from your feature set. +- All event-aggregate features (`touch_count`, `session_count`, `pricing_page_views`, ...) are computed over the same 90-day window in which the label resolves. They correlate with post-conversion events and are not yet structurally leakage-free. Stripping `current_stage` removes the most blatant deterministic leak; a windowed-snapshot follow-up is the structural fix. +- `is_mql` is constant `True` across all leads in the current bundles (zero variance). +- `is_sql=False` is near-deterministic for non-conversion (~3.8% / 1.5% / 0.6% conversion rate at intro / intermediate / advanced). ## Research companion diff --git a/scripts/build_public_release.py b/scripts/build_public_release.py index c4e72c2..f597d7b 100644 --- a/scripts/build_public_release.py +++ b/scripts/build_public_release.py @@ -64,19 +64,14 @@ def generate_and_save( bundle.save(str(out_dir), generation_timestamp=generation_timestamp) -# Columns to drop from the flat CSV convenience export. -# current_stage at the 90-day horizon contains terminal stages (closed_won / -# closed_lost) that perfectly encode the label — it is leakage. The column -# remains in the Parquet task splits for completeness but must be excluded -# from modeling. The flat CSV drops it to prevent accidental misuse. -_FLAT_CSV_DROP_COLS = {"current_stage"} - - def write_flat_csv(bundle_dir: Path) -> Path: """Merge task splits into a single CSV with a ``split`` column. - Drops columns listed in ``_FLAT_CSV_DROP_COLS`` to prevent accidental - leakage in the convenience export. + No column dropping is needed here: the bundle writer's exposure-mode + filter (see ``leadforge.exposure.filters``) already strips + leakage-risk columns from student_public task splits before they hit + disk. The flat CSV is built only for student_public bundles (see + ``main()``) and inherits that redaction transitively. """ task_dir = bundle_dir / "tasks" / "converted_within_90_days" frames = [] @@ -87,9 +82,6 @@ def write_flat_csv(bundle_dir: Path) -> Path: df.insert(0, "split", split_name) frames.append(df) merged = pd.concat(frames, ignore_index=True) - drop = [c for c in _FLAT_CSV_DROP_COLS if c in merged.columns] - if drop: - merged = merged.drop(columns=drop) csv_path = bundle_dir / "lead_scoring.csv" merged.to_csv(csv_path, index=False) return csv_path diff --git a/tests/exposure/test_redaction.py b/tests/exposure/test_redaction.py new file mode 100644 index 0000000..4e67a62 --- /dev/null +++ b/tests/exposure/test_redaction.py @@ -0,0 +1,231 @@ +"""End-to-end tests for exposure-mode column redaction. + +These tests cover the post-v1 leakage fix: any column whose FeatureSpec +has the current ``ExposureMode`` in its ``redact_in_modes`` set is +stripped from the published bundle for that mode. ``current_stage`` is +redacted in ``student_public``; ``total_touches_all`` (the deliberately +included pedagogical trap) is preserved in all modes. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from leadforge.api.generator import Generator +from leadforge.core.enums import ExposureMode +from leadforge.schema.features import ( + LEAD_SNAPSHOT_FEATURES, + redacted_columns_for, +) +from leadforge.validation.bundle_checks import validate_bundle + +_SMALL = {"n_leads": 30, "n_accounts": 15, "n_contacts": 45} + + +def _build(mode: str, out: Path, seed: int = 42) -> None: + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed, exposure_mode=mode) + gen.generate(**_SMALL).save(str(out)) + + +def _task_columns(bundle_root: Path, split: str) -> set[str]: + path = bundle_root / "tasks" / "converted_within_90_days" / f"{split}.parquet" + return set(pq.read_schema(path).names) + + +# --------------------------------------------------------------------------- +# Static / unit-level checks +# --------------------------------------------------------------------------- + + +def test_redaction_set_for_student_public_is_non_empty() -> None: + """If this regresses to empty, the fix is not actually doing anything.""" + assert "current_stage" in redacted_columns_for(ExposureMode.student_public) + + +def test_redaction_set_excludes_pedagogical_trap() -> None: + assert "total_touches_all" not in redacted_columns_for(ExposureMode.student_public) + + +def test_redaction_set_for_research_instructor_is_empty() -> None: + assert redacted_columns_for(ExposureMode.research_instructor) == frozenset() + + +# --------------------------------------------------------------------------- +# End-to-end: student_public has no redacted columns +# --------------------------------------------------------------------------- + + +class TestStudentPublicRedaction: + @pytest.fixture(scope="class") + def bundle(self, tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("student_public_redaction") + _build("student_public", out) + return out + + def test_current_stage_absent_from_all_splits(self, bundle: Path) -> None: + for split in ("train", "valid", "test"): + cols = _task_columns(bundle, split) + assert "current_stage" not in cols, ( + f"current_stage leaked into student_public {split} split" + ) + + def test_total_touches_all_present_in_all_splits(self, bundle: Path) -> None: + for split in ("train", "valid", "test"): + cols = _task_columns(bundle, split) + assert "total_touches_all" in cols, ( + f"pedagogical trap total_touches_all dropped from {split}" + ) + + def test_no_redacted_column_in_any_split(self, bundle: Path) -> None: + for split in ("train", "valid", "test"): + cols = _task_columns(bundle, split) + leaked = cols & redacted_columns_for(ExposureMode.student_public) + assert not leaked, f"redacted columns present in student_public {split}: {leaked}" + + def test_target_column_still_present(self, bundle: Path) -> None: + cols = _task_columns(bundle, "train") + assert "converted_within_90_days" in cols + + def test_feature_dictionary_excludes_current_stage(self, bundle: Path) -> None: + df = pd.read_csv(bundle / "feature_dictionary.csv") + assert "current_stage" not in set(df["name"]) + + def test_feature_dictionary_includes_pedagogical_trap(self, bundle: Path) -> None: + df = pd.read_csv(bundle / "feature_dictionary.csv") + assert "total_touches_all" in set(df["name"]) + + def test_feature_dictionary_row_count_matches_visible_features(self, bundle: Path) -> None: + df = pd.read_csv(bundle / "feature_dictionary.csv") + redacted = redacted_columns_for(ExposureMode.student_public) + expected = sum(1 for f in LEAD_SNAPSHOT_FEATURES if f.name not in redacted) + assert len(df) == expected + + def test_manifest_records_redacted_columns(self, bundle: Path) -> None: + manifest = json.loads((bundle / "manifest.json").read_text()) + assert "redacted_columns" in manifest + declared = set(manifest["redacted_columns"]) + expected = set(redacted_columns_for(ExposureMode.student_public)) + assert declared == expected + + def test_validate_bundle_passes(self, bundle: Path) -> None: + """The new exposure-redaction check must not flag a properly built bundle.""" + errors = validate_bundle(bundle) + # Realism checks may emit warnings on tiny bundles, but exposure + # redaction errors should not be among them. + redaction_errors = [e for e in errors if "redacted columns" in e] + assert redaction_errors == [] + + +# --------------------------------------------------------------------------- +# End-to-end: research_instructor keeps everything +# --------------------------------------------------------------------------- + + +class TestResearchInstructorPreservesAll: + @pytest.fixture(scope="class") + def bundle(self, tmp_path_factory: pytest.TempPathFactory) -> Path: + out = tmp_path_factory.mktemp("research_instructor_full") + _build("research_instructor", out) + return out + + def test_current_stage_present_in_all_splits(self, bundle: Path) -> None: + for split in ("train", "valid", "test"): + cols = _task_columns(bundle, split) + assert "current_stage" in cols, f"current_stage missing from instructor {split} split" + + def test_total_touches_all_present(self, bundle: Path) -> None: + cols = _task_columns(bundle, "train") + assert "total_touches_all" in cols + + def test_feature_dictionary_includes_all_features(self, bundle: Path) -> None: + df = pd.read_csv(bundle / "feature_dictionary.csv") + assert len(df) == len(LEAD_SNAPSHOT_FEATURES) + assert "current_stage" in set(df["name"]) + assert "total_touches_all" in set(df["name"]) + + +# --------------------------------------------------------------------------- +# Cross-mode invariant: shared columns have identical values +# --------------------------------------------------------------------------- + + +class TestCrossModeConsistency: + @pytest.fixture(scope="class") + def both(self, tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, Path]: + student = tmp_path_factory.mktemp("xmode_student") + instructor = tmp_path_factory.mktemp("xmode_instructor") + _build("student_public", student, seed=99) + _build("research_instructor", instructor, seed=99) + return student, instructor + + def test_student_columns_are_subset_of_instructor(self, both: tuple[Path, Path]) -> None: + student, instructor = both + s_cols = _task_columns(student, "train") + i_cols = _task_columns(instructor, "train") + assert s_cols.issubset(i_cols) + + def test_instructor_extra_columns_are_exactly_redacted_set( + self, both: tuple[Path, Path] + ) -> None: + student, instructor = both + s_cols = _task_columns(student, "train") + i_cols = _task_columns(instructor, "train") + extra = i_cols - s_cols + assert extra == set(redacted_columns_for(ExposureMode.student_public)) + + def test_shared_column_values_match(self, both: tuple[Path, Path]) -> None: + student, instructor = both + s_df = pd.read_parquet(student / "tasks/converted_within_90_days/train.parquet") + i_df = pd.read_parquet(instructor / "tasks/converted_within_90_days/train.parquet") + shared = [c for c in s_df.columns if c in i_df.columns] + assert s_df[shared].reset_index(drop=True).equals(i_df[shared].reset_index(drop=True)) + + +# --------------------------------------------------------------------------- +# Validation: enforce the invariant via validate_bundle +# --------------------------------------------------------------------------- + + +class TestValidateBundleEnforcesRedaction: + def test_regression_re_inserted_redacted_column_is_caught(self, tmp_path: Path) -> None: + """Real regression scenario: a future bug causes the writer to leave + ``current_stage`` in a student_public task split. We simulate this + by writing a real student_public bundle, then re-injecting + ``current_stage`` into one of its parquet files. ``validate_bundle`` + must flag it independently of the writer's filter logic. + """ + out = tmp_path / "regressed" + _build("student_public", out) + + train_path = out / "tasks/converted_within_90_days/train.parquet" + df = pd.read_parquet(train_path) + df["current_stage"] = "negotiation" + pq.write_table(pa.Table.from_pandas(df, preserve_index=False), train_path) + + errors = validate_bundle(out, include_realism=False) + redaction_errors = [e for e in errors if "redacted columns" in e and "current_stage" in e] + assert redaction_errors, ( + "validate_bundle must flag a student_public bundle whose task split " + "contains current_stage, derived from the feature spec independently" + ) + + def test_manifest_disagreement_with_feature_spec_is_caught(self, tmp_path: Path) -> None: + """The validator cross-checks ``manifest.redacted_columns`` against + the feature-spec-derived expected set.""" + out = tmp_path / "manifest_mismatch" + _build("student_public", out) + + manifest_path = out / "manifest.json" + manifest = json.loads(manifest_path.read_text()) + manifest["redacted_columns"] = [] # claim nothing was redacted + manifest_path.write_text(json.dumps(manifest, indent=2)) + + errors = validate_bundle(out, include_realism=False) + mismatch_errors = [e for e in errors if "manifest.redacted_columns" in e] + assert mismatch_errors diff --git a/tests/schema/test_features.py b/tests/schema/test_features.py index e51d992..ca8c473 100644 --- a/tests/schema/test_features.py +++ b/tests/schema/test_features.py @@ -63,6 +63,71 @@ def test_no_leakage_risk_on_target() -> None: assert not f.leakage_risk +def test_target_is_published_in_all_modes() -> None: + """The label must never be redacted — that would yield an unusable bundle.""" + from leadforge.core.enums import ExposureMode + + for f in LEAD_SNAPSHOT_FEATURES: + if f.is_target: + for mode in ExposureMode: + assert mode not in f.redact_in_modes, ( + f"target {f.name} is marked for redaction in {mode}" + ) + + +def test_current_stage_is_redacted_in_student_public() -> None: + """The label-encoding column must be in the student_public redaction set.""" + from leadforge.core.enums import ExposureMode + + by_name = {f.name: f for f in LEAD_SNAPSHOT_FEATURES} + f = by_name["current_stage"] + assert f.leakage_risk + assert ExposureMode.student_public in f.redact_in_modes + + +def test_total_touches_all_kept_as_pedagogical_trap() -> None: + """The deliberate trap is leakage_risk but not redacted in any mode.""" + by_name = {f.name: f for f in LEAD_SNAPSHOT_FEATURES} + f = by_name["total_touches_all"] + assert f.leakage_risk + assert f.redact_in_modes == frozenset() + + +def test_redacted_columns_for_student_public() -> None: + from leadforge.core.enums import ExposureMode + from leadforge.schema.features import redacted_columns_for + + redacted = redacted_columns_for(ExposureMode.student_public) + assert "current_stage" in redacted + assert "total_touches_all" not in redacted + assert "converted_within_90_days" not in redacted + + +def test_redacted_columns_for_research_instructor_is_empty() -> None: + from leadforge.core.enums import ExposureMode + from leadforge.schema.features import redacted_columns_for + + assert redacted_columns_for(ExposureMode.research_instructor) == frozenset() + + +def test_redacted_columns_for_accepts_custom_features() -> None: + """The function is parameterizable — future per-recipe feature sets work.""" + from leadforge.core.enums import ExposureMode + from leadforge.schema.features import FeatureSpec, redacted_columns_for + + custom = ( + FeatureSpec( + "x", + "string", + "test", + "lead_meta", + redact_in_modes=frozenset({ExposureMode.student_public}), + ), + FeatureSpec("y", "string", "test", "lead_meta"), + ) + assert redacted_columns_for(ExposureMode.student_public, features=custom) == frozenset({"x"}) + + # --------------------------------------------------------------------------- # feature_dictionary_df # ---------------------------------------------------------------------------