From 142273a67266ff2ed10b665cd85126cabc397b99 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 19:20:00 +0300 Subject: [PATCH 1/3] =?UTF-8?q?docs:=20PR=201.1=20=E2=80=94=20relational?= =?UTF-8?q?=20leakage=20audit=20on=20alpha=20bundles?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reproduces the ChatGPT v2 finding on the actual 5000-lead alpha bundles (`release/{intro,intermediate,advanced}/`). All three public tiers reconstruct `converted_within_90_days` at 100% via every probed path; the join-derived LR / HistGBM model also achieves AUC = 1.000 in 5-fold CV. Adds: - `scripts/probe_relational_leakage.py` — five deterministic reconstruction paths (A direct, B opportunity won, C customer exists, D subscription exists, E B∨C∨D) plus a bonus public-relational-only model. The deterministic reconstruction is exposed as a pure function `deterministic_relational_reconstruction(...)` designed to lift verbatim into `leadforge/validation/leakage_probes.py` in PR 3.1. - `tests/scripts/test_probe_relational_leakage.py` — unit test on a hand-built four-lead toy bundle covering each path independently, plus a path-A-collapses-when-label-dropped test that pre-validates Phase 2's fix shape. - `docs/release/v1_current_state_audit.md` — per-tier evidence table, explicit G4.1-G4.6 verdict (all FAIL), pointer to PR 2.1 as the structural fix. Also reaffirms G1.1 (dataset name `leadforge-lead-scoring-v1`, already locked via PR #61's milestone rename) in `v1_acceptance_gates.md`, and checks off Phase 1 in `.agent-plan.md`. This PR is evidence + docs only. The structural fix lives in PR 2.1. Co-Authored-By: Claude Opus 4.7 --- .agent-plan.md | 6 +- docs/release/v1_acceptance_gates.md | 2 +- docs/release/v1_current_state_audit.md | 177 +++++++++ scripts/probe_relational_leakage.py | 372 ++++++++++++++++++ .../scripts/test_probe_relational_leakage.py | 129 ++++++ 5 files changed, 682 insertions(+), 4 deletions(-) create mode 100644 docs/release/v1_current_state_audit.md create mode 100644 scripts/probe_relational_leakage.py create mode 100644 tests/scripts/test_probe_relational_leakage.py diff --git a/.agent-plan.md b/.agent-plan.md index a9c5093..beee260 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -20,9 +20,9 @@ Goal: ship a best-in-class educational synthetic CRM lead-scoring dataset family **Companion docs:** `docs/release/v1_release_design.md`, `docs/release/v1_acceptance_gates.md`, `docs/release/post_v1_roadmap.md` **External review materials:** `docs/external_review/{gemini,chatgpt}/` (raw) + `docs/external_review/summaries/` (synthesized) -### Phase 1 — Audit and naming -- [ ] Reproduce relational-leakage finding on alpha bundles → `docs/release/v1_current_state_audit.md` -- [ ] Lock dataset release name `leadforge-lead-scoring-v1` +### Phase 1 — Audit and naming ✓ (PR 1.1) +- [x] Reproduce relational-leakage finding on alpha bundles → `docs/release/v1_current_state_audit.md` — all three tiers reconstruct `converted_within_90_days` at 100% via paths A–E; LR/HistGBM AUC = 1.000 on join-derived features. Probe script: `scripts/probe_relational_leakage.py` (function `deterministic_relational_reconstruction` designed to lift into PR 3.1's `leadforge/validation/leakage_probes.py`). +- [x] Lock dataset release name `leadforge-lead-scoring-v1` (already locked via PR #61's milestone rename + roadmap edits; G1.1 reaffirmed) ### Phase 2 — Snapshot-safe relational export - [ ] `leadforge/render/relational_snapshot_safe.py` (new) diff --git a/docs/release/v1_acceptance_gates.md b/docs/release/v1_acceptance_gates.md index 31f2a6c..484e41c 100644 --- a/docs/release/v1_acceptance_gates.md +++ b/docs/release/v1_acceptance_gates.md @@ -10,7 +10,7 @@ read by `scripts/validate_release_candidate.py` and by humans before tag. ## Naming and versioning gate -- **G1.1** Dataset release name: `leadforge-lead-scoring-v1`. Locked in Phase 1. +- **G1.1** Dataset release name: `leadforge-lead-scoring-v1`. Locked in Phase 1 (PR #61 milestone rename + roadmap edits; reaffirmed in PR 1.1's `docs/release/v1_current_state_audit.md`). - **G1.2** Kaggle slug: `leadforge-lead-scoring-v1`. - **G1.3** Hugging Face repo: `leadforge-lead-scoring-v1` (public family) and `leadforge-lead-scoring-v1-instructor` (companion). - **G1.4** Bundle `package_version` reflects the leadforge package at build time. diff --git a/docs/release/v1_current_state_audit.md b/docs/release/v1_current_state_audit.md new file mode 100644 index 0000000..115ec2c --- /dev/null +++ b/docs/release/v1_current_state_audit.md @@ -0,0 +1,177 @@ +# v1 Current-State Audit — Relational Leakage in Alpha `student_public` Bundles + +**Phase:** PR 1.1 (Phase 1 of `v1_release_roadmap.md`) +**Date:** 2026-05-05 +**Author:** generated by `scripts/probe_relational_leakage.py` against the alpha bundles in `release/` +**Status:** **BLOCKER CONFIRMED.** All three public tiers fail relational-leakage gates G4.1–G4.6 in `v1_acceptance_gates.md`. +**Structural fix:** PR 2.1 — `leadforge/render/relational_snapshot_safe.py` + `leadforge/validation/relational_leakage.py`. + +This document reproduces the relational-leakage finding from +[`docs/external_review/summaries/chatgpt_v2_summary.md`](../external_review/summaries/chatgpt_v2_summary.md) §0 +on the actual 5000-lead alpha bundles (`release/{intro,intermediate,advanced}/`), +producing numeric per-tier evidence to anchor the Phase 2 work. + +## TL;DR + +`converted_within_90_days` is reconstructible at **100% accuracy** in every +public tier through trivial joins. There is no statistical noise to defeat — +the public bundles literally contain the label and three independent +post-outcome side channels: + +| Channel | Mechanism | +|---|---| +| `leads.converted_within_90_days` | the label itself, in plaintext | +| `leads.conversion_timestamp` | non-null iff converted | +| `opportunities.close_outcome == "closed_won"` | only set for converted leads | +| `customers.parquet` | only contains rows for converted leads | +| `subscriptions.parquet` | only contains rows for converted leads | + +A model exposed to any one of these is not learning lead scoring; it is +reading the answer key. The Phase 2 fix is not "shrink the leakage" — it is +*structural removal* of all four side channels from the `student_public` +export path. + +## Method + +`scripts/probe_relational_leakage.py ` runs five +deterministic reconstruction paths and a bonus model: + +| Path | Description | +|---|---| +| **A. Direct label read** | Read `leads.converted_within_90_days` as the prediction. Sanity floor — proves the column is in the public table. | +| **B. Opportunity outcome** | Predict converted iff the lead has any `opportunity` with `close_outcome == "closed_won"`. | +| **C. Customer existence** | Predict converted iff the lead joins (via opportunity) to any row in `customers.parquet`. | +| **D. Subscription existence** | Predict converted iff the lead joins (via opportunity → customer) to any row in `subscriptions.parquet`. | +| **E. Deterministic OR (B ∨ C ∨ D)** | The headline reconstruction — combines all join-only channels (no model fitting). | +| **F. Bonus model** | Logistic regression and HistGBM trained 5-fold on simple join-aggregate features (`n_opps`, `max_acv`, `mean_acv`, `any_closed_won`, `any_closed`, `n_customers`, `n_subscriptions`). | + +The deterministic reconstruction (A–E) is implemented as a pure function +`deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions)` +designed to lift verbatim into `leadforge/validation/leakage_probes.py` in PR 3.1. + +## Per-tier evidence + +Reproduce from a clean checkout via: + +```bash +python scripts/probe_relational_leakage.py release/intro +python scripts/probe_relational_leakage.py release/intermediate +python scripts/probe_relational_leakage.py release/advanced +``` + +### Bundle composition + +| Tier | n_leads | n_opportunities | n_customers | n_subscriptions | conversion rate | +|---|---:|---:|---:|---:|---:| +| intro | 5000 | 4701 | 2110 | 2110 | 0.422 | +| intermediate | 5000 | 4641 | 1049 | 1049 | 0.210 | +| advanced | 5000 | 4557 | 393 | 393 | 0.079 | + +Note: `n_customers == n_subscriptions` per tier and both equal the count of +converted leads. That equivalence alone is reconstruction-grade evidence +that customers/subscriptions are conversion-conditional entities. + +### Reconstruction accuracy (vs. `converted_within_90_days`) + +Every path achieves perfect reconstruction in every tier. Precision, recall, +and F1 are all 1.000 across the board. + +| Tier | Path A (direct) | Path B (opp won) | Path C (customer) | Path D (subscription) | Path E (B∨C∨D) | +|---|---:|---:|---:|---:|---:| +| intro | 1.000 | 1.000 | 1.000 | 1.000 | 1.000 | +| intermediate | 1.000 | 1.000 | 1.000 | 1.000 | 1.000 | +| advanced | 1.000 | 1.000 | 1.000 | 1.000 | 1.000 | + +### Bonus "public-relational-only" model (5-fold CV) + +LR and HistGBM trained only on join-derived features (no +`converted_within_90_days`, no `conversion_timestamp`, no `close_outcome` +column directly used — only the `any_closed_won` aggregate which is +reconstructed from the public column): + +| Tier | LR AUC (mean) | LR AP | HistGBM AUC (mean) | HistGBM AP | +|---|---:|---:|---:|---:| +| intro | 1.000 | 1.000 | 1.000 | 1.000 | +| intermediate | 1.000 | 1.000 | 1.000 | 1.000 | +| advanced | 1.000 | 1.000 | 1.000 | 1.000 | + +### Leakage-column presence + +Across all three public tiers: + +| Column / table | G4.x | Present? | +|---|---|:--:| +| `leads.converted_within_90_days` | G4.1 | ✗ PRESENT | +| `leads.conversion_timestamp` | G4.1 | ✗ PRESENT | +| `opportunities.close_outcome` | G4.2 | ✗ PRESENT | +| `opportunities.closed_at` | G4.2 | ✗ PRESENT | +| `tables/customers.parquet` | G4.3 | ✗ PRESENT | +| `tables/subscriptions.parquet` | G4.3 | ✗ PRESENT | + +## Acceptance-gate verdict + +Per `v1_acceptance_gates.md` §"Relational leakage gate": + +| Gate | Verdict | Evidence | +|---|---|---| +| **G4.1** Public `leads` excludes `converted_within_90_days` and `conversion_timestamp` | ✗ FAIL | both columns present in all three public tiers | +| **G4.2** Public `opportunities` excludes `close_outcome` and `closed_at` | ✗ FAIL | both columns present in all three public tiers | +| **G4.3** Public bundles do not contain `customers.parquet` or `subscriptions.parquet` | ✗ FAIL | both files present in all three public tiers | +| **G4.4** No public event/opportunity rows past `lead_created_at + snapshot_day` | ✗ FAIL (assumed) | not directly probed in this PR; the full-horizon export is the same code path that emits the post-snapshot opportunities. Phase 2's snapshot-safe relational module addresses this. | +| **G4.5** Probabilistic relational reconstruction probe AUC ≤ TBD | ✗ FAIL | join-only LR/HistGBM AUC = 1.000 in every tier | +| **G4.6** Manifest field `relational_snapshot_safe == true` for `student_public` | ✗ FAIL | manifest field does not yet exist (introduced in PR 2.2 with `BUNDLE_SCHEMA_VERSION` 4 → 5) | + +**Overall:** every relational-leakage gate fails on every public tier. The +release is blocked exactly as `chatgpt_v2_summary.md` §0 predicted, but on the +real 5000-lead bundles rather than the 500-lead smoke bundle. + +## Why every metric is 1.000 (and that's fine) + +There is no probabilistic ambiguity to measure. The public tables expose: + +1. The label itself (path A). +2. Three deterministic, monotone proxies for the label (paths B/C/D), each of + which is reconstructible from public columns/tables alone. +3. Aggregate features over those proxies (the bonus model), which collapse + to the same information. + +This is structural leakage, not probabilistic noise that a tighter band can +tolerate. A 0.999 result would still be a fail; the only acceptable Phase 2 +state is that paths B/C/D *cannot be constructed* because the underlying +columns/tables are absent from public bundles, and the bonus model's AUC drops +to whatever signal honest features provide (to be banded in PR 3.3). + +## Pointer to the structural fix — PR 2.1 + +The fix lives in PR 2.1 of `v1_release_roadmap.md` and breaks into: + +1. New `leadforge/render/relational_snapshot_safe.py`: + - Filter `touches`, `sessions`, `sales_activities` to + `timestamp <= lead_created_at + snapshot_day` per lead. + - Filter `opportunities` to `created_at <= lead_created_at + snapshot_day`; + drop `close_outcome` / `closed_at`. + - Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`. + - Omit `customers.parquet` / `subscriptions.parquet` from public bundles. + +2. New `leadforge/validation/relational_leakage.py`: + - Lift the deterministic reconstruction from this PR's + `scripts/probe_relational_leakage.py` and assert + paths B/C/D produce zero hits because the underlying columns/tables are + absent. + - Assert no banned columns; assert event timestamps are bounded by + `lead_created_at + snapshot_day`. + +3. PR 2.2 — Wire the new export through `leadforge/exposure/filters.py` and + `leadforge/api/bundle.py`; bump `BUNDLE_SCHEMA_VERSION` 4 → 5; add the + `relational_snapshot_safe: true` manifest field for `student_public`. + +After PR 2.2 ships, this script must be re-run on the regenerated bundles and +each row in the gate-verdict table above must flip to ✓ PASS. + +## Related artifacts + +- Probe script: [`scripts/probe_relational_leakage.py`](../../scripts/probe_relational_leakage.py) +- Unit test: [`tests/scripts/test_probe_relational_leakage.py`](../../tests/scripts/test_probe_relational_leakage.py) +- Acceptance gates: [`docs/release/v1_acceptance_gates.md`](v1_acceptance_gates.md) §"Relational leakage gate" +- Roadmap: [`docs/release/v1_release_roadmap.md`](v1_release_roadmap.md) §"Phase 2 — Snapshot-safe relational export" +- Original finding: [`docs/external_review/summaries/chatgpt_v2_summary.md`](../external_review/summaries/chatgpt_v2_summary.md) §0 diff --git a/scripts/probe_relational_leakage.py b/scripts/probe_relational_leakage.py new file mode 100644 index 0000000..9f2b69f --- /dev/null +++ b/scripts/probe_relational_leakage.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +"""Probe public relational tables for ``converted_within_90_days`` leakage. + +Reproduces the ChatGPT v2 finding: in the alpha ``student_public`` bundles +under ``release/{intro,intermediate,advanced}/``, joining +``leads`` to ``opportunities`` / ``customers`` / ``subscriptions`` reconstructs +the target end-to-end. + +Reports five deterministic reconstruction paths plus a bonus "public +relational only" model trained on join-derived features: + + A. Direct read of ``leads.converted_within_90_days`` (sanity floor) + B. Opportunity outcome (lead has any closed_won opportunity) + C. Customer existence (lead -> opportunities -> customers) + D. Subscription existence (lead -> opportunities -> customers -> subscriptions) + E. Deterministic OR (B OR C OR D) + F. LR / HistGBM on join-derived features (5-fold CV AUC + AP) + +Path E is the headline reconstruction; the implementation is exposed as +:func:`deterministic_relational_reconstruction` so PR 3.1 can lift it +verbatim into ``leadforge/validation/leakage_probes.py``. + +Exit code: 0 on success (any probe outcome), 1 on missing tables / shape errors. + +Usage:: + + python scripts/probe_relational_leakage.py release/intermediate + python scripts/probe_relational_leakage.py release/intro --json +""" + +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path +from typing import Any + +import pandas as pd + +REQUIRED_TABLES = ("leads", "opportunities", "customers", "subscriptions") + + +def deterministic_relational_reconstruction( + leads: pd.DataFrame, + opportunities: pd.DataFrame, + customers: pd.DataFrame, + subscriptions: pd.DataFrame, +) -> pd.DataFrame: + """Reconstruct ``converted_within_90_days`` from public relational joins. + + Returns a DataFrame indexed by ``lead_id`` with five boolean columns, + one per reconstruction path (A-E). Path E is the union of B, C, D and + is the headline relational-leakage prediction. + + No hidden state, no model fit — pure joins. Designed to be lifted into + ``leadforge/validation/leakage_probes.py`` (PR 3.1) as the relational + leakage probe. + """ + leads_idx = leads.set_index("lead_id", drop=False) + + # Path A — the label itself, if present in public leads. + if "converted_within_90_days" in leads.columns: + path_a = leads_idx["converted_within_90_days"].astype(bool) + else: + path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") + + # Path B — any opportunity with close_outcome == "closed_won". + if "close_outcome" in opportunities.columns: + won_leads = set( + opportunities.loc[opportunities["close_outcome"] == "closed_won", "lead_id"] + ) + else: + won_leads = set() + path_b = leads_idx["lead_id"].isin(won_leads) + + # Path C — lead has any joined customer (via opportunity_id -> opportunity.lead_id). + opp_to_lead = dict(zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=True)) + customer_leads = { + opp_to_lead[opp_id] for opp_id in customers["opportunity_id"] if opp_id in opp_to_lead + } + path_c = leads_idx["lead_id"].isin(customer_leads) + + # Path D — lead has any joined subscription (sub -> customer -> opportunity -> lead). + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=True)) + sub_leads: set[str] = set() + for cust_id in subscriptions["customer_id"]: + opp_id = cust_to_opp.get(cust_id) + if opp_id is None: + continue + lead_id = opp_to_lead.get(opp_id) + if lead_id is not None: + sub_leads.add(lead_id) + path_d = leads_idx["lead_id"].isin(sub_leads) + + # Path E — deterministic OR of B, C, D. + path_e = path_b | path_c | path_d + + out = pd.DataFrame( + { + "path_a_direct_label": path_a.values, + "path_b_opportunity_won": path_b.values, + "path_c_customer_exists": path_c.values, + "path_d_subscription_exists": path_d.values, + "path_e_or_b_c_d": path_e.values, + }, + index=leads_idx.index, + ) + return out + + +def _binary_metrics(y_true: pd.Series, y_pred: pd.Series) -> dict[str, float]: + """Accuracy / precision / recall / F1 for boolean predictions.""" + tp = int(((y_pred) & (y_true)).sum()) + fp = int(((y_pred) & (~y_true)).sum()) + fn = int(((~y_pred) & (y_true)).sum()) + tn = int(((~y_pred) & (~y_true)).sum()) + n = tp + fp + fn + tn + accuracy = (tp + tn) / n if n else float("nan") + precision = tp / (tp + fp) if (tp + fp) else float("nan") + recall = tp / (tp + fn) if (tp + fn) else float("nan") + f1 = ( + (2 * precision * recall) / (precision + recall) + if (precision and recall and precision + recall) + else float("nan") + ) + return { + "accuracy": accuracy, + "precision": precision, + "recall": recall, + "f1": f1, + "tp": tp, + "fp": fp, + "fn": fn, + "tn": tn, + } + + +def _build_relational_features( + leads: pd.DataFrame, + opportunities: pd.DataFrame, + customers: pd.DataFrame, + subscriptions: pd.DataFrame, +) -> pd.DataFrame: + """Aggregate features per lead from public relational tables.""" + has_close_outcome = "close_outcome" in opportunities.columns + opp_agg = ( + opportunities.groupby("lead_id") + .agg( + n_opps=("opportunity_id", "count"), + max_acv=("estimated_acv", "max"), + mean_acv=("estimated_acv", "mean"), + any_closed_won=( + "close_outcome", + lambda s: int((s == "closed_won").any()) if has_close_outcome else 0, + ), + any_closed=( + "close_outcome", + lambda s: int(s.notna().any()) if has_close_outcome else 0, + ), + ) + .reset_index() + ) + opp_to_lead = dict(zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=True)) + customers = customers.copy() + customers["lead_id"] = customers["opportunity_id"].map(opp_to_lead) + cust_agg = customers.groupby("lead_id").size().rename("n_customers").reset_index() + + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=True)) + subs = subscriptions.copy() + subs["opportunity_id"] = subs["customer_id"].map(cust_to_opp) + subs["lead_id"] = subs["opportunity_id"].map(opp_to_lead) + sub_agg = subs.groupby("lead_id").size().rename("n_subscriptions").reset_index() + + feats = ( + leads[["lead_id"]] + .merge(opp_agg, on="lead_id", how="left") + .merge(cust_agg, on="lead_id", how="left") + .merge(sub_agg, on="lead_id", how="left") + ) + feats = feats.fillna( + { + "n_opps": 0, + "max_acv": 0.0, + "mean_acv": 0.0, + "any_closed_won": 0, + "any_closed": 0, + "n_customers": 0, + "n_subscriptions": 0, + } + ) + return feats.set_index("lead_id") + + +def _bonus_model_metrics( + leads: pd.DataFrame, + opportunities: pd.DataFrame, + customers: pd.DataFrame, + subscriptions: pd.DataFrame, + seed: int = 42, +) -> dict[str, dict[str, float]] | None: + """5-fold CV AUC + AP for LR and HistGBM on join-derived features. + + Returns ``None`` if scikit-learn is not installed (lets the deterministic + paths still report). + """ + try: + from sklearn.ensemble import HistGradientBoostingClassifier + from sklearn.linear_model import LogisticRegression + from sklearn.metrics import average_precision_score, roc_auc_score + from sklearn.model_selection import StratifiedKFold + from sklearn.preprocessing import StandardScaler + except ImportError: + return None + + features = _build_relational_features(leads, opportunities, customers, subscriptions) + if "converted_within_90_days" not in leads.columns: + return None + y = leads.set_index("lead_id")["converted_within_90_days"].astype(int).reindex(features.index) + + skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed) + results: dict[str, dict[str, float]] = {} + + for name, mk_model in ( + ("logistic_regression", lambda: LogisticRegression(max_iter=1000)), + ("hist_gbm", lambda: HistGradientBoostingClassifier(random_state=seed)), + ): + aucs: list[float] = [] + aps: list[float] = [] + for train_idx, test_idx in skf.split(features.values, y.values): + x_tr, x_te = features.values[train_idx], features.values[test_idx] + y_tr, y_te = y.values[train_idx], y.values[test_idx] + if name == "logistic_regression": + scaler = StandardScaler() + x_tr = scaler.fit_transform(x_tr) + x_te = scaler.transform(x_te) + model = mk_model() + model.fit(x_tr, y_tr) + proba = model.predict_proba(x_te)[:, 1] + aucs.append(roc_auc_score(y_te, proba)) + aps.append(average_precision_score(y_te, proba)) + results[name] = { + "auc_mean": float(sum(aucs) / len(aucs)), + "auc_min": float(min(aucs)), + "auc_max": float(max(aucs)), + "ap_mean": float(sum(aps) / len(aps)), + } + return results + + +def probe_bundle(bundle_dir: Path) -> dict[str, Any]: + """Run every probe against a single bundle directory; return a result dict.""" + tables_dir = bundle_dir / "tables" + if not tables_dir.is_dir(): + raise FileNotFoundError(f"missing tables/ under {bundle_dir}") + missing = [t for t in REQUIRED_TABLES if not (tables_dir / f"{t}.parquet").exists()] + if missing: + raise FileNotFoundError(f"missing required tables in {tables_dir}: {missing}") + + leads = pd.read_parquet(tables_dir / "leads.parquet") + opportunities = pd.read_parquet(tables_dir / "opportunities.parquet") + customers = pd.read_parquet(tables_dir / "customers.parquet") + subscriptions = pd.read_parquet(tables_dir / "subscriptions.parquet") + + paths = deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions) + + if "converted_within_90_days" not in leads.columns: + return { + "bundle": str(bundle_dir), + "n_leads": int(len(leads)), + "error": "leads.parquet has no converted_within_90_days column — cannot score paths", + } + + y_true = ( + leads.set_index("lead_id")["converted_within_90_days"].astype(bool).reindex(paths.index) + ) + conversion_rate = float(y_true.mean()) + + path_metrics: dict[str, dict[str, float]] = {} + for col in paths.columns: + path_metrics[col] = _binary_metrics(y_true, paths[col]) + + bonus = _bonus_model_metrics(leads, opportunities, customers, subscriptions) + + return { + "bundle": str(bundle_dir), + "n_leads": int(len(leads)), + "n_opportunities": int(len(opportunities)), + "n_customers": int(len(customers)), + "n_subscriptions": int(len(subscriptions)), + "conversion_rate": conversion_rate, + "deterministic_paths": path_metrics, + "bonus_model": bonus, + "leakage_columns_present": { + "leads.converted_within_90_days": "converted_within_90_days" in leads.columns, + "leads.conversion_timestamp": "conversion_timestamp" in leads.columns, + "opportunities.close_outcome": "close_outcome" in opportunities.columns, + "opportunities.closed_at": "closed_at" in opportunities.columns, + "customers.parquet present": True, + "subscriptions.parquet present": True, + }, + } + + +def _print_human(result: dict[str, Any]) -> None: + print(f"\n=== {result['bundle']} ===") + print( + f"n_leads={result['n_leads']} " + f"n_opps={result.get('n_opportunities')} " + f"n_customers={result.get('n_customers')} " + f"n_subscriptions={result.get('n_subscriptions')} " + f"conversion_rate={result.get('conversion_rate', float('nan')):.3f}" + ) + if "error" in result: + print(f" ERROR: {result['error']}") + return + print(" Deterministic reconstruction paths (vs converted_within_90_days):") + print(f" {'path':<32} {'acc':>6} {'prec':>6} {'rec':>6} {'f1':>6}") + for path, m in result["deterministic_paths"].items(): + print( + f" {path:<32} {m['accuracy']:>6.3f} {m['precision']:>6.3f} " + f"{m['recall']:>6.3f} {m['f1']:>6.3f}" + ) + bonus = result["bonus_model"] + if bonus is None: + print(" Bonus model: scikit-learn not installed — skipped.") + else: + print(" Bonus model (5-fold CV on join-derived features):") + for name, m in bonus.items(): + print( + f" {name:<22} AUC={m['auc_mean']:.3f} " + f"(min={m['auc_min']:.3f} max={m['auc_max']:.3f}) " + f"AP={m['ap_mean']:.3f}" + ) + print(" Leakage-column presence (G4.1-G4.3 indicators):") + for k, v in result["leakage_columns_present"].items(): + flag = "PRESENT" if v else "absent" + print(f" {k:<40} {flag}") + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) + parser.add_argument( + "bundle_dir", + type=Path, + help=( + "Path to a bundle directory (must contain tables/{leads,opportunities," + "customers,subscriptions}.parquet)" + ), + ) + parser.add_argument( + "--json", + action="store_true", + help="Emit JSON instead of human-readable text", + ) + args = parser.parse_args(argv) + + try: + result = probe_bundle(args.bundle_dir) + except FileNotFoundError as exc: + print(f"ERROR: {exc}", file=sys.stderr) + return 1 + + if args.json: + print(json.dumps(result, indent=2, default=str)) + else: + _print_human(result) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/scripts/test_probe_relational_leakage.py b/tests/scripts/test_probe_relational_leakage.py new file mode 100644 index 0000000..ef1bff3 --- /dev/null +++ b/tests/scripts/test_probe_relational_leakage.py @@ -0,0 +1,129 @@ +"""Tests for ``scripts/probe_relational_leakage.py``. + +Exercises the deterministic reconstruction function on a hand-built four-lead +frame that covers each leakage path independently — this is the function PR +3.1 will lift into ``leadforge/validation/leakage_probes.py``, so locking +its behaviour now matters. +""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path + +import pandas as pd + +_SCRIPT_PATH = Path(__file__).resolve().parents[2] / "scripts" / "probe_relational_leakage.py" +_spec = importlib.util.spec_from_file_location("probe_relational_leakage", _SCRIPT_PATH) +assert _spec is not None +assert _spec.loader is not None +probe_module = importlib.util.module_from_spec(_spec) +sys.modules["probe_relational_leakage"] = probe_module +_spec.loader.exec_module(probe_module) + + +def _toy_bundle() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """Construct a 4-lead toy bundle covering every leakage path independently. + + - lead_001: converted; opp closed_won; customer + subscription rows. + - lead_002: converted via stale flag in leads.parquet; no joined opp. + (Tests path A in isolation.) + - lead_003: converted by ground truth; opp exists but close_outcome NaN. + customer row exists. (Tests path C reaching when path B can't.) + - lead_004: not converted; opp exists but is open; no customer/sub. + """ + leads = pd.DataFrame( + [ + {"lead_id": "lead_001", "converted_within_90_days": True}, + {"lead_id": "lead_002", "converted_within_90_days": True}, + {"lead_id": "lead_003", "converted_within_90_days": True}, + {"lead_id": "lead_004", "converted_within_90_days": False}, + ] + ) + opportunities = pd.DataFrame( + [ + { + "opportunity_id": "opp_001", + "lead_id": "lead_001", + "close_outcome": "closed_won", + "estimated_acv": 50_000, + }, + { + "opportunity_id": "opp_003", + "lead_id": "lead_003", + "close_outcome": None, + "estimated_acv": 30_000, + }, + { + "opportunity_id": "opp_004", + "lead_id": "lead_004", + "close_outcome": None, + "estimated_acv": 20_000, + }, + ] + ) + customers = pd.DataFrame( + [ + {"customer_id": "cust_001", "opportunity_id": "opp_001", "account_id": "acct_a"}, + {"customer_id": "cust_003", "opportunity_id": "opp_003", "account_id": "acct_c"}, + ] + ) + subscriptions = pd.DataFrame( + [ + {"subscription_id": "sub_001", "customer_id": "cust_001", "plan_name": "starter"}, + ] + ) + return leads, opportunities, customers, subscriptions + + +def test_deterministic_reconstruction_paths() -> None: + leads, opportunities, customers, subscriptions = _toy_bundle() + + paths = probe_module.deterministic_relational_reconstruction( + leads, opportunities, customers, subscriptions + ) + + # Path A — direct read of the label. + assert bool(paths.loc["lead_001", "path_a_direct_label"]) is True + assert bool(paths.loc["lead_002", "path_a_direct_label"]) is True + assert bool(paths.loc["lead_003", "path_a_direct_label"]) is True + assert bool(paths.loc["lead_004", "path_a_direct_label"]) is False + + # Path B — opportunity closed_won. Only lead_001. + assert bool(paths.loc["lead_001", "path_b_opportunity_won"]) is True + assert bool(paths.loc["lead_002", "path_b_opportunity_won"]) is False + assert bool(paths.loc["lead_003", "path_b_opportunity_won"]) is False + assert bool(paths.loc["lead_004", "path_b_opportunity_won"]) is False + + # Path C — customer existence. lead_001 + lead_003. + assert bool(paths.loc["lead_001", "path_c_customer_exists"]) is True + assert bool(paths.loc["lead_002", "path_c_customer_exists"]) is False + assert bool(paths.loc["lead_003", "path_c_customer_exists"]) is True + assert bool(paths.loc["lead_004", "path_c_customer_exists"]) is False + + # Path D — subscription existence. Only lead_001. + assert bool(paths.loc["lead_001", "path_d_subscription_exists"]) is True + assert bool(paths.loc["lead_002", "path_d_subscription_exists"]) is False + assert bool(paths.loc["lead_003", "path_d_subscription_exists"]) is False + assert bool(paths.loc["lead_004", "path_d_subscription_exists"]) is False + + # Path E — OR of B/C/D. lead_001 + lead_003. + assert bool(paths.loc["lead_001", "path_e_or_b_c_d"]) is True + assert bool(paths.loc["lead_002", "path_e_or_b_c_d"]) is False + assert bool(paths.loc["lead_003", "path_e_or_b_c_d"]) is True + assert bool(paths.loc["lead_004", "path_e_or_b_c_d"]) is False + + +def test_deterministic_reconstruction_handles_missing_label_column() -> None: + """When public ``leads`` has had the label dropped (Phase 2 fix), path A + must collapse to all-False rather than crash.""" + leads, opportunities, customers, subscriptions = _toy_bundle() + leads_safe = leads.drop(columns=["converted_within_90_days"]) + + paths = probe_module.deterministic_relational_reconstruction( + leads_safe, opportunities, customers, subscriptions + ) + assert not paths["path_a_direct_label"].any() + # Other paths still fire. + assert bool(paths.loc["lead_001", "path_e_or_b_c_d"]) is True From 8f0b4ca246fb077b16aa1f835260fc50a4da9b5f Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 20:07:09 +0300 Subject: [PATCH 2/3] review: address self-review findings on PR 1.1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Brutal self-review identified 15 issues; this commit addresses the correctness/credibility ones. Summary: Script (scripts/probe_relational_leakage.py): - Assert lead_id uniqueness in deterministic_relational_reconstruction (a validator must refuse non-unique keys, not silently misalign). - Make customers/subscriptions optional inputs — empty frames now accepted, so the same script runs cleanly on Phase-2-fixed bundles. - Branch the agg dict properly — was using has_close_outcome flag *inside* a lambda that pandas would have crashed before invoking. - Add Phase-2-success ablation: re-run deterministic probes on a redacted-in-process view to surface the post-fix shape *now*. - Bonus model gains a without_close_outcome_aggregates variant — the with-aggregates variant is trivially perfect because any_closed_won is Path B aggregated; the without variant is the load-bearing probe. Refactored to sklearn Pipeline (removes string-based scaler dispatch). - Add G4.4 snapshot-window probe directly on event tables. Empirical result: alpha bundles PASS G4.4 (90-day horizon already bounds timestamps). The audit doc now corrects the prior "FAIL (assumed)" framing. - Add --max-accuracy flag for Phase-2 CI gating (exit 2 on threshold violation). - Rewrite docstring to clearly distinguish four orthogonal evidence channels. Tests (tests/scripts/test_probe_relational_leakage.py): - Add _binary_metrics coverage (perfect / all-negative / mixed) — the function had divide-by-zero handling and F1 logic with zero tests. - Add lead_id-uniqueness rejection test. - Add CLI smoke test on release/intermediate (skip if not present). - Add --max-accuracy gate failure test. - Drop the synthetic edge-case lead — toy bundle is now 3 leads cleanly covering A/C without B/D. Audit doc (docs/release/v1_current_state_audit.md): - Distinguish Path A (label in cleartext) from Paths B-E (joins). - Add Phase-2 ablation table showing collapse to (1 - conv_rate). - Report bonus model in two variants; AUC = 1.000 in BOTH variants is a substantive new finding (non-trivial relational features carry the leak independently of close_outcome — i.e., omitting customers/subscriptions tables is structurally required). - Correct G4.4 verdict to PASS (with caveat about within-horizon content still leaking). - Note instructor companion behavior is expected. - Stop reporting AUC on deterministic 0/1 paths. - Remove "sanity floor" framing for Path A (understated the violation). Co-Authored-By: Claude Opus 4.7 --- docs/release/v1_current_state_audit.md | 293 ++++++---- scripts/probe_relational_leakage.py | 516 +++++++++++++----- .../scripts/test_probe_relational_leakage.py | 191 +++++-- 3 files changed, 697 insertions(+), 303 deletions(-) diff --git a/docs/release/v1_current_state_audit.md b/docs/release/v1_current_state_audit.md index 115ec2c..fef2d33 100644 --- a/docs/release/v1_current_state_audit.md +++ b/docs/release/v1_current_state_audit.md @@ -2,56 +2,72 @@ **Phase:** PR 1.1 (Phase 1 of `v1_release_roadmap.md`) **Date:** 2026-05-05 -**Author:** generated by `scripts/probe_relational_leakage.py` against the alpha bundles in `release/` -**Status:** **BLOCKER CONFIRMED.** All three public tiers fail relational-leakage gates G4.1–G4.6 in `v1_acceptance_gates.md`. +**Generated by:** `scripts/probe_relational_leakage.py` against `release/{intro,intermediate,advanced}/` +**Status:** **BLOCKER CONFIRMED.** Public bundles fail G4.1, G4.2, G4.3, G4.5, G4.6 in `v1_acceptance_gates.md`. G4.4 (snapshot-window timestamps) **passes empirically** on alpha bundles — important nuance, see §G4.4 below. **Structural fix:** PR 2.1 — `leadforge/render/relational_snapshot_safe.py` + `leadforge/validation/relational_leakage.py`. This document reproduces the relational-leakage finding from [`docs/external_review/summaries/chatgpt_v2_summary.md`](../external_review/summaries/chatgpt_v2_summary.md) §0 -on the actual 5000-lead alpha bundles (`release/{intro,intermediate,advanced}/`), -producing numeric per-tier evidence to anchor the Phase 2 work. +on the actual 5000-lead alpha bundles. ## TL;DR -`converted_within_90_days` is reconstructible at **100% accuracy** in every -public tier through trivial joins. There is no statistical noise to defeat — -the public bundles literally contain the label and three independent -post-outcome side channels: +The public bundles leak `converted_within_90_days` through **two qualitatively different mechanisms** that the audit must distinguish: -| Channel | Mechanism | -|---|---| -| `leads.converted_within_90_days` | the label itself, in plaintext | -| `leads.conversion_timestamp` | non-null iff converted | -| `opportunities.close_outcome == "closed_won"` | only set for converted leads | -| `customers.parquet` | only contains rows for converted leads | -| `subscriptions.parquet` | only contains rows for converted leads | +1. **The label is published in cleartext.** `leads.converted_within_90_days` + and `leads.conversion_timestamp` are present in every public tier. + Path A is "open the parquet, read the column" — not leakage *via joins*. +2. **Joins to post-outcome entities reconstruct the label deterministically.** + `opportunities.close_outcome == "closed_won"`, plus the existence of + conversion-conditional `customers.parquet` and `subscriptions.parquet` + tables, each independently reconstruct the target at 100% accuracy. This is + the leakage the *roadmap-level* discussion is about, and it survives + even if Path A is patched first. -A model exposed to any one of these is not learning lead scoring; it is -reading the answer key. The Phase 2 fix is not "shrink the leakage" — it is -*structural removal* of all four side channels from the `student_public` -export path. +Phase 2 must remove **both** mechanisms. Removing only the label column +(easy) leaves the join-only reconstruction (paths B/C/D) at 100%. ## Method -`scripts/probe_relational_leakage.py ` runs five -deterministic reconstruction paths and a bonus model: - -| Path | Description | -|---|---| -| **A. Direct label read** | Read `leads.converted_within_90_days` as the prediction. Sanity floor — proves the column is in the public table. | -| **B. Opportunity outcome** | Predict converted iff the lead has any `opportunity` with `close_outcome == "closed_won"`. | -| **C. Customer existence** | Predict converted iff the lead joins (via opportunity) to any row in `customers.parquet`. | -| **D. Subscription existence** | Predict converted iff the lead joins (via opportunity → customer) to any row in `subscriptions.parquet`. | -| **E. Deterministic OR (B ∨ C ∨ D)** | The headline reconstruction — combines all join-only channels (no model fitting). | -| **F. Bonus model** | Logistic regression and HistGBM trained 5-fold on simple join-aggregate features (`n_opps`, `max_acv`, `mean_acv`, `any_closed_won`, `any_closed`, `n_customers`, `n_subscriptions`). | - -The deterministic reconstruction (A–E) is implemented as a pure function -`deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions)` -designed to lift verbatim into `leadforge/validation/leakage_probes.py` in PR 3.1. - -## Per-tier evidence - -Reproduce from a clean checkout via: +`scripts/probe_relational_leakage.py ` reports four orthogonal +pieces of evidence: + +1. **Deterministic reconstruction paths** (no model fit, just joins): + + | Path | Description | + |---|---| + | A. Direct label read | `leads.converted_within_90_days` taken as the prediction. | + | B. Opportunity outcome | Lead has any `opportunities` row with `close_outcome == "closed_won"`. | + | C. Customer existence | Lead → opportunities → customers (any joined customer). | + | D. Subscription existence | Lead → opportunities → customers → subscriptions. | + | E. Deterministic OR (B ∨ C ∨ D) | Headline join-only reconstruction. | + +2. **Phase-2-success ablation.** Same deterministic probes after simulating + PR 2.1's redaction in-process (drop label columns from `leads`, drop + `close_outcome`/`closed_at` from `opportunities`, treat `customers` and + `subscriptions` as empty). Tells us what the post-fix probe should look + like, *before* PR 2.1 ships. + +3. **Bonus model probes.** 5-fold CV LR + HistGBM on join-derived + features, in two variants: + - `with_close_outcome_aggregates` — includes `any_closed_won` (which is + just Path B aggregated; trivially perfect). + - `without_close_outcome_aggregates` — only `n_opps`, `max_acv`, + `mean_acv`, `n_customers`, `n_subscriptions`. The load-bearing variant — + answers "do the *non-trivial* relational features carry the leak + independently of `close_outcome`?" + +4. **Snapshot-window probe (G4.4).** Per event table, count rows with + `timestamp > lead_created_at + horizon_days`. Direct test of the + timestamp-bound invariant. + +The deterministic reconstruction is implemented as a pure function +`deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions)`, +designed to lift verbatim into `leadforge/validation/leakage_probes.py` +(PR 3.1). The function refuses to operate on non-unique `lead_id` and +accepts empty `customers`/`subscriptions` frames (Phase 2 success state). + +Reproduce via: ```bash python scripts/probe_relational_leakage.py release/intro @@ -59,7 +75,14 @@ python scripts/probe_relational_leakage.py release/intermediate python scripts/probe_relational_leakage.py release/advanced ``` -### Bundle composition +For Phase-2 CI gating after PR 2.2: + +```bash +python scripts/probe_relational_leakage.py release/intermediate --max-accuracy 0.65 +# exit 2 if any deterministic path or bonus AUC > 0.65 +``` + +## Bundle composition | Tier | n_leads | n_opportunities | n_customers | n_subscriptions | conversion rate | |---|---:|---:|---:|---:|---:| @@ -67,111 +90,157 @@ python scripts/probe_relational_leakage.py release/advanced | intermediate | 5000 | 4641 | 1049 | 1049 | 0.210 | | advanced | 5000 | 4557 | 393 | 393 | 0.079 | -Note: `n_customers == n_subscriptions` per tier and both equal the count of -converted leads. That equivalence alone is reconstruction-grade evidence -that customers/subscriptions are conversion-conditional entities. +`n_customers == n_subscriptions == n_converted_leads` per tier — direct +evidence that customers and subscriptions are conversion-conditional +entities. Their *presence in the public table set is the leak*; column +contents are immaterial. -### Reconstruction accuracy (vs. `converted_within_90_days`) +## Deterministic reconstruction (paths A–E) -Every path achieves perfect reconstruction in every tier. Precision, recall, -and F1 are all 1.000 across the board. +Reconstruction **accuracy** vs `converted_within_90_days`. Precision / +recall / F1 are also 1.000 across the board (full output in the script's +JSON mode); only accuracy reproduced here for compactness. AUC is not +reported because these are deterministic 0/1 predictions (AUC undefined / +degenerate). -| Tier | Path A (direct) | Path B (opp won) | Path C (customer) | Path D (subscription) | Path E (B∨C∨D) | +| Tier | A. direct | B. opp won | C. customer | D. subscription | E. B∨C∨D | |---|---:|---:|---:|---:|---:| | intro | 1.000 | 1.000 | 1.000 | 1.000 | 1.000 | | intermediate | 1.000 | 1.000 | 1.000 | 1.000 | 1.000 | | advanced | 1.000 | 1.000 | 1.000 | 1.000 | 1.000 | -### Bonus "public-relational-only" model (5-fold CV) - -LR and HistGBM trained only on join-derived features (no -`converted_within_90_days`, no `conversion_timestamp`, no `close_outcome` -column directly used — only the `any_closed_won` aggregate which is -reconstructed from the public column): - -| Tier | LR AUC (mean) | LR AP | HistGBM AUC (mean) | HistGBM AP | -|---|---:|---:|---:|---:| -| intro | 1.000 | 1.000 | 1.000 | 1.000 | -| intermediate | 1.000 | 1.000 | 1.000 | 1.000 | -| advanced | 1.000 | 1.000 | 1.000 | 1.000 | - -### Leakage-column presence - -Across all three public tiers: - -| Column / table | G4.x | Present? | -|---|---|:--:| -| `leads.converted_within_90_days` | G4.1 | ✗ PRESENT | -| `leads.conversion_timestamp` | G4.1 | ✗ PRESENT | -| `opportunities.close_outcome` | G4.2 | ✗ PRESENT | -| `opportunities.closed_at` | G4.2 | ✗ PRESENT | -| `tables/customers.parquet` | G4.3 | ✗ PRESENT | -| `tables/subscriptions.parquet` | G4.3 | ✗ PRESENT | +## Phase-2-success ablation + +Same deterministic probes, run on a virtual redacted view (label columns +dropped from `leads`, `close_outcome`/`closed_at` dropped from +`opportunities`, `customers`/`subscriptions` empty). Every path collapses +to all-False predictions, so accuracy reduces to the baseline of always +predicting the negative class — i.e., `1 - conversion_rate`: + +| Tier | accuracy of any path | matches `1 - conv. rate` | +|---|---:|:--:| +| intro | 0.578 | ✓ (1 − 0.422) | +| intermediate | 0.790 | ✓ (1 − 0.210) | +| advanced | 0.921 | ✓ (1 − 0.079) | + +This is the *correct* post-fix shape for deterministic probes: with the +post-outcome side channels gone, no join produces a positive prediction. +The remaining residual risk — "can a *model* trained on +`n_opps`/`max_acv`/etc. (which are NOT post-outcome) still leak?" — is +PR 2.1 / 3.1 territory and is left for those PRs to band. + +## Bonus model probes (5-fold CV) + +| Tier | variant | LR AUC | LR AP | HistGBM AUC | HistGBM AP | n_features | +|---|---|---:|---:|---:|---:|---:| +| intro | with `any_closed_won`/`any_closed` | 1.000 | 1.000 | 1.000 | 1.000 | 7 | +| intro | without close-outcome aggregates | 1.000 | 1.000 | 1.000 | 1.000 | 5 | +| intermediate | with `any_closed_won`/`any_closed` | 1.000 | 1.000 | 1.000 | 1.000 | 7 | +| intermediate | without close-outcome aggregates | 1.000 | 1.000 | 1.000 | 1.000 | 5 | +| advanced | with `any_closed_won`/`any_closed` | 1.000 | 1.000 | 1.000 | 1.000 | 7 | +| advanced | without close-outcome aggregates | 1.000 | 1.000 | 1.000 | 1.000 | 5 | + +**Key observation:** AUC = 1.000 even *without* `any_closed_won`. That +means non-trivial relational features (`n_opps`, `n_customers`, +`n_subscriptions`, `max_acv`, `mean_acv`) are individually sufficient to +reconstruct the label, because `customers.parquet` and +`subscriptions.parquet` exist *only* for converted leads. This is why +PR 2.1's structural fix must omit those tables entirely from public +bundles, not just redact a column. + +## G4.4 — snapshot-window probe + +Direct empirical check on alpha bundles: are there event rows with +`timestamp > lead_created_at + 90d`? + +| Tier | touches | sessions | sales_activities | opportunities | +|---|---|---|---|---| +| intro | 0 / 53354 PASS | 0 / 14339 PASS | 0 / 56643 PASS | 0 / 4701 PASS | +| intermediate | 0 / 54803 PASS | 0 / 14565 PASS | 0 / 60739 PASS | 0 / 4641 PASS | +| advanced | 0 / 54662 PASS | 0 / 14599 PASS | 0 / 62254 PASS | 0 / 4557 PASS | + +**G4.4 passes literally:** the 90-day simulation horizon already bounds +event timestamps. **But** that is not the same as "the public bundle is +snapshot-safe." Events *within* the 90-day window still encode conversion +(Path B uses opportunities created within the horizon; the customers and +subscriptions tables only exist for leads that closed within the horizon). +The snapshot-window invariant (G4.4) and the relational-leakage invariant +(G4.1–G4.3) are independent constraints; passing G4.4 does not imply +passing G4.5. ## Acceptance-gate verdict -Per `v1_acceptance_gates.md` §"Relational leakage gate": - | Gate | Verdict | Evidence | |---|---|---| -| **G4.1** Public `leads` excludes `converted_within_90_days` and `conversion_timestamp` | ✗ FAIL | both columns present in all three public tiers | -| **G4.2** Public `opportunities` excludes `close_outcome` and `closed_at` | ✗ FAIL | both columns present in all three public tiers | -| **G4.3** Public bundles do not contain `customers.parquet` or `subscriptions.parquet` | ✗ FAIL | both files present in all three public tiers | -| **G4.4** No public event/opportunity rows past `lead_created_at + snapshot_day` | ✗ FAIL (assumed) | not directly probed in this PR; the full-horizon export is the same code path that emits the post-snapshot opportunities. Phase 2's snapshot-safe relational module addresses this. | -| **G4.5** Probabilistic relational reconstruction probe AUC ≤ TBD | ✗ FAIL | join-only LR/HistGBM AUC = 1.000 in every tier | +| **G4.1** Public `leads` excludes `converted_within_90_days` and `conversion_timestamp` | ✗ FAIL | both columns present in all three tiers | +| **G4.2** Public `opportunities` excludes `close_outcome` and `closed_at` | ✗ FAIL | both columns present in all three tiers | +| **G4.3** Public bundles do not contain `customers.parquet` or `subscriptions.parquet` | ✗ FAIL | both files present in all three tiers | +| **G4.4** No public event rows past `lead_created_at + snapshot_day` | ✓ PASS | 0 violations across all event tables and all tiers (90-day horizon) | +| **G4.5** Probabilistic relational reconstruction probe AUC ≤ TBD | ✗ FAIL | LR / HistGBM AUC = 1.000 in every tier in both feature variants | | **G4.6** Manifest field `relational_snapshot_safe == true` for `student_public` | ✗ FAIL | manifest field does not yet exist (introduced in PR 2.2 with `BUNDLE_SCHEMA_VERSION` 4 → 5) | -**Overall:** every relational-leakage gate fails on every public tier. The -release is blocked exactly as `chatgpt_v2_summary.md` §0 predicted, but on the -real 5000-lead bundles rather than the 500-lead smoke bundle. +## Why every reconstruction metric is 1.000 (and what that implies for Phase 2) + +The public bundles expose four logically-equivalent reconstructions of +`converted_within_90_days`: -## Why every metric is 1.000 (and that's fine) +1. The label itself (Path A). +2. `close_outcome == "closed_won"` on opportunities (Path B). +3. The presence of any joined customer (Path C). +4. The presence of any joined subscription (Path D). -There is no probabilistic ambiguity to measure. The public tables expose: +All four are functions of the same underlying truth — they all flip on iff +the lead converted within 90 days — so any model with access to any of +them trivially achieves AUC 1.0. This is structural, not probabilistic: +PR 2.1 must remove the *information channels*, not "shrink the leakage." -1. The label itself (path A). -2. Three deterministic, monotone proxies for the label (paths B/C/D), each of - which is reconstructible from public columns/tables alone. -3. Aggregate features over those proxies (the bonus model), which collapse - to the same information. +## Note on the instructor companion -This is structural leakage, not probabilistic noise that a tighter band can -tolerate. A 0.999 result would still be a fail; the only acceptable Phase 2 -state is that paths B/C/D *cannot be constructed* because the underlying -columns/tables are absent from public bundles, and the bonus model's AUC drops -to whatever signal honest features provide (to be banded in PR 3.3). +`release/intermediate_instructor/` is a `research_instructor` bundle and +is *expected* to retain all four channels — that's the point of the +instructor mode (full truth for teaching). Running this script against the +instructor companion will report the same 1.000 reconstruction; that's +correct behavior, not a regression. The public/instructor diff is gated +separately by G9.\*. ## Pointer to the structural fix — PR 2.1 -The fix lives in PR 2.1 of `v1_release_roadmap.md` and breaks into: +PR 2.1 of `v1_release_roadmap.md` is the structural fix: 1. New `leadforge/render/relational_snapshot_safe.py`: - - Filter `touches`, `sessions`, `sales_activities` to - `timestamp <= lead_created_at + snapshot_day` per lead. - - Filter `opportunities` to `created_at <= lead_created_at + snapshot_day`; - drop `close_outcome` / `closed_at`. - Drop `converted_within_90_days` / `conversion_timestamp` from public `leads`. + - Drop `close_outcome` / `closed_at` from public `opportunities`. + - Filter `opportunities` to `created_at <= lead_created_at + snapshot_day` per lead. + - Filter `touches`/`sessions`/`sales_activities` similarly (defence-in-depth even though G4.4 passes today). - Omit `customers.parquet` / `subscriptions.parquet` from public bundles. 2. New `leadforge/validation/relational_leakage.py`: - - Lift the deterministic reconstruction from this PR's - `scripts/probe_relational_leakage.py` and assert - paths B/C/D produce zero hits because the underlying columns/tables are - absent. - - Assert no banned columns; assert event timestamps are bounded by - `lead_created_at + snapshot_day`. - -3. PR 2.2 — Wire the new export through `leadforge/exposure/filters.py` and - `leadforge/api/bundle.py`; bump `BUNDLE_SCHEMA_VERSION` 4 → 5; add the - `relational_snapshot_safe: true` manifest field for `student_public`. - -After PR 2.2 ships, this script must be re-run on the regenerated bundles and -each row in the gate-verdict table above must flip to ✓ PASS. + - Lift `deterministic_relational_reconstruction` from this PR's + `scripts/probe_relational_leakage.py` and assert that paths B/C/D + produce zero hits because the underlying columns/tables are absent. + - Assert no banned columns; assert event timestamps within horizon. + - Add a Phase-2 bonus-model probe — train LR/HistGBM on the redacted + view's `n_opps`/ACV features and band the residual AUC. + +3. PR 2.2 wires the new export through `leadforge/exposure/filters.py` + and `leadforge/api/bundle.py`; bumps `BUNDLE_SCHEMA_VERSION` 4 → 5; + adds the `relational_snapshot_safe: true` manifest field for + `student_public`. + +After PR 2.2 ships, this script must be re-run on the regenerated +bundles. Expected post-fix shape: + +- Deterministic paths A–E: all-False (matches the Phase-2 ablation rows + in the table above). +- Bonus model AUC (without close-outcome aggregates): the residual + band that PR 3.3 will calibrate — currently unbanded. +- G4.4: still PASS. +- The script's `--max-accuracy` flag becomes the regression gate in CI. ## Related artifacts - Probe script: [`scripts/probe_relational_leakage.py`](../../scripts/probe_relational_leakage.py) -- Unit test: [`tests/scripts/test_probe_relational_leakage.py`](../../tests/scripts/test_probe_relational_leakage.py) +- Unit tests: [`tests/scripts/test_probe_relational_leakage.py`](../../tests/scripts/test_probe_relational_leakage.py) - Acceptance gates: [`docs/release/v1_acceptance_gates.md`](v1_acceptance_gates.md) §"Relational leakage gate" - Roadmap: [`docs/release/v1_release_roadmap.md`](v1_release_roadmap.md) §"Phase 2 — Snapshot-safe relational export" - Original finding: [`docs/external_review/summaries/chatgpt_v2_summary.md`](../external_review/summaries/chatgpt_v2_summary.md) §0 diff --git a/scripts/probe_relational_leakage.py b/scripts/probe_relational_leakage.py index 9f2b69f..b88c21f 100644 --- a/scripts/probe_relational_leakage.py +++ b/scripts/probe_relational_leakage.py @@ -1,31 +1,60 @@ #!/usr/bin/env python3 """Probe public relational tables for ``converted_within_90_days`` leakage. -Reproduces the ChatGPT v2 finding: in the alpha ``student_public`` bundles -under ``release/{intro,intermediate,advanced}/``, joining -``leads`` to ``opportunities`` / ``customers`` / ``subscriptions`` reconstructs -the target end-to-end. - -Reports five deterministic reconstruction paths plus a bonus "public -relational only" model trained on join-derived features: - - A. Direct read of ``leads.converted_within_90_days`` (sanity floor) - B. Opportunity outcome (lead has any closed_won opportunity) - C. Customer existence (lead -> opportunities -> customers) - D. Subscription existence (lead -> opportunities -> customers -> subscriptions) - E. Deterministic OR (B OR C OR D) - F. LR / HistGBM on join-derived features (5-fold CV AUC + AP) - -Path E is the headline reconstruction; the implementation is exposed as +Reproduces the ChatGPT v2 finding on the alpha ``student_public`` bundles +under ``release/{intro,intermediate,advanced}/``. + +The script reports four orthogonal pieces of evidence: + +1. **Deterministic reconstruction paths** (no model fit, just joins): + + A. Direct read of ``leads.converted_within_90_days`` + — the label is published in cleartext. + B. Opportunity outcome + — lead has any opportunity with ``close_outcome == "closed_won"``. + C. Customer existence + — lead -> opportunities -> customers (any joined customer). + D. Subscription existence + — lead -> opportunities -> customers -> subscriptions. + E. Deterministic OR (B ∨ C ∨ D) + — the headline join-only reconstruction. + +2. **Phase-2-success ablation** — same paths after simulating Phase 2's + redaction (drop ``converted_within_90_days``/``conversion_timestamp`` from + ``leads``; drop ``close_outcome``/``closed_at`` from ``opportunities``; + omit ``customers`` and ``subscriptions``). This is what PR 2.1 needs to + size against. + +3. **Bonus model probes** — 5-fold CV LR + HistGBM on join-derived features. + Reported in two variants: + + - ``with_close_outcome_aggregates`` — includes ``any_closed_won`` + (which is just Path B aggregated; trivially perfect). + - ``without_close_outcome_aggregates`` — only counts and ACV + aggregates. Tells us whether non-trivial relational features + carry the leak independently of ``close_outcome``. + +4. **Snapshot-window probe (G4.4)** — for each event table, count rows + with ``timestamp > lead_created_at + horizon_days``. If horizon_days + isn't readable from the manifest, falls back to 90. + +The deterministic reconstruction is exposed as :func:`deterministic_relational_reconstruction` so PR 3.1 can lift it verbatim into ``leadforge/validation/leakage_probes.py``. -Exit code: 0 on success (any probe outcome), 1 on missing tables / shape errors. +Exit codes: + + 0 — probe ran to completion; no threshold violation + 1 — missing/unreadable required tables + 2 — a deterministic-path or bonus-model accuracy/AUC exceeded + ``--max-accuracy`` (intended for Phase 2 CI gating) Usage:: python scripts/probe_relational_leakage.py release/intermediate python scripts/probe_relational_leakage.py release/intro --json + python scripts/probe_relational_leakage.py release/intermediate \\ + --max-accuracy 0.65 # Phase-2 CI mode """ from __future__ import annotations @@ -38,7 +67,8 @@ import pandas as pd -REQUIRED_TABLES = ("leads", "opportunities", "customers", "subscriptions") +REQUIRED_TABLES = ("leads", "opportunities") +DEFAULT_HORIZON_DAYS = 90 def deterministic_relational_reconstruction( @@ -56,7 +86,17 @@ def deterministic_relational_reconstruction( No hidden state, no model fit — pure joins. Designed to be lifted into ``leadforge/validation/leakage_probes.py`` (PR 3.1) as the relational leakage probe. + + Empty ``customers``/``subscriptions`` frames are accepted (Phase 2 + success state); the corresponding paths simply return all-False. + + Raises: + ValueError: if ``leads.lead_id`` contains duplicates. A validator + cannot operate safely on non-unique keys. """ + if not leads["lead_id"].is_unique: + raise ValueError("leads.lead_id must be unique") + leads_idx = leads.set_index("lead_id", drop=False) # Path A — the label itself, if present in public leads. @@ -66,7 +106,7 @@ def deterministic_relational_reconstruction( path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") # Path B — any opportunity with close_outcome == "closed_won". - if "close_outcome" in opportunities.columns: + if "close_outcome" in opportunities.columns and len(opportunities) > 0: won_leads = set( opportunities.loc[opportunities["close_outcome"] == "closed_won", "lead_id"] ) @@ -75,14 +115,22 @@ def deterministic_relational_reconstruction( path_b = leads_idx["lead_id"].isin(won_leads) # Path C — lead has any joined customer (via opportunity_id -> opportunity.lead_id). - opp_to_lead = dict(zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=True)) + if len(opportunities) > 0: + opp_to_lead = dict( + zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=False) + ) + else: + opp_to_lead = {} customer_leads = { opp_to_lead[opp_id] for opp_id in customers["opportunity_id"] if opp_id in opp_to_lead } path_c = leads_idx["lead_id"].isin(customer_leads) # Path D — lead has any joined subscription (sub -> customer -> opportunity -> lead). - cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=True)) + if len(customers) > 0: + cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) + else: + cust_to_opp = {} sub_leads: set[str] = set() for cust_id in subscriptions["customer_id"]: opp_id = cust_to_opp.get(cust_id) @@ -93,10 +141,10 @@ def deterministic_relational_reconstruction( sub_leads.add(lead_id) path_d = leads_idx["lead_id"].isin(sub_leads) - # Path E — deterministic OR of B, C, D. + # Path E — deterministic OR of B, C, D (the headline join-only path). path_e = path_b | path_c | path_d - out = pd.DataFrame( + return pd.DataFrame( { "path_a_direct_label": path_a.values, "path_b_opportunity_won": path_b.values, @@ -106,11 +154,10 @@ def deterministic_relational_reconstruction( }, index=leads_idx.index, ) - return out def _binary_metrics(y_true: pd.Series, y_pred: pd.Series) -> dict[str, float]: - """Accuracy / precision / recall / F1 for boolean predictions.""" + """Accuracy / precision / recall / F1 / counts for boolean predictions.""" tp = int(((y_pred) & (y_true)).sum()) fp = int(((y_pred) & (~y_true)).sum()) fn = int(((~y_pred) & (y_true)).sum()) @@ -141,32 +188,48 @@ def _build_relational_features( opportunities: pd.DataFrame, customers: pd.DataFrame, subscriptions: pd.DataFrame, + *, + include_close_outcome_aggregates: bool, ) -> pd.DataFrame: - """Aggregate features per lead from public relational tables.""" - has_close_outcome = "close_outcome" in opportunities.columns - opp_agg = ( - opportunities.groupby("lead_id") - .agg( - n_opps=("opportunity_id", "count"), - max_acv=("estimated_acv", "max"), - mean_acv=("estimated_acv", "mean"), - any_closed_won=( - "close_outcome", - lambda s: int((s == "closed_won").any()) if has_close_outcome else 0, - ), - any_closed=( - "close_outcome", - lambda s: int(s.notna().any()) if has_close_outcome else 0, - ), + """Aggregate features per lead from public relational tables. + + When ``include_close_outcome_aggregates`` is False, the trivially-perfect + ``any_closed_won`` / ``any_closed`` columns are omitted. The resulting + feature set is the load-bearing "non-trivial relational" probe. + """ + agg_kwargs: dict[str, tuple[str, str | Any]] = { + "n_opps": ("opportunity_id", "count"), + "max_acv": ("estimated_acv", "max"), + "mean_acv": ("estimated_acv", "mean"), + } + if include_close_outcome_aggregates and "close_outcome" in opportunities.columns: + agg_kwargs["any_closed_won"] = ( + "close_outcome", + lambda s: int((s == "closed_won").any()), ) - .reset_index() - ) - opp_to_lead = dict(zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=True)) - customers = customers.copy() - customers["lead_id"] = customers["opportunity_id"].map(opp_to_lead) - cust_agg = customers.groupby("lead_id").size().rename("n_customers").reset_index() + agg_kwargs["any_closed"] = ( + "close_outcome", + lambda s: int(s.notna().any()), + ) + + if len(opportunities) > 0: + opp_agg = opportunities.groupby("lead_id").agg(**agg_kwargs).reset_index() + opp_to_lead = dict( + zip(opportunities["opportunity_id"], opportunities["lead_id"], strict=False) + ) + else: + opp_agg = pd.DataFrame(columns=["lead_id", *agg_kwargs.keys()]) + opp_to_lead = {} + + customers_with_lead = customers.copy() + customers_with_lead["lead_id"] = customers_with_lead["opportunity_id"].map(opp_to_lead) + cust_agg = customers_with_lead.groupby("lead_id").size().rename("n_customers").reset_index() - cust_to_opp = dict(zip(customers["customer_id"], customers["opportunity_id"], strict=True)) + cust_to_opp = ( + dict(zip(customers["customer_id"], customers["opportunity_id"], strict=False)) + if len(customers) > 0 + else {} + ) subs = subscriptions.copy() subs["opportunity_id"] = subs["customer_id"].map(cust_to_opp) subs["lead_id"] = subs["opportunity_id"].map(opp_to_lead) @@ -178,17 +241,17 @@ def _build_relational_features( .merge(cust_agg, on="lead_id", how="left") .merge(sub_agg, on="lead_id", how="left") ) - feats = feats.fillna( - { - "n_opps": 0, - "max_acv": 0.0, - "mean_acv": 0.0, - "any_closed_won": 0, - "any_closed": 0, - "n_customers": 0, - "n_subscriptions": 0, - } - ) + fill_defaults = { + "n_opps": 0, + "max_acv": 0.0, + "mean_acv": 0.0, + "n_customers": 0, + "n_subscriptions": 0, + } + if "any_closed_won" in feats.columns: + fill_defaults["any_closed_won"] = 0 + fill_defaults["any_closed"] = 0 + feats = feats.fillna(fill_defaults) return feats.set_index("lead_id") @@ -197,110 +260,240 @@ def _bonus_model_metrics( opportunities: pd.DataFrame, customers: pd.DataFrame, subscriptions: pd.DataFrame, + *, seed: int = 42, -) -> dict[str, dict[str, float]] | None: +) -> dict[str, dict[str, dict[str, float]]] | None: """5-fold CV AUC + AP for LR and HistGBM on join-derived features. - Returns ``None`` if scikit-learn is not installed (lets the deterministic - paths still report). + Returns a nested dict keyed by feature-set variant + (``with_close_outcome_aggregates`` vs ``without_close_outcome_aggregates``) + then by model name. The "without" variant is the load-bearing probe — + the "with" variant trivially achieves AUC 1.0 because ``any_closed_won`` + is Path B aggregated. + + Returns ``None`` if scikit-learn is unavailable. """ try: from sklearn.ensemble import HistGradientBoostingClassifier from sklearn.linear_model import LogisticRegression from sklearn.metrics import average_precision_score, roc_auc_score from sklearn.model_selection import StratifiedKFold + from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler except ImportError: return None - features = _build_relational_features(leads, opportunities, customers, subscriptions) if "converted_within_90_days" not in leads.columns: return None - y = leads.set_index("lead_id")["converted_within_90_days"].astype(int).reindex(features.index) - skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed) - results: dict[str, dict[str, float]] = {} - - for name, mk_model in ( - ("logistic_regression", lambda: LogisticRegression(max_iter=1000)), - ("hist_gbm", lambda: HistGradientBoostingClassifier(random_state=seed)), + out: dict[str, dict[str, dict[str, float]]] = {} + for variant_name, include_co_agg in ( + ("with_close_outcome_aggregates", True), + ("without_close_outcome_aggregates", False), ): - aucs: list[float] = [] - aps: list[float] = [] - for train_idx, test_idx in skf.split(features.values, y.values): - x_tr, x_te = features.values[train_idx], features.values[test_idx] - y_tr, y_te = y.values[train_idx], y.values[test_idx] - if name == "logistic_regression": - scaler = StandardScaler() - x_tr = scaler.fit_transform(x_tr) - x_te = scaler.transform(x_te) - model = mk_model() - model.fit(x_tr, y_tr) - proba = model.predict_proba(x_te)[:, 1] - aucs.append(roc_auc_score(y_te, proba)) - aps.append(average_precision_score(y_te, proba)) - results[name] = { - "auc_mean": float(sum(aucs) / len(aucs)), - "auc_min": float(min(aucs)), - "auc_max": float(max(aucs)), - "ap_mean": float(sum(aps) / len(aps)), + features = _build_relational_features( + leads, + opportunities, + customers, + subscriptions, + include_close_outcome_aggregates=include_co_agg, + ) + y = ( + leads.set_index("lead_id")["converted_within_90_days"] + .astype(int) + .reindex(features.index) + ) + + models: dict[str, Pipeline] = { + "logistic_regression": Pipeline( + [("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=1000))] + ), + "hist_gbm": Pipeline([("clf", HistGradientBoostingClassifier(random_state=seed))]), + } + + skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed) + variant_results: dict[str, dict[str, float]] = {} + for name, pipe in models.items(): + aucs: list[float] = [] + aps: list[float] = [] + for train_idx, test_idx in skf.split(features.values, y.values): + x_tr, x_te = features.values[train_idx], features.values[test_idx] + y_tr, y_te = y.values[train_idx], y.values[test_idx] + pipe.fit(x_tr, y_tr) + proba = pipe.predict_proba(x_te)[:, 1] + aucs.append(roc_auc_score(y_te, proba)) + aps.append(average_precision_score(y_te, proba)) + variant_results[name] = { + "auc_mean": float(sum(aucs) / len(aucs)), + "auc_min": float(min(aucs)), + "auc_max": float(max(aucs)), + "ap_mean": float(sum(aps) / len(aps)), + "n_features": int(features.shape[1]), + } + out[variant_name] = variant_results + return out + + +def _phase2_success_view( + leads: pd.DataFrame, opportunities: pd.DataFrame +) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """Simulate Phase-2 redaction so the same probe runs on a virtual fix. + + Drops public-leakage columns from ``leads``/``opportunities`` and + omits ``customers``/``subscriptions`` entirely. Used to measure the + join-only reconstruction *that PR 2.1 must defeat*, distinct from the + trivially-perfect Path A. + """ + drop_from_leads = [ + c for c in ("converted_within_90_days", "conversion_timestamp") if c in leads.columns + ] + leads_safe = leads.drop(columns=drop_from_leads) + opps_safe = opportunities.drop( + columns=[c for c in ("close_outcome", "closed_at") if c in opportunities.columns] + ) + empty_customers = pd.DataFrame( + { + "customer_id": pd.Series(dtype=str), + "opportunity_id": pd.Series(dtype=str), + "account_id": pd.Series(dtype=str), + } + ) + empty_subscriptions = pd.DataFrame( + { + "subscription_id": pd.Series(dtype=str), + "customer_id": pd.Series(dtype=str), + "plan_name": pd.Series(dtype=str), } - return results + ) + return leads_safe, opps_safe, empty_customers, empty_subscriptions + + +def _g4_4_snapshot_window_probe( + leads: pd.DataFrame, + tables_dir: Path, + horizon_days: int, +) -> dict[str, dict[str, int]]: + """Per-event-table: count rows with timestamp > lead_created_at + horizon_days. + + Implements G4.4 directly. Skips tables that aren't on disk. + """ + leads_anchor = leads[["lead_id", "lead_created_at"]].copy() + leads_anchor["lead_created_at"] = pd.to_datetime(leads_anchor["lead_created_at"]) + horizon = pd.Timedelta(days=horizon_days) + + event_tables = ( + ("touches", "touch_timestamp"), + ("sessions", "session_timestamp"), + ("sales_activities", "activity_timestamp"), + ("opportunities", "created_at"), + ) + out: dict[str, dict[str, int]] = {} + for tbl, ts_col in event_tables: + path = tables_dir / f"{tbl}.parquet" + if not path.exists(): + continue + df = pd.read_parquet(path, columns=["lead_id", ts_col]) + df[ts_col] = pd.to_datetime(df[ts_col]) + merged = df.merge(leads_anchor, on="lead_id", how="left") + bad = (merged[ts_col] > merged["lead_created_at"] + horizon).sum() + out[tbl] = {"violations": int(bad), "total_rows": int(len(df))} + return out + + +def _read_horizon_days(bundle_dir: Path) -> int: + manifest = bundle_dir / "manifest.json" + if manifest.exists(): + try: + data = json.loads(manifest.read_text()) + return int(data.get("horizon_days") or DEFAULT_HORIZON_DAYS) + except (json.JSONDecodeError, ValueError, TypeError): + return DEFAULT_HORIZON_DAYS + return DEFAULT_HORIZON_DAYS + + +def _read_optional_table(tables_dir: Path, name: str, columns: list[str]) -> pd.DataFrame: + path = tables_dir / f"{name}.parquet" + if path.exists(): + return pd.read_parquet(path) + return pd.DataFrame({c: pd.Series(dtype=str) for c in columns}) def probe_bundle(bundle_dir: Path) -> dict[str, Any]: - """Run every probe against a single bundle directory; return a result dict.""" + """Run every probe against a single bundle directory; return a result dict. + + Tolerates missing ``customers.parquet`` / ``subscriptions.parquet`` — + that's exactly the Phase 2 success state, and the same script must run + cleanly on regenerated bundles. + """ tables_dir = bundle_dir / "tables" if not tables_dir.is_dir(): raise FileNotFoundError(f"missing tables/ under {bundle_dir}") - missing = [t for t in REQUIRED_TABLES if not (tables_dir / f"{t}.parquet").exists()] - if missing: - raise FileNotFoundError(f"missing required tables in {tables_dir}: {missing}") + missing_required = [t for t in REQUIRED_TABLES if not (tables_dir / f"{t}.parquet").exists()] + if missing_required: + raise FileNotFoundError(f"missing required tables in {tables_dir}: {missing_required}") leads = pd.read_parquet(tables_dir / "leads.parquet") opportunities = pd.read_parquet(tables_dir / "opportunities.parquet") - customers = pd.read_parquet(tables_dir / "customers.parquet") - subscriptions = pd.read_parquet(tables_dir / "subscriptions.parquet") - - paths = deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions) - - if "converted_within_90_days" not in leads.columns: - return { - "bundle": str(bundle_dir), - "n_leads": int(len(leads)), - "error": "leads.parquet has no converted_within_90_days column — cannot score paths", - } - - y_true = ( - leads.set_index("lead_id")["converted_within_90_days"].astype(bool).reindex(paths.index) + customers = _read_optional_table( + tables_dir, "customers", ["customer_id", "opportunity_id", "account_id"] + ) + subscriptions = _read_optional_table( + tables_dir, "subscriptions", ["subscription_id", "customer_id", "plan_name"] ) - conversion_rate = float(y_true.mean()) - - path_metrics: dict[str, dict[str, float]] = {} - for col in paths.columns: - path_metrics[col] = _binary_metrics(y_true, paths[col]) - bonus = _bonus_model_metrics(leads, opportunities, customers, subscriptions) + paths = deterministic_relational_reconstruction(leads, opportunities, customers, subscriptions) - return { + base: dict[str, Any] = { "bundle": str(bundle_dir), "n_leads": int(len(leads)), "n_opportunities": int(len(opportunities)), "n_customers": int(len(customers)), "n_subscriptions": int(len(subscriptions)), - "conversion_rate": conversion_rate, - "deterministic_paths": path_metrics, - "bonus_model": bonus, "leakage_columns_present": { "leads.converted_within_90_days": "converted_within_90_days" in leads.columns, "leads.conversion_timestamp": "conversion_timestamp" in leads.columns, "opportunities.close_outcome": "close_outcome" in opportunities.columns, "opportunities.closed_at": "closed_at" in opportunities.columns, - "customers.parquet present": True, - "subscriptions.parquet present": True, + "tables/customers.parquet": (tables_dir / "customers.parquet").exists(), + "tables/subscriptions.parquet": (tables_dir / "subscriptions.parquet").exists(), }, } + horizon_days = _read_horizon_days(bundle_dir) + base["horizon_days"] = horizon_days + base["g4_4_snapshot_window"] = _g4_4_snapshot_window_probe(leads, tables_dir, horizon_days) + + if "converted_within_90_days" not in leads.columns: + base["error"] = "leads.parquet has no converted_within_90_days column — cannot score paths" + return base + + y_true = leads.set_index("lead_id")["converted_within_90_days"].astype(bool) + y_true = y_true.reindex(paths.index) + base["conversion_rate"] = float(y_true.mean()) + + base["deterministic_paths"] = { + col: _binary_metrics(y_true, paths[col]) for col in paths.columns + } + + # Phase-2 success ablation — re-run the deterministic probe under the + # virtual redaction. Path E should drop to 0 hits if Phase 2 works. + leads_p2, opps_p2, cust_p2, subs_p2 = _phase2_success_view(leads, opportunities) + paths_p2 = deterministic_relational_reconstruction(leads_p2, opps_p2, cust_p2, subs_p2) + base["phase2_ablation_paths"] = { + col: _binary_metrics(y_true, paths_p2[col]) for col in paths_p2.columns + } + + base["bonus_model"] = _bonus_model_metrics(leads, opportunities, customers, subscriptions) + return base + + +def _format_metrics_row(name: str, m: dict[str, float], width: int = 32) -> str: + return ( + f" {name:<{width}} " + f"{m['accuracy']:>6.3f} {m['precision']:>6.3f} " + f"{m['recall']:>6.3f} {m['f1']:>6.3f}" + ) + def _print_human(result: dict[str, Any]) -> None: print(f"\n=== {result['bundle']} ===") @@ -309,43 +502,75 @@ def _print_human(result: dict[str, Any]) -> None: f"n_opps={result.get('n_opportunities')} " f"n_customers={result.get('n_customers')} " f"n_subscriptions={result.get('n_subscriptions')} " - f"conversion_rate={result.get('conversion_rate', float('nan')):.3f}" + f"conversion_rate={result.get('conversion_rate', float('nan')):.3f} " + f"horizon_days={result.get('horizon_days')}" ) if "error" in result: print(f" ERROR: {result['error']}") return - print(" Deterministic reconstruction paths (vs converted_within_90_days):") + + print(" Deterministic reconstruction (vs converted_within_90_days):") print(f" {'path':<32} {'acc':>6} {'prec':>6} {'rec':>6} {'f1':>6}") for path, m in result["deterministic_paths"].items(): - print( - f" {path:<32} {m['accuracy']:>6.3f} {m['precision']:>6.3f} " - f"{m['recall']:>6.3f} {m['f1']:>6.3f}" - ) + print(_format_metrics_row(path, m)) + + print(" Phase-2-success ablation (label/close_outcome/customers/subs redacted):") + print(f" {'path':<32} {'acc':>6} {'prec':>6} {'rec':>6} {'f1':>6}") + for path, m in result["phase2_ablation_paths"].items(): + print(_format_metrics_row(path, m)) + bonus = result["bonus_model"] if bonus is None: print(" Bonus model: scikit-learn not installed — skipped.") else: print(" Bonus model (5-fold CV on join-derived features):") - for name, m in bonus.items(): - print( - f" {name:<22} AUC={m['auc_mean']:.3f} " - f"(min={m['auc_min']:.3f} max={m['auc_max']:.3f}) " - f"AP={m['ap_mean']:.3f}" - ) - print(" Leakage-column presence (G4.1-G4.3 indicators):") + for variant, models in bonus.items(): + print(f" [{variant}]") + for name, m in models.items(): + print( + f" {name:<22} AUC={m['auc_mean']:.3f} " + f"(min={m['auc_min']:.3f} max={m['auc_max']:.3f}) " + f"AP={m['ap_mean']:.3f} n_feat={m['n_features']}" + ) + + horizon = result["horizon_days"] + print(f" G4.4 snapshot-window (timestamp > lead_created_at + {horizon}d):") + for tbl, m in result["g4_4_snapshot_window"].items(): + flag = "PASS" if m["violations"] == 0 else "FAIL" + print(f" {tbl:<22} {flag} ({m['violations']}/{m['total_rows']} rows past horizon)") + + print(" Leakage-column / file presence (G4.1-G4.3):") for k, v in result["leakage_columns_present"].items(): flag = "PRESENT" if v else "absent" print(f" {k:<40} {flag}") +def _max_observed_accuracy(result: dict[str, Any]) -> float: + """Maximum reconstruction accuracy across deterministic and bonus probes. + + Excludes Phase-2 ablation (that's the virtual fix, not the current state). + Used for ``--max-accuracy`` gating. + """ + candidates: list[float] = [] + for m in result.get("deterministic_paths", {}).values(): + candidates.append(m["accuracy"]) + bonus = result.get("bonus_model") or {} + for variant in bonus.values(): + for m in variant.values(): + candidates.append(m["auc_mean"]) + return max(candidates) if candidates else 0.0 + + def main(argv: list[str] | None = None) -> int: - parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) + parser = argparse.ArgumentParser( + description="Probe public relational tables for converted_within_90_days leakage" + ) parser.add_argument( "bundle_dir", type=Path, help=( - "Path to a bundle directory (must contain tables/{leads,opportunities," - "customers,subscriptions}.parquet)" + "Path to a bundle directory (must contain tables/{leads,opportunities}.parquet; " + "customers/subscriptions optional)" ), ) parser.add_argument( @@ -353,6 +578,15 @@ def main(argv: list[str] | None = None) -> int: action="store_true", help="Emit JSON instead of human-readable text", ) + parser.add_argument( + "--max-accuracy", + type=float, + default=None, + help=( + "Threshold for reconstruction accuracy / bonus-model AUC. " + "Exit 2 if any probe exceeds this. Intended for Phase 2 CI gating." + ), + ) args = parser.parse_args(argv) try: @@ -365,6 +599,16 @@ def main(argv: list[str] | None = None) -> int: print(json.dumps(result, indent=2, default=str)) else: _print_human(result) + + if args.max_accuracy is not None: + observed = _max_observed_accuracy(result) + if observed > args.max_accuracy: + print( + f"\nGATE FAILURE: observed reconstruction = {observed:.3f} " + f"> --max-accuracy {args.max_accuracy:.3f}", + file=sys.stderr, + ) + return 2 return 0 diff --git a/tests/scripts/test_probe_relational_leakage.py b/tests/scripts/test_probe_relational_leakage.py index ef1bff3..6da6712 100644 --- a/tests/scripts/test_probe_relational_leakage.py +++ b/tests/scripts/test_probe_relational_leakage.py @@ -1,20 +1,22 @@ """Tests for ``scripts/probe_relational_leakage.py``. -Exercises the deterministic reconstruction function on a hand-built four-lead -frame that covers each leakage path independently — this is the function PR -3.1 will lift into ``leadforge/validation/leakage_probes.py``, so locking -its behaviour now matters. +Locks the deterministic reconstruction function (which PR 3.1 will lift +into ``leadforge/validation/leakage_probes.py``), the binary-metrics +helper, and the CLI entrypoint. """ from __future__ import annotations import importlib.util +import subprocess import sys from pathlib import Path import pandas as pd +import pytest _SCRIPT_PATH = Path(__file__).resolve().parents[2] / "scripts" / "probe_relational_leakage.py" +_REPO_ROOT = Path(__file__).resolve().parents[2] _spec = importlib.util.spec_from_file_location("probe_relational_leakage", _SCRIPT_PATH) assert _spec is not None assert _spec.loader is not None @@ -24,21 +26,20 @@ def _toy_bundle() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: - """Construct a 4-lead toy bundle covering every leakage path independently. + """A 3-lead toy bundle covering each path through the join graph. - lead_001: converted; opp closed_won; customer + subscription rows. - - lead_002: converted via stale flag in leads.parquet; no joined opp. - (Tests path A in isolation.) - - lead_003: converted by ground truth; opp exists but close_outcome NaN. - customer row exists. (Tests path C reaching when path B can't.) - - lead_004: not converted; opp exists but is open; no customer/sub. + (Hits A, B, C, D, E.) + - lead_002: converted; opp exists but close_outcome NaN; customer row exists, + no subscription. (Hits A and C — and therefore E — but NOT B or D.) + - lead_003: not converted; opp exists but is open; no customer/sub. + (Hits none.) """ leads = pd.DataFrame( [ {"lead_id": "lead_001", "converted_within_90_days": True}, {"lead_id": "lead_002", "converted_within_90_days": True}, - {"lead_id": "lead_003", "converted_within_90_days": True}, - {"lead_id": "lead_004", "converted_within_90_days": False}, + {"lead_id": "lead_003", "converted_within_90_days": False}, ] ) opportunities = pd.DataFrame( @@ -50,14 +51,14 @@ def _toy_bundle() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFram "estimated_acv": 50_000, }, { - "opportunity_id": "opp_003", - "lead_id": "lead_003", + "opportunity_id": "opp_002", + "lead_id": "lead_002", "close_outcome": None, "estimated_acv": 30_000, }, { - "opportunity_id": "opp_004", - "lead_id": "lead_004", + "opportunity_id": "opp_003", + "lead_id": "lead_003", "close_outcome": None, "estimated_acv": 20_000, }, @@ -66,7 +67,7 @@ def _toy_bundle() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFram customers = pd.DataFrame( [ {"customer_id": "cust_001", "opportunity_id": "opp_001", "account_id": "acct_a"}, - {"customer_id": "cust_003", "opportunity_id": "opp_003", "account_id": "acct_c"}, + {"customer_id": "cust_002", "opportunity_id": "opp_002", "account_id": "acct_b"}, ] ) subscriptions = pd.DataFrame( @@ -84,46 +85,126 @@ def test_deterministic_reconstruction_paths() -> None: leads, opportunities, customers, subscriptions ) - # Path A — direct read of the label. - assert bool(paths.loc["lead_001", "path_a_direct_label"]) is True - assert bool(paths.loc["lead_002", "path_a_direct_label"]) is True - assert bool(paths.loc["lead_003", "path_a_direct_label"]) is True - assert bool(paths.loc["lead_004", "path_a_direct_label"]) is False - - # Path B — opportunity closed_won. Only lead_001. - assert bool(paths.loc["lead_001", "path_b_opportunity_won"]) is True - assert bool(paths.loc["lead_002", "path_b_opportunity_won"]) is False - assert bool(paths.loc["lead_003", "path_b_opportunity_won"]) is False - assert bool(paths.loc["lead_004", "path_b_opportunity_won"]) is False - - # Path C — customer existence. lead_001 + lead_003. - assert bool(paths.loc["lead_001", "path_c_customer_exists"]) is True - assert bool(paths.loc["lead_002", "path_c_customer_exists"]) is False - assert bool(paths.loc["lead_003", "path_c_customer_exists"]) is True - assert bool(paths.loc["lead_004", "path_c_customer_exists"]) is False - - # Path D — subscription existence. Only lead_001. - assert bool(paths.loc["lead_001", "path_d_subscription_exists"]) is True - assert bool(paths.loc["lead_002", "path_d_subscription_exists"]) is False - assert bool(paths.loc["lead_003", "path_d_subscription_exists"]) is False - assert bool(paths.loc["lead_004", "path_d_subscription_exists"]) is False - - # Path E — OR of B/C/D. lead_001 + lead_003. - assert bool(paths.loc["lead_001", "path_e_or_b_c_d"]) is True - assert bool(paths.loc["lead_002", "path_e_or_b_c_d"]) is False - assert bool(paths.loc["lead_003", "path_e_or_b_c_d"]) is True - assert bool(paths.loc["lead_004", "path_e_or_b_c_d"]) is False - - -def test_deterministic_reconstruction_handles_missing_label_column() -> None: - """When public ``leads`` has had the label dropped (Phase 2 fix), path A - must collapse to all-False rather than crash.""" - leads, opportunities, customers, subscriptions = _toy_bundle() + expected = { + "path_a_direct_label": [True, True, False], + "path_b_opportunity_won": [True, False, False], + "path_c_customer_exists": [True, True, False], + "path_d_subscription_exists": [True, False, False], + "path_e_or_b_c_d": [True, True, False], + } + for col, vals in expected.items(): + assert list(paths[col].astype(bool)) == vals, f"{col}: {list(paths[col].astype(bool))}" + + +def test_deterministic_reconstruction_phase2_success_state() -> None: + """Phase 2 success state — label dropped, customers/subs absent — + collapses paths A/C/D/E to all-False without crashing.""" + leads, opportunities, _customers, _subscriptions = _toy_bundle() leads_safe = leads.drop(columns=["converted_within_90_days"]) + opps_safe = opportunities.drop(columns=["close_outcome"]) + empty_customers = pd.DataFrame( + {"customer_id": pd.Series(dtype=str), "opportunity_id": pd.Series(dtype=str)} + ) + empty_subscriptions = pd.DataFrame( + {"subscription_id": pd.Series(dtype=str), "customer_id": pd.Series(dtype=str)} + ) paths = probe_module.deterministic_relational_reconstruction( - leads_safe, opportunities, customers, subscriptions + leads_safe, opps_safe, empty_customers, empty_subscriptions ) assert not paths["path_a_direct_label"].any() - # Other paths still fire. - assert bool(paths.loc["lead_001", "path_e_or_b_c_d"]) is True + assert not paths["path_b_opportunity_won"].any() + assert not paths["path_c_customer_exists"].any() + assert not paths["path_d_subscription_exists"].any() + assert not paths["path_e_or_b_c_d"].any() + + +def test_deterministic_reconstruction_rejects_duplicate_lead_id() -> None: + """A validator cannot operate on non-unique keys — must raise, not silently + misalign.""" + leads, opportunities, customers, subscriptions = _toy_bundle() + dup = pd.concat([leads, leads.iloc[[0]]], ignore_index=True) + with pytest.raises(ValueError, match="lead_id must be unique"): + probe_module.deterministic_relational_reconstruction( + dup, opportunities, customers, subscriptions + ) + + +def test_binary_metrics_perfect_prediction() -> None: + y_true = pd.Series([True, True, False, False]) + y_pred = pd.Series([True, True, False, False]) + m = probe_module._binary_metrics(y_true, y_pred) + assert m["accuracy"] == 1.0 + assert m["precision"] == 1.0 + assert m["recall"] == 1.0 + assert m["f1"] == 1.0 + assert (m["tp"], m["fp"], m["fn"], m["tn"]) == (2, 0, 0, 2) + + +def test_binary_metrics_all_negative_prediction() -> None: + """Recall undefined-but-not-NaN protection: when no positives predicted but + positives exist, precision is NaN (no positive predictions) and recall = 0.""" + y_true = pd.Series([True, False, False, False]) + y_pred = pd.Series([False, False, False, False]) + m = probe_module._binary_metrics(y_true, y_pred) + assert m["accuracy"] == 0.75 + assert m["recall"] == 0.0 + assert m["precision"] != m["precision"] # NaN: no positive predictions + assert (m["tp"], m["fp"], m["fn"], m["tn"]) == (0, 0, 1, 3) + + +def test_binary_metrics_mixed() -> None: + y_true = pd.Series([True, True, False, False]) + y_pred = pd.Series([True, False, True, False]) + m = probe_module._binary_metrics(y_true, y_pred) + assert m["accuracy"] == 0.5 + assert m["precision"] == 0.5 + assert m["recall"] == 0.5 + assert m["f1"] == 0.5 + + +# --------------------------------------------------------------------------- +# CLI smoke test — runs against the real alpha bundle if present +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif( + not (_REPO_ROOT / "release" / "intermediate" / "tables" / "leads.parquet").exists(), + reason="alpha bundle not present (clean checkout)", +) +def test_cli_smoke_reports_numeric_accuracy() -> None: + """Acceptance criterion: ``probe_relational_leakage.py release/intermediate`` + exits 0 and reports a numeric reconstruction accuracy.""" + result = subprocess.run( # noqa: S603 — controlled args, sys.executable + [sys.executable, str(_SCRIPT_PATH), str(_REPO_ROOT / "release" / "intermediate")], + capture_output=True, + text=True, + check=False, + ) + assert result.returncode == 0, result.stderr + assert "path_e_or_b_c_d" in result.stdout + # The accuracy column should contain at least one float-like token. + assert any(tok.replace(".", "").isdigit() for tok in result.stdout.split()) + + +@pytest.mark.skipif( + not (_REPO_ROOT / "release" / "intermediate" / "tables" / "leads.parquet").exists(), + reason="alpha bundle not present (clean checkout)", +) +def test_cli_max_accuracy_gate_fails_on_alpha() -> None: + """Phase-2 CI gate flag: alpha bundles trivially exceed any sane threshold, + so ``--max-accuracy 0.65`` must exit 2.""" + result = subprocess.run( # noqa: S603 — controlled args, sys.executable + [ + sys.executable, + str(_SCRIPT_PATH), + str(_REPO_ROOT / "release" / "intermediate"), + "--max-accuracy", + "0.65", + ], + capture_output=True, + text=True, + check=False, + ) + assert result.returncode == 2, result.stderr + assert "GATE FAILURE" in result.stderr From d9cb0b2365c3e10fd04311eba1aa0f7b8a7c2fa3 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Tue, 5 May 2026 20:12:28 +0300 Subject: [PATCH 3/3] review: address Copilot inline comments on PR 1.1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five real issues from the automated review: 1. Path A (label cleartext read) used .astype(bool), which maps NaN to True. Routed through pandas' nullable "boolean" dtype with explicit fillna(False) so partially-redacted bundles don't report phantom positives. Same fix applied to y_true construction in probe_bundle. 2. _binary_metrics F1 returned NaN when precision OR recall was exactly 0.0 — should be 0.0 (zero-skill is well-defined; NaN should signal undefined). New logic: NaN only when precision/recall is itself NaN; 0.0 when both are zero; harmonic mean otherwise. 3. Module docstring promised exit 1 on "shape errors" but main() only caught FileNotFoundError. Broadened catch to (FileNotFoundError, KeyError, ValueError); error message now includes exception type. 4. probe_bundle() previously bailed with an "error" key when converted_within_90_days was absent. Post-Phase-2 bundles will be exactly that shape. Now reports path_prediction_rates (fraction of leads each path flags positive) unconditionally — useful as a sanity view both before and after the fix. The early-return is now a "note" key rather than "error", with a message pointing the reader at the prediction-rates view. 5. New tests: - F1 zero-precision-and-recall yields zero, not NaN - Path A with NaN-bearing label column maps NaN -> False Total: 10 tests, all passing. Lint/format/mypy clean. Co-Authored-By: Claude Opus 4.7 --- scripts/probe_relational_leakage.py | 60 +++++++++++++++---- .../scripts/test_probe_relational_leakage.py | 44 ++++++++++++++ 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/scripts/probe_relational_leakage.py b/scripts/probe_relational_leakage.py index b88c21f..88abc7e 100644 --- a/scripts/probe_relational_leakage.py +++ b/scripts/probe_relational_leakage.py @@ -61,6 +61,7 @@ import argparse import json +import math import sys from pathlib import Path from typing import Any @@ -100,8 +101,11 @@ def deterministic_relational_reconstruction( leads_idx = leads.set_index("lead_id", drop=False) # Path A — the label itself, if present in public leads. + # Plain ``astype(bool)`` would map NaN to True; route through pandas' + # nullable boolean dtype so missing values fill cleanly to False without + # triggering object-downcast warnings. if "converted_within_90_days" in leads.columns: - path_a = leads_idx["converted_within_90_days"].astype(bool) + path_a = leads_idx["converted_within_90_days"].astype("boolean").fillna(False).astype(bool) else: path_a = pd.Series(False, index=leads_idx.index, name="converted_within_90_days") @@ -157,7 +161,12 @@ def deterministic_relational_reconstruction( def _binary_metrics(y_true: pd.Series, y_pred: pd.Series) -> dict[str, float]: - """Accuracy / precision / recall / F1 / counts for boolean predictions.""" + """Accuracy / precision / recall / F1 / counts for boolean predictions. + + F1 is ``0.0`` when precision or recall is exactly ``0.0`` (well-defined zero + skill, not undefined); ``NaN`` only when precision or recall is itself + ``NaN`` (undefined — no positive predictions or no positives to find). + """ tp = int(((y_pred) & (y_true)).sum()) fp = int(((y_pred) & (~y_true)).sum()) fn = int(((~y_pred) & (y_true)).sum()) @@ -166,11 +175,18 @@ def _binary_metrics(y_true: pd.Series, y_pred: pd.Series) -> dict[str, float]: accuracy = (tp + tn) / n if n else float("nan") precision = tp / (tp + fp) if (tp + fp) else float("nan") recall = tp / (tp + fn) if (tp + fn) else float("nan") - f1 = ( - (2 * precision * recall) / (precision + recall) - if (precision and recall and precision + recall) - else float("nan") - ) + + # F1 logic: + # - either precision or recall is NaN -> F1 is NaN (undefined input) + # - precision + recall == 0 -> F1 = 0 (defined: zero skill) + # - otherwise -> harmonic mean + if math.isnan(precision) or math.isnan(recall): + f1 = float("nan") + elif precision + recall == 0: + f1 = 0.0 + else: + f1 = (2 * precision * recall) / (precision + recall) + return { "accuracy": accuracy, "precision": precision, @@ -463,11 +479,24 @@ def probe_bundle(bundle_dir: Path) -> dict[str, Any]: base["horizon_days"] = horizon_days base["g4_4_snapshot_window"] = _g4_4_snapshot_window_probe(leads, tables_dir, horizon_days) + # Path prediction rates — useful even without ground truth (post-Phase-2, + # B/C/D should all be 0.0; non-zero would mean the redaction is incomplete). + base["path_prediction_rates"] = {col: float(paths[col].mean()) for col in paths.columns} + if "converted_within_90_days" not in leads.columns: - base["error"] = "leads.parquet has no converted_within_90_days column — cannot score paths" + base["note"] = ( + "leads.parquet has no converted_within_90_days column — " + "scored metrics (accuracy/precision/recall/F1) skipped; " + "path_prediction_rates above is the post-Phase-2 verification view." + ) return base - y_true = leads.set_index("lead_id")["converted_within_90_days"].astype(bool) + y_true = ( + leads.set_index("lead_id")["converted_within_90_days"] + .astype("boolean") + .fillna(False) + .astype(bool) + ) y_true = y_true.reindex(paths.index) base["conversion_rate"] = float(y_true.mean()) @@ -505,8 +534,13 @@ def _print_human(result: dict[str, Any]) -> None: f"conversion_rate={result.get('conversion_rate', float('nan')):.3f} " f"horizon_days={result.get('horizon_days')}" ) - if "error" in result: - print(f" ERROR: {result['error']}") + if "path_prediction_rates" in result: + print(" Path prediction rates (fraction of leads each path flags positive):") + for path, rate in result["path_prediction_rates"].items(): + print(f" {path:<32} {rate:>6.3f}") + + if "note" in result: + print(f" {result['note']}") return print(" Deterministic reconstruction (vs converted_within_90_days):") @@ -591,8 +625,8 @@ def main(argv: list[str] | None = None) -> int: try: result = probe_bundle(args.bundle_dir) - except FileNotFoundError as exc: - print(f"ERROR: {exc}", file=sys.stderr) + except (FileNotFoundError, KeyError, ValueError) as exc: + print(f"ERROR: {type(exc).__name__}: {exc}", file=sys.stderr) return 1 if args.json: diff --git a/tests/scripts/test_probe_relational_leakage.py b/tests/scripts/test_probe_relational_leakage.py index 6da6712..57a3fd3 100644 --- a/tests/scripts/test_probe_relational_leakage.py +++ b/tests/scripts/test_probe_relational_leakage.py @@ -163,6 +163,50 @@ def test_binary_metrics_mixed() -> None: assert m["f1"] == 0.5 +def test_binary_metrics_zero_precision_and_recall_yields_zero_f1() -> None: + """Both precision and recall are well-defined zeros (not NaN) — F1 must + be 0.0, not NaN. Reproduces the bug the F1 fix addresses: 0 TP, several + FP, several FN.""" + y_true = pd.Series([True, True, False, False]) + y_pred = pd.Series([False, False, True, True]) + m = probe_module._binary_metrics(y_true, y_pred) + assert m["precision"] == 0.0 + assert m["recall"] == 0.0 + assert m["f1"] == 0.0 + + +def test_path_a_handles_nan_label_safely() -> None: + """``astype(bool)`` on a NaN-bearing label column would map NaN to True. + The function must defensively coerce NaN -> False instead.""" + leads = pd.DataFrame( + [ + {"lead_id": "lead_a", "converted_within_90_days": True}, + {"lead_id": "lead_b", "converted_within_90_days": None}, # NaN + {"lead_id": "lead_c", "converted_within_90_days": False}, + ] + ) + empty_opps = pd.DataFrame( + { + "opportunity_id": pd.Series(dtype=str), + "lead_id": pd.Series(dtype=str), + "close_outcome": pd.Series(dtype=str), + "estimated_acv": pd.Series(dtype=float), + } + ) + empty_customers = pd.DataFrame( + {"customer_id": pd.Series(dtype=str), "opportunity_id": pd.Series(dtype=str)} + ) + empty_subscriptions = pd.DataFrame( + {"subscription_id": pd.Series(dtype=str), "customer_id": pd.Series(dtype=str)} + ) + paths = probe_module.deterministic_relational_reconstruction( + leads, empty_opps, empty_customers, empty_subscriptions + ) + assert bool(paths.loc["lead_a", "path_a_direct_label"]) is True + assert bool(paths.loc["lead_b", "path_a_direct_label"]) is False # NaN -> False + assert bool(paths.loc["lead_c", "path_a_direct_label"]) is False + + # --------------------------------------------------------------------------- # CLI smoke test — runs against the real alpha bundle if present # ---------------------------------------------------------------------------