diff --git a/.agent-plan.md b/.agent-plan.md index beee260..c568ed1 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -25,11 +25,12 @@ Goal: ship a best-in-class educational synthetic CRM lead-scoring dataset family - [x] Lock dataset release name `leadforge-lead-scoring-v1` (already locked via PR #61's milestone rename + roadmap edits; G1.1 reaffirmed) ### Phase 2 — Snapshot-safe relational export -- [ ] `leadforge/render/relational_snapshot_safe.py` (new) -- [ ] `leadforge/validation/relational_leakage.py` (new) -- [ ] `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe` -- [ ] Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`; drop `close_outcome` / `closed_at` from public `opportunities`; omit `customers` / `subscriptions` from public bundles -- [ ] Hash-determinism preserved on regenerated bundles +- [x] `leadforge/render/relational_snapshot_safe.py` (new) — PR 2.1: `to_dataframes_snapshot_safe(dfs, *, snapshot_day)` projects the full-horizon dict from `to_dataframes` onto the public-bundle shape (drops `BANNED_LEAD_COLUMNS` from leads, `BANNED_OPP_COLUMNS` from opportunities, filters event tables per-lead by `lead_created_at + snapshot_day`, omits `customers`/`subscriptions`, passes accounts/contacts unchanged). +- [x] `leadforge/validation/relational_leakage.py` (new) — PR 2.1: owns the snapshot-safe contract constants (`BANNED_LEAD_COLUMNS`, `BANNED_OPP_COLUMNS`, `BANNED_TABLES`, `SNAPSHOT_FILTERED_TABLES`); ships `LeakageFinding`/`LeakageReport`/`RelationalLeakageError(LeadforgeError)` plus five probes (`probe_banned_columns`, `probe_banned_tables`, `probe_deterministic_reconstruction`, `probe_snapshot_window`, opt-in `probe_bonus_model_auc`) and two orchestrators (`run_all_probes`, `run_all_probes_on_dataframes`). The bonus-model probe is opt-in: orchestrators skip it unless the caller passes `bonus_model_max_auc=...` (PR 3.3 will calibrate per-tier bands). `deterministic_relational_reconstruction` lifted from `scripts/probe_relational_leakage.py`; the script now re-exports it from the package. +- [ ] `BUNDLE_SCHEMA_VERSION` 4 → 5; manifest gains `relational_snapshot_safe` — PR 2.2. +- [ ] Wire `relational_snapshot_safe` through `leadforge/exposure/filters.py` and `leadforge/api/bundle.py`; plumb the leakage validator into `leadforge/validation/bundle_checks.py` — PR 2.2. +- [ ] Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`; drop `close_outcome` / `closed_at` from public `opportunities`; omit `customers` / `subscriptions` from public bundles — PR 2.2 (the structural rules are already enforced module-side; PR 2.2 turns them on for actual writes). +- [ ] Hash-determinism preserved on regenerated bundles — PR 2.2. ### Phase 3 — Release validation hardening - [ ] `leadforge/validation/{release_quality,leakage_probes,reporting}.py` (new) diff --git a/leadforge/render/relational_snapshot_safe.py b/leadforge/render/relational_snapshot_safe.py new file mode 100644 index 0000000..7d6378e --- /dev/null +++ b/leadforge/render/relational_snapshot_safe.py @@ -0,0 +1,153 @@ +"""Snapshot-safe relational export for ``student_public`` bundles. + +:func:`to_dataframes_snapshot_safe` projects the full-horizon dict +returned by :func:`leadforge.render.relational.to_dataframes` onto the +shape published in public bundles. The transformation strips every +known channel through which ``converted_within_90_days`` is +reconstructible from joins (see +``docs/release/v1_current_state_audit.md``): + +* ``leads``: drops :data:`BANNED_LEAD_COLUMNS`. +* ``opportunities``: drops :data:`BANNED_OPP_COLUMNS` and filters rows + per-lead to ``created_at <= lead_created_at + snapshot_day``. +* ``touches`` / ``sessions`` / ``sales_activities``: filtered per-lead + on their respective timestamp column to the same boundary. This is + defence-in-depth — the alpha bundles already pass G4.4 because the + simulation horizon bounds event timestamps, but the public contract + is the snapshot window, not the horizon. +* ``customers`` / ``subscriptions`` (:data:`BANNED_TABLES`): omitted + entirely from the output dict; they exist only for converted leads, + so their presence is the leak. +* ``accounts`` / ``contacts``: passed through unchanged (firmographic + / personographic, time-invariant). + +The ``research_instructor`` mode keeps using +:func:`leadforge.render.relational.to_dataframes` for the full-horizon +export. The contract constants live in +:mod:`leadforge.validation.relational_leakage` (validator owns the +definition of "leakage"); this module re-exports them for ergonomics. +""" + +from __future__ import annotations + +from collections.abc import Mapping + +import pandas as pd + +from leadforge.validation.relational_leakage import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + SNAPSHOT_FILTERED_TABLES, +) + +__all__ = [ + "BANNED_LEAD_COLUMNS", + "BANNED_OPP_COLUMNS", + "BANNED_TABLES", + "SNAPSHOT_FILTERED_TABLES", + "to_dataframes_snapshot_safe", +] + +_ANCHOR_COL = "_lead_anchor_ts" + + +def to_dataframes_snapshot_safe( + dfs: Mapping[str, pd.DataFrame], + *, + snapshot_day: int, +) -> dict[str, pd.DataFrame]: + """Project the full-horizon relational dict onto the snapshot-safe form. + + Args: + dfs: Output of :func:`leadforge.render.relational.to_dataframes`. + Must contain ``leads``; other tables are optional and + missing keys are silently skipped. Input frames are never + mutated. + snapshot_day: Number of days after ``lead_created_at`` beyond + which event rows are dropped. This is independent of + ``label_window_days`` (which gates the task splits). + + Returns: + A new dict containing — in canonical order — ``accounts``, + ``contacts``, ``leads``, ``touches``, ``sessions``, + ``sales_activities``, ``opportunities``. ``customers`` and + ``subscriptions`` are absent. + + Raises: + ValueError: if ``snapshot_day`` is negative, ``leads`` is + absent, ``leads`` lacks the anchor columns, or + ``leads.lead_id`` is not unique. + """ + if snapshot_day < 0: + raise ValueError(f"snapshot_day must be non-negative, got {snapshot_day}") + if "leads" not in dfs: + raise ValueError("dfs must contain a 'leads' frame") + + out: dict[str, pd.DataFrame] = {} + + for name in ("accounts", "contacts"): + if name in dfs: + out[name] = dfs[name] + + leads = _drop_columns(dfs["leads"], BANNED_LEAD_COLUMNS) + out["leads"] = leads + anchor = _build_anchor(leads) + horizon = pd.Timedelta(days=snapshot_day) + + for name, ts_col in SNAPSHOT_FILTERED_TABLES: + if name not in dfs: + continue + df = dfs[name] + if name == "opportunities": + df = _drop_columns(df, BANNED_OPP_COLUMNS) + out[name] = _filter_to_snapshot_window(df, anchor, ts_col, horizon) + + return out + + +def _drop_columns(df: pd.DataFrame, columns: tuple[str, ...]) -> pd.DataFrame: + cols_to_drop = [c for c in columns if c in df.columns] + if not cols_to_drop: + return df + return df.drop(columns=cols_to_drop) + + +def _build_anchor(leads: pd.DataFrame) -> pd.DataFrame: + missing = [c for c in ("lead_id", "lead_created_at") if c not in leads.columns] + if missing: + raise ValueError(f"leads is missing required columns: {missing}") + # Duplicate lead_ids would broadcast in the per-lead merge below and + # silently inflate event-table row counts. Match the same invariant + # asserted by ``deterministic_relational_reconstruction``. + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") + anchor = leads[["lead_id", "lead_created_at"]].rename(columns={"lead_created_at": _ANCHOR_COL}) + anchor[_ANCHOR_COL] = pd.to_datetime(anchor[_ANCHOR_COL], errors="coerce") + # NaT here would silently drop every event for the affected leads via + # the ``ts <= NaT`` -> NaN -> fillna(False) path downstream — exactly + # the kind of silent data-quality erosion a public-bundle exporter + # must refuse to ship. + nat_mask = anchor[_ANCHOR_COL].isna() + if nat_mask.any(): + sample = anchor.loc[nat_mask, "lead_id"].head(5).tolist() + raise ValueError( + f"leads.lead_created_at has {int(nat_mask.sum())} unparseable / null " + f"value(s); offending lead_id sample: {sample}" + ) + return anchor + + +def _filter_to_snapshot_window( + events: pd.DataFrame, + anchor: pd.DataFrame, + ts_col: str, + horizon: pd.Timedelta, +) -> pd.DataFrame: + if len(events) == 0: + return events + merged = events.merge(anchor, on="lead_id", how="left") + ts = pd.to_datetime(merged[ts_col]) + cutoff = merged[_ANCHOR_COL] + horizon + keep = (ts <= cutoff).fillna(False).to_numpy() + return events.loc[keep].reset_index(drop=True) diff --git a/leadforge/validation/relational_leakage.py b/leadforge/validation/relational_leakage.py new file mode 100644 index 0000000..3299e02 --- /dev/null +++ b/leadforge/validation/relational_leakage.py @@ -0,0 +1,719 @@ +"""Probes that detect public-bundle reconstruction of ``converted_within_90_days``. + +The audit in ``docs/release/v1_current_state_audit.md`` enumerates four +deterministic paths (A-E) by which alpha public bundles reconstruct the +target via joins. This module is the validator that asserts the +snapshot-safe contract — encoded in :data:`BANNED_LEAD_COLUMNS`, +:data:`BANNED_OPP_COLUMNS`, :data:`BANNED_TABLES`, and +:data:`SNAPSHOT_FILTERED_TABLES` — is in place on any bundle claiming to +be ``student_public``. The matching writer-side enforcement lives in +:mod:`leadforge.render.relational_snapshot_safe` and imports the same +constants from this module. + +Five probes, each producing zero or more :class:`LeakageFinding`: + +* :func:`probe_banned_columns` — public ``leads`` and ``opportunities`` + must not contain :data:`BANNED_LEAD_COLUMNS` or + :data:`BANNED_OPP_COLUMNS` respectively. +* :func:`probe_banned_tables` — public bundles must not include + :data:`BANNED_TABLES`. +* :func:`probe_deterministic_reconstruction` — paths B / C / D from the + audit must produce zero positive predictions. **Path A is not + checked here** — it is the column-presence violation already covered + by :func:`probe_banned_columns`. +* :func:`probe_snapshot_window` — every event-table row must satisfy + ``timestamp <= lead_created_at + snapshot_day``. +* :func:`probe_bonus_model_auc` — *opt-in* honest-feature baseline: + trains LR + HistGBM on the legitimate aggregates ``n_opps`` / + ``max_acv`` / ``mean_acv`` (plus ``n_customers`` / + ``n_subscriptions`` if present) and asserts CV AUC stays below an + explicit ``max_auc``. The orchestrators skip this probe unless the + caller passes ``bonus_model_max_auc=...``. + +:func:`run_all_probes` is the file-based orchestrator (designed to be +called from :func:`leadforge.validation.bundle_checks.validate_bundle`). +:func:`run_all_probes_on_dataframes` is the same orchestrator without +the disk read, for unit tests against in-memory bundles. + +The :func:`deterministic_relational_reconstruction` function is the +single source of truth for the join graph that defines paths A-E. The +companion script ``scripts/probe_relational_leakage.py`` re-exports it +from here so the alpha-bundle audit and the validator agree by +construction. +""" + +from __future__ import annotations + +from collections.abc import Iterable, Mapping +from dataclasses import dataclass +from pathlib import Path +from typing import Final + +import pandas as pd + +from leadforge.core.exceptions import LeadforgeError + +# --------------------------------------------------------------------------- +# Snapshot-safe contract — the single source of truth for "what is leakage". +# leadforge.render.relational_snapshot_safe imports these so the writer +# and the validator share one definition. +# --------------------------------------------------------------------------- + +#: Columns dropped from public ``leads.parquet``. +BANNED_LEAD_COLUMNS: Final[tuple[str, ...]] = ( + "converted_within_90_days", + "conversion_timestamp", +) + +#: Columns dropped from public ``opportunities.parquet``. +BANNED_OPP_COLUMNS: Final[tuple[str, ...]] = ( + "close_outcome", + "closed_at", +) + +#: Tables omitted from public bundles entirely. +BANNED_TABLES: Final[tuple[str, ...]] = ("customers", "subscriptions") + +#: Tables filtered per-lead by their timestamp column to +#: ``lead_created_at + snapshot_day``. ``opportunities`` is included +#: even though it is an entity table, because its ``created_at`` +#: anchors when the entity becomes observable in the funnel. +SNAPSHOT_FILTERED_TABLES: Final[tuple[tuple[str, str], ...]] = ( + ("touches", "touch_timestamp"), + ("sessions", "session_timestamp"), + ("sales_activities", "activity_timestamp"), + ("opportunities", "created_at"), +) + +#: Channel labels carried on :class:`LeakageFinding.channel`. Constants +#: rather than an enum because findings serialise straight to JSON. +CHANNEL_BANNED_COLUMN: Final[str] = "banned_column" +CHANNEL_BANNED_TABLE: Final[str] = "banned_table" +CHANNEL_JOIN_RECONSTRUCTION: Final[str] = "join_reconstruction" +CHANNEL_SNAPSHOT_WINDOW: Final[str] = "snapshot_window" +CHANNEL_BONUS_MODEL: Final[str] = "bonus_model" + +_PUBLIC_TABLES: Final[tuple[str, ...]] = ( + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", +) + + +@dataclass(frozen=True) +class LeakageFinding: + """One leakage-channel violation surfaced by a probe.""" + + channel: str + detail: str + message: str + + +@dataclass(frozen=True) +class LeakageReport: + """Aggregate result of a probe run. Empty :attr:`findings` means OK.""" + + findings: tuple[LeakageFinding, ...] + + @property + def ok(self) -> bool: + return len(self.findings) == 0 + + def raise_if_failing(self) -> None: + """Raise :class:`RelationalLeakageError` if any probe reported a finding.""" + if not self.ok: + raise RelationalLeakageError(self) + + +class RelationalLeakageError(LeadforgeError): + """Raised by :meth:`LeakageReport.raise_if_failing` on any finding. + + Carries the originating :class:`LeakageReport` on ``self.report`` so + callers (e.g. ``leadforge validate``) can render the full set of + findings rather than just the first one. + """ + + def __init__(self, report: LeakageReport) -> None: + self.report = report + rendered = "\n".join(f" - [{f.channel}] {f.detail}: {f.message}" for f in report.findings) + super().__init__( + f"public bundle leaks `converted_within_90_days` " + f"({len(report.findings)} finding(s)):\n{rendered}" + ) + + +# --------------------------------------------------------------------------- +# Deterministic reconstruction — the join graph that defines paths A-E. +# Lifted from the PR 1.1 audit script; the script now re-exports this +# function from here so there is one implementation. +# --------------------------------------------------------------------------- + + +def deterministic_relational_reconstruction( + leads: pd.DataFrame, + opportunities: pd.DataFrame, + customers: pd.DataFrame, + subscriptions: pd.DataFrame, +) -> pd.DataFrame: + """Reconstruct ``converted_within_90_days`` from public relational joins. + + Returns a DataFrame indexed by ``lead_id`` with five boolean columns, + one per reconstruction path (A-E). Path E is the union of B, C, D and + is the headline relational-leakage prediction. + + No hidden state, no model fit — pure joins. + + Empty ``customers``/``subscriptions`` frames are accepted (the + post-fix expected state); the corresponding paths simply return + all-False. + + Raises: + ValueError: if ``leads.lead_id`` contains duplicates. A validator + cannot operate safely on non-unique keys. + """ + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") + + leads_idx = leads.set_index("lead_id", drop=False) + + # Path A — the label itself, if present in public leads. + # Plain ``astype(bool)`` would map NaN to True; route through pandas' + # nullable boolean dtype so missing values fill cleanly to False without + # triggering object-downcast warnings. + if "converted_within_90_days" in leads.columns: + path_a = leads_idx["converted_within_90_days"].astype("boolean").fillna(False).astype(bool) + else: + path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") + + # Path B — any opportunity with close_outcome == "closed_won". + if "close_outcome" in opportunities.columns and len(opportunities) > 0: + won_leads = set( + opportunities.loc[opportunities["close_outcome"] == "closed_won", "lead_id"] + ) + else: + won_leads = set() + path_b = leads_idx["lead_id"].isin(won_leads) + + # Path C — lead has any joined customer (via opportunity_id -> opportunity.lead_id). + if len(opportunities) > 0: + opp_to_lead = dict( + zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=False) + ) + else: + opp_to_lead = {} + customer_leads = { + opp_to_lead[opp_id] for opp_id in customers["opportunity_id"] if opp_id in opp_to_lead + } + path_c = leads_idx["lead_id"].isin(customer_leads) + + # Path D — lead has any joined subscription (sub -> customer -> opportunity -> lead). + if len(customers) > 0: + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) + else: + cust_to_opp = {} + sub_leads: set[str] = set() + for cust_id in subscriptions["customer_id"]: + opp_id = cust_to_opp.get(cust_id) + if opp_id is None: + continue + lead_id = opp_to_lead.get(opp_id) + if lead_id is not None: + sub_leads.add(lead_id) + path_d = leads_idx["lead_id"].isin(sub_leads) + + # Path E — deterministic OR of B, C, D (the headline join-only path). + path_e = path_b | path_c | path_d + + return pd.DataFrame( + { + "path_a_direct_label": path_a.values, + "path_b_opportunity_won": path_b.values, + "path_c_customer_exists": path_c.values, + "path_d_subscription_exists": path_d.values, + "path_e_or_b_c_d": path_e.values, + }, + index=leads_idx.index, + ) + + +# --------------------------------------------------------------------------- +# Probes +# --------------------------------------------------------------------------- + + +def probe_banned_columns(tables: Mapping[str, pd.DataFrame]) -> list[LeakageFinding]: + """Public ``leads``/``opportunities`` must not carry banned columns. + + Detects Path A (label column directly readable from ``leads``) and + the ``opportunities.close_outcome`` / ``closed_at`` channels — i.e. + leakage that any caller can spot by listing column names, no joins + required. + """ + findings: list[LeakageFinding] = [] + for table_name, banned in ( + ("leads", BANNED_LEAD_COLUMNS), + ("opportunities", BANNED_OPP_COLUMNS), + ): + df = tables.get(table_name) + if df is None: + continue + for col in banned: + if col in df.columns: + findings.append( + LeakageFinding( + channel=CHANNEL_BANNED_COLUMN, + detail=f"{table_name}.{col}", + message=( + f"public {table_name}.parquet must not contain " + f"the banned column '{col}'" + ), + ) + ) + return findings + + +def probe_banned_tables(table_names: Iterable[str]) -> list[LeakageFinding]: + """Public bundles must not include conversion-conditional tables.""" + present = set(table_names) + return [ + LeakageFinding( + channel=CHANNEL_BANNED_TABLE, + detail=name, + message=( + f"public bundles must not include '{name}.parquet' " + "(it exists only for converted leads, so its presence " + "reconstructs the label)" + ), + ) + for name in BANNED_TABLES + if name in present + ] + + +def probe_deterministic_reconstruction( + tables: Mapping[str, pd.DataFrame], +) -> list[LeakageFinding]: + """Audit paths B / C / D must produce zero positive predictions. + + This probe focuses exclusively on the **join-graph** reconstruction: + + * B — at least one opportunity with ``close_outcome == "closed_won"``; + * C — a joinable customer row reachable via ``opportunity_id``; + * D — a joinable subscription row reachable via ``customer_id``. + + Path A (direct read of ``leads.converted_within_90_days``) is *not* + checked here — it is the column-presence violation already raised by + :func:`probe_banned_columns`. Re-emitting it here would double-count + one defect across two channels. Tests assert this delegation + explicitly so that future maintainers don't widen the scope by + accident. + """ + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + + opportunities = tables.get( + "opportunities", + _empty_frame({"opportunity_id": "string", "lead_id": "string"}), + ) + customers = tables.get( + "customers", + _empty_frame({"customer_id": "string", "opportunity_id": "string", "account_id": "string"}), + ) + subscriptions = tables.get( + "subscriptions", + _empty_frame({"subscription_id": "string", "customer_id": "string"}), + ) + + paths = deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions) + + findings: list[LeakageFinding] = [] + for path_col, label in ( + ("path_b_opportunity_won", "B (opportunity.close_outcome == 'closed_won')"), + ("path_c_customer_exists", "C (joined customer exists)"), + ("path_d_subscription_exists", "D (joined subscription exists)"), + ): + positive = int(paths[path_col].sum()) + if positive > 0: + findings.append( + LeakageFinding( + channel=CHANNEL_JOIN_RECONSTRUCTION, + detail=path_col, + message=( + f"path {label} produced {positive}/{len(paths)} " + "positive predictions; a snapshot-safe public " + "bundle must produce zero" + ), + ) + ) + return findings + + +def probe_snapshot_window( + tables: Mapping[str, pd.DataFrame], snapshot_day: int +) -> list[LeakageFinding]: + """Every event-table row must satisfy ``timestamp <= lead_created_at + snapshot_day``.""" + if snapshot_day < 0: + raise ValueError(f"snapshot_day must be non-negative, got {snapshot_day}") + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + if "lead_id" not in leads.columns or "lead_created_at" not in leads.columns: + raise ValueError("leads must contain 'lead_id' and 'lead_created_at' columns") + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") + + anchor = leads[["lead_id", "lead_created_at"]].copy() + anchor["lead_created_at"] = pd.to_datetime(anchor["lead_created_at"], errors="coerce") + # NaT in the anchor would propagate to NaT cutoffs, then ``ts > NaT`` + # is NaN, and the violation count's ``fillna(False)`` would silently + # drop those rows — masking a data-quality bug in the bundle. Refuse + # to operate on a malformed anchor, same posture as the duplicate- + # lead_id check above. + nat_mask = anchor["lead_created_at"].isna() + if nat_mask.any(): + sample = anchor.loc[nat_mask, "lead_id"].head(5).tolist() + raise ValueError( + f"leads.lead_created_at has {int(nat_mask.sum())} unparseable / null " + f"value(s); offending lead_id sample: {sample}" + ) + horizon = pd.Timedelta(days=snapshot_day) + + findings: list[LeakageFinding] = [] + for name, ts_col in SNAPSHOT_FILTERED_TABLES: + df = tables.get(name) + if df is None or len(df) == 0 or ts_col not in df.columns: + continue + merged = df[["lead_id", ts_col]].merge(anchor, on="lead_id", how="left") + # An event row whose lead_id has no match in leads gets NaT for + # ``lead_created_at`` after the left-merge; that row's cutoff is + # NaT and the violation count would silently miss it. An orphan + # event row is a structural FK violation (and a leakage attack + # surface — a tampered bundle could insert post-snapshot events + # tied to lead_ids absent from the public leads table). Refuse + # to bless it. + orphan_mask = merged["lead_created_at"].isna() + if orphan_mask.any(): + sample = merged.loc[orphan_mask, "lead_id"].head(5).tolist() + raise ValueError( + f"{name}.parquet has {int(orphan_mask.sum())} row(s) referencing " + f"lead_id(s) absent from leads; sample: {sample}" + ) + ts = pd.to_datetime(merged[ts_col]) + cutoff = merged["lead_created_at"] + horizon + violations = int((ts > cutoff).fillna(False).sum()) + if violations > 0: + findings.append( + LeakageFinding( + channel=CHANNEL_SNAPSHOT_WINDOW, + detail=f"{name}.{ts_col}", + message=( + f"{violations}/{len(df)} rows in {name}.parquet " + f"have {ts_col} > lead_created_at + {snapshot_day}d" + ), + ) + ) + return findings + + +def probe_bonus_model_auc( + tables: Mapping[str, pd.DataFrame], + *, + max_auc: float, + seed: int = 42, + label: pd.Series | None = None, +) -> list[LeakageFinding]: + """Opt-in honest-feature baseline: 5-fold CV LR + HistGBM AUC. + + Trains on per-lead aggregates (``n_opps`` / ``max_acv`` / + ``mean_acv``, plus ``n_customers`` / ``n_subscriptions`` if those + tables are present) and asserts the mean cross-validated AUC stays + below ``max_auc``. + + Caller responsibilities: + + * ``max_auc`` is required. PR 2.1 ships this probe with no + calibrated threshold; PR 3.3 will land per-tier bands. + * ``label`` must be a :class:`pandas.Series` indexed by ``lead_id`` + (``index.name == "lead_id"``) **and** cover every lead in the + bundle. Both are enforced — a misaligned or partial label would + silently neutralise the probe (via the binary-cardinality gate + or NaN folds), which defeats the validator's purpose. + + The probe skips silently (no findings, no error) when: + + * scikit-learn is not installed; + * ``leads`` is missing or empty; + * the label is unavailable (no ``label`` argument and the public + bundle has correctly redacted ``converted_within_90_days``); + * the label has fewer than two classes after alignment; + * the smaller class has fewer members than the minimum needed for + stratified CV (``n_splits >= 2``). + """ + try: + from sklearn.ensemble import HistGradientBoostingClassifier + from sklearn.linear_model import LogisticRegression + from sklearn.metrics import roc_auc_score + from sklearn.model_selection import StratifiedKFold + from sklearn.pipeline import Pipeline + from sklearn.preprocessing import StandardScaler + except ImportError: + return [] + + leads = tables.get("leads") + if leads is None or len(leads) == 0: + return [] + + y_series = _resolve_label(leads, label) + if y_series is None: + return [] + + features = _build_relational_features(leads, tables) + if features.empty or len(features.columns) == 0: + return [] + + aligned = y_series.reindex(features.index) + if aligned.isna().any(): + missing = int(aligned.isna().sum()) + raise ValueError( + f"label is missing values for {missing} lead_id(s) present in the " + "bundle; supply a complete label or omit it to read from leads" + ) + y = aligned.astype(int) + if y.nunique(dropna=True) < 2: + return [] + + # Stratified CV needs at least n_splits members in each class. If the + # smaller class is below that, the probe can't run — skip silently + # (this is a sample-size constraint, not a leakage finding). + min_class = int(y.value_counts().min()) + n_splits = min(5, min_class) + if n_splits < 2: + return [] + + models: dict[str, Pipeline] = { + "logistic_regression": Pipeline( + [("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=1000))] + ), + "hist_gbm": Pipeline([("clf", HistGradientBoostingClassifier(random_state=seed))]), + } + skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed) + + findings: list[LeakageFinding] = [] + for name, pipe in models.items(): + aucs: list[float] = [] + for train_idx, test_idx in skf.split(features.values, y.values): + x_tr, x_te = features.values[train_idx], features.values[test_idx] + y_tr, y_te = y.values[train_idx], y.values[test_idx] + pipe.fit(x_tr, y_tr) + proba = pipe.predict_proba(x_te)[:, 1] + aucs.append(float(roc_auc_score(y_te, proba))) + auc_mean = sum(aucs) / len(aucs) + if auc_mean > max_auc: + findings.append( + LeakageFinding( + channel=CHANNEL_BONUS_MODEL, + detail=name, + message=( + f"{n_splits}-fold CV AUC {auc_mean:.3f} on join-derived " + f"features exceeds max_auc={max_auc:.3f}; honest " + "aggregates carry stronger signal than the band allows" + ), + ) + ) + return findings + + +# --------------------------------------------------------------------------- +# Orchestrators +# --------------------------------------------------------------------------- + + +def run_all_probes_on_dataframes( + tables: Mapping[str, pd.DataFrame], + *, + snapshot_day: int, + bonus_model_max_auc: float | None = None, + label: pd.Series | None = None, +) -> LeakageReport: + """Run every structural probe; run the bonus probe iff ``bonus_model_max_auc`` is set.""" + findings: list[LeakageFinding] = [] + findings += probe_banned_columns(tables) + findings += probe_banned_tables(tables.keys()) + findings += probe_deterministic_reconstruction(tables) + findings += probe_snapshot_window(tables, snapshot_day=snapshot_day) + if bonus_model_max_auc is not None: + findings += probe_bonus_model_auc(tables, max_auc=bonus_model_max_auc, label=label) + return LeakageReport(findings=tuple(findings)) + + +def run_all_probes( + bundle_dir: Path, + *, + snapshot_day: int, + bonus_model_max_auc: float | None = None, + label: pd.Series | None = None, +) -> LeakageReport: + """Run every structural probe against ``/tables/*.parquet``. + + Args: + bundle_dir: Bundle root (must contain ``tables/leads.parquet``). + snapshot_day: Snapshot window for the timestamp probe. The + caller (typically ``validate_bundle``) is expected to read + it from ``manifest.json``. + bonus_model_max_auc: Pass a numeric threshold to enable the + opt-in :func:`probe_bonus_model_auc`. ``None`` (default) + skips it — the calibrated band ships in PR 3.3. + label: Optional ground-truth labels for the bonus probe when + ``leads.converted_within_90_days`` has been redacted. Must + be indexed by ``lead_id`` (see :func:`probe_bonus_model_auc`). + Ignored when ``bonus_model_max_auc`` is ``None``. + + Raises: + FileNotFoundError: if ``/tables/`` is missing or + ``leads.parquet`` is not present. + """ + tables_dir = bundle_dir / "tables" + if not tables_dir.is_dir(): + raise FileNotFoundError(f"missing tables/ under {bundle_dir}") + if not (tables_dir / "leads.parquet").exists(): + raise FileNotFoundError(f"missing required leads.parquet under {tables_dir}") + + tables: dict[str, pd.DataFrame] = {} + for name in (*_PUBLIC_TABLES, *BANNED_TABLES): + path = tables_dir / f"{name}.parquet" + if path.exists(): + tables[name] = pd.read_parquet(path) + return run_all_probes_on_dataframes( + tables, + snapshot_day=snapshot_day, + bonus_model_max_auc=bonus_model_max_auc, + label=label, + ) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _empty_frame(dtype_map: dict[str, str]) -> pd.DataFrame: + return pd.DataFrame({c: pd.Series(dtype=d) for c, d in dtype_map.items()}) + + +def _resolve_label( + leads: pd.DataFrame, + label: pd.Series | None, +) -> pd.Series | None: + """Pick a label series to score against, or ``None`` to skip the probe. + + A caller-supplied ``label`` must be indexed by ``lead_id`` + (``index.name == "lead_id"``). Without that guarantee a misaligned + label would silently skip the probe via the binary-cardinality gate + downstream — exactly the kind of hidden no-op a leakage validator + must not have. + """ + if label is not None: + if label.index.name != "lead_id": + raise ValueError( + "label must be a pandas.Series indexed by lead_id " + f"(got index.name={label.index.name!r})" + ) + return label.astype("boolean").fillna(False).astype(int) + if "converted_within_90_days" in leads.columns: + return ( + leads.set_index("lead_id")["converted_within_90_days"] + .astype("boolean") + .fillna(False) + .astype(int) + ) + return None + + +def _build_relational_features( + leads: pd.DataFrame, + tables: Mapping[str, pd.DataFrame], +) -> pd.DataFrame: + """Per-lead aggregates from joinable public/optional relational tables. + + Honest features only — no aggregate of ``close_outcome``. Customers + and subscriptions counts are included only when the corresponding + tables exist (i.e. on a tampered bundle); on a clean public bundle + they default to 0 and the model can discard the column. + """ + opps = tables.get("opportunities") + customers = tables.get("customers") + subscriptions = tables.get("subscriptions") + + feats = leads[["lead_id"]].copy() + + if opps is not None and len(opps) > 0: + agg: dict[str, tuple[str, str]] = {"n_opps": ("opportunity_id", "count")} + if "estimated_acv" in opps.columns: + agg["max_acv"] = ("estimated_acv", "max") + agg["mean_acv"] = ("estimated_acv", "mean") + opp_agg = opps.groupby("lead_id").agg(**agg).reset_index() + feats = feats.merge(opp_agg, on="lead_id", how="left") + opp_to_lead = dict(zip(opps["opportunity_id"], opps["lead_id"], strict=False)) + else: + opp_to_lead = {} + + if customers is not None and len(customers) > 0: + cust = customers.copy() + cust["lead_id"] = cust["opportunity_id"].map(opp_to_lead) + cust_agg = cust.groupby("lead_id").size().rename("n_customers").reset_index() + feats = feats.merge(cust_agg, on="lead_id", how="left") + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) + else: + cust_to_opp = {} + + if subscriptions is not None and len(subscriptions) > 0: + subs = subscriptions.copy() + subs["opportunity_id"] = subs["customer_id"].map(cust_to_opp) + subs["lead_id"] = subs["opportunity_id"].map(opp_to_lead) + sub_agg = subs.groupby("lead_id").size().rename("n_subscriptions").reset_index() + feats = feats.merge(sub_agg, on="lead_id", how="left") + + fill_defaults: dict[str, float] = { + "n_opps": 0.0, + "max_acv": 0.0, + "mean_acv": 0.0, + "n_customers": 0.0, + "n_subscriptions": 0.0, + } + for col, default in fill_defaults.items(): + if col in feats.columns: + feats[col] = feats[col].fillna(default) + else: + feats[col] = default + + feature_cols = list(fill_defaults.keys()) + return feats.set_index("lead_id")[feature_cols].astype(float) + + +__all__ = [ + "BANNED_LEAD_COLUMNS", + "BANNED_OPP_COLUMNS", + "BANNED_TABLES", + "CHANNEL_BANNED_COLUMN", + "CHANNEL_BANNED_TABLE", + "CHANNEL_BONUS_MODEL", + "CHANNEL_JOIN_RECONSTRUCTION", + "CHANNEL_SNAPSHOT_WINDOW", + "LeakageFinding", + "LeakageReport", + "RelationalLeakageError", + "SNAPSHOT_FILTERED_TABLES", + "deterministic_relational_reconstruction", + "probe_banned_columns", + "probe_banned_tables", + "probe_bonus_model_auc", + "probe_deterministic_reconstruction", + "probe_snapshot_window", + "run_all_probes", + "run_all_probes_on_dataframes", +] diff --git a/scripts/probe_relational_leakage.py b/scripts/probe_relational_leakage.py index 88abc7e..4aa08af 100644 --- a/scripts/probe_relational_leakage.py +++ b/scripts/probe_relational_leakage.py @@ -68,98 +68,18 @@ import pandas as pd +# Re-export from the canonical package location. PR 2.1 lifted this +# function into ``leadforge/validation/relational_leakage.py``; the +# script keeps it accessible at ``probe_module.deterministic_relational_reconstruction`` +# (and at the CLI level) so callers and existing tests remain stable. +from leadforge.validation.relational_leakage import ( + deterministic_relational_reconstruction, +) + REQUIRED_TABLES = ("leads", "opportunities") DEFAULT_HORIZON_DAYS = 90 -def deterministic_relational_reconstruction( - leads: pd.DataFrame, - opportunities: pd.DataFrame, - customers: pd.DataFrame, - subscriptions: pd.DataFrame, -) -> pd.DataFrame: - """Reconstruct ``converted_within_90_days`` from public relational joins. - - Returns a DataFrame indexed by ``lead_id`` with five boolean columns, - one per reconstruction path (A-E). Path E is the union of B, C, D and - is the headline relational-leakage prediction. - - No hidden state, no model fit — pure joins. Designed to be lifted into - ``leadforge/validation/leakage_probes.py`` (PR 3.1) as the relational - leakage probe. - - Empty ``customers``/``subscriptions`` frames are accepted (Phase 2 - success state); the corresponding paths simply return all-False. - - Raises: - ValueError: if ``leads.lead_id`` contains duplicates. A validator - cannot operate safely on non-unique keys. - """ - if not leads["lead_id"].is_unique: - raise ValueError("leads.lead_id must be unique") - - leads_idx = leads.set_index("lead_id", drop=False) - - # Path A — the label itself, if present in public leads. - # Plain ``astype(bool)`` would map NaN to True; route through pandas' - # nullable boolean dtype so missing values fill cleanly to False without - # triggering object-downcast warnings. - if "converted_within_90_days" in leads.columns: - path_a = leads_idx["converted_within_90_days"].astype("boolean").fillna(False).astype(bool) - else: - path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") - - # Path B — any opportunity with close_outcome == "closed_won". - if "close_outcome" in opportunities.columns and len(opportunities) > 0: - won_leads = set( - opportunities.loc[opportunities["close_outcome"] == "closed_won", "lead_id"] - ) - else: - won_leads = set() - path_b = leads_idx["lead_id"].isin(won_leads) - - # Path C — lead has any joined customer (via opportunity_id -> opportunity.lead_id). - if len(opportunities) > 0: - opp_to_lead = dict( - zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=False) - ) - else: - opp_to_lead = {} - customer_leads = { - opp_to_lead[opp_id] for opp_id in customers["opportunity_id"] if opp_id in opp_to_lead - } - path_c = leads_idx["lead_id"].isin(customer_leads) - - # Path D — lead has any joined subscription (sub -> customer -> opportunity -> lead). - if len(customers) > 0: - cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) - else: - cust_to_opp = {} - sub_leads: set[str] = set() - for cust_id in subscriptions["customer_id"]: - opp_id = cust_to_opp.get(cust_id) - if opp_id is None: - continue - lead_id = opp_to_lead.get(opp_id) - if lead_id is not None: - sub_leads.add(lead_id) - path_d = leads_idx["lead_id"].isin(sub_leads) - - # Path E — deterministic OR of B, C, D (the headline join-only path). - path_e = path_b | path_c | path_d - - return pd.DataFrame( - { - "path_a_direct_label": path_a.values, - "path_b_opportunity_won": path_b.values, - "path_c_customer_exists": path_c.values, - "path_d_subscription_exists": path_d.values, - "path_e_or_b_c_d": path_e.values, - }, - index=leads_idx.index, - ) - - def _binary_metrics(y_true: pd.Series, y_pred: pd.Series) -> dict[str, float]: """Accuracy / precision / recall / F1 / counts for boolean predictions. diff --git a/tests/render/test_relational_snapshot_safe.py b/tests/render/test_relational_snapshot_safe.py new file mode 100644 index 0000000..09dcdfd --- /dev/null +++ b/tests/render/test_relational_snapshot_safe.py @@ -0,0 +1,304 @@ +"""Tests for ``leadforge/render/relational_snapshot_safe.py``.""" + +from __future__ import annotations + +import pandas as pd +import pytest + +from leadforge.render.relational_snapshot_safe import ( + BANNED_LEAD_COLUMNS, + BANNED_OPP_COLUMNS, + BANNED_TABLES, + SNAPSHOT_FILTERED_TABLES, + to_dataframes_snapshot_safe, +) + +# --------------------------------------------------------------------------- +# Synthetic fixtures +# +# Two leads, anchored on the same date. Events span 0d, 5d, 12d, 35d after +# anchor — a snapshot_day=10 must keep day-0 / day-5 and drop day-12 / day-35. +# --------------------------------------------------------------------------- + +ANCHOR = pd.Timestamp("2026-01-01") + + +def _ts(offset_days: int) -> str: + return (ANCHOR + pd.Timedelta(days=offset_days)).isoformat() + + +def _full_horizon_dict() -> dict[str, pd.DataFrame]: + accounts = pd.DataFrame( + [ + {"account_id": "acct_1", "industry": "saas"}, + {"account_id": "acct_2", "industry": "saas"}, + ] + ) + contacts = pd.DataFrame( + [ + {"contact_id": "c_1", "account_id": "acct_1", "seniority": "vp"}, + {"contact_id": "c_2", "account_id": "acct_2", "seniority": "manager"}, + ] + ) + leads = pd.DataFrame( + [ + { + "lead_id": "lead_1", + "account_id": "acct_1", + "contact_id": "c_1", + "lead_created_at": ANCHOR.isoformat(), + "current_stage": "closed_won", + "is_sql": True, + "converted_within_90_days": True, + "conversion_timestamp": _ts(40), + }, + { + "lead_id": "lead_2", + "account_id": "acct_2", + "contact_id": "c_2", + "lead_created_at": ANCHOR.isoformat(), + "current_stage": "discovery", + "is_sql": False, + "converted_within_90_days": False, + "conversion_timestamp": None, + }, + ] + ) + touches = pd.DataFrame( + [ + {"touch_id": "t_1", "lead_id": "lead_1", "touch_timestamp": _ts(0)}, + {"touch_id": "t_2", "lead_id": "lead_1", "touch_timestamp": _ts(5)}, + {"touch_id": "t_3", "lead_id": "lead_1", "touch_timestamp": _ts(12)}, + {"touch_id": "t_4", "lead_id": "lead_2", "touch_timestamp": _ts(35)}, + ] + ) + sessions = pd.DataFrame( + [ + {"session_id": "s_1", "lead_id": "lead_1", "session_timestamp": _ts(0)}, + {"session_id": "s_2", "lead_id": "lead_1", "session_timestamp": _ts(20)}, + ] + ) + sales_activities = pd.DataFrame( + [ + {"activity_id": "a_1", "lead_id": "lead_1", "activity_timestamp": _ts(7)}, + {"activity_id": "a_2", "lead_id": "lead_2", "activity_timestamp": _ts(15)}, + ] + ) + opportunities = pd.DataFrame( + [ + { + "opportunity_id": "opp_1", + "lead_id": "lead_1", + "created_at": _ts(8), + "stage": "closed_won", + "estimated_acv": 50_000, + "close_outcome": "closed_won", + "closed_at": _ts(40), + }, + { + "opportunity_id": "opp_2", + "lead_id": "lead_1", + "created_at": _ts(30), + "stage": "negotiation", + "estimated_acv": 60_000, + "close_outcome": None, + "closed_at": None, + }, + ] + ) + customers = pd.DataFrame( + [ + { + "customer_id": "cu_1", + "opportunity_id": "opp_1", + "account_id": "acct_1", + "customer_start_at": _ts(40), + } + ] + ) + subscriptions = pd.DataFrame( + [ + { + "subscription_id": "sub_1", + "customer_id": "cu_1", + "plan_name": "starter", + "subscription_start_at": _ts(40), + "subscription_status": "active", + } + ] + ) + return { + "accounts": accounts, + "contacts": contacts, + "leads": leads, + "touches": touches, + "sessions": sessions, + "sales_activities": sales_activities, + "opportunities": opportunities, + "customers": customers, + "subscriptions": subscriptions, + } + + +# --------------------------------------------------------------------------- +# Property tests +# --------------------------------------------------------------------------- + + +def test_drops_banned_columns_from_leads_and_opportunities() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + for col in BANNED_LEAD_COLUMNS: + assert col not in out["leads"].columns, f"leads should not contain banned column {col}" + for col in BANNED_OPP_COLUMNS: + assert col not in out["opportunities"].columns, ( + f"opportunities should not contain banned column {col}" + ) + + +def test_omits_banned_tables_entirely() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + for name in BANNED_TABLES: + assert name not in out, f"output dict should not contain banned table {name}" + + +def test_event_tables_filtered_to_snapshot_window() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + + # touches: kept day-0 & day-5 (lead_1); dropped day-12 (lead_1) and day-35 (lead_2). + assert sorted(out["touches"]["touch_id"]) == ["t_1", "t_2"] + # sessions: kept day-0; dropped day-20. + assert sorted(out["sessions"]["session_id"]) == ["s_1"] + # sales_activities: kept day-7 (lead_1); dropped day-15 (lead_2). + assert sorted(out["sales_activities"]["activity_id"]) == ["a_1"] + # opportunities: kept day-8 (lead_1); dropped day-30 (lead_1). + assert sorted(out["opportunities"]["opportunity_id"]) == ["opp_1"] + + +def test_snapshot_window_invariant_holds_per_lead() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + leads_anchor = out["leads"].set_index("lead_id")["lead_created_at"].apply(pd.Timestamp) + horizon = pd.Timedelta(days=10) + + for name, ts_col in SNAPSHOT_FILTERED_TABLES: + df = out[name] + if df.empty: + continue + for _, row in df.iterrows(): + anchor = leads_anchor[row["lead_id"]] + assert pd.Timestamp(row[ts_col]) <= anchor + horizon, ( + f"{name}.{ts_col} for lead {row['lead_id']} exceeds window" + ) + + +def test_accounts_and_contacts_pass_through_unchanged() -> None: + src = _full_horizon_dict() + out = to_dataframes_snapshot_safe(src, snapshot_day=10) + pd.testing.assert_frame_equal(out["accounts"], src["accounts"]) + pd.testing.assert_frame_equal(out["contacts"], src["contacts"]) + + +def test_idempotent_on_already_safe_input() -> None: + once = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + twice = to_dataframes_snapshot_safe(once, snapshot_day=10) + assert set(once.keys()) == set(twice.keys()) + for name in once: + pd.testing.assert_frame_equal(once[name], twice[name]) + + +def test_deterministic_across_two_calls() -> None: + a = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + b = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + assert set(a.keys()) == set(b.keys()) + for name in a: + pd.testing.assert_frame_equal(a[name], b[name]) + + +def test_does_not_mutate_input_frames() -> None: + src = _full_horizon_dict() + leads_before = src["leads"].copy(deep=True) + opps_before = src["opportunities"].copy(deep=True) + touches_before = src["touches"].copy(deep=True) + + to_dataframes_snapshot_safe(src, snapshot_day=10) + + pd.testing.assert_frame_equal(src["leads"], leads_before) + pd.testing.assert_frame_equal(src["opportunities"], opps_before) + pd.testing.assert_frame_equal(src["touches"], touches_before) + + +def test_canonical_output_table_order() -> None: + out = to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=10) + assert list(out.keys()) == [ + "accounts", + "contacts", + "leads", + "touches", + "sessions", + "sales_activities", + "opportunities", + ] + + +def test_handles_missing_optional_tables() -> None: + src = _full_horizon_dict() + minimal = {"leads": src["leads"], "opportunities": src["opportunities"]} + out = to_dataframes_snapshot_safe(minimal, snapshot_day=10) + assert "leads" in out + assert "opportunities" in out + assert "touches" not in out + assert "accounts" not in out + + +def test_zero_snapshot_day_keeps_only_anchor_day_events() -> None: + src = _full_horizon_dict() + out = to_dataframes_snapshot_safe(src, snapshot_day=0) + # Only the day-0 touches and sessions survive. + assert sorted(out["touches"]["touch_id"]) == ["t_1"] + assert sorted(out["sessions"]["session_id"]) == ["s_1"] + # No sales_activities or opportunities at day-0. + assert out["sales_activities"].empty + assert out["opportunities"].empty + + +def test_negative_snapshot_day_raises() -> None: + with pytest.raises(ValueError, match="non-negative"): + to_dataframes_snapshot_safe(_full_horizon_dict(), snapshot_day=-1) + + +def test_missing_leads_raises() -> None: + with pytest.raises(ValueError, match="must contain a 'leads'"): + to_dataframes_snapshot_safe({}, snapshot_day=10) + + +def test_leads_missing_anchor_columns_raises() -> None: + bad = {"leads": pd.DataFrame([{"lead_id": "lead_1"}])} + with pytest.raises(ValueError, match="missing required columns"): + to_dataframes_snapshot_safe(bad, snapshot_day=10) + + +def test_duplicate_lead_id_raises() -> None: + """Per-lead snapshot filter would broadcast on duplicates; matches the + same invariant asserted by ``deterministic_relational_reconstruction``.""" + src = _full_horizon_dict() + src["leads"] = pd.concat([src["leads"], src["leads"].iloc[[0]]], ignore_index=True) + with pytest.raises(ValueError, match="lead_id must be unique"): + to_dataframes_snapshot_safe(src, snapshot_day=10) + + +def test_nat_lead_created_at_raises() -> None: + """A NaT/null anchor would silently drop every event for the affected + lead via ``ts <= NaT`` -> NaN -> fillna(False). Must raise instead.""" + src = _full_horizon_dict() + src["leads"] = src["leads"].copy() + src["leads"].loc[0, "lead_created_at"] = None + with pytest.raises(ValueError, match="unparseable / null"): + to_dataframes_snapshot_safe(src, snapshot_day=10) + + +def test_unparseable_lead_created_at_raises() -> None: + """Garbage anchor strings would coerce to NaT and silently misbehave.""" + src = _full_horizon_dict() + src["leads"] = src["leads"].copy() + src["leads"].loc[0, "lead_created_at"] = "not-a-date" + with pytest.raises(ValueError, match="unparseable / null"): + to_dataframes_snapshot_safe(src, snapshot_day=10) diff --git a/tests/validation/test_relational_leakage.py b/tests/validation/test_relational_leakage.py new file mode 100644 index 0000000..989fb78 --- /dev/null +++ b/tests/validation/test_relational_leakage.py @@ -0,0 +1,594 @@ +"""Tests for ``leadforge/validation/relational_leakage.py``. + +Each probe is exercised against two synthetic minimal bundles: + +* a *clean* bundle, produced by running the same source frames through + :func:`leadforge.render.relational_snapshot_safe.to_dataframes_snapshot_safe`, + on which every probe must produce zero findings; +* a *tampered* bundle, in which one leakage channel at a time is + re-introduced, on which the matching probe must fire with a finding + that pins the channel and the offending detail. +""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd +import pytest + +from leadforge.render.relational_snapshot_safe import to_dataframes_snapshot_safe +from leadforge.validation.relational_leakage import ( + CHANNEL_BANNED_COLUMN, + CHANNEL_BANNED_TABLE, + CHANNEL_BONUS_MODEL, + CHANNEL_JOIN_RECONSTRUCTION, + LeakageFinding, + LeakageReport, + RelationalLeakageError, + deterministic_relational_reconstruction, + probe_banned_columns, + probe_banned_tables, + probe_bonus_model_auc, + probe_deterministic_reconstruction, + probe_snapshot_window, + run_all_probes, + run_all_probes_on_dataframes, +) + +ANCHOR = pd.Timestamp("2026-01-01") +SNAPSHOT_DAY = 10 + + +def _ts(offset_days: int) -> str: + return (ANCHOR + pd.Timedelta(days=offset_days)).isoformat() + + +def _full_horizon_bundle(*, n_each: int = 25) -> dict[str, pd.DataFrame]: + """A balanced bundle with 2*n_each leads — half converted, half not. + + Both converted (``lead_C_*``) and unconverted (``lead_U_*``) leads + own a single opportunity; only converted leads own a customer and a + subscription. This mirrors the realistic shape — opportunities for + everyone, conversion-conditional only customers/subscriptions — so + that the structural probes have something to bite on without + accidentally turning ``n_opps`` into a perfect predictor for the + opt-in bonus-model probe. + """ + leads_rows: list[dict] = [] + opps_rows: list[dict] = [] + cust_rows: list[dict] = [] + sub_rows: list[dict] = [] + touches_rows: list[dict] = [] + sessions_rows: list[dict] = [] + activities_rows: list[dict] = [] + + for i in range(n_each): + cid = f"lead_C_{i:03d}" + ucid = f"lead_U_{i:03d}" + leads_rows.append( + { + "lead_id": cid, + "account_id": f"acct_C_{i:03d}", + "contact_id": f"con_C_{i:03d}", + "lead_created_at": ANCHOR.isoformat(), + "current_stage": "closed_won", + "is_sql": True, + "converted_within_90_days": True, + "conversion_timestamp": _ts(40), + } + ) + leads_rows.append( + { + "lead_id": ucid, + "account_id": f"acct_U_{i:03d}", + "contact_id": f"con_U_{i:03d}", + "lead_created_at": ANCHOR.isoformat(), + "current_stage": "discovery", + "is_sql": False, + "converted_within_90_days": False, + "conversion_timestamp": None, + } + ) + # ACV drawn from the same distribution for both classes — keeps + # ``max_acv`` / ``mean_acv`` uninformative for the bonus probe + # so its enabled-but-clean test path is stable. + acv = 30_000 + (i % 10) * 2_000 + opps_rows.append( + { + "opportunity_id": f"opp_C_{i:03d}", + "lead_id": cid, + "created_at": _ts(5), + "stage": "closed_won", + "estimated_acv": acv, + "close_outcome": "closed_won", + "closed_at": _ts(40), + } + ) + opps_rows.append( + { + "opportunity_id": f"opp_U_{i:03d}", + "lead_id": ucid, + "created_at": _ts(6), + "stage": "negotiation", + "estimated_acv": acv, + "close_outcome": None, + "closed_at": None, + } + ) + cust_rows.append( + { + "customer_id": f"cu_{i:03d}", + "opportunity_id": f"opp_C_{i:03d}", + "account_id": f"acct_C_{i:03d}", + "customer_start_at": _ts(40), + } + ) + sub_rows.append( + { + "subscription_id": f"sub_{i:03d}", + "customer_id": f"cu_{i:03d}", + "plan_name": "starter", + "subscription_start_at": _ts(40), + "subscription_status": "active", + } + ) + touches_rows.append({"touch_id": f"t_C_{i:03d}", "lead_id": cid, "touch_timestamp": _ts(2)}) + sessions_rows.append( + {"session_id": f"s_C_{i:03d}", "lead_id": cid, "session_timestamp": _ts(3)} + ) + activities_rows.append( + {"activity_id": f"a_C_{i:03d}", "lead_id": cid, "activity_timestamp": _ts(7)} + ) + touches_rows.append( + {"touch_id": f"t_U_{i:03d}", "lead_id": ucid, "touch_timestamp": _ts(2)} + ) + + return { + "accounts": pd.DataFrame( + [{"account_id": r["account_id"]} for r in leads_rows] + ).drop_duplicates(), + "contacts": pd.DataFrame( + [{"contact_id": r["contact_id"], "account_id": r["account_id"]} for r in leads_rows] + ).drop_duplicates(), + "leads": pd.DataFrame(leads_rows), + "touches": pd.DataFrame(touches_rows), + "sessions": pd.DataFrame(sessions_rows), + "sales_activities": pd.DataFrame(activities_rows), + "opportunities": pd.DataFrame(opps_rows), + "customers": pd.DataFrame(cust_rows), + "subscriptions": pd.DataFrame(sub_rows), + } + + +def _clean_bundle() -> dict[str, pd.DataFrame]: + """Snapshot-safe bundle — every probe should report zero findings.""" + return to_dataframes_snapshot_safe(_full_horizon_bundle(), snapshot_day=SNAPSHOT_DAY) + + +def _label_for(bundle: dict[str, pd.DataFrame]) -> pd.Series: + """Held-back ground truth for the bonus-model probe on a clean bundle.""" + src = _full_horizon_bundle() + return src["leads"].set_index("lead_id")["converted_within_90_days"] + + +# --------------------------------------------------------------------------- +# Clean bundle — every probe must be silent. +# --------------------------------------------------------------------------- + + +def test_clean_bundle_passes_all_probes() -> None: + """Default orchestrator (structural probes only) must report zero findings.""" + tables = _clean_bundle() + report = run_all_probes_on_dataframes(tables, snapshot_day=SNAPSHOT_DAY) + assert report.ok, [f"[{f.channel}] {f.detail}: {f.message}" for f in report.findings] + + +def test_clean_bundle_passes_with_bonus_probe_enabled() -> None: + """Bonus probe is opt-in; when enabled with a generous threshold, the + clean bundle still reports zero findings — exercising the full code + path without flaking on synthetic-data noise.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + report = run_all_probes_on_dataframes( + tables, + snapshot_day=SNAPSHOT_DAY, + bonus_model_max_auc=0.95, + label=_label_for(tables), + ) + assert report.ok, [f"[{f.channel}] {f.detail}: {f.message}" for f in report.findings] + + +def test_clean_bundle_individual_probes_silent() -> None: + tables = _clean_bundle() + assert probe_banned_columns(tables) == [] + assert probe_banned_tables(tables.keys()) == [] + assert probe_deterministic_reconstruction(tables) == [] + assert probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) == [] + + +# --------------------------------------------------------------------------- +# Banned columns (Path A). +# --------------------------------------------------------------------------- + + +def test_banned_column_in_leads_fires() -> None: + tables = _clean_bundle() + tables["leads"] = tables["leads"].assign(converted_within_90_days=[True] * len(tables["leads"])) + findings = probe_banned_columns(tables) + assert any( + f.channel == CHANNEL_BANNED_COLUMN and f.detail == "leads.converted_within_90_days" + for f in findings + ) + + +def test_banned_column_in_opportunities_fires() -> None: + tables = _clean_bundle() + tables["opportunities"] = tables["opportunities"].assign( + close_outcome=["closed_won"] * len(tables["opportunities"]) + ) + findings = probe_banned_columns(tables) + assert any( + f.channel == CHANNEL_BANNED_COLUMN and f.detail == "opportunities.close_outcome" + for f in findings + ) + + +# --------------------------------------------------------------------------- +# Banned tables. +# --------------------------------------------------------------------------- + + +def test_banned_table_customers_present_fires() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + findings = probe_banned_tables(tables.keys()) + assert any(f.channel == CHANNEL_BANNED_TABLE and f.detail == "customers" for f in findings) + + +def test_banned_table_subscriptions_present_fires() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["subscriptions"] = src["subscriptions"] + findings = probe_banned_tables(tables.keys()) + assert any(f.channel == CHANNEL_BANNED_TABLE and f.detail == "subscriptions" for f in findings) + + +# --------------------------------------------------------------------------- +# Deterministic paths B / C / D. +# --------------------------------------------------------------------------- + + +def test_deterministic_path_b_fires_when_close_outcome_present() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["opportunities"] = src["opportunities"] # restores close_outcome + findings = probe_deterministic_reconstruction(tables) + assert any(f.detail == "path_b_opportunity_won" for f in findings) + + +def test_deterministic_path_c_fires_when_customers_present() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + findings = probe_deterministic_reconstruction(tables) + assert any(f.detail == "path_c_customer_exists" for f in findings) + + +def test_deterministic_path_d_fires_when_subscriptions_present() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + tables["subscriptions"] = src["subscriptions"] + findings = probe_deterministic_reconstruction(tables) + assert any(f.detail == "path_d_subscription_exists" for f in findings) + + +def test_deterministic_probe_does_not_flag_path_a() -> None: + """Path A is covered by probe_banned_columns; the deterministic probe + must remain silent on it to avoid double-counting the same finding.""" + tables = _clean_bundle() + tables["leads"] = tables["leads"].assign(converted_within_90_days=[True] * len(tables["leads"])) + findings = probe_deterministic_reconstruction(tables) + assert all(f.detail != "path_a_direct_label" for f in findings) + + +# --------------------------------------------------------------------------- +# Snapshot window. +# --------------------------------------------------------------------------- + + +def test_snapshot_window_fires_on_late_event() -> None: + tables = _clean_bundle() + leaked = pd.DataFrame( + [ + { + "touch_id": "t_late", + "lead_id": tables["leads"]["lead_id"].iloc[0], + "touch_timestamp": _ts(SNAPSHOT_DAY + 5), + } + ] + ) + tables["touches"] = pd.concat([tables["touches"], leaked], ignore_index=True) + findings = probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) + assert any(f.detail == "touches.touch_timestamp" for f in findings) + + +def test_snapshot_window_silent_on_clean_bundle() -> None: + tables = _clean_bundle() + assert probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) == [] + + +def test_snapshot_window_negative_day_raises() -> None: + tables = _clean_bundle() + with pytest.raises(ValueError, match="non-negative"): + probe_snapshot_window(tables, snapshot_day=-1) + + +def test_snapshot_window_duplicate_lead_id_raises() -> None: + """Duplicate lead_ids would broadcast in the merge; matches the + same invariant asserted by deterministic_relational_reconstruction.""" + tables = _clean_bundle() + tables["leads"] = pd.concat([tables["leads"], tables["leads"].iloc[[0]]], ignore_index=True) + with pytest.raises(ValueError, match="lead_id must be unique"): + probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) + + +def test_snapshot_window_nat_lead_created_at_raises() -> None: + """NaT in the anchor propagates to NaT cutoffs, masking violations + via the fillna(False) in the count expression. The probe must + refuse to operate on a malformed anchor.""" + tables = _clean_bundle() + tables["leads"] = tables["leads"].copy() + tables["leads"].loc[0, "lead_created_at"] = None + with pytest.raises(ValueError, match="unparseable / null"): + probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) + + +def test_snapshot_window_orphan_event_raises() -> None: + """An event row whose lead_id is absent from leads gets a NaT cutoff + after the left-merge; the count would silently miss it. Treat + orphan events as a structural violation and raise.""" + tables = _clean_bundle() + orphan = pd.DataFrame( + [{"touch_id": "t_orphan", "lead_id": "lead_does_not_exist", "touch_timestamp": _ts(2)}] + ) + tables["touches"] = pd.concat([tables["touches"], orphan], ignore_index=True) + with pytest.raises(ValueError, match="touches.parquet has 1 row.*absent from leads"): + probe_snapshot_window(tables, snapshot_day=SNAPSHOT_DAY) + + +# --------------------------------------------------------------------------- +# Bonus-model probe. +# --------------------------------------------------------------------------- + + +def test_bonus_model_probe_fires_when_customers_reintroduced() -> None: + """Re-adding customers to a clean bundle gives the model n_customers as + a perfect predictor — AUC should saturate near 1.0 and exceed any sane + band (we use a deliberately tight 0.95 to guard against flakiness on + small synthetic data).""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + tables["subscriptions"] = src["subscriptions"] + + label = _label_for(tables) + findings = probe_bonus_model_auc(tables, max_auc=0.95, label=label) + assert findings, "expected the bonus model to detect the customers leak" + assert all(f.channel == CHANNEL_BONUS_MODEL for f in findings) + + +def test_bonus_model_probe_skipped_without_label() -> None: + """If `leads.converted_within_90_days` is redacted (clean bundle) and + no `label` is supplied, the probe has nothing to score against and + must skip cleanly with zero findings, not error.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + assert probe_bonus_model_auc(tables, max_auc=0.65, label=None) == [] + + +def test_bonus_model_probe_rejects_misaligned_label() -> None: + """A label not indexed by lead_id would silently misalign and skip the + probe via the binary-cardinality gate — defeating the validator. + The probe must reject it loudly instead.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + bad_label = src["leads"]["converted_within_90_days"].reset_index(drop=True) + assert bad_label.index.name != "lead_id" + with pytest.raises(ValueError, match="indexed by lead_id"): + probe_bonus_model_auc(tables, max_auc=0.65, label=bad_label) + + +def test_bonus_model_probe_rejects_partial_label() -> None: + """A label that covers only a subset of bundle lead_ids would + introduce NaN after reindex; the resulting astype(int) cast used to + crash with an opaque error. The probe must raise a clear + ValueError naming the gap instead.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + full_label = _label_for(tables) + partial = full_label.iloc[:-5] + with pytest.raises(ValueError, match="missing values for"): + probe_bonus_model_auc(tables, max_auc=0.65, label=partial) + + +def test_bonus_model_probe_skips_when_class_too_small_for_cv() -> None: + """When the smaller class has fewer than 2 members, + StratifiedKFold cannot run; the probe must skip silently rather + than raise a sklearn-internal error.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + leads = tables["leads"].head(6).copy() + tables["leads"] = leads + label = pd.Series( + [True, True, True, True, True, False], + index=leads["lead_id"].to_numpy(), + name="converted_within_90_days", + ) + label.index.name = "lead_id" + assert probe_bonus_model_auc(tables, max_auc=0.65, label=label) == [] + + +def test_bonus_model_probe_uses_smaller_n_splits_when_class_count_lt_5() -> None: + """When 2 <= min_class_count < 5, the probe must size n_splits + accordingly — exercised by labelling only 3 positives in a 12-lead + bundle (n_splits should drop to 3).""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + leads = tables["leads"].head(12).copy() + tables["leads"] = leads + tables["customers"] = src["customers"] # leak signal so AUC saturates + tables["subscriptions"] = src["subscriptions"] + + truth = [True, True, True] + [False] * 9 + label = pd.Series(truth, index=leads["lead_id"].to_numpy(), name="converted_within_90_days") + label.index.name = "lead_id" + findings = probe_bonus_model_auc(tables, max_auc=0.5, label=label) + assert findings, "expected the bonus model to fire — n_splits should adapt to min_class=3" + assert all(f.message.startswith("3-fold") for f in findings), ( + "n_splits must downshift to 3 when the minority class has only 3 members" + ) + + +def test_orchestrator_skips_bonus_probe_by_default() -> None: + """run_all_probes_on_dataframes(... bonus_model_max_auc=None) must skip + the bonus probe even when customers/subscriptions would trigger it. + This is the post-PR-2.1 default; PR 3.3 calibrates and turns it on.""" + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + tables["subscriptions"] = src["subscriptions"] + + # The bonus probe would fire here (customers+subs reintroduced), but + # the structural probes will fire first regardless. Filter to bonus- + # channel findings to assert the bonus probe is the one that didn't run. + report = run_all_probes_on_dataframes(tables, snapshot_day=SNAPSHOT_DAY) + assert all(f.channel != CHANNEL_BONUS_MODEL for f in report.findings) + + +def test_orchestrator_runs_bonus_probe_when_enabled() -> None: + pytest.importorskip("sklearn") + tables = _clean_bundle() + src = _full_horizon_bundle() + tables["customers"] = src["customers"] + tables["subscriptions"] = src["subscriptions"] + + report = run_all_probes_on_dataframes( + tables, + snapshot_day=SNAPSHOT_DAY, + bonus_model_max_auc=0.95, + label=_label_for(tables), + ) + assert any(f.channel == CHANNEL_BONUS_MODEL for f in report.findings) + + +# --------------------------------------------------------------------------- +# Orchestrator + report ergonomics. +# --------------------------------------------------------------------------- + + +def test_orchestrator_aggregates_findings_across_channels() -> None: + tables = _clean_bundle() + src = _full_horizon_bundle() + # Re-introduce two channels at once: the label column AND customers. + tables["leads"] = tables["leads"].assign(converted_within_90_days=[True] * len(tables["leads"])) + tables["customers"] = src["customers"] + + report = run_all_probes_on_dataframes(tables, snapshot_day=SNAPSHOT_DAY) + assert not report.ok + channels = {f.channel for f in report.findings} + assert CHANNEL_BANNED_COLUMN in channels + assert CHANNEL_BANNED_TABLE in channels + assert CHANNEL_JOIN_RECONSTRUCTION in channels + + +def test_report_raise_if_failing_carries_report() -> None: + tables = _clean_bundle() + tables["leads"] = tables["leads"].assign(converted_within_90_days=[True] * len(tables["leads"])) + report = run_all_probes_on_dataframes(tables, snapshot_day=SNAPSHOT_DAY) + with pytest.raises(RelationalLeakageError) as excinfo: + report.raise_if_failing() + assert excinfo.value.report is report + assert "banned_column" in str(excinfo.value) + + +def test_report_ok_property() -> None: + assert LeakageReport(findings=()).ok is True + f = LeakageFinding(channel=CHANNEL_BANNED_COLUMN, detail="leads.x", message="...") + assert LeakageReport(findings=(f,)).ok is False + + +# --------------------------------------------------------------------------- +# File-based orchestrator. +# --------------------------------------------------------------------------- + + +def _write_bundle_to_disk(tables: dict[str, pd.DataFrame], root: Path) -> Path: + tables_dir = root / "tables" + tables_dir.mkdir(parents=True, exist_ok=True) + for name, df in tables.items(): + df.to_parquet(tables_dir / f"{name}.parquet", index=False) + return root + + +def test_run_all_probes_reads_bundle_from_disk(tmp_path: Path) -> None: + bundle_dir = _write_bundle_to_disk(_clean_bundle(), tmp_path / "clean") + report = run_all_probes(bundle_dir, snapshot_day=SNAPSHOT_DAY) + assert report.ok, [f"[{f.channel}] {f.detail}: {f.message}" for f in report.findings] + + +def test_run_all_probes_detects_disk_bundle_leakage(tmp_path: Path) -> None: + src = _full_horizon_bundle() + bundle_dir = _write_bundle_to_disk(src, tmp_path / "leaky") + report = run_all_probes(bundle_dir, snapshot_day=SNAPSHOT_DAY) + assert not report.ok + channels = {f.channel for f in report.findings} + assert CHANNEL_BANNED_COLUMN in channels + assert CHANNEL_BANNED_TABLE in channels + assert CHANNEL_JOIN_RECONSTRUCTION in channels + + +def test_run_all_probes_missing_tables_dir_raises(tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError, match="missing tables/"): + run_all_probes(tmp_path, snapshot_day=SNAPSHOT_DAY) + + +def test_run_all_probes_missing_leads_raises(tmp_path: Path) -> None: + (tmp_path / "tables").mkdir() + with pytest.raises(FileNotFoundError, match="leads.parquet"): + run_all_probes(tmp_path, snapshot_day=SNAPSHOT_DAY) + + +# --------------------------------------------------------------------------- +# Lifted function — sanity check that the package and the script agree. +# --------------------------------------------------------------------------- + + +def test_deterministic_function_matches_script_export() -> None: + """The script re-exports the lifted function; calling each must yield + identical output on the same inputs.""" + import importlib.util + import sys + + script_path = Path(__file__).resolve().parents[2] / "scripts" / "probe_relational_leakage.py" + spec = importlib.util.spec_from_file_location("probe_relational_leakage_check", script_path) + assert spec is not None + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules["probe_relational_leakage_check"] = module + spec.loader.exec_module(module) + + src = _full_horizon_bundle() + a = deterministic_relational_reconstruction( + src["leads"], src["opportunities"], src["customers"], src["subscriptions"] + ) + b = module.deterministic_relational_reconstruction( + src["leads"], src["opportunities"], src["customers"], src["subscriptions"] + ) + pd.testing.assert_frame_equal(a, b)