From 9ebf57107809d39a4c34a010d08f9d380227eeee Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 20:38:44 +0300 Subject: [PATCH 1/5] feat(render,validation): snapshot-safe relational export + leakage validator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR 2.1 of the v1 dataset release sequence. Module-level only — adds the structural fix audited against in PR 1.1, plus the matching validator that asserts the fix is in place on any bundle claiming to be student_public. PR 2.2 will wire these through bundle.py / exposure/filters.py and bump BUNDLE_SCHEMA_VERSION 4 → 5. leadforge/render/relational_snapshot_safe.py to_dataframes_snapshot_safe(dfs, *, snapshot_day) projects the full-horizon dict from leadforge.render.relational.to_dataframes onto the public-bundle shape: - leads: drops BANNED_LEAD_COLUMNS (converted_within_90_days, conversion_timestamp). - opportunities: drops BANNED_OPP_COLUMNS (close_outcome, closed_at) and filters per-lead by created_at <= lead_created_at + snapshot_day. - touches/sessions/sales_activities: filtered per-lead on each timestamp column to the same boundary (defence-in-depth — alpha bundles already pass G4.4 because the 90d horizon bounds events). - customers/subscriptions: omitted entirely (BANNED_TABLES). - accounts/contacts: pass-through. Pure function, no I/O, deterministic. leadforge/render/relational.py is unchanged (full-horizon export still backs research_instructor). leadforge/validation/relational_leakage.py Five probes feeding LeakageFinding / LeakageReport: - probe_banned_columns — Path A column scan. - probe_banned_tables — customers/subscriptions absent. - probe_deterministic_reconstruction — paths B/C/D zero-hit on the join graph. - probe_snapshot_window — event timestamps within lead_created_at + snapshot_day. - probe_bonus_model_auc — 5-fold CV LR + HistGBM on n_opps / max_acv / mean_acv; default ceiling 0.65 with a TODO(PR 3.3) for per-tier calibration. Two orchestrators (file-based and in-memory) plus RelationalLeakageError carrying the LeakageReport. Constants imported from the render module so writer + validator share one source of truth for "snapshot-safe". scripts/probe_relational_leakage.py deterministic_relational_reconstruction lifted into the package (where PR 3.1 was already slated to land it); the script now re-exports it from leadforge.validation.relational_leakage. No behavioural change — existing tests/scripts/test_probe_relational_leakage.py still passes against `probe_module.deterministic_relational_reconstruction`. Tests tests/render/test_relational_snapshot_safe.py — 14 property tests: banned cols absent; BANNED_TABLES absent from output dict; per-lead snapshot-window invariant holds; accounts/contacts pass-through; idempotent on already-safe input; deterministic across two calls; no input mutation; canonical output table order; missing-optional tolerance; snapshot_day=0 edge; negative-day raises; missing leads raises; missing anchor cols raises. tests/validation/test_relational_leakage.py — 23 probe tests over a 50-lead synthetic bundle (clean + tampered): every probe silent on the clean bundle (orchestrator + individual); each leakage channel re-introduced one at a time fires the matching probe with the expected detail; deterministic probe does not flag Path A (covered by banned_columns); bonus-model fires when customers re-introduced; bonus-model skips cleanly when label unavailable; orchestrator aggregates findings across channels; raise_if_failing carries the report; file-based run_all_probes reads/exits cleanly on missing tables/leads; package and script exports of deterministic_relational_reconstruction agree. All 1001 existing tests still pass; ruff + mypy clean. Refs: docs/release/v1_release_roadmap.md §"Phase 2"; docs/release/v1_release_design.md §"Snapshot-safe relational export"; docs/release/v1_current_state_audit.md §"Pointer to the structural fix" Co-Authored-By: Claude Opus 4.7 --- .agent-plan.md | 11 +- leadforge/render/relational_snapshot_safe.py | 151 +++++ leadforge/validation/relational_leakage.py | 613 ++++++++++++++++++ scripts/probe_relational_leakage.py | 96 +-- tests/render/test_relational_snapshot_safe.py | 276 ++++++++ tests/validation/test_relational_leakage.py | 451 +++++++++++++ 6 files changed, 1505 insertions(+), 93 deletions(-) create mode 100644 leadforge/render/relational_snapshot_safe.py create mode 100644 leadforge/validation/relational_leakage.py create mode 100644 tests/render/test_relational_snapshot_safe.py create mode 100644 tests/validation/test_relational_leakage.py diff --git a/.agent-plan.md b/.agent-plan.md index beee260..8945010 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -25,11 +25,12 @@ Goal: ship a best-in-class educational synthetic CRM lead-scoring dataset family - [x] Lock dataset release name `leadforge-lead-scoring-v1` (already locked via PR #61's milestone rename + roadmap edits; G1.1 reaffirmed) ### Phase 2 — Snapshot-safe relational export -- [ ] `leadforge/render/relational_snapshot_safe.py` (new) -- [ ] `leadforge/validation/relational_leakage.py` (new) -- [ ] `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe` -- [ ] Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`; drop `close_outcome` / `closed_at` from public `opportunities`; omit `customers` / `subscriptions` from public bundles -- [ ] Hash-determinism preserved on regenerated bundles +- [x] `leadforge/render/relational_snapshot_safe.py` (new) — PR 2.1: `to_dataframes_snapshot_safe(dfs, *, snapshot_day)` projects the full-horizon dict from `to_dataframes` onto the public-bundle shape (drops `BANNED_LEAD_COLUMNS` from leads, `BANNED_OPP_COLUMNS` from opportunities, filters event tables per-lead by `lead_created_at + snapshot_day`, omits `customers`/`subscriptions`, passes accounts/contacts unchanged). +- [x] `leadforge/validation/relational_leakage.py` (new) — PR 2.1: `LeakageFinding`/`LeakageReport` dataclasses + five probes (`probe_banned_columns`, `probe_banned_tables`, `probe_deterministic_reconstruction`, `probe_snapshot_window`, `probe_bonus_model_auc`) + file-based and in-memory orchestrators (`run_all_probes`, `run_all_probes_on_dataframes`). `deterministic_relational_reconstruction` lifted from `scripts/probe_relational_leakage.py`; the script now re-exports it from the package. Bonus-model band defaults to 0.65 with a `# TODO(PR 3.3)` comment for per-tier calibration. +- [ ] `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe` — PR 2.2. +- [ ] Wire `relational_snapshot_safe` through `leadforge/exposure/filters.py` and `leadforge/api/bundle.py`; plumb the leakage validator into `leadforge/validation/bundle_checks.py` — PR 2.2. +- [ ] Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`; drop `close_outcome` / `closed_at` from public `opportunities`; omit `customers` / `subscriptions` from public bundles — PR 2.2 (the structural rules are already enforced module-side; PR 2.2 turns them on for actual writes). +- [ ] Hash-determinism preserved on regenerated bundles — PR 2.2. ### Phase 3 — Release validation hardening - [ ] `leadforge/validation/{release_quality,leakage_probes,reporting}.py` (new) diff --git a/leadforge/render/relational_snapshot_safe.py b/leadforge/render/relational_snapshot_safe.py new file mode 100644 index 0000000..c7ca4f6 --- /dev/null +++ b/leadforge/render/relational_snapshot_safe.py @@ -0,0 +1,151 @@ +"""Snapshot-safe relational export for ``student_public`` bundles. + +:func:`to_dataframes_snapshot_safe` projects the full-horizon dict +returned by :func:`leadforge.render.relational.to_dataframes` onto the +shape published in public bundles. The transformation strips every +known channel through which ``converted_within_90_days`` is +reconstructible from joins (see +``docs/release/v1_current_state_audit.md``): + +* ``leads``: drops :data:`BANNED_LEAD_COLUMNS`. +* ``opportunities``: drops :data:`BANNED_OPP_COLUMNS` and filters rows + per-lead to ``created_at <= lead_created_at + snapshot_day``. +* ``touches`` / ``sessions`` / ``sales_activities``: filtered per-lead + on their respective timestamp column to the same boundary. This is + defence-in-depth — the alpha bundles already pass G4.4 because the + simulation horizon bounds event timestamps, but the public contract + is the snapshot window, not the horizon. +* ``customers`` / ``subscriptions`` (:data:`BANNED_TABLES`): omitted + entirely from the output dict; they exist only for converted leads, + so their presence is the leak. +* ``accounts`` / ``contacts``: passed through unchanged + (firmographic/personographic, time-invariant). + +PR 2.2 will wire this through :mod:`leadforge.api.bundle` and +:mod:`leadforge.exposure.filters` so ``student_public`` writes go +through this function while ``research_instructor`` continues to use +the full-horizon :func:`leadforge.render.relational.to_dataframes`. + +The constants below are re-used by +:mod:`leadforge.validation.relational_leakage` so the writer and the +validator share one source of truth for what "snapshot-safe" means. +""" + +from __future__ import annotations + +from collections.abc import Mapping + +import pandas as pd + +#: Columns dropped from public ``leads.parquet``. +BANNED_LEAD_COLUMNS: tuple[str, ...] = ( + "converted_within_90_days", + "conversion_timestamp", +) + +#: Columns dropped from public ``opportunities.parquet``. +BANNED_OPP_COLUMNS: tuple[str, ...] = ( + "close_outcome", + "closed_at", +) + +#: Tables omitted from public bundles entirely. +BANNED_TABLES: tuple[str, ...] = ("customers", "subscriptions") + +#: Event tables and the timestamp column used for per-lead snapshot +#: filtering. ``opportunities`` is treated as an event table here +#: (filtered by ``created_at``) because its existence is itself an +#: event in the funnel timeline. +EVENT_TABLES: tuple[tuple[str, str], ...] = ( + ("touches", "touch_timestamp"), + ("sessions", "session_timestamp"), + ("sales_activities", "activity_timestamp"), + ("opportunities", "created_at"), +) + +_ANCHOR_COL = "_lead_anchor_ts" + + +def to_dataframes_snapshot_safe( + dfs: Mapping[str, pd.DataFrame], + *, + snapshot_day: int, +) -> dict[str, pd.DataFrame]: + """Project the full-horizon relational dict onto the snapshot-safe form. + + Args: + dfs: Output of :func:`leadforge.render.relational.to_dataframes`. + Must contain ``leads``; other tables are optional and + missing keys are silently skipped. Input frames are never + mutated. + snapshot_day: Number of days after ``lead_created_at`` beyond + which event rows are dropped. This is independent of + ``label_window_days`` (which gates the task splits). + + Returns: + A new dict containing — in canonical order — ``accounts``, + ``contacts``, ``leads``, ``touches``, ``sessions``, + ``sales_activities``, ``opportunities``. ``customers`` and + ``subscriptions`` are absent. + + Raises: + ValueError: if ``snapshot_day`` is negative, or if ``leads`` is + absent from ``dfs``, or if ``leads`` lacks ``lead_id`` / + ``lead_created_at``. + """ + if snapshot_day < 0: + raise ValueError(f"snapshot_day must be non-negative, got {snapshot_day}") + if "leads" not in dfs: + raise ValueError("dfs must contain a 'leads' frame") + + out: dict[str, pd.DataFrame] = {} + + for name in ("accounts", "contacts"): + if name in dfs: + out[name] = dfs[name] + + leads = _drop_columns(dfs["leads"], BANNED_LEAD_COLUMNS) + out["leads"] = leads + anchor = _build_anchor(leads) + horizon = pd.Timedelta(days=snapshot_day) + + for name, ts_col in EVENT_TABLES: + if name not in dfs: + continue + df = dfs[name] + if name == "opportunities": + df = _drop_columns(df, BANNED_OPP_COLUMNS) + out[name] = _filter_to_snapshot_window(df, anchor, ts_col, horizon) + + return out + + +def _drop_columns(df: pd.DataFrame, columns: tuple[str, ...]) -> pd.DataFrame: + cols_to_drop = [c for c in columns if c in df.columns] + if not cols_to_drop: + return df + return df.drop(columns=cols_to_drop) + + +def _build_anchor(leads: pd.DataFrame) -> pd.DataFrame: + missing = [c for c in ("lead_id", "lead_created_at") if c not in leads.columns] + if missing: + raise ValueError(f"leads is missing required columns: {missing}") + anchor = leads[["lead_id", "lead_created_at"]].rename(columns={"lead_created_at": _ANCHOR_COL}) + anchor[_ANCHOR_COL] = pd.to_datetime(anchor[_ANCHOR_COL]) + return anchor + + +def _filter_to_snapshot_window( + events: pd.DataFrame, + anchor: pd.DataFrame, + ts_col: str, + horizon: pd.Timedelta, +) -> pd.DataFrame: + if len(events) == 0: + return events + merged = events.merge(anchor, on="lead_id", how="left") + ts = pd.to_datetime(merged[ts_col]) + cutoff = merged[_ANCHOR_COL] + horizon + keep = (ts <= cutoff).fillna(False).to_numpy() + return events.loc[keep].reset_index(drop=True) diff --git a/leadforge/validation/relational_leakage.py b/leadforge/validation/relational_leakage.py new file mode 100644 index 0000000..303736b --- /dev/null +++ b/leadforge/validation/relational_leakage.py @@ -0,0 +1,613 @@ +"""Probes that detect public-bundle reconstruction of ``converted_within_90_days``. + +The audit in ``docs/release/v1_current_state_audit.md`` enumerates four +deterministic paths (A-E) by which alpha public bundles reconstruct the +target via joins. The structural fix lives in +:mod:`leadforge.render.relational_snapshot_safe`; this module is the +matching validator that asserts the fix is in place on any bundle +claiming to be ``student_public``. + +Five probes, each producing zero or more :class:`LeakageFinding` +instances: + +* :func:`probe_banned_columns` — public ``leads`` and ``opportunities`` + tables must not contain :data:`~leadforge.render.relational_snapshot_safe.BANNED_LEAD_COLUMNS` + or :data:`~leadforge.render.relational_snapshot_safe.BANNED_OPP_COLUMNS` + respectively. +* :func:`probe_banned_tables` — public bundles must not include the + conversion-conditional tables ``customers`` or ``subscriptions``. +* :func:`probe_deterministic_reconstruction` — paths B / C / D from the + audit must produce zero positive predictions. +* :func:`probe_snapshot_window` — every event-table row must satisfy + ``timestamp <= lead_created_at + snapshot_day``. +* :func:`probe_bonus_model_auc` — optional honest-feature baseline: + trains LR + HistGBM on the legitimate aggregates ``n_opps`` / ``max_acv`` + / ``mean_acv`` (plus ``n_customers`` / ``n_subscriptions`` if present) + and asserts CV AUC stays below ``max_auc``. + +:func:`run_all_probes` is the file-based orchestrator that PR 2.2 will +call from :func:`leadforge.validation.bundle_checks.validate_bundle`. +:func:`run_all_probes_on_dataframes` is the same orchestrator without +the disk read, so unit tests can exercise the probes against synthetic +bundles built in-memory. + +The :func:`deterministic_relational_reconstruction` function is the +single source of truth for the join graph that defines paths A-E. The +companion script ``scripts/probe_relational_leakage.py`` re-exports it +unchanged so the alpha-bundle audit and the validator agree by +construction. +""" + +from __future__ import annotations + +from collections.abc import Iterable, Mapping +from dataclasses import dataclass +from pathlib import Path +from typing import Final + +import pandas as pd + +from leadforge.render.relational_snapshot_safe import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + EVENT_TABLES, +) + +#: Channel labels carried on :class:`LeakageFinding.channel`. Constants +#: rather than an enum because findings serialise straight to JSON in +#: PR 3.2's reporting layer. +CHANNEL_BANNED_COLUMN: Final[str] = "banned_column" +CHANNEL_BANNED_TABLE: Final[str] = "banned_table" +CHANNEL_DETERMINISTIC_PATH: Final[str] = "deterministic_path" +CHANNEL_SNAPSHOT_WINDOW: Final[str] = "snapshot_window" +CHANNEL_BONUS_MODEL: Final[str] = "bonus_model" + +#: Default ceiling for the bonus-model AUC probe. Honest aggregates +#: (``n_opps`` / ACV) on the v0.1.0-alpha intermediate tier produce a +#: legitimate signal in the high-0.5s under the post-fix shape, so 0.65 +#: is a conservative placeholder until PR 3.3 calibrates a per-tier +#: band against measured baselines. +DEFAULT_MAX_BONUS_AUC: Final[float] = 0.65 +# TODO(PR 3.3): tighten this band against a measured honest-feature baseline. + +_PUBLIC_TABLES: Final[tuple[str, ...]] = ( + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", +) + + +@dataclass(frozen=True) +class LeakageFinding: + """One leakage-channel violation surfaced by a probe.""" + + channel: str + detail: str + message: str + + +@dataclass(frozen=True) +class LeakageReport: + """Aggregate result of a probe run. Empty :attr:`findings` means OK.""" + + findings: tuple[LeakageFinding, ...] + + @property + def ok(self) -> bool: + return len(self.findings) == 0 + + def raise_if_failing(self) -> None: + """Raise :class:`RelationalLeakageError` if any probe reported a finding.""" + if not self.ok: + raise RelationalLeakageError(self) + + +class RelationalLeakageError(AssertionError): + """Raised by :meth:`LeakageReport.raise_if_failing` on any finding. + + Carries the originating :class:`LeakageReport` on ``self.report`` so + callers (e.g. ``leadforge validate``) can render the full set of + findings in their output rather than just the first one. + """ + + def __init__(self, report: LeakageReport) -> None: + self.report = report + first_lines = "\n".join( + f" - [{f.channel}] {f.detail}: {f.message}" for f in report.findings + ) + super().__init__( + f"public bundle leaks `converted_within_90_days` " + f"({len(report.findings)} finding(s)):\n{first_lines}" + ) + + +# --------------------------------------------------------------------------- +# Deterministic reconstruction — the join graph that defines paths A-E. +# +# Lifted verbatim from ``scripts/probe_relational_leakage.py`` (PR 1.1) so +# the package and the script share one implementation. The script now +# re-exports this function from here. +# --------------------------------------------------------------------------- + + +def deterministic_relational_reconstruction( + leads: pd.DataFrame, + opportunities: pd.DataFrame, + customers: pd.DataFrame, + subscriptions: pd.DataFrame, +) -> pd.DataFrame: + """Reconstruct ``converted_within_90_days`` from public relational joins. + + Returns a DataFrame indexed by ``lead_id`` with five boolean columns, + one per reconstruction path (A-E). Path E is the union of B, C, D and + is the headline relational-leakage prediction. + + No hidden state, no model fit — pure joins. + + Empty ``customers``/``subscriptions`` frames are accepted (the + post-fix expected state); the corresponding paths simply return + all-False. + + Raises: + ValueError: if ``leads.lead_id`` contains duplicates. A validator + cannot operate safely on non-unique keys. + """ + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") + + leads_idx = leads.set_index("lead_id", drop=False) + + # Path A — the label itself, if present in public leads. + # Plain ``astype(bool)`` would map NaN to True; route through pandas' + # nullable boolean dtype so missing values fill cleanly to False without + # triggering object-downcast warnings. + if "converted_within_90_days" in leads.columns: + path_a = leads_idx["converted_within_90_days"].astype("boolean").fillna(False).astype(bool) + else: + path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") + + # Path B — any opportunity with close_outcome == "closed_won". + if "close_outcome" in opportunities.columns and len(opportunities) > 0: + won_leads = set( + opportunities.loc[opportunities["close_outcome"] == "closed_won", "lead_id"] + ) + else: + won_leads = set() + path_b = leads_idx["lead_id"].isin(won_leads) + + # Path C — lead has any joined customer (via opportunity_id -> opportunity.lead_id). + if len(opportunities) > 0: + opp_to_lead = dict( + zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=False) + ) + else: + opp_to_lead = {} + customer_leads = { + opp_to_lead[opp_id] for opp_id in customers["opportunity_id"] if opp_id in opp_to_lead + } + path_c = leads_idx["lead_id"].isin(customer_leads) + + # Path D — lead has any joined subscription (sub -> customer -> opportunity -> lead). + if len(customers) > 0: + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) + else: + cust_to_opp = {} + sub_leads: set[str] = set() + for cust_id in subscriptions["customer_id"]: + opp_id = cust_to_opp.get(cust_id) + if opp_id is None: + continue + lead_id = opp_to_lead.get(opp_id) + if lead_id is not None: + sub_leads.add(lead_id) + path_d = leads_idx["lead_id"].isin(sub_leads) + + # Path E — deterministic OR of B, C, D (the headline join-only path). + path_e = path_b | path_c | path_d + + return pd.DataFrame( + { + "path_a_direct_label": path_a.values, + "path_b_opportunity_won": path_b.values, + "path_c_customer_exists": path_c.values, + "path_d_subscription_exists": path_d.values, + "path_e_or_b_c_d": path_e.values, + }, + index=leads_idx.index, + ) + + +# --------------------------------------------------------------------------- +# Probes +# --------------------------------------------------------------------------- + + +def probe_banned_columns(tables: Mapping[str, pd.DataFrame]) -> list[LeakageFinding]: + """Public ``leads``/``opportunities`` must not carry banned columns.""" + findings: list[LeakageFinding] = [] + for table_name, banned in ( + ("leads", BANNED_LEAD_COLUMNS), + ("opportunities", BANNED_OPP_COLUMNS), + ): + df = tables.get(table_name) + if df is None: + continue + for col in banned: + if col in df.columns: + findings.append( + LeakageFinding( + channel=CHANNEL_BANNED_COLUMN, + detail=f"{table_name}.{col}", + message=( + f"public {table_name}.parquet must not contain " + f"the banned column '{col}'" + ), + ) + ) + return findings + + +def probe_banned_tables(table_names: Iterable[str]) -> list[LeakageFinding]: + """Public bundles must not include conversion-conditional tables.""" + present = set(table_names) + return [ + LeakageFinding( + channel=CHANNEL_BANNED_TABLE, + detail=name, + message=( + f"public bundles must not include '{name}.parquet' " + "(it exists only for converted leads, so its presence " + "reconstructs the label)" + ), + ) + for name in BANNED_TABLES + if name in present + ] + + +def probe_deterministic_reconstruction( + tables: Mapping[str, pd.DataFrame], +) -> list[LeakageFinding]: + """Paths B / C / D from the audit must produce zero positive predictions. + + Path A is intentionally not checked here — it is fully covered by + :func:`probe_banned_columns` (Path A reads + ``leads.converted_within_90_days`` directly, which is a + :data:`BANNED_LEAD_COLUMNS` violation). + """ + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + + opportunities = tables.get( + "opportunities", + _empty_frame({"opportunity_id": "string", "lead_id": "string"}), + ) + customers = tables.get( + "customers", + _empty_frame({"customer_id": "string", "opportunity_id": "string", "account_id": "string"}), + ) + subscriptions = tables.get( + "subscriptions", + _empty_frame({"subscription_id": "string", "customer_id": "string"}), + ) + + paths = deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions) + + findings: list[LeakageFinding] = [] + for path_col, label in ( + ("path_b_opportunity_won", "B (opportunity.close_outcome == 'closed_won')"), + ("path_c_customer_exists", "C (joined customer exists)"), + ("path_d_subscription_exists", "D (joined subscription exists)"), + ): + positive = int(paths[path_col].sum()) + if positive > 0: + findings.append( + LeakageFinding( + channel=CHANNEL_DETERMINISTIC_PATH, + detail=path_col, + message=( + f"path {label} produced {positive}/{len(paths)} " + "positive predictions; a snapshot-safe public " + "bundle must produce zero" + ), + ) + ) + return findings + + +def probe_snapshot_window( + tables: Mapping[str, pd.DataFrame], snapshot_day: int +) -> list[LeakageFinding]: + """Every event-table row must satisfy ``timestamp <= lead_created_at + snapshot_day``.""" + if snapshot_day < 0: + raise ValueError(f"snapshot_day must be non-negative, got {snapshot_day}") + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + if "lead_id" not in leads.columns or "lead_created_at" not in leads.columns: + raise ValueError("leads must contain 'lead_id' and 'lead_created_at' columns") + + anchor = leads[["lead_id", "lead_created_at"]].copy() + anchor["lead_created_at"] = pd.to_datetime(anchor["lead_created_at"]) + horizon = pd.Timedelta(days=snapshot_day) + + findings: list[LeakageFinding] = [] + for name, ts_col in EVENT_TABLES: + df = tables.get(name) + if df is None or len(df) == 0 or ts_col not in df.columns: + continue + merged = df[["lead_id", ts_col]].merge(anchor, on="lead_id", how="left") + ts = pd.to_datetime(merged[ts_col]) + cutoff = merged["lead_created_at"] + horizon + violations = int((ts > cutoff).fillna(False).sum()) + if violations > 0: + findings.append( + LeakageFinding( + channel=CHANNEL_SNAPSHOT_WINDOW, + detail=f"{name}.{ts_col}", + message=( + f"{violations}/{len(df)} rows in {name}.parquet " + f"have {ts_col} > lead_created_at + {snapshot_day}d" + ), + ) + ) + return findings + + +def probe_bonus_model_auc( + tables: Mapping[str, pd.DataFrame], + *, + max_auc: float = DEFAULT_MAX_BONUS_AUC, + seed: int = 42, + label: pd.Series | None = None, +) -> list[LeakageFinding]: + """5-fold CV LR + HistGBM AUC on honest relational aggregates. + + If the public bundle has been correctly redacted, ``leads`` no longer + carries ``converted_within_90_days`` — in that case the caller must + supply the held-back ``label`` (typically read from the task split) + so we can score against ground truth. When neither is available the + probe is skipped silently (no finding, no error): there is simply no + truth to compare against. + + Skipped (no finding) if scikit-learn is unavailable. + """ + try: + from sklearn.ensemble import HistGradientBoostingClassifier + from sklearn.linear_model import LogisticRegression + from sklearn.metrics import roc_auc_score + from sklearn.model_selection import StratifiedKFold + from sklearn.pipeline import Pipeline + from sklearn.preprocessing import StandardScaler + except ImportError: + return [] + + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + + y_series = _resolve_label(leads, label) + if y_series is None: + return [] + + features = _build_relational_features(leads, tables) + if features.empty or len(features.columns) == 0: + return [] + y = y_series.reindex(features.index).astype(int) + if y.nunique(dropna=True) < 2: + return [] + + models: dict[str, Pipeline] = { + "logistic_regression": Pipeline( + [("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=1000))] + ), + "hist_gbm": Pipeline([("clf", HistGradientBoostingClassifier(random_state=seed))]), + } + skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed) + + findings: list[LeakageFinding] = [] + for name, pipe in models.items(): + aucs: list[float] = [] + for train_idx, test_idx in skf.split(features.values, y.values): + x_tr, x_te = features.values[train_idx], features.values[test_idx] + y_tr, y_te = y.values[train_idx], y.values[test_idx] + pipe.fit(x_tr, y_tr) + proba = pipe.predict_proba(x_te)[:, 1] + aucs.append(float(roc_auc_score(y_te, proba))) + auc_mean = sum(aucs) / len(aucs) + if auc_mean > max_auc: + findings.append( + LeakageFinding( + channel=CHANNEL_BONUS_MODEL, + detail=name, + message=( + f"5-fold CV AUC {auc_mean:.3f} on join-derived features " + f"exceeds max_auc={max_auc:.3f}; honest aggregates " + "carry stronger signal than the band allows" + ), + ) + ) + return findings + + +# --------------------------------------------------------------------------- +# Orchestrators +# --------------------------------------------------------------------------- + + +def run_all_probes_on_dataframes( + tables: Mapping[str, pd.DataFrame], + *, + snapshot_day: int, + max_auc: float = DEFAULT_MAX_BONUS_AUC, + label: pd.Series | None = None, +) -> LeakageReport: + """Run every probe against an in-memory tables dict.""" + findings: list[LeakageFinding] = [] + findings += probe_banned_columns(tables) + findings += probe_banned_tables(tables.keys()) + findings += probe_deterministic_reconstruction(tables) + findings += probe_snapshot_window(tables, snapshot_day=snapshot_day) + findings += probe_bonus_model_auc(tables, max_auc=max_auc, label=label) + return LeakageReport(findings=tuple(findings)) + + +def run_all_probes( + bundle_dir: Path, + *, + snapshot_day: int, + max_auc: float = DEFAULT_MAX_BONUS_AUC, + label: pd.Series | None = None, +) -> LeakageReport: + """Run every probe against ``/tables/*.parquet``. + + Args: + bundle_dir: Bundle root (must contain ``tables/leads.parquet``). + snapshot_day: Snapshot window for the timestamp probe. + max_auc: Threshold for the bonus-model probe. + label: Optional ground-truth labels to feed the bonus-model + probe when ``leads.converted_within_90_days`` has been + redacted. Not loading them automatically (e.g. from the + task splits) keeps this module independent of task layout — + PR 2.2's wiring layer is the right place for that lookup. + + Raises: + FileNotFoundError: if ``/tables/`` is missing or + ``leads.parquet`` is not present. + """ + tables_dir = bundle_dir / "tables" + if not tables_dir.is_dir(): + raise FileNotFoundError(f"missing tables/ under {bundle_dir}") + if not (tables_dir / "leads.parquet").exists(): + raise FileNotFoundError(f"missing required leads.parquet under {tables_dir}") + + tables: dict[str, pd.DataFrame] = {} + for name in (*_PUBLIC_TABLES, *BANNED_TABLES): + path = tables_dir / f"{name}.parquet" + if path.exists(): + tables[name] = pd.read_parquet(path) + return run_all_probes_on_dataframes( + tables, snapshot_day=snapshot_day, max_auc=max_auc, label=label + ) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _empty_frame(dtype_map: dict[str, str]) -> pd.DataFrame: + return pd.DataFrame({c: pd.Series(dtype=d) for c, d in dtype_map.items()}) + + +def _resolve_label( + leads: pd.DataFrame, + label: pd.Series | None, +) -> pd.Series | None: + """Pick a label series to score against, or ``None`` to skip the probe. + + When ``label`` is supplied the caller is responsible for aligning it + to ``lead_id`` (either as the index name or in a way that + ``Series.reindex(features.index)`` resolves). When it is not + supplied we read ``leads.converted_within_90_days`` directly — this + branch is exercised by tampered bundles in tests. + """ + if label is not None: + return label.astype("boolean").fillna(False).astype(int) + if "converted_within_90_days" in leads.columns: + return ( + leads.set_index("lead_id")["converted_within_90_days"] + .astype("boolean") + .fillna(False) + .astype(int) + ) + return None + + +def _build_relational_features( + leads: pd.DataFrame, + tables: Mapping[str, pd.DataFrame], +) -> pd.DataFrame: + """Per-lead aggregates from joinable public/optional relational tables. + + Honest features only — no aggregate of ``close_outcome``. Customers + and subscriptions counts are included only when the corresponding + tables exist (i.e. on a tampered bundle); on a clean public bundle + they default to 0 and become uninformative columns the model can + discard. + """ + opps = tables.get("opportunities") + customers = tables.get("customers") + subscriptions = tables.get("subscriptions") + + feats = leads[["lead_id"]].copy() + + if opps is not None and len(opps) > 0: + agg: dict[str, tuple[str, str]] = {"n_opps": ("opportunity_id", "count")} + if "estimated_acv" in opps.columns: + agg["max_acv"] = ("estimated_acv", "max") + agg["mean_acv"] = ("estimated_acv", "mean") + opp_agg = opps.groupby("lead_id").agg(**agg).reset_index() + feats = feats.merge(opp_agg, on="lead_id", how="left") + opp_to_lead = dict(zip(opps["opportunity_id"], opps["lead_id"], strict=False)) + else: + opp_to_lead = {} + + if customers is not None and len(customers) > 0: + cust = customers.copy() + cust["lead_id"] = cust["opportunity_id"].map(opp_to_lead) + cust_agg = cust.groupby("lead_id").size().rename("n_customers").reset_index() + feats = feats.merge(cust_agg, on="lead_id", how="left") + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) + else: + cust_to_opp = {} + + if subscriptions is not None and len(subscriptions) > 0: + subs = subscriptions.copy() + subs["opportunity_id"] = subs["customer_id"].map(cust_to_opp) + subs["lead_id"] = subs["opportunity_id"].map(opp_to_lead) + sub_agg = subs.groupby("lead_id").size().rename("n_subscriptions").reset_index() + feats = feats.merge(sub_agg, on="lead_id", how="left") + + fill_defaults: dict[str, float] = { + "n_opps": 0.0, + "max_acv": 0.0, + "mean_acv": 0.0, + "n_customers": 0.0, + "n_subscriptions": 0.0, + } + for col, default in fill_defaults.items(): + if col in feats.columns: + feats[col] = feats[col].fillna(default) + else: + feats[col] = default + + feature_cols = list(fill_defaults.keys()) + return feats.set_index("lead_id")[feature_cols].astype(float) + + +__all__ = [ + "CHANNEL_BANNED_COLUMN", + "CHANNEL_BANNED_TABLE", + "CHANNEL_BONUS_MODEL", + "CHANNEL_DETERMINISTIC_PATH", + "CHANNEL_SNAPSHOT_WINDOW", + "DEFAULT_MAX_BONUS_AUC", + "LeakageFinding", + "LeakageReport", + "RelationalLeakageError", + "deterministic_relational_reconstruction", + "probe_banned_columns", + "probe_banned_tables", + "probe_bonus_model_auc", + "probe_deterministic_reconstruction", + "probe_snapshot_window", + "run_all_probes", + "run_all_probes_on_dataframes", +] diff --git a/scripts/probe_relational_leakage.py b/scripts/probe_relational_leakage.py index 88abc7e..4aa08af 100644 --- a/scripts/probe_relational_leakage.py +++ b/scripts/probe_relational_leakage.py @@ -68,98 +68,18 @@ import pandas as pd +# Re-export from the canonical package location. PR 2.1 lifted this +# function into ``leadforge/validation/relational_leakage.py``; the +# script keeps it accessible at ``probe_module.deterministic_relational_reconstruction`` +# (and at the CLI level) so callers and existing tests remain stable. +from leadforge.validation.relational_leakage import ( + deterministic_relational_reconstruction, +) + REQUIRED_TABLES = ("leads", "opportunities") DEFAULT_HORIZON_DAYS = 90 -def deterministic_relational_reconstruction( - leads: pd.DataFrame, - opportunities: pd.DataFrame, - customers: pd.DataFrame, - subscriptions: pd.DataFrame, -) -> pd.DataFrame: - """Reconstruct ``converted_within_90_days`` from public relational joins. - - Returns a DataFrame indexed by ``lead_id`` with five boolean columns, - one per reconstruction path (A-E). Path E is the union of B, C, D and - is the headline relational-leakage prediction. - - No hidden state, no model fit — pure joins. Designed to be lifted into - ``leadforge/validation/leakage_probes.py`` (PR 3.1) as the relational - leakage probe. - - Empty ``customers``/``subscriptions`` frames are accepted (Phase 2 - success state); the corresponding paths simply return all-False. - - Raises: - ValueError: if ``leads.lead_id`` contains duplicates. A validator - cannot operate safely on non-unique keys. - """ - if not leads["lead_id"].is_unique: - raise ValueError("leads.lead_id must be unique") - - leads_idx = leads.set_index("lead_id", drop=False) - - # Path A — the label itself, if present in public leads. - # Plain ``astype(bool)`` would map NaN to True; route through pandas' - # nullable boolean dtype so missing values fill cleanly to False without - # triggering object-downcast warnings. - if "converted_within_90_days" in leads.columns: - path_a = leads_idx["converted_within_90_days"].astype("boolean").fillna(False).astype(bool) - else: - path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") - - # Path B — any opportunity with close_outcome == "closed_won". - if "close_outcome" in opportunities.columns and len(opportunities) > 0: - won_leads = set( - opportunities.loc[opportunities["close_outcome"] == "closed_won", "lead_id"] - ) - else: - won_leads = set() - path_b = leads_idx["lead_id"].isin(won_leads) - - # Path C — lead has any joined customer (via opportunity_id -> opportunity.lead_id). - if len(opportunities) > 0: - opp_to_lead = dict( - zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=False) - ) - else: - opp_to_lead = {} - customer_leads = { - opp_to_lead[opp_id] for opp_id in customers["opportunity_id"] if opp_id in opp_to_lead - } - path_c = leads_idx["lead_id"].isin(customer_leads) - - # Path D — lead has any joined subscription (sub -> customer -> opportunity -> lead). - if len(customers) > 0: - cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) - else: - cust_to_opp = {} - sub_leads: set[str] = set() - for cust_id in subscriptions["customer_id"]: - opp_id = cust_to_opp.get(cust_id) - if opp_id is None: - continue - lead_id = opp_to_lead.get(opp_id) - if lead_id is not None: - sub_leads.add(lead_id) - path_d = leads_idx["lead_id"].isin(sub_leads) - - # Path E — deterministic OR of B, C, D (the headline join-only path). - path_e = path_b | path_c | path_d - - return pd.DataFrame( - { - "path_a_direct_label": path_a.values, - "path_b_opportunity_won": path_b.values, - "path_c_customer_exists": path_c.values, - "path_d_subscription_exists": path_d.values, - "path_e_or_b_c_d": path_e.values, - }, - index=leads_idx.index, - ) - - def _binary_metrics(y_true: pd.Series, y_pred: pd.Series) -> dict[str, float]: """Accuracy / precision / recall / F1 / counts for boolean predictions. diff --git a/tests/render/test_relational_snapshot_safe.py b/tests/render/test_relational_snapshot_safe.py new file mode 100644 index 0000000..97ae325 --- /dev/null +++ b/tests/render/test_relational_snapshot_safe.py @@ -0,0 +1,276 @@ +"""Tests for ``leadforge/render/relational_snapshot_safe.py``.""" + +from __future__ import annotations + +import pandas as pd +import pytest + +from leadforge.render.relational_snapshot_safe import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + EVENT_TABLES, + to_dataframes_snapshot_safe, +) + +# --------------------------------------------------------------------------- +# Synthetic fixtures +# +# Two leads, anchored on the same date. Events span 0d, 5d, 12d, 35d after +# anchor — a snapshot_day=10 must keep day-0 / day-5 and drop day-12 / day-35. +# --------------------------------------------------------------------------- + +ANCHOR = pd.Timestamp("2026-01-01") + + +def _ts(offset_days: int) -> str: + return (ANCHOR + pd.Timedelta(days=offset_days)).isoformat() + + +def _full_horizon_dict() -> dict[str, pd.DataFrame]: + accounts = pd.DataFrame( + [ + {"account_id": "acct_1", "industry": "saas"}, + {"account_id": "acct_2", "industry": "saas"}, + ] + ) + contacts = pd.DataFrame( + [ + {"contact_id": "c_1", "account_id": "acct_1", "seniority": "vp"}, + {"contact_id": "c_2", "account_id": "acct_2", "seniority": "manager"}, + ] + ) + leads = pd.DataFrame( + [ + { + "lead_id": "lead_1", + "account_id": "acct_1", + "contact_id": "c_1", + "lead_created_at": ANCHOR.isoformat(), + "current_stage": "closed_won", + "is_sql": True, + "converted_within_90_days": True, + "conversion_timestamp": _ts(40), + }, + { + "lead_id": "lead_2", + "account_id": "acct_2", + "contact_id": "c_2", + "lead_created_at": ANCHOR.isoformat(), + "current_stage": "discovery", + "is_sql": False, + "converted_within_90_days": False, + "conversion_timestamp": None, + }, + ] + ) + touches = pd.DataFrame( + [ + {"touch_id": "t_1", "lead_id": "lead_1", "touch_timestamp": _ts(0)}, + {"touch_id": "t_2", "lead_id": "lead_1", "touch_timestamp": _ts(5)}, + {"touch_id": "t_3", "lead_id": "lead_1", "touch_timestamp": _ts(12)}, + {"touch_id": "t_4", "lead_id": "lead_2", "touch_timestamp": _ts(35)}, + ] + ) + sessions = pd.DataFrame( + [ + {"session_id": "s_1", "lead_id": "lead_1", "session_timestamp": _ts(0)}, + {"session_id": "s_2", "lead_id": "lead_1", "session_timestamp": _ts(20)}, + ] + ) + sales_activities = pd.DataFrame( + [ + {"activity_id": "a_1", "lead_id": "lead_1", "activity_timestamp": _ts(7)}, + {"activity_id": "a_2", "lead_id": "lead_2", "activity_timestamp": _ts(15)}, + ] + ) + opportunities = pd.DataFrame( + [ + { + "opportunity_id": "opp_1", + "lead_id": "lead_1", + "created_at": _ts(8), + "stage": "closed_won", + "estimated_acv": 50_000, + "close_outcome": "closed_won", + "closed_at": _ts(40), + }, + { + "opportunity_id": "opp_2", + "lead_id": "lead_1", + "created_at": _ts(30), + "stage": "negotiation", + "estimated_acv": 60_000, + "close_outcome": None, + "closed_at": None, + }, + ] + ) + customers = pd.DataFrame( + [ + { + "customer_id": "cu_1", + "opportunity_id": "opp_1", + "account_id": "acct_1", + "customer_start_at": _ts(40), + } + ] + ) + subscriptions = pd.DataFrame( + [ + { + "subscription_id": "sub_1", + "customer_id": "cu_1", + "plan_name": "starter", + "subscription_start_at": _ts(40), + "subscription_status": "active", + } + ] + ) + return { + "accounts": accounts, + "contacts": contacts, + "leads": leads, + "touches": touches, + "sessions": sessions, + "sales_activities": sales_activities, + "opportunities": opportunities, + "customers": customers, + "subscriptions": subscriptions, + } + + +# --------------------------------------------------------------------------- +# Property tests +# --------------------------------------------------------------------------- + + +def test_drops_banned_columns_from_leads_and_opportunities() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + for col in BANNED_LEAD_COLUMNS: + assert col not in out["leads"].columns, f"leads should not contain banned column {col}" + for col in BANNED_OPP_COLUMNS: + assert col not in out["opportunities"].columns, ( + f"opportunities should not contain banned column {col}" + ) + + +def test_omits_banned_tables_entirely() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + for name in BANNED_TABLES: + assert name not in out, f"output dict should not contain banned table {name}" + + +def test_event_tables_filtered_to_snapshot_window() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + + # touches: kept day-0 & day-5 (lead_1); dropped day-12 (lead_1) and day-35 (lead_2). + assert sorted(out["touches"]["touch_id"]) == ["t_1", "t_2"] + # sessions: kept day-0; dropped day-20. + assert sorted(out["sessions"]["session_id"]) == ["s_1"] + # sales_activities: kept day-7 (lead_1); dropped day-15 (lead_2). + assert sorted(out["sales_activities"]["activity_id"]) == ["a_1"] + # opportunities: kept day-8 (lead_1); dropped day-30 (lead_1). + assert sorted(out["opportunities"]["opportunity_id"]) == ["opp_1"] + + +def test_snapshot_window_invariant_holds_per_lead() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + leads_anchor = out["leads"].set_index("lead_id")["lead_created_at"].apply(pd.Timestamp) + horizon = pd.Timedelta(days=10) + + for name, ts_col in EVENT_TABLES: + df = out[name] + if df.empty: + continue + for _, row in df.iterrows(): + anchor = leads_anchor[row["lead_id"]] + assert pd.Timestamp(row[ts_col]) <= anchor + horizon, ( + f"{name}.{ts_col} for lead {row['lead_id']} exceeds window" + ) + + +def test_accounts_and_contacts_pass_through_unchanged() -> None: + src = _full_horizon_dict() + out = to_dataframes_snapshot_safe(src, snapshot_day=10) + pd.testing.assert_frame_equal(out["accounts"], src["accounts"]) + pd.testing.assert_frame_equal(out["contacts"], src["contacts"]) + + +def test_idempotent_on_already_safe_input() -> None: + once = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + twice = to_dataframes_snapshot_safe(once, snapshot_day=10) + assert set(once.keys()) == set(twice.keys()) + for name in once: + pd.testing.assert_frame_equal(once[name], twice[name]) + + +def test_deterministic_across_two_calls() -> None: + a = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + b = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + assert set(a.keys()) == set(b.keys()) + for name in a: + pd.testing.assert_frame_equal(a[name], b[name]) + + +def test_does_not_mutate_input_frames() -> None: + src = _full_horizon_dict() + leads_before = src["leads"].copy(deep=True) + opps_before = src["opportunities"].copy(deep=True) + touches_before = src["touches"].copy(deep=True) + + to_dataframes_snapshot_safe(src, snapshot_day=10) + + pd.testing.assert_frame_equal(src["leads"], leads_before) + pd.testing.assert_frame_equal(src["opportunities"], opps_before) + pd.testing.assert_frame_equal(src["touches"], touches_before) + + +def test_canonical_output_table_order() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + assert list(out.keys()) == [ + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", + ] + + +def test_handles_missing_optional_tables() -> None: + src = _full_horizon_dict() + minimal = {"leads": src["leads"], "opportunities": src["opportunities"]} + out = to_dataframes_snapshot_safe(minimal, snapshot_day=10) + assert "leads" in out + assert "opportunities" in out + assert "touches" not in out + assert "accounts" not in out + + +def test_zero_snapshot_day_keeps_only_anchor_day_events() -> None: + src = _full_horizon_dict() + out = to_dataframes_snapshot_safe(src, snapshot_day=0) + # Only the day-0 touches and sessions survive. + assert sorted(out["touches"]["touch_id"]) == ["t_1"] + assert sorted(out["sessions"]["session_id"]) == ["s_1"] + # No sales_activities or opportunities at day-0. + assert out["sales_activities"].empty + assert out["opportunities"].empty + + +def test_negative_snapshot_day_raises() -> None: + with pytest.raises(ValueError, match="non-negative"): + to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=-1) + + +def test_missing_leads_raises() -> None: + with pytest.raises(ValueError, match="must contain a 'leads'"): + to_dataframes_snapshot_safe({}, snapshot_day=10) + + +def test_leads_missing_anchor_columns_raises() -> None: + bad = {"leads": pd.DataFrame([{"lead_id": "lead_1"}])} + with pytest.raises(ValueError, match="missing required columns"): + to_dataframes_snapshot_safe(bad, snapshot_day=10) diff --git a/tests/validation/test_relational_leakage.py b/tests/validation/test_relational_leakage.py new file mode 100644 index 0000000..1724cba --- /dev/null +++ b/tests/validation/test_relational_leakage.py @@ -0,0 +1,451 @@ +"""Tests for ``leadforge/validation/relational_leakage.py``. + +Each probe is exercised against two synthetic minimal bundles: + +* a *clean* bundle, produced by running the same source frames through + :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe`, + on which every probe must produce zero findings; +* a *tampered* bundle, in which one leakage channel at a time is + re-introduced, on which the matching probe must fire with a finding + that pins the channel and the offending detail. +""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.render.relational_snapshot_safe import to_dataframes_snapshot_safe +from leadforge.validation.relational_leakage import ( + CHANNEL_BANNED_COLUMN, + CHANNEL_BANNED_TABLE, + CHANNEL_BONUS_MODEL, + CHANNEL_DETERMINISTIC_PATH, + LeakageFinding, + LeakageReport, + RelationalLeakageError, + deterministic_relational_reconstruction, + probe_banned_columns, + probe_banned_tables, + probe_bonus_model_auc, + probe_deterministic_reconstruction, + probe_snapshot_window, + run_all_probes, + run_all_probes_on_dataframes, +) + +ANCHOR = pd.Timestamp("2026-01-01") +SNAPSHOT_DAY = 10 + + +def _ts(offset_days: int) -> str: + return (ANCHOR + pd.Timedelta(days=offset_days)).isoformat() + + +def _full_horizon_bundle(*, n_each: int = 25) -> dict[str, pd.DataFrame]: + """A balanced bundle with 2*n_each leads — half converted, half not. + + Both converted (``lead_C_*``) and unconverted (``lead_U_*``) leads + own a single opportunity; only converted leads own a customer and a + subscription. Mirroring the realistic shape — opportunities for + everyone, conversion-conditional only customers/subscriptions — + keeps post-redaction honest aggregates (``n_opps``, ``max_acv``, + ``mean_acv``) non-discriminative, so the bonus-model probe stays + below the default 0.65 ceiling on the clean bundle. + """ + leads_rows: list[dict] = [] + opps_rows: list[dict] = [] + cust_rows: list[dict] = [] + sub_rows: list[dict] = [] + touches_rows: list[dict] = [] + sessions_rows: list[dict] = [] + activities_rows: list[dict] = [] + + for i in range(n_each): + cid = f"lead_C_{i:03d}" + ucid = f"lead_U_{i:03d}" + leads_rows.append( + { + "lead_id": cid, + "account_id": f"acct_C_{i:03d}", + "contact_id": f"con_C_{i:03d}", + "lead_created_at": ANCHOR.isoformat(), + "current_stage": "closed_won", + "is_sql": True, + "converted_within_90_days": True, + "conversion_timestamp": _ts(40), + } + ) + leads_rows.append( + { + "lead_id": ucid, + "account_id": f"acct_U_{i:03d}", + "contact_id": f"con_U_{i:03d}", + "lead_created_at": ANCHOR.isoformat(), + "current_stage": "discovery", + "is_sql": False, + "converted_within_90_days": False, + "conversion_timestamp": None, + } + ) + # ACV drawn from the same distribution for both classes so the + # post-redaction model has no signal beyond label noise — without + # this the bonus-model probe trips even on a snapshot-safe bundle. + acv = 30_000 + (i % 10) * 2_000 + opps_rows.append( + { + "opportunity_id": f"opp_C_{i:03d}", + "lead_id": cid, + "created_at": _ts(5), + "stage": "closed_won", + "estimated_acv": acv, + "close_outcome": "closed_won", + "closed_at": _ts(40), + } + ) + opps_rows.append( + { + "opportunity_id": f"opp_U_{i:03d}", + "lead_id": ucid, + "created_at": _ts(6), + "stage": "negotiation", + "estimated_acv": acv, + "close_outcome": None, + "closed_at": None, + } + ) + cust_rows.append( + { + "customer_id": f"cu_{i:03d}", + "opportunity_id": f"opp_C_{i:03d}", + "account_id": f"acct_C_{i:03d}", + "customer_start_at": _ts(40), + } + ) + sub_rows.append( + { + "subscription_id": f"sub_{i:03d}", + "customer_id": f"cu_{i:03d}", + "plan_name": "starter", + "subscription_start_at": _ts(40), + "subscription_status": "active", + } + ) + touches_rows.append({"touch_id": f"t_C_{i:03d}", "lead_id": cid, "touch_timestamp": _ts(2)}) + sessions_rows.append( + {"session_id": f"s_C_{i:03d}", "lead_id": cid, "session_timestamp": _ts(3)} + ) + activities_rows.append( + {"activity_id": f"a_C_{i:03d}", "lead_id": cid, "activity_timestamp": _ts(7)} + ) + touches_rows.append( + {"touch_id": f"t_U_{i:03d}", "lead_id": ucid, "touch_timestamp": _ts(2)} + ) + + return { + "accounts": pd.DataFrame( + [{"account_id": r["account_id"]} for r in leads_rows] + ).drop_duplicates(), + "contacts": pd.DataFrame( + [{"contact_id": r["contact_id"], "account_id": r["account_id"]} for r in leads_rows] + ).drop_duplicates(), + "leads": pd.DataFrame(leads_rows), + "touches": pd.DataFrame(touches_rows), + "sessions": pd.DataFrame(sessions_rows), + "sales_activities": pd.DataFrame(activities_rows), + "opportunities": pd.DataFrame(opps_rows), + "customers": pd.DataFrame(cust_rows), + "subscriptions": pd.DataFrame(sub_rows), + } + + +def _clean_bundle() -> dict[str, pd.DataFrame]: + """Snapshot-safe bundle — every probe should report zero findings.""" + return to_dataframes_snapshot_safe(_full_horizon_bundle(), snapshot_day=SNAPSHOT_DAY) + + +def _label_for(bundle: dict[str, pd.DataFrame]) -> pd.Series: + """Held-back ground truth for the bonus-model probe on a clean bundle.""" + src = _full_horizon_bundle() + return src["leads"].set_index("lead_id")["converted_within_90_days"] + + +# --------------------------------------------------------------------------- +# Clean bundle — every probe must be silent. +# --------------------------------------------------------------------------- + + +def test_clean_bundle_passes_all_probes() -> None: + tables = _clean_bundle() + report = run_all_probes_on_dataframes( + tables, snapshot_day=SNAPSHOT_DAY, label=_label_for(tables) + ) + assert report.ok, [f"[{f.channel}] {f.detail}: {f.message}" for f in report.findings] + + +def test_clean_bundle_individual_probes_silent() -> None: + tables = _clean_bundle() + assert probe_banned_columns(tables) == [] + assert probe_banned_tables(tables.keys()) == [] + assert probe_deterministic_reconstruction(tables) == [] + assert probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) == [] + + +# --------------------------------------------------------------------------- +# Banned columns (Path A). +# --------------------------------------------------------------------------- + + +def test_banned_column_in_leads_fires() -> None: + tables = _clean_bundle() + tables["leads"] = tables["leads"].assign(converted_within_90_days=[True] * len(tables["leads"])) + findings = probe_banned_columns(tables) + assert any( + f.channel == CHANNEL_BANNED_COLUMN and f.detail == "leads.converted_within_90_days" + for f in findings + ) + + +def test_banned_column_in_opportunities_fires() -> None: + tables = _clean_bundle() + tables["opportunities"] = tables["opportunities"].assign( + close_outcome=["closed_won"] * len(tables["opportunities"]) + ) + findings = probe_banned_columns(tables) + assert any( + f.channel == CHANNEL_BANNED_COLUMN and f.detail == "opportunities.close_outcome" + for f in findings + ) + + +# --------------------------------------------------------------------------- +# Banned tables. +# --------------------------------------------------------------------------- + + +def test_banned_table_customers_present_fires() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + findings = probe_banned_tables(tables.keys()) + assert any(f.channel == CHANNEL_BANNED_TABLE and f.detail == "customers" for f in findings) + + +def test_banned_table_subscriptions_present_fires() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["subscriptions"] = src["subscriptions"] + findings = probe_banned_tables(tables.keys()) + assert any(f.channel == CHANNEL_BANNED_TABLE and f.detail == "subscriptions" for f in findings) + + +# --------------------------------------------------------------------------- +# Deterministic paths B / C / D. +# --------------------------------------------------------------------------- + + +def test_deterministic_path_b_fires_when_close_outcome_present() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["opportunities"] = src["opportunities"] # restores close_outcome + findings = probe_deterministic_reconstruction(tables) + assert any(f.detail == "path_b_opportunity_won" for f in findings) + + +def test_deterministic_path_c_fires_when_customers_present() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + findings = probe_deterministic_reconstruction(tables) + assert any(f.detail == "path_c_customer_exists" for f in findings) + + +def test_deterministic_path_d_fires_when_subscriptions_present() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + tables["subscriptions"] = src["subscriptions"] + findings = probe_deterministic_reconstruction(tables) + assert any(f.detail == "path_d_subscription_exists" for f in findings) + + +def test_deterministic_probe_does_not_flag_path_a() -> None: + """Path A is covered by probe_banned_columns; the deterministic probe + must remain silent on it to avoid double-counting the same finding.""" + tables = _clean_bundle() + tables["leads"] = tables["leads"].assign(converted_within_90_days=[True] * len(tables["leads"])) + findings = probe_deterministic_reconstruction(tables) + assert all(f.detail != "path_a_direct_label" for f in findings) + + +# --------------------------------------------------------------------------- +# Snapshot window. +# --------------------------------------------------------------------------- + + +def test_snapshot_window_fires_on_late_event() -> None: + tables = _clean_bundle() + leaked = pd.DataFrame( + [ + { + "touch_id": "t_late", + "lead_id": tables["leads"]["lead_id"].iloc[0], + "touch_timestamp": _ts(SNAPSHOT_DAY + 5), + } + ] + ) + tables["touches"] = pd.concat([tables["touches"], leaked], ignore_index=True) + findings = probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) + assert any(f.detail == "touches.touch_timestamp" for f in findings) + + +def test_snapshot_window_silent_on_clean_bundle() -> None: + tables = _clean_bundle() + assert probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) == [] + + +def test_snapshot_window_negative_day_raises() -> None: + tables = _clean_bundle() + with pytest.raises(ValueError, match="non-negative"): + probe_snapshot_window(tables, snapshot_day=-1) + + +# --------------------------------------------------------------------------- +# Bonus-model probe. +# --------------------------------------------------------------------------- + + +def test_bonus_model_probe_fires_when_customers_reintroduced() -> None: + """Re-adding customers to a clean bundle gives the model n_customers as + a perfect predictor — AUC should saturate near 1.0 and exceed any sane + band (we use a deliberately tight 0.95 to guard against flakiness on + small synthetic data).""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + tables["subscriptions"] = src["subscriptions"] + + label = _label_for(tables) + findings = probe_bonus_model_auc(tables, max_auc=0.95, label=label) + assert findings, "expected the bonus model to detect the customers leak" + assert all(f.channel == CHANNEL_BONUS_MODEL for f in findings) + + +def test_bonus_model_probe_skipped_without_label() -> None: + """If `leads.converted_within_90_days` is redacted (clean bundle) and + no `label` is supplied, the probe has nothing to score against and + must skip cleanly with zero findings, not error.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + assert probe_bonus_model_auc(tables, max_auc=0.65, label=None) == [] + + +# --------------------------------------------------------------------------- +# Orchestrator + report ergonomics. +# --------------------------------------------------------------------------- + + +def test_orchestrator_aggregates_findings_across_channels() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + # Re-introduce two channels at once: the label column AND customers. + tables["leads"] = tables["leads"].assign(converted_within_90_days=[True] * len(tables["leads"])) + tables["customers"] = src["customers"] + + report = run_all_probes_on_dataframes(tables, snapshot_day=SNAPSHOT_DAY) + assert not report.ok + channels = {f.channel for f in report.findings} + assert CHANNEL_BANNED_COLUMN in channels + assert CHANNEL_BANNED_TABLE in channels + assert CHANNEL_DETERMINISTIC_PATH in channels + + +def test_report_raise_if_failing_carries_report() -> None: + tables = _clean_bundle() + tables["leads"] = tables["leads"].assign(converted_within_90_days=[True] * len(tables["leads"])) + report = run_all_probes_on_dataframes(tables, snapshot_day=SNAPSHOT_DAY) + with pytest.raises(RelationalLeakageError) as excinfo: + report.raise_if_failing() + assert excinfo.value.report is report + assert "banned_column" in str(excinfo.value) + + +def test_report_ok_property() -> None: + assert LeakageReport(findings=()).ok is True + f = LeakageFinding(channel=CHANNEL_BANNED_COLUMN, detail="leads.x", message="...") + assert LeakageReport(findings=(f,)).ok is False + + +# --------------------------------------------------------------------------- +# File-based orchestrator. +# --------------------------------------------------------------------------- + + +def _write_bundle_to_disk(tables: dict[str, pd.DataFrame], root: Path) -> Path: + tables_dir = root / "tables" + tables_dir.mkdir(parents=True, exist_ok=True) + for name, df in tables.items(): + df.to_parquet(tables_dir / f"{name}.parquet", index=False) + return root + + +def test_run_all_probes_reads_bundle_from_disk(tmp_path: Path) -> None: + bundle_dir = _write_bundle_to_disk(_clean_bundle(), tmp_path / "clean") + report = run_all_probes( + bundle_dir, snapshot_day=SNAPSHOT_DAY, label=_label_for(_clean_bundle()) + ) + assert report.ok, [f"[{f.channel}] {f.detail}: {f.message}" for f in report.findings] + + +def test_run_all_probes_detects_disk_bundle_leakage(tmp_path: Path) -> None: + src = _full_horizon_bundle() + bundle_dir = _write_bundle_to_disk(src, tmp_path / "leaky") + report = run_all_probes(bundle_dir, snapshot_day=SNAPSHOT_DAY) + assert not report.ok + channels = {f.channel for f in report.findings} + assert CHANNEL_BANNED_COLUMN in channels + assert CHANNEL_BANNED_TABLE in channels + assert CHANNEL_DETERMINISTIC_PATH in channels + + +def test_run_all_probes_missing_tables_dir_raises(tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError, match="missing tables/"): + run_all_probes(tmp_path, snapshot_day=SNAPSHOT_DAY) + + +def test_run_all_probes_missing_leads_raises(tmp_path: Path) -> None: + (tmp_path / "tables").mkdir() + with pytest.raises(FileNotFoundError, match="leads.parquet"): + run_all_probes(tmp_path, snapshot_day=SNAPSHOT_DAY) + + +# --------------------------------------------------------------------------- +# Lifted function — sanity check that the package and the script agree. +# --------------------------------------------------------------------------- + + +def test_deterministic_function_matches_script_export() -> None: + """The script re-exports the lifted function; calling each must yield + identical output on the same inputs.""" + import importlib.util + import sys + + script_path = Path(__file__).resolve().parents[2] / "scripts" / "probe_relational_leakage.py" + spec = importlib.util.spec_from_file_location("probe_relational_leakage_check", script_path) + assert spec is not None + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules["probe_relational_leakage_check"] = module + spec.loader.exec_module(module) + + src = _full_horizon_bundle() + a = deterministic_relational_reconstruction( + src["leads"], src["opportunities"], src["customers"], src["subscriptions"] + ) + b = module.deterministic_relational_reconstruction( + src["leads"], src["opportunities"], src["customers"], src["subscriptions"] + ) + pd.testing.assert_frame_equal(a, b) From 2922eb88b5f4cc72008155fee28b75b6eeca4d2c Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 20:41:58 +0300 Subject: [PATCH 2/5] fix(render,validation): assert lead_id uniqueness before per-lead merge Self-review followup to PR 2.1. Both ``_build_anchor`` (in ``relational_snapshot_safe``) and ``probe_snapshot_window`` (in ``relational_leakage``) build a per-lead anchor by selecting ``leads[['lead_id','lead_created_at']]`` and merging it back onto each event table. If ``leads.lead_id`` were not unique, the left-merge would broadcast and silently inflate event-table row counts (export) or produce a mask whose length doesn't match the event frame (probe). ``deterministic_relational_reconstruction`` already asserts the same invariant; align both new modules with it so a duplicate-key bug surfaces with a clear ValueError instead of a quiet wrong answer. Adds two regression tests (one per module) that pass duplicated leads and assert the matching ValueError. Tests now total 1003 passing; ruff + mypy still clean. Co-Authored-By: Claude Opus 4.7 --- leadforge/render/relational_snapshot_safe.py | 5 +++++ leadforge/validation/relational_leakage.py | 2 ++ tests/render/test_relational_snapshot_safe.py | 9 +++++++++ tests/validation/test_relational_leakage.py | 9 +++++++++ 4 files changed, 25 insertions(+) diff --git a/leadforge/render/relational_snapshot_safe.py b/leadforge/render/relational_snapshot_safe.py index c7ca4f6..1c10c8e 100644 --- a/leadforge/render/relational_snapshot_safe.py +++ b/leadforge/render/relational_snapshot_safe.py @@ -131,6 +131,11 @@ def _build_anchor(leads: pd.DataFrame) -> pd.DataFrame: missing = [c for c in ("lead_id", "lead_created_at") if c not in leads.columns] if missing: raise ValueError(f"leads is missing required columns: {missing}") + # Duplicate lead_ids would broadcast in the per-lead merge below and + # silently inflate event-table row counts. Match the same invariant + # asserted by ``deterministic_relational_reconstruction``. + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") anchor = leads[["lead_id", "lead_created_at"]].rename(columns={"lead_created_at": _ANCHOR_COL}) anchor[_ANCHOR_COL] = pd.to_datetime(anchor[_ANCHOR_COL]) return anchor diff --git a/leadforge/validation/relational_leakage.py b/leadforge/validation/relational_leakage.py index 303736b..f8ba666 100644 --- a/leadforge/validation/relational_leakage.py +++ b/leadforge/validation/relational_leakage.py @@ -332,6 +332,8 @@ def probe_snapshot_window( return [] if "lead_id" not in leads.columns or "lead_created_at" not in leads.columns: raise ValueError("leads must contain 'lead_id' and 'lead_created_at' columns") + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") anchor = leads[["lead_id", "lead_created_at"]].copy() anchor["lead_created_at"] = pd.to_datetime(anchor["lead_created_at"]) diff --git a/tests/render/test_relational_snapshot_safe.py b/tests/render/test_relational_snapshot_safe.py index 97ae325..ec9ad82 100644 --- a/tests/render/test_relational_snapshot_safe.py +++ b/tests/render/test_relational_snapshot_safe.py @@ -274,3 +274,12 @@ def test_leads_missing_anchor_columns_raises() -> None: bad = {"leads": pd.DataFrame([{"lead_id": "lead_1"}])} with pytest.raises(ValueError, match="missing required columns"): to_dataframes_snapshot_safe(bad, snapshot_day=10) + + +def test_duplicate_lead_id_raises() -> None: + """Per-lead snapshot filter would broadcast on duplicates; matches the + same invariant asserted by ``deterministic_relational_reconstruction``.""" + src = _full_horizon_dict() + src["leads"] = pd.concat([src["leads"], src["leads"].iloc[[0]]], ignore_index=True) + with pytest.raises(ValueError, match="lead_id must be unique"): + to_dataframes_snapshot_safe(src, snapshot_day=10) diff --git a/tests/validation/test_relational_leakage.py b/tests/validation/test_relational_leakage.py index 1724cba..a027896 100644 --- a/tests/validation/test_relational_leakage.py +++ b/tests/validation/test_relational_leakage.py @@ -312,6 +312,15 @@ def test_snapshot_window_negative_day_raises() -> None: probe_snapshot_window(tables, snapshot_day=-1) +def test_snapshot_window_duplicate_lead_id_raises() -> None: + """Duplicate lead_ids would broadcast in the merge; matches the + same invariant asserted by deterministic_relational_reconstruction.""" + tables = _clean_bundle() + tables["leads"] = pd.concat([tables["leads"], tables["leads"].iloc[[0]]], ignore_index=True) + with pytest.raises(ValueError, match="lead_id must be unique"): + probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) + + # --------------------------------------------------------------------------- # Bonus-model probe. # --------------------------------------------------------------------------- From df15a243761f094d3ab44a579379a1edfc70d2e7 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 20:56:36 +0300 Subject: [PATCH 3/5] refactor(render,validation): address PR 2.1 self-review (P0+P1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review-driven cleanup of the structural/leakage modules from 9ebf571 / 2922eb8. Behaviour-preserving for the structural probes; the bonus-model probe goes from default-on to opt-in, which is the substantive correctness change. P0 — substantive fixes * Bonus-model probe is now opt-in. Orchestrators (``run_all_probes`` / ``run_all_probes_on_dataframes``) gain ``bonus_model_max_auc: float | None = None`` and skip the probe unless that threshold is set. The probe function itself now requires ``max_auc`` as a keyword (no ``DEFAULT_MAX_BONUS_AUC``). Rationale: PR 3.3 owns the calibrated per-tier band; PR 2.1 must not default-on an uncalibrated 0.65 ceiling that would fail every alpha bundle when PR 2.2 wires it into validate_bundle. * ``_resolve_label`` rejects misaligned labels with a clear ``ValueError`` instead of letting them silently no-op via the binary-cardinality skip — a leakage validator that hides bugs by design defeats its own purpose. * ``RelationalLeakageError`` now inherits from ``LeadforgeError`` instead of ``AssertionError`` (which gets stripped under ``python -O`` and is the wrong base class for a domain error). P1 — architecture / naming * Contract constants (``BANNED_LEAD_COLUMNS`` / ``BANNED_OPP_COLUMNS`` / ``BANNED_TABLES`` / ``SNAPSHOT_FILTERED_TABLES``) move from the render module to the validator. The validator owns the definition of "leakage"; the writer enforces it and imports the contract. This flips one render→validation import direction we had backwards. * ``EVENT_TABLES`` → ``SNAPSHOT_FILTERED_TABLES``. The original name implied "all event tables", but ``opportunities`` is an entity table that we per-lead-filter by ``created_at``; the new name says what the constant is for. * ``CHANNEL_DETERMINISTIC_PATH`` → ``CHANNEL_JOIN_RECONSTRUCTION``. Path A is a deterministic reconstruction too — it is just a column read. The probe deliberately scopes itself to join-graph paths (B/C/D) and delegates Path A to ``probe_banned_columns``; the channel constant now matches. Tests * Two new tests cover the new behaviours: - ``test_bonus_model_probe_rejects_misaligned_label`` — the ``_resolve_label`` guard fires loudly on a non-lead_id index. - ``test_orchestrator_skips_bonus_probe_by_default`` / ``test_orchestrator_runs_bonus_probe_when_enabled`` — opt-in behaviour at the orchestrator level. * Existing clean-bundle tests no longer pass ``label`` to the structural orchestrator (it is unused without ``bonus_model_max_auc``); the bonus-enabled test path is exercised separately. 1007 tests pass (was 1003); ruff + mypy clean across leadforge/. Co-Authored-By: Claude Opus 4.7 --- .agent-plan.md | 2 +- leadforge/render/relational_snapshot_safe.py | 62 ++--- leadforge/validation/relational_leakage.py | 233 +++++++++++------- tests/render/test_relational_snapshot_safe.py | 4 +- tests/validation/test_relational_leakage.py | 88 +++++-- 5 files changed, 246 insertions(+), 143 deletions(-) diff --git a/.agent-plan.md b/.agent-plan.md index 8945010..c568ed1 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -26,7 +26,7 @@ Goal: ship a best-in-class educational synthetic CRM lead-scoring dataset family ### Phase 2 — Snapshot-safe relational export - [x] `leadforge/render/relational_snapshot_safe.py` (new) — PR 2.1: `to_dataframes_snapshot_safe(dfs, *, snapshot_day)` projects the full-horizon dict from `to_dataframes` onto the public-bundle shape (drops `BANNED_LEAD_COLUMNS` from leads, `BANNED_OPP_COLUMNS` from opportunities, filters event tables per-lead by `lead_created_at + snapshot_day`, omits `customers`/`subscriptions`, passes accounts/contacts unchanged). -- [x] `leadforge/validation/relational_leakage.py` (new) — PR 2.1: `LeakageFinding`/`LeakageReport` dataclasses + five probes (`probe_banned_columns`, `probe_banned_tables`, `probe_deterministic_reconstruction`, `probe_snapshot_window`, `probe_bonus_model_auc`) + file-based and in-memory orchestrators (`run_all_probes`, `run_all_probes_on_dataframes`). `deterministic_relational_reconstruction` lifted from `scripts/probe_relational_leakage.py`; the script now re-exports it from the package. Bonus-model band defaults to 0.65 with a `# TODO(PR 3.3)` comment for per-tier calibration. +- [x] `leadforge/validation/relational_leakage.py` (new) — PR 2.1: owns the snapshot-safe contract constants (`BANNED_LEAD_COLUMNS`, `BANNED_OPP_COLUMNS`, `BANNED_TABLES`, `SNAPSHOT_FILTERED_TABLES`); ships `LeakageFinding`/`LeakageReport`/`RelationalLeakageError(LeadforgeError)` plus five probes (`probe_banned_columns`, `probe_banned_tables`, `probe_deterministic_reconstruction`, `probe_snapshot_window`, opt-in `probe_bonus_model_auc`) and two orchestrators (`run_all_probes`, `run_all_probes_on_dataframes`). The bonus-model probe is opt-in: orchestrators skip it unless the caller passes `bonus_model_max_auc=...` (PR 3.3 will calibrate per-tier bands). `deterministic_relational_reconstruction` lifted from `scripts/probe_relational_leakage.py`; the script now re-exports it from the package. - [ ] `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe` — PR 2.2. - [ ] Wire `relational_snapshot_safe` through `leadforge/exposure/filters.py` and `leadforge/api/bundle.py`; plumb the leakage validator into `leadforge/validation/bundle_checks.py` — PR 2.2. - [ ] Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`; drop `close_outcome` / `closed_at` from public `opportunities`; omit `customers` / `subscriptions` from public bundles — PR 2.2 (the structural rules are already enforced module-side; PR 2.2 turns them on for actual writes). diff --git a/leadforge/render/relational_snapshot_safe.py b/leadforge/render/relational_snapshot_safe.py index 1c10c8e..b4e2573 100644 --- a/leadforge/render/relational_snapshot_safe.py +++ b/leadforge/render/relational_snapshot_safe.py @@ -18,17 +18,14 @@ * ``customers`` / ``subscriptions`` (:data:`BANNED_TABLES`): omitted entirely from the output dict; they exist only for converted leads, so their presence is the leak. -* ``accounts`` / ``contacts``: passed through unchanged - (firmographic/personographic, time-invariant). - -PR 2.2 will wire this through :mod:`leadforge.api.bundle` and -:mod:`leadforge.exposure.filters` so ``student_public`` writes go -through this function while ``research_instructor`` continues to use -the full-horizon :func:`leadforge.render.relational.to_dataframes`. - -The constants below are re-used by -:mod:`leadforge.validation.relational_leakage` so the writer and the -validator share one source of truth for what "snapshot-safe" means. +* ``accounts`` / ``contacts``: passed through unchanged (firmographic + / personographic, time-invariant). + +The ``research_instructor`` mode keeps using +:func:`leadforge.render.relational.to_dataframes` for the full-horizon +export. The contract constants live in +:mod:`leadforge.validation.relational_leakage` (validator owns the +definition of "leakage"); this module re-exports them for ergonomics. """ from __future__ import annotations @@ -37,31 +34,20 @@ import pandas as pd -#: Columns dropped from public ``leads.parquet``. -BANNED_LEAD_COLUMNS: tuple[str, ...] = ( - "converted_within_90_days", - "conversion_timestamp", +from leadforge.validation.relational_leakage import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + SNAPSHOT_FILTERED_TABLES, ) -#: Columns dropped from public ``opportunities.parquet``. -BANNED_OPP_COLUMNS: tuple[str, ...] = ( - "close_outcome", - "closed_at", -) - -#: Tables omitted from public bundles entirely. -BANNED_TABLES: tuple[str, ...] = ("customers", "subscriptions") - -#: Event tables and the timestamp column used for per-lead snapshot -#: filtering. ``opportunities`` is treated as an event table here -#: (filtered by ``created_at``) because its existence is itself an -#: event in the funnel timeline. -EVENT_TABLES: tuple[tuple[str, str], ...] = ( - ("touches", "touch_timestamp"), - ("sessions", "session_timestamp"), - ("sales_activities", "activity_timestamp"), - ("opportunities", "created_at"), -) +__all__ = [ + "BANNED_LEAD_COLUMNS", + "BANNED_OPP_COLUMNS", + "BANNED_TABLES", + "SNAPSHOT_FILTERED_TABLES", + "to_dataframes_snapshot_safe", +] _ANCHOR_COL = "_lead_anchor_ts" @@ -89,9 +75,9 @@ def to_dataframes_snapshot_safe( ``subscriptions`` are absent. Raises: - ValueError: if ``snapshot_day`` is negative, or if ``leads`` is - absent from ``dfs``, or if ``leads`` lacks ``lead_id`` / - ``lead_created_at``. + ValueError: if ``snapshot_day`` is negative, ``leads`` is + absent, ``leads`` lacks the anchor columns, or + ``leads.lead_id`` is not unique. """ if snapshot_day < 0: raise ValueError(f"snapshot_day must be non-negative, got {snapshot_day}") @@ -109,7 +95,7 @@ def to_dataframes_snapshot_safe( anchor = _build_anchor(leads) horizon = pd.Timedelta(days=snapshot_day) - for name, ts_col in EVENT_TABLES: + for name, ts_col in SNAPSHOT_FILTERED_TABLES: if name not in dfs: continue df = dfs[name] diff --git a/leadforge/validation/relational_leakage.py b/leadforge/validation/relational_leakage.py index f8ba666..eac2071 100644 --- a/leadforge/validation/relational_leakage.py +++ b/leadforge/validation/relational_leakage.py @@ -2,39 +2,43 @@ The audit in ``docs/release/v1_current_state_audit.md`` enumerates four deterministic paths (A-E) by which alpha public bundles reconstruct the -target via joins. The structural fix lives in -:mod:`leadforge.render.relational_snapshot_safe`; this module is the -matching validator that asserts the fix is in place on any bundle -claiming to be ``student_public``. +target via joins. This module is the validator that asserts the +snapshot-safe contract — encoded in :data:`BANNED_LEAD_COLUMNS`, +:data:`BANNED_OPP_COLUMNS`, :data:`BANNED_TABLES`, and +:data:`SNAPSHOT_FILTERED_TABLES` — is in place on any bundle claiming to +be ``student_public``. The matching writer-side enforcement lives in +:mod:`leadforge.render.relational_snapshot_safe` and imports the same +constants from this module. -Five probes, each producing zero or more :class:`LeakageFinding` -instances: +Five probes, each producing zero or more :class:`LeakageFinding`: * :func:`probe_banned_columns` — public ``leads`` and ``opportunities`` - tables must not contain :data:`~leadforge.render.relational_snapshot_safe.BANNED_LEAD_COLUMNS` - or :data:`~leadforge.render.relational_snapshot_safe.BANNED_OPP_COLUMNS` - respectively. -* :func:`probe_banned_tables` — public bundles must not include the - conversion-conditional tables ``customers`` or ``subscriptions``. + must not contain :data:`BANNED_LEAD_COLUMNS` or + :data:`BANNED_OPP_COLUMNS` respectively. +* :func:`probe_banned_tables` — public bundles must not include + :data:`BANNED_TABLES`. * :func:`probe_deterministic_reconstruction` — paths B / C / D from the - audit must produce zero positive predictions. + audit must produce zero positive predictions. **Path A is not + checked here** — it is the column-presence violation already covered + by :func:`probe_banned_columns`. * :func:`probe_snapshot_window` — every event-table row must satisfy ``timestamp <= lead_created_at + snapshot_day``. -* :func:`probe_bonus_model_auc` — optional honest-feature baseline: - trains LR + HistGBM on the legitimate aggregates ``n_opps`` / ``max_acv`` - / ``mean_acv`` (plus ``n_customers`` / ``n_subscriptions`` if present) - and asserts CV AUC stays below ``max_auc``. - -:func:`run_all_probes` is the file-based orchestrator that PR 2.2 will -call from :func:`leadforge.validation.bundle_checks.validate_bundle`. +* :func:`probe_bonus_model_auc` — *opt-in* honest-feature baseline: + trains LR + HistGBM on the legitimate aggregates ``n_opps`` / + ``max_acv`` / ``mean_acv`` (plus ``n_customers`` / + ``n_subscriptions`` if present) and asserts CV AUC stays below an + explicit ``max_auc``. The orchestrators skip this probe unless the + caller passes ``bonus_model_max_auc=...``. + +:func:`run_all_probes` is the file-based orchestrator (designed to be +called from :func:`leadforge.validation.bundle_checks.validate_bundle`). :func:`run_all_probes_on_dataframes` is the same orchestrator without -the disk read, so unit tests can exercise the probes against synthetic -bundles built in-memory. +the disk read, for unit tests against in-memory bundles. The :func:`deterministic_relational_reconstruction` function is the single source of truth for the join graph that defines paths A-E. The companion script ``scripts/probe_relational_leakage.py`` re-exports it -unchanged so the alpha-bundle audit and the validator agree by +from here so the alpha-bundle audit and the validator agree by construction. """ @@ -47,30 +51,48 @@ import pandas as pd -from leadforge.render.relational_snapshot_safe import ( - BANNED_LEAD_COLUMNS, - BANNED_OPP_COLUMNS, - BANNED_TABLES, - EVENT_TABLES, +from leadforge.core.exceptions import LeadforgeError + +# --------------------------------------------------------------------------- +# Snapshot-safe contract — the single source of truth for "what is leakage". +# leadforge.render.relational_snapshot_safe imports these so the writer +# and the validator share one definition. +# --------------------------------------------------------------------------- + +#: Columns dropped from public ``leads.parquet``. +BANNED_LEAD_COLUMNS: Final[tuple[str, ...]] = ( + "converted_within_90_days", + "conversion_timestamp", +) + +#: Columns dropped from public ``opportunities.parquet``. +BANNED_OPP_COLUMNS: Final[tuple[str, ...]] = ( + "close_outcome", + "closed_at", +) + +#: Tables omitted from public bundles entirely. +BANNED_TABLES: Final[tuple[str, ...]] = ("customers", "subscriptions") + +#: Tables filtered per-lead by their timestamp column to +#: ``lead_created_at + snapshot_day``. ``opportunities`` is included +#: even though it is an entity table, because its ``created_at`` +#: anchors when the entity becomes observable in the funnel. +SNAPSHOT_FILTERED_TABLES: Final[tuple[tuple[str, str], ...]] = ( + ("touches", "touch_timestamp"), + ("sessions", "session_timestamp"), + ("sales_activities", "activity_timestamp"), + ("opportunities", "created_at"), ) #: Channel labels carried on :class:`LeakageFinding.channel`. Constants -#: rather than an enum because findings serialise straight to JSON in -#: PR 3.2's reporting layer. +#: rather than an enum because findings serialise straight to JSON. CHANNEL_BANNED_COLUMN: Final[str] = "banned_column" CHANNEL_BANNED_TABLE: Final[str] = "banned_table" -CHANNEL_DETERMINISTIC_PATH: Final[str] = "deterministic_path" +CHANNEL_JOIN_RECONSTRUCTION: Final[str] = "join_reconstruction" CHANNEL_SNAPSHOT_WINDOW: Final[str] = "snapshot_window" CHANNEL_BONUS_MODEL: Final[str] = "bonus_model" -#: Default ceiling for the bonus-model AUC probe. Honest aggregates -#: (``n_opps`` / ACV) on the v0.1.0-alpha intermediate tier produce a -#: legitimate signal in the high-0.5s under the post-fix shape, so 0.65 -#: is a conservative placeholder until PR 3.3 calibrates a per-tier -#: band against measured baselines. -DEFAULT_MAX_BONUS_AUC: Final[float] = 0.65 -# TODO(PR 3.3): tighten this band against a measured honest-feature baseline. - _PUBLIC_TABLES: Final[tuple[str, ...]] = ( "accounts", "contacts", @@ -107,31 +129,27 @@ def raise_if_failing(self) -> None: raise RelationalLeakageError(self) -class RelationalLeakageError(AssertionError): +class RelationalLeakageError(LeadforgeError): """Raised by :meth:`LeakageReport.raise_if_failing` on any finding. Carries the originating :class:`LeakageReport` on ``self.report`` so callers (e.g. ``leadforge validate``) can render the full set of - findings in their output rather than just the first one. + findings rather than just the first one. """ def __init__(self, report: LeakageReport) -> None: self.report = report - first_lines = "\n".join( - f" - [{f.channel}] {f.detail}: {f.message}" for f in report.findings - ) + rendered = "\n".join(f" - [{f.channel}] {f.detail}: {f.message}" for f in report.findings) super().__init__( f"public bundle leaks `converted_within_90_days` " - f"({len(report.findings)} finding(s)):\n{first_lines}" + f"({len(report.findings)} finding(s)):\n{rendered}" ) # --------------------------------------------------------------------------- # Deterministic reconstruction — the join graph that defines paths A-E. -# -# Lifted verbatim from ``scripts/probe_relational_leakage.py`` (PR 1.1) so -# the package and the script share one implementation. The script now -# re-exports this function from here. +# Lifted from the PR 1.1 audit script; the script now re-exports this +# function from here so there is one implementation. # --------------------------------------------------------------------------- @@ -228,7 +246,13 @@ def deterministic_relational_reconstruction( def probe_banned_columns(tables: Mapping[str, pd.DataFrame]) -> list[LeakageFinding]: - """Public ``leads``/``opportunities`` must not carry banned columns.""" + """Public ``leads``/``opportunities`` must not carry banned columns. + + Detects Path A (label column directly readable from ``leads``) and + the ``opportunities.close_outcome`` / ``closed_at`` channels — i.e. + leakage that any caller can spot by listing column names, no joins + required. + """ findings: list[LeakageFinding] = [] for table_name, banned in ( ("leads", BANNED_LEAD_COLUMNS), @@ -273,12 +297,20 @@ def probe_banned_tables(table_names: Iterable[str]) -> list[LeakageFinding]: def probe_deterministic_reconstruction( tables: Mapping[str, pd.DataFrame], ) -> list[LeakageFinding]: - """Paths B / C / D from the audit must produce zero positive predictions. + """Audit paths B / C / D must produce zero positive predictions. + + This probe focuses exclusively on the **join-graph** reconstruction: + + * B — at least one opportunity with ``close_outcome == "closed_won"``; + * C — a joinable customer row reachable via ``opportunity_id``; + * D — a joinable subscription row reachable via ``customer_id``. - Path A is intentionally not checked here — it is fully covered by - :func:`probe_banned_columns` (Path A reads - ``leads.converted_within_90_days`` directly, which is a - :data:`BANNED_LEAD_COLUMNS` violation). + Path A (direct read of ``leads.converted_within_90_days``) is *not* + checked here — it is the column-presence violation already raised by + :func:`probe_banned_columns`. Re-emitting it here would double-count + one defect across two channels. Tests assert this delegation + explicitly so that future maintainers don't widen the scope by + accident. """ leads = tables.get("leads") if leads is None or len(leads) == 0: @@ -309,7 +341,7 @@ def probe_deterministic_reconstruction( if positive > 0: findings.append( LeakageFinding( - channel=CHANNEL_DETERMINISTIC_PATH, + channel=CHANNEL_JOIN_RECONSTRUCTION, detail=path_col, message=( f"path {label} produced {positive}/{len(paths)} " @@ -340,7 +372,7 @@ def probe_snapshot_window( horizon = pd.Timedelta(days=snapshot_day) findings: list[LeakageFinding] = [] - for name, ts_col in EVENT_TABLES: + for name, ts_col in SNAPSHOT_FILTERED_TABLES: df = tables.get(name) if df is None or len(df) == 0 or ts_col not in df.columns: continue @@ -365,20 +397,33 @@ def probe_snapshot_window( def probe_bonus_model_auc( tables: Mapping[str, pd.DataFrame], *, - max_auc: float = DEFAULT_MAX_BONUS_AUC, + max_auc: float, seed: int = 42, label: pd.Series | None = None, ) -> list[LeakageFinding]: - """5-fold CV LR + HistGBM AUC on honest relational aggregates. + """Opt-in honest-feature baseline: 5-fold CV LR + HistGBM AUC. - If the public bundle has been correctly redacted, ``leads`` no longer - carries ``converted_within_90_days`` — in that case the caller must - supply the held-back ``label`` (typically read from the task split) - so we can score against ground truth. When neither is available the - probe is skipped silently (no finding, no error): there is simply no - truth to compare against. + Trains on per-lead aggregates (``n_opps`` / ``max_acv`` / + ``mean_acv``, plus ``n_customers`` / ``n_subscriptions`` if those + tables are present) and asserts the mean cross-validated AUC stays + below ``max_auc``. - Skipped (no finding) if scikit-learn is unavailable. + Caller responsibilities: + + * ``max_auc`` is required. PR 2.1 ships this probe with no + calibrated threshold; PR 3.3 will land per-tier bands. + * ``label`` must be a :class:`pandas.Series` indexed by ``lead_id`` + (``index.name == "lead_id"``). This is enforced — a misaligned + label would silently skip the probe via the binary-cardinality + gate, which defeats the validator's purpose. + + The probe skips silently (no findings, no error) when: + + * scikit-learn is not installed; + * ``leads`` is missing or empty; + * the label is unavailable (no ``label`` argument and the public + bundle has correctly redacted ``converted_within_90_days``); + * the label has fewer than two classes after alignment. """ try: from sklearn.ensemble import HistGradientBoostingClassifier @@ -447,16 +492,17 @@ def run_all_probes_on_dataframes( tables: Mapping[str, pd.DataFrame], *, snapshot_day: int, - max_auc: float = DEFAULT_MAX_BONUS_AUC, + bonus_model_max_auc: float | None = None, label: pd.Series | None = None, ) -> LeakageReport: - """Run every probe against an in-memory tables dict.""" + """Run every structural probe; run the bonus probe iff ``bonus_model_max_auc`` is set.""" findings: list[LeakageFinding] = [] findings += probe_banned_columns(tables) findings += probe_banned_tables(tables.keys()) findings += probe_deterministic_reconstruction(tables) findings += probe_snapshot_window(tables, snapshot_day=snapshot_day) - findings += probe_bonus_model_auc(tables, max_auc=max_auc, label=label) + if bonus_model_max_auc is not None: + findings += probe_bonus_model_auc(tables, max_auc=bonus_model_max_auc, label=label) return LeakageReport(findings=tuple(findings)) @@ -464,20 +510,23 @@ def run_all_probes( bundle_dir: Path, *, snapshot_day: int, - max_auc: float = DEFAULT_MAX_BONUS_AUC, + bonus_model_max_auc: float | None = None, label: pd.Series | None = None, ) -> LeakageReport: - """Run every probe against ``/tables/*.parquet``. + """Run every structural probe against ``/tables/*.parquet``. Args: bundle_dir: Bundle root (must contain ``tables/leads.parquet``). - snapshot_day: Snapshot window for the timestamp probe. - max_auc: Threshold for the bonus-model probe. - label: Optional ground-truth labels to feed the bonus-model - probe when ``leads.converted_within_90_days`` has been - redacted. Not loading them automatically (e.g. from the - task splits) keeps this module independent of task layout — - PR 2.2's wiring layer is the right place for that lookup. + snapshot_day: Snapshot window for the timestamp probe. The + caller (typically ``validate_bundle``) is expected to read + it from ``manifest.json``. + bonus_model_max_auc: Pass a numeric threshold to enable the + opt-in :func:`probe_bonus_model_auc`. ``None`` (default) + skips it — the calibrated band ships in PR 3.3. + label: Optional ground-truth labels for the bonus probe when + ``leads.converted_within_90_days`` has been redacted. Must + be indexed by ``lead_id`` (see :func:`probe_bonus_model_auc`). + Ignored when ``bonus_model_max_auc`` is ``None``. Raises: FileNotFoundError: if ``/tables/`` is missing or @@ -495,7 +544,10 @@ def run_all_probes( if path.exists(): tables[name] = pd.read_parquet(path) return run_all_probes_on_dataframes( - tables, snapshot_day=snapshot_day, max_auc=max_auc, label=label + tables, + snapshot_day=snapshot_day, + bonus_model_max_auc=bonus_model_max_auc, + label=label, ) @@ -514,13 +566,18 @@ def _resolve_label( ) -> pd.Series | None: """Pick a label series to score against, or ``None`` to skip the probe. - When ``label`` is supplied the caller is responsible for aligning it - to ``lead_id`` (either as the index name or in a way that - ``Series.reindex(features.index)`` resolves). When it is not - supplied we read ``leads.converted_within_90_days`` directly — this - branch is exercised by tampered bundles in tests. + A caller-supplied ``label`` must be indexed by ``lead_id`` + (``index.name == "lead_id"``). Without that guarantee a misaligned + label would silently skip the probe via the binary-cardinality gate + downstream — exactly the kind of hidden no-op a leakage validator + must not have. """ if label is not None: + if label.index.name != "lead_id": + raise ValueError( + "label must be a pandas.Series indexed by lead_id " + f"(got index.name={label.index.name!r})" + ) return label.astype("boolean").fillna(False).astype(int) if "converted_within_90_days" in leads.columns: return ( @@ -541,8 +598,7 @@ def _build_relational_features( Honest features only — no aggregate of ``close_outcome``. Customers and subscriptions counts are included only when the corresponding tables exist (i.e. on a tampered bundle); on a clean public bundle - they default to 0 and become uninformative columns the model can - discard. + they default to 0 and the model can discard the column. """ opps = tables.get("opportunities") customers = tables.get("customers") @@ -595,15 +651,18 @@ def _build_relational_features( __all__ = [ + "BANNED_LEAD_COLUMNS", + "BANNED_OPP_COLUMNS", + "BANNED_TABLES", "CHANNEL_BANNED_COLUMN", "CHANNEL_BANNED_TABLE", "CHANNEL_BONUS_MODEL", - "CHANNEL_DETERMINISTIC_PATH", + "CHANNEL_JOIN_RECONSTRUCTION", "CHANNEL_SNAPSHOT_WINDOW", - "DEFAULT_MAX_BONUS_AUC", "LeakageFinding", "LeakageReport", "RelationalLeakageError", + "SNAPSHOT_FILTERED_TABLES", "deterministic_relational_reconstruction", "probe_banned_columns", "probe_banned_tables", diff --git a/tests/render/test_relational_snapshot_safe.py b/tests/render/test_relational_snapshot_safe.py index ec9ad82..f582b50 100644 --- a/tests/render/test_relational_snapshot_safe.py +++ b/tests/render/test_relational_snapshot_safe.py @@ -9,7 +9,7 @@ BANNED_LEAD_COLUMNS, BANNED_OPP_COLUMNS, BANNED_TABLES, - EVENT_TABLES, + SNAPSHOT_FILTERED_TABLES, to_dataframes_snapshot_safe, ) @@ -179,7 +179,7 @@ def test_snapshot_window_invariant_holds_per_lead() -> None: leads_anchor = out["leads"].set_index("lead_id")["lead_created_at"].apply(pd.Timestamp) horizon = pd.Timedelta(days=10) - for name, ts_col in EVENT_TABLES: + for name, ts_col in SNAPSHOT_FILTERED_TABLES: df = out[name] if df.empty: continue diff --git a/tests/validation/test_relational_leakage.py b/tests/validation/test_relational_leakage.py index a027896..a8ff136 100644 --- a/tests/validation/test_relational_leakage.py +++ b/tests/validation/test_relational_leakage.py @@ -22,7 +22,7 @@ CHANNEL_BANNED_COLUMN, CHANNEL_BANNED_TABLE, CHANNEL_BONUS_MODEL, - CHANNEL_DETERMINISTIC_PATH, + CHANNEL_JOIN_RECONSTRUCTION, LeakageFinding, LeakageReport, RelationalLeakageError, @@ -49,11 +49,11 @@ def _full_horizon_bundle(*, n_each: int = 25) -> dict[str, pd.DataFrame]: Both converted (``lead_C_*``) and unconverted (``lead_U_*``) leads own a single opportunity; only converted leads own a customer and a - subscription. Mirroring the realistic shape — opportunities for - everyone, conversion-conditional only customers/subscriptions — - keeps post-redaction honest aggregates (``n_opps``, ``max_acv``, - ``mean_acv``) non-discriminative, so the bonus-model probe stays - below the default 0.65 ceiling on the clean bundle. + subscription. This mirrors the realistic shape — opportunities for + everyone, conversion-conditional only customers/subscriptions — so + that the structural probes have something to bite on without + accidentally turning ``n_opps`` into a perfect predictor for the + opt-in bonus-model probe. """ leads_rows: list[dict] = [] opps_rows: list[dict] = [] @@ -90,9 +90,9 @@ def _full_horizon_bundle(*, n_each: int = 25) -> dict[str, pd.DataFrame]: "conversion_timestamp": None, } ) - # ACV drawn from the same distribution for both classes so the - # post-redaction model has no signal beyond label noise — without - # this the bonus-model probe trips even on a snapshot-safe bundle. + # ACV drawn from the same distribution for both classes — keeps + # ``max_acv`` / ``mean_acv`` uninformative for the bonus probe + # so its enabled-but-clean test path is stable. acv = 30_000 + (i % 10) * 2_000 opps_rows.append( { @@ -178,9 +178,23 @@ def _label_for(bundle: dict[str, pd.DataFrame]) -> pd.Series: def test_clean_bundle_passes_all_probes() -> None: + """Default orchestrator (structural probes only) must report zero findings.""" + tables = _clean_bundle() + report = run_all_probes_on_dataframes(tables, snapshot_day=SNAPSHOT_DAY) + assert report.ok, [f"[{f.channel}] {f.detail}: {f.message}" for f in report.findings] + + +def test_clean_bundle_passes_with_bonus_probe_enabled() -> None: + """Bonus probe is opt-in; when enabled with a generous threshold, the + clean bundle still reports zero findings — exercising the full code + path without flaking on synthetic-data noise.""" + pytest.importorskip("sklearn") tables = _clean_bundle() report = run_all_probes_on_dataframes( - tables, snapshot_day=SNAPSHOT_DAY, label=_label_for(tables) + tables, + snapshot_day=SNAPSHOT_DAY, + bonus_model_max_auc=0.95, + label=_label_for(tables), ) assert report.ok, [f"[{f.channel}] {f.detail}: {f.message}" for f in report.findings] @@ -352,6 +366,52 @@ def test_bonus_model_probe_skipped_without_label() -> None: assert probe_bonus_model_auc(tables, max_auc=0.65, label=None) == [] +def test_bonus_model_probe_rejects_misaligned_label() -> None: + """A label not indexed by lead_id would silently misalign and skip the + probe via the binary-cardinality gate — defeating the validator. + The probe must reject it loudly instead.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + bad_label = src["leads"]["converted_within_90_days"].reset_index(drop=True) + assert bad_label.index.name != "lead_id" + with pytest.raises(ValueError, match="indexed by lead_id"): + probe_bonus_model_auc(tables, max_auc=0.65, label=bad_label) + + +def test_orchestrator_skips_bonus_probe_by_default() -> None: + """run_all_probes_on_dataframes(... bonus_model_max_auc=None) must skip + the bonus probe even when customers/subscriptions would trigger it. + This is the post-PR-2.1 default; PR 3.3 calibrates and turns it on.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + tables["subscriptions"] = src["subscriptions"] + + # The bonus probe would fire here (customers+subs reintroduced), but + # the structural probes will fire first regardless. Filter to bonus- + # channel findings to assert the bonus probe is the one that didn't run. + report = run_all_probes_on_dataframes(tables, snapshot_day=SNAPSHOT_DAY) + assert all(f.channel != CHANNEL_BONUS_MODEL for f in report.findings) + + +def test_orchestrator_runs_bonus_probe_when_enabled() -> None: + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + tables["subscriptions"] = src["subscriptions"] + + report = run_all_probes_on_dataframes( + tables, + snapshot_day=SNAPSHOT_DAY, + bonus_model_max_auc=0.95, + label=_label_for(tables), + ) + assert any(f.channel == CHANNEL_BONUS_MODEL for f in report.findings) + + # --------------------------------------------------------------------------- # Orchestrator + report ergonomics. # --------------------------------------------------------------------------- @@ -369,7 +429,7 @@ def test_orchestrator_aggregates_findings_across_channels() -> None: channels = {f.channel for f in report.findings} assert CHANNEL_BANNED_COLUMN in channels assert CHANNEL_BANNED_TABLE in channels - assert CHANNEL_DETERMINISTIC_PATH in channels + assert CHANNEL_JOIN_RECONSTRUCTION in channels def test_report_raise_if_failing_carries_report() -> None: @@ -403,9 +463,7 @@ def _write_bundle_to_disk(tables: dict[str, pd.DataFrame], root: Path) -> Path: def test_run_all_probes_reads_bundle_from_disk(tmp_path: Path) -> None: bundle_dir = _write_bundle_to_disk(_clean_bundle(), tmp_path / "clean") - report = run_all_probes( - bundle_dir, snapshot_day=SNAPSHOT_DAY, label=_label_for(_clean_bundle()) - ) + report = run_all_probes(bundle_dir, snapshot_day=SNAPSHOT_DAY) assert report.ok, [f"[{f.channel}] {f.detail}: {f.message}" for f in report.findings] @@ -417,7 +475,7 @@ def test_run_all_probes_detects_disk_bundle_leakage(tmp_path: Path) -> None: channels = {f.channel for f in report.findings} assert CHANNEL_BANNED_COLUMN in channels assert CHANNEL_BANNED_TABLE in channels - assert CHANNEL_DETERMINISTIC_PATH in channels + assert CHANNEL_JOIN_RECONSTRUCTION in channels def test_run_all_probes_missing_tables_dir_raises(tmp_path: Path) -> None: From 6929a941abf8f83b292d1b51085b6116f0a01f13 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 21:02:41 +0300 Subject: [PATCH 4/5] fix(validation): address Copilot review on probe_bonus_model_auc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two legitimate edge cases raised on PR #63: 1. Partial labels would crash with an opaque error. ``y_series.reindex(features.index).astype(int)`` raises ``IntCastingNaNError`` when the supplied ``label`` is missing any ``lead_id`` present in the bundle, because reindex introduces NaN and ``astype(int)`` chokes. Now the probe raises a clear ``ValueError`` that names the gap and tells the caller to either complete the label or omit it. 2. ``StratifiedKFold(n_splits=5)`` would crash on small or highly imbalanced bundles. When the smaller class has fewer than 5 members, sklearn raises ``ValueError`` from ``skf.split``. Now the probe sizes ``n_splits = min(5, min_class_count)`` and skips silently (no finding, no error) when even ``n_splits=2`` is impossible — that is a sample-size constraint, not a leakage finding. Two new tests cover the partial-label rejection, and two cover the dynamic-n_splits / too-small-to-CV paths. Finding messages now report the actual fold count (e.g. ``3-fold CV AUC...``) instead of a hard-coded ``5-fold``. 1010 tests pass (was 1007); ruff + mypy clean. Co-Authored-By: Claude Opus 4.7 --- leadforge/validation/relational_leakage.py | 37 +++++++++++---- tests/validation/test_relational_leakage.py | 52 +++++++++++++++++++++ 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/leadforge/validation/relational_leakage.py b/leadforge/validation/relational_leakage.py index eac2071..f825ab4 100644 --- a/leadforge/validation/relational_leakage.py +++ b/leadforge/validation/relational_leakage.py @@ -413,9 +413,10 @@ def probe_bonus_model_auc( * ``max_auc`` is required. PR 2.1 ships this probe with no calibrated threshold; PR 3.3 will land per-tier bands. * ``label`` must be a :class:`pandas.Series` indexed by ``lead_id`` - (``index.name == "lead_id"``). This is enforced — a misaligned - label would silently skip the probe via the binary-cardinality - gate, which defeats the validator's purpose. + (``index.name == "lead_id"``) **and** cover every lead in the + bundle. Both are enforced — a misaligned or partial label would + silently neutralise the probe (via the binary-cardinality gate + or NaN folds), which defeats the validator's purpose. The probe skips silently (no findings, no error) when: @@ -423,7 +424,9 @@ def probe_bonus_model_auc( * ``leads`` is missing or empty; * the label is unavailable (no ``label`` argument and the public bundle has correctly redacted ``converted_within_90_days``); - * the label has fewer than two classes after alignment. + * the label has fewer than two classes after alignment; + * the smaller class has fewer members than the minimum needed for + stratified CV (``n_splits >= 2``). """ try: from sklearn.ensemble import HistGradientBoostingClassifier @@ -446,17 +449,33 @@ def probe_bonus_model_auc( features = _build_relational_features(leads, tables) if features.empty or len(features.columns) == 0: return [] - y = y_series.reindex(features.index).astype(int) + + aligned = y_series.reindex(features.index) + if aligned.isna().any(): + missing = int(aligned.isna().sum()) + raise ValueError( + f"label is missing values for {missing} lead_id(s) present in the " + "bundle; supply a complete label or omit it to read from leads" + ) + y = aligned.astype(int) if y.nunique(dropna=True) < 2: return [] + # Stratified CV needs at least n_splits members in each class. If the + # smaller class is below that, the probe can't run — skip silently + # (this is a sample-size constraint, not a leakage finding). + min_class = int(y.value_counts().min()) + n_splits = min(5, min_class) + if n_splits < 2: + return [] + models: dict[str, Pipeline] = { "logistic_regression": Pipeline( [("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=1000))] ), "hist_gbm": Pipeline([("clf", HistGradientBoostingClassifier(random_state=seed))]), } - skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed) + skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed) findings: list[LeakageFinding] = [] for name, pipe in models.items(): @@ -474,9 +493,9 @@ def probe_bonus_model_auc( channel=CHANNEL_BONUS_MODEL, detail=name, message=( - f"5-fold CV AUC {auc_mean:.3f} on join-derived features " - f"exceeds max_auc={max_auc:.3f}; honest aggregates " - "carry stronger signal than the band allows" + f"{n_splits}-fold CV AUC {auc_mean:.3f} on join-derived " + f"features exceeds max_auc={max_auc:.3f}; honest " + "aggregates carry stronger signal than the band allows" ), ) ) diff --git a/tests/validation/test_relational_leakage.py b/tests/validation/test_relational_leakage.py index a8ff136..90ccd2b 100644 --- a/tests/validation/test_relational_leakage.py +++ b/tests/validation/test_relational_leakage.py @@ -379,6 +379,58 @@ def test_bonus_model_probe_rejects_misaligned_label() -> None: probe_bonus_model_auc(tables, max_auc=0.65, label=bad_label) +def test_bonus_model_probe_rejects_partial_label() -> None: + """A label that covers only a subset of bundle lead_ids would + introduce NaN after reindex; the resulting astype(int) cast used to + crash with an opaque error. The probe must raise a clear + ValueError naming the gap instead.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + full_label = _label_for(tables) + partial = full_label.iloc[:-5] + with pytest.raises(ValueError, match="missing values for"): + probe_bonus_model_auc(tables, max_auc=0.65, label=partial) + + +def test_bonus_model_probe_skips_when_class_too_small_for_cv() -> None: + """When the smaller class has fewer than 2 members, + StratifiedKFold cannot run; the probe must skip silently rather + than raise a sklearn-internal error.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + leads = tables["leads"].head(6).copy() + tables["leads"] = leads + label = pd.Series( + [True, True, True, True, True, False], + index=leads["lead_id"].to_numpy(), + name="converted_within_90_days", + ) + label.index.name = "lead_id" + assert probe_bonus_model_auc(tables, max_auc=0.65, label=label) == [] + + +def test_bonus_model_probe_uses_smaller_n_splits_when_class_count_lt_5() -> None: + """When 2 <= min_class_count < 5, the probe must size n_splits + accordingly — exercised by labelling only 3 positives in a 12-lead + bundle (n_splits should drop to 3).""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + leads = tables["leads"].head(12).copy() + tables["leads"] = leads + tables["customers"] = src["customers"] # leak signal so AUC saturates + tables["subscriptions"] = src["subscriptions"] + + truth = [True, True, True] + [False] * 9 + label = pd.Series(truth, index=leads["lead_id"].to_numpy(), name="converted_within_90_days") + label.index.name = "lead_id" + findings = probe_bonus_model_auc(tables, max_auc=0.5, label=label) + assert findings, "expected the bonus model to fire — n_splits should adapt to min_class=3" + assert all(f.message.startswith("3-fold") for f in findings), ( + "n_splits must downshift to 3 when the minority class has only 3 members" + ) + + def test_orchestrator_skips_bonus_probe_by_default() -> None: """run_all_probes_on_dataframes(... bonus_model_max_auc=None) must skip the bonus probe even when customers/subscriptions would trigger it. From ce2b3d34a0e16be638d9854b74310566bca80203 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 21:25:24 +0300 Subject: [PATCH 5/5] fix(render,validation): refuse to operate on malformed anchors / orphan events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two new Copilot review threads on PR #63 (both legitimate): * COPILOT-3 (relational_snapshot_safe.py:126) — ``_build_anchor`` parsed ``lead_created_at`` with ``pd.to_datetime`` but never validated the result. A NaT anchor would propagate through ``ts <= NaT`` -> NaN -> ``fillna(False)`` and silently drop every event for the affected lead. Now coerce to NaT explicitly with ``errors="coerce"`` and raise ``ValueError`` (with a 5-lead sample of offending lead_ids) if any NaT remains. Mirrors the same posture as the existing duplicate-lead_id check: structural defects fail loudly rather than producing a quietly wrong output. * COPILOT-4 (relational_leakage.py:382) — ``probe_snapshot_window`` had the same NaT-anchor issue, plus a related orphan-event blind spot: an event row whose ``lead_id`` is absent from ``leads`` gets a NaT cutoff after the left-merge, and the ``fillna(False)`` in the violation count silently treats it as a non-violation. An orphan event is a structural FK violation AND a leakage attack surface — a tampered bundle could insert post-snapshot events tied to lead_ids absent from the public leads table to bypass the snapshot filter. Both cases now raise ``ValueError`` with offending lead_id samples. Tests added: * ``test_nat_lead_created_at_raises`` / ``test_unparseable_lead_created_at_raises`` — render-side anchor. * ``test_snapshot_window_nat_lead_created_at_raises`` / ``test_snapshot_window_orphan_event_raises`` — validator-side. Did not refactor ``_build_anchor`` into a shared helper between the two modules; PR 2.1 keeps render and validation independent in their anchor handling, and PR 3.1 (which already plans to unify the leakage probes) can DRY if the duplication starts to matter. 1014 tests pass (was 1010); ruff + mypy clean. Co-Authored-By: Claude Opus 4.7 --- leadforge/render/relational_snapshot_safe.py | 13 ++++++++- leadforge/validation/relational_leakage.py | 28 ++++++++++++++++++- tests/render/test_relational_snapshot_safe.py | 19 +++++++++++++ tests/validation/test_relational_leakage.py | 24 ++++++++++++++++ 4 files changed, 82 insertions(+), 2 deletions(-) diff --git a/leadforge/render/relational_snapshot_safe.py b/leadforge/render/relational_snapshot_safe.py index b4e2573..7d6378e 100644 --- a/leadforge/render/relational_snapshot_safe.py +++ b/leadforge/render/relational_snapshot_safe.py @@ -123,7 +123,18 @@ def _build_anchor(leads: pd.DataFrame) -> pd.DataFrame: if not leads["lead_id"].is_unique: raise ValueError("leads.lead_id must be unique") anchor = leads[["lead_id", "lead_created_at"]].rename(columns={"lead_created_at": _ANCHOR_COL}) - anchor[_ANCHOR_COL] = pd.to_datetime(anchor[_ANCHOR_COL]) + anchor[_ANCHOR_COL] = pd.to_datetime(anchor[_ANCHOR_COL], errors="coerce") + # NaT here would silently drop every event for the affected leads via + # the ``ts <= NaT`` -> NaN -> fillna(False) path downstream — exactly + # the kind of silent data-quality erosion a public-bundle exporter + # must refuse to ship. + nat_mask = anchor[_ANCHOR_COL].isna() + if nat_mask.any(): + sample = anchor.loc[nat_mask, "lead_id"].head(5).tolist() + raise ValueError( + f"leads.lead_created_at has {int(nat_mask.sum())} unparseable / null " + f"value(s); offending lead_id sample: {sample}" + ) return anchor diff --git a/leadforge/validation/relational_leakage.py b/leadforge/validation/relational_leakage.py index f825ab4..3299e02 100644 --- a/leadforge/validation/relational_leakage.py +++ b/leadforge/validation/relational_leakage.py @@ -368,7 +368,19 @@ def probe_snapshot_window( raise ValueError("leads.lead_id must be unique") anchor = leads[["lead_id", "lead_created_at"]].copy() - anchor["lead_created_at"] = pd.to_datetime(anchor["lead_created_at"]) + anchor["lead_created_at"] = pd.to_datetime(anchor["lead_created_at"], errors="coerce") + # NaT in the anchor would propagate to NaT cutoffs, then ``ts > NaT`` + # is NaN, and the violation count's ``fillna(False)`` would silently + # drop those rows — masking a data-quality bug in the bundle. Refuse + # to operate on a malformed anchor, same posture as the duplicate- + # lead_id check above. + nat_mask = anchor["lead_created_at"].isna() + if nat_mask.any(): + sample = anchor.loc[nat_mask, "lead_id"].head(5).tolist() + raise ValueError( + f"leads.lead_created_at has {int(nat_mask.sum())} unparseable / null " + f"value(s); offending lead_id sample: {sample}" + ) horizon = pd.Timedelta(days=snapshot_day) findings: list[LeakageFinding] = [] @@ -377,6 +389,20 @@ def probe_snapshot_window( if df is None or len(df) == 0 or ts_col not in df.columns: continue merged = df[["lead_id", ts_col]].merge(anchor, on="lead_id", how="left") + # An event row whose lead_id has no match in leads gets NaT for + # ``lead_created_at`` after the left-merge; that row's cutoff is + # NaT and the violation count would silently miss it. An orphan + # event row is a structural FK violation (and a leakage attack + # surface — a tampered bundle could insert post-snapshot events + # tied to lead_ids absent from the public leads table). Refuse + # to bless it. + orphan_mask = merged["lead_created_at"].isna() + if orphan_mask.any(): + sample = merged.loc[orphan_mask, "lead_id"].head(5).tolist() + raise ValueError( + f"{name}.parquet has {int(orphan_mask.sum())} row(s) referencing " + f"lead_id(s) absent from leads; sample: {sample}" + ) ts = pd.to_datetime(merged[ts_col]) cutoff = merged["lead_created_at"] + horizon violations = int((ts > cutoff).fillna(False).sum()) diff --git a/tests/render/test_relational_snapshot_safe.py b/tests/render/test_relational_snapshot_safe.py index f582b50..09dcdfd 100644 --- a/tests/render/test_relational_snapshot_safe.py +++ b/tests/render/test_relational_snapshot_safe.py @@ -283,3 +283,22 @@ def test_duplicate_lead_id_raises() -> None: src["leads"] = pd.concat([src["leads"], src["leads"].iloc[[0]]], ignore_index=True) with pytest.raises(ValueError, match="lead_id must be unique"): to_dataframes_snapshot_safe(src, snapshot_day=10) + + +def test_nat_lead_created_at_raises() -> None: + """A NaT/null anchor would silently drop every event for the affected + lead via ``ts <= NaT`` -> NaN -> fillna(False). Must raise instead.""" + src = _full_horizon_dict() + src["leads"] = src["leads"].copy() + src["leads"].loc[0, "lead_created_at"] = None + with pytest.raises(ValueError, match="unparseable / null"): + to_dataframes_snapshot_safe(src, snapshot_day=10) + + +def test_unparseable_lead_created_at_raises() -> None: + """Garbage anchor strings would coerce to NaT and silently misbehave.""" + src = _full_horizon_dict() + src["leads"] = src["leads"].copy() + src["leads"].loc[0, "lead_created_at"] = "not-a-date" + with pytest.raises(ValueError, match="unparseable / null"): + to_dataframes_snapshot_safe(src, snapshot_day=10) diff --git a/tests/validation/test_relational_leakage.py b/tests/validation/test_relational_leakage.py index 90ccd2b..989fb78 100644 --- a/tests/validation/test_relational_leakage.py +++ b/tests/validation/test_relational_leakage.py @@ -335,6 +335,30 @@ def test_snapshot_window_duplicate_lead_id_raises() -> None: probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) +def test_snapshot_window_nat_lead_created_at_raises() -> None: + """NaT in the anchor propagates to NaT cutoffs, masking violations + via the fillna(False) in the count expression. The probe must + refuse to operate on a malformed anchor.""" + tables = _clean_bundle() + tables["leads"] = tables["leads"].copy() + tables["leads"].loc[0, "lead_created_at"] = None + with pytest.raises(ValueError, match="unparseable / null"): + probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) + + +def test_snapshot_window_orphan_event_raises() -> None: + """An event row whose lead_id is absent from leads gets a NaT cutoff + after the left-merge; the count would silently miss it. Treat + orphan events as a structural violation and raise.""" + tables = _clean_bundle() + orphan = pd.DataFrame( + [{"touch_id": "t_orphan", "lead_id": "lead_does_not_exist", "touch_timestamp": _ts(2)}] + ) + tables["touches"] = pd.concat([tables["touches"], orphan], ignore_index=True) + with pytest.raises(ValueError, match="touches.parquet has 1 row.*absent from leads"): + probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) + + # --------------------------------------------------------------------------- # Bonus-model probe. # ---------------------------------------------------------------------------