From 9391c7731ee41d29202bf46d6e465194686f5f55 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Wed, 29 Apr 2026 23:19:13 +0300 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20v5=20lead=20scoring=20dataset=20?= =?UTF-8?q?=E2=80=94=20build,=20validate,=20and=20baseline=20eval=20script?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit v5 improvements over v4: - Snapshot day 10 → 14 (longer observation window, more realistic) - Leakage trap renamed to __leakage__total_touches_90d (explicit naming) - expected_acv clipped to narrative range [18k, 120k] - Added days_since_first_touch momentum feature (19 cols, up from 18) - Validator uses hold-out AUC (not in-sample), PR-AUC, Precision@K, Lift@K - Multi-seed leakage trap robustness: mean delta >= 0.03, min >= 0.015 - Duplicate check, ACV range check, missingness bounds - Baseline eval script with LR + RF, value-aware scoring demo Co-Authored-By: Claude Opus 4.6 --- .agent-plan.md | 14 +- scripts/build_v5_snapshot.py | 235 ++++++++++++++ scripts/quick_baseline_eval_v5.py | 189 +++++++++++ scripts/validate_v5_dataset.py | 522 ++++++++++++++++++++++++++++++ 4 files changed, 959 insertions(+), 1 deletion(-) create mode 100644 scripts/build_v5_snapshot.py create mode 100644 scripts/quick_baseline_eval_v5.py create mode 100644 scripts/validate_v5_dataset.py diff --git a/.agent-plan.md b/.agent-plan.md index a759000..7907891 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -6,7 +6,7 @@ ## Current System State -**v0.5.0 in progress — Milestones 7–11 complete, v4 dataset shipped.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. v4 engine changes + build pipeline merged (PR #21). v4 dataset generated and validated. PR-agent refresh fallback wiring fixed for bot-authored reviews. 609 tests passing. +**v0.5.0 in progress — Milestones 7–11 complete, v5 dataset shipped.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. v4 engine changes merged (PR #21). v5 dataset generated and validated (all 10 checks pass). PR-agent refresh fallback wiring fixed for bot-authored reviews. 609 tests passing. --- @@ -51,6 +51,18 @@ Build pipeline: - [x] Add StandardScaler to validation script LR for convergence - [x] Update `.agent-plan.md` to reflect completion +### v5: Improved dataset with robust validation ✓ (PR #25) + +No engine changes required — v5 is a build pipeline + validation improvement. + +- [x] `scripts/build_v5_snapshot.py` — day-14 snapshot, ACV capping [18k–120k], `__leakage__` naming, `days_since_first_touch` momentum feature +- [x] `scripts/validate_v5_dataset.py` — 10 checks: hold-out AUC/PR-AUC, multi-seed leakage robustness, Precision@K, Lift@K, duplicate check, ACV range, missingness bounds +- [x] `scripts/quick_baseline_eval_v5.py` — LR + RF baselines, value-aware scoring demo, feature importance +- [x] Generate `lead_scoring_intro_v5.csv` (1000 rows × 19 cols, 30% conversion, hold-out AUC 0.632) +- [x] Leakage trap robustly validated: mean delta 0.033, min delta 0.015 across 10 seeds +- [x] `RELEASE_v5.md` with instructor/student notes, value-aware scoring section, full validation results +- [x] Updated `BACKGROUND.md` with value-aware lead scoring section + --- ## Deferred Items diff --git a/scripts/build_v5_snapshot.py b/scripts/build_v5_snapshot.py new file mode 100644 index 0000000..fd1ad62 --- /dev/null +++ b/scripts/build_v5_snapshot.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +"""Build the v5 lead scoring intro CSV (generates the bundle internally). + +Usage: + python scripts/build_v5_snapshot.py OUTPUT_CSV + +Produces a 1000-row × 19-column CSV at ~30% conversion rate with: +- Day-14 windowed features +- Structured missingness (MAR for web_sessions, seniority; MCAR on days_since_last_touch) +- Leakage trap (__leakage__total_touches_90d using full 90-day data) +- Expected ACV capped to narrative range [18k, 120k] +- Momentum features (touches_week_1, days_since_first_touch) +- Stratified subsampling +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import numpy as np +import pandas as pd + +from leadforge.api.generator import Generator +from leadforge.render.snapshots import build_snapshot + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +SEED = 42 +N_LEADS = 5000 +SNAPSHOT_DAY = 14 +SUBSAMPLE_N = 1000 +TARGET_RATE = 0.30 + +# Narrative-consistent ACV bounds (from narrative.yaml: $18k–$120k). +ACV_FLOOR = 18_000.0 +ACV_CAP = 120_000.0 + +# v5 column set: 18 features + 1 target = 19 columns. +_FINAL_COLUMNS = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", + "opportunity_created", + "demo_completed", + "expected_acv", + "inbound_touches", + "outbound_touches", + "touches_week_1", + "days_since_first_touch", + "web_sessions", + "sales_activities", + "days_since_last_touch", + "__leakage__total_touches_90d", + "converted", +] + +# Snapshot column → v5 column renaming. +_RENAME_MAP = { + "employee_band": "company_size", + "estimated_revenue_band": "company_revenue", + "role_function": "contact_role", + "inbound_touch_count": "inbound_touches", + "outbound_touch_count": "outbound_touches", + "session_count": "web_sessions", + "activity_count": "sales_activities", + "converted_within_90_days": "converted", + "total_touches_all": "__leakage__total_touches_90d", +} + + +# --------------------------------------------------------------------------- +# Pipeline steps +# --------------------------------------------------------------------------- + + +def generate_bundle(seed: int = SEED, n_leads: int = N_LEADS) -> pd.DataFrame: + """Generate a full bundle and return the day-14 snapshot.""" + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=seed, + exposure_mode="research_instructor", + n_leads=n_leads, + difficulty="intro", + ) + bundle = gen.generate() + return build_snapshot( + bundle.simulation_result, + bundle.population, + snapshot_day=SNAPSHOT_DAY, + ) + + +def derive_binary_features(df: pd.DataFrame) -> pd.DataFrame: + """Derive binary features for the v5 column set.""" + df = df.copy() + df["opportunity_created"] = df["opportunity_created"].astype(int) + df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) + return df + + +def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: + """Clip expected_acv to narrative-consistent range [ACV_FLOOR, ACV_CAP].""" + df = df.copy() + df["expected_acv"] = df["expected_acv"].clip(lower=ACV_FLOOR, upper=ACV_CAP) + return df + + +def rename_and_select(df: pd.DataFrame) -> pd.DataFrame: + """Rename snapshot columns to v5 names and select final column set.""" + df = df.rename(columns=_RENAME_MAP) + df["converted"] = df["converted"].astype(int) + missing = [c for c in _FINAL_COLUMNS if c not in df.columns] + if missing: + raise ValueError( + f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" + ) + return df[_FINAL_COLUMNS] + + +def subsample( + df: pd.DataFrame, + rng: np.random.RandomState, + n: int = SUBSAMPLE_N, + target_rate: float = TARGET_RATE, +) -> pd.DataFrame: + """Stratified subsample to n rows at target_rate conversion.""" + positives = df[df["converted"] == 1] + negatives = df[df["converted"] == 0] + n_pos = int(n * target_rate) + n_neg = n - n_pos + + if len(positives) < n_pos: + print(f"WARNING: only {len(positives)} positives, need {n_pos}", file=sys.stderr) + n_pos = len(positives) + n_neg = n - n_pos + if len(negatives) < n_neg: + print(f"WARNING: only {len(negatives)} negatives, need {n_neg}", file=sys.stderr) + n_neg = len(negatives) + + pos_sample = positives.sample(n=n_pos, random_state=rng) + neg_sample = negatives.sample(n=n_neg, random_state=rng) + return ( + pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + ) + + +def inject_missingness(df: pd.DataFrame, rng: np.random.RandomState) -> pd.DataFrame: + """Apply structured missingness per the v5 contract. + + Patterns (all <10% per column): + - web_sessions: SDR outbound 15%, inbound marketing 2%, partner referral 5% + - seniority: partner referral 8%, others 1% + - days_since_last_touch: structural NaN (no touches) + 3% MCAR + - days_since_first_touch: structural NaN (no touches) + 2% MCAR + """ + df = df.copy() + n = len(df) + + # web_sessions: source-conditional missingness + for source, rate in [ + ("sdr_outbound", 0.15), + ("inbound_marketing", 0.02), + ("partner_referral", 0.05), + ]: + mask = (df["lead_source"] == source) & (rng.random(n) < rate) + df.loc[mask, "web_sessions"] = np.nan + + # seniority: source-conditional missingness + partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) + other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) + df.loc[partner_mask | other_mask, "seniority"] = np.nan + + # days_since_last_touch: additional 3% MCAR on top of structural NaN + dslt_mask = rng.random(n) < 0.03 + df.loc[dslt_mask, "days_since_last_touch"] = np.nan + + # days_since_first_touch: additional 2% MCAR on top of structural NaN + dsft_mask = rng.random(n) < 0.02 + df.loc[dsft_mask, "days_since_first_touch"] = np.nan + + return df + + +def build_v5_dataset(seed: int = SEED) -> pd.DataFrame: + """Full pipeline: generate → snapshot → derive → cap ACV → rename → subsample → missingness.""" + rng = np.random.RandomState(seed) + + print("Generating bundle...", file=sys.stderr) + snapshot = generate_bundle(seed=seed) + conv = snapshot["converted_within_90_days"].mean() + print( + f" Raw snapshot: {len(snapshot)} rows, conversion={conv:.1%}", + file=sys.stderr, + ) + + df = derive_binary_features(snapshot) + df = cap_expected_acv(df) + df = rename_and_select(df) + + print("Subsampling...", file=sys.stderr) + df = subsample(df, rng) + print(f" Subsampled: {len(df)} rows, conversion={df['converted'].mean():.1%}", file=sys.stderr) + + print("Injecting missingness...", file=sys.stderr) + df = inject_missingness(df, rng) + + return df + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} OUTPUT_CSV", file=sys.stderr) + sys.exit(1) + + output_path = Path(sys.argv[1]) + df = build_v5_dataset() + + output_path.parent.mkdir(parents=True, exist_ok=True) + df.to_csv(output_path, index=False) + print(f"Wrote {len(df)} rows × {len(df.columns)} columns to {output_path}", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/scripts/quick_baseline_eval_v5.py b/scripts/quick_baseline_eval_v5.py new file mode 100644 index 0000000..4ec1624 --- /dev/null +++ b/scripts/quick_baseline_eval_v5.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +"""Quick baseline evaluation for the v5 lead scoring intro dataset. + +Usage: + python scripts/quick_baseline_eval_v5.py lead_scoring_intro_v5.csv + +Runs Logistic Regression and Random Forest on a 70/30 hold-out split, +prints key metrics, and demonstrates leakage trap detection. +""" + +from __future__ import annotations + +import sys + +import numpy as np +import pandas as pd +from sklearn.ensemble import RandomForestClassifier +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import ( + average_precision_score, + roc_auc_score, +) +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder, StandardScaler + +TARGET = "converted" +LEAKAGE_TRAP = "__leakage__total_touches_90d" +SEED = 42 + + +def prepare(df: pd.DataFrame, exclude: list[str] | None = None) -> tuple[pd.DataFrame, pd.Series]: + """Encode categoricals, impute, return X and y.""" + feature_cols = [c for c in df.columns if c != TARGET and c not in (exclude or [])] + x = df[feature_cols].copy() + y = df[TARGET].astype(int) + for col in x.select_dtypes(include=["object", "category"]).columns: + le = LabelEncoder() + x[col] = le.fit_transform(x[col].astype(str).fillna("__MISSING__")) + x = x.select_dtypes(include=[np.number]) + x = x.fillna(x.median()) + return x, y + + +def evaluate(name: str, y_true: pd.Series, probs: np.ndarray) -> dict[str, float]: + """Compute and print metrics.""" + auc = roc_auc_score(y_true, probs) + pr_auc = average_precision_score(y_true, probs) + base_rate = y_true.mean() + + metrics = {"AUC": auc, "PR-AUC": pr_auc} + print(f"\n {name}") + print(f" AUC: {auc:.3f}") + print(f" PR-AUC: {pr_auc:.3f}") + + n_test = len(y_true) + for k in [25, 50, 100]: + if k > n_test: + continue + top_k_idx = np.argsort(-probs)[:k] + top_k_labels = y_true.iloc[top_k_idx] + prec_k = float(top_k_labels.mean()) + lift_k = prec_k / base_rate if base_rate > 0 else 0.0 + metrics[f"P@{k}"] = prec_k + metrics[f"Lift@{k}"] = lift_k + print(f" P@{k:3d}: {prec_k:.3f} (Lift: {lift_k:.2f}x)") + + return metrics + + +def main() -> None: + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} CSV_PATH", file=sys.stderr) + sys.exit(1) + + df = pd.read_csv(sys.argv[1]) + print(f"Dataset: {len(df)} rows × {len(df.columns)} cols") + print(f"Conversion rate: {df[TARGET].mean():.1%}") + print(f"Missing values: {df.isna().sum().sum()} total") + + # --- Without leakage trap --- + print(f"\n{'=' * 60}") + print("BASELINE (without leakage trap)") + print(f"{'=' * 60}") + + x, y = prepare(df, exclude=[LEAKAGE_TRAP]) + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=0.30, random_state=SEED, stratify=y + ) + scaler = StandardScaler() + x_train_s = scaler.fit_transform(x_train) + x_test_s = scaler.transform(x_test) + + lr = LogisticRegression(max_iter=2000, random_state=SEED) + lr.fit(x_train_s, y_train) + evaluate("Logistic Regression", y_test, lr.predict_proba(x_test_s)[:, 1]) + + rf = RandomForestClassifier(n_estimators=200, random_state=SEED, n_jobs=-1) + rf.fit(x_train, y_train) + evaluate("Random Forest", y_test, rf.predict_proba(x_test)[:, 1]) + + # --- With leakage trap --- + if LEAKAGE_TRAP in df.columns: + print(f"\n{'=' * 60}") + print("WITH LEAKAGE TRAP (for comparison — students should detect this)") + print(f"{'=' * 60}") + + x_full, y_full = prepare(df) + x_train_f, x_test_f, y_train_f, y_test_f = train_test_split( + x_full, y_full, test_size=0.30, random_state=SEED, stratify=y_full + ) + scaler_f = StandardScaler() + x_train_fs = scaler_f.fit_transform(x_train_f) + x_test_fs = scaler_f.transform(x_test_f) + + lr_f = LogisticRegression(max_iter=2000, random_state=SEED) + lr_f.fit(x_train_fs, y_train_f) + m_with = evaluate("LR with trap", y_test_f, lr_f.predict_proba(x_test_fs)[:, 1]) + + lr_without = LogisticRegression(max_iter=2000, random_state=SEED) + x_no, _ = prepare(df, exclude=[LEAKAGE_TRAP]) + x_train_n, x_test_n, _, _ = train_test_split( + x_no, y_full, test_size=0.30, random_state=SEED, stratify=y_full + ) + scaler_n = StandardScaler() + x_train_ns = scaler_n.fit_transform(x_train_n) + x_test_ns = scaler_n.transform(x_test_n) + lr_without.fit(x_train_ns, y_train_f) + m_without = evaluate("LR without trap", y_test_f, lr_without.predict_proba(x_test_ns)[:, 1]) + + delta = m_with["AUC"] - m_without["AUC"] + print(f"\n ** Leakage trap AUC delta: {delta:+.4f} **") + if delta > 0.02: + print(" → Detectable improvement — students should investigate why") + else: + print(" → Small delta — trap may be hard to detect in single split") + + # --- Feature importance --- + print(f"\n{'=' * 60}") + print("FEATURE IMPORTANCE (Random Forest, without trap)") + print(f"{'=' * 60}") + x_imp, _ = prepare(df, exclude=[LEAKAGE_TRAP]) + importances = sorted( + zip(x_imp.columns, rf.feature_importances_, strict=False), + key=lambda t: t[1], + reverse=True, + ) + for feat, imp in importances: + bar = "█" * int(imp * 100) + print(f" {feat:30s} {imp:.3f} {bar}") + + # --- Expected value demonstration --- + if "expected_acv" in df.columns: + print(f"\n{'=' * 60}") + print("VALUE-AWARE SCORING DEMO") + print(f"{'=' * 60}") + x_val, y_val = prepare(df, exclude=[LEAKAGE_TRAP]) + x_tr, x_te, y_tr, y_te = train_test_split( + x_val, y_val, test_size=0.30, random_state=SEED, stratify=y_val + ) + scaler_v = StandardScaler() + x_tr_s = scaler_v.fit_transform(x_tr) + x_te_s = scaler_v.transform(x_te) + + lr_v = LogisticRegression(max_iter=2000, random_state=SEED) + lr_v.fit(x_tr_s, y_tr) + test_probs = lr_v.predict_proba(x_te_s)[:, 1] + + test_df = df.iloc[x_te.index].copy() + test_df["pred_prob"] = test_probs + test_df["expected_value"] = test_df["pred_prob"] * test_df["expected_acv"] + + for k in [25, 50]: + # Rank by probability + top_k_prob = test_df.nlargest(k, "pred_prob") + ev_prob = top_k_prob.loc[top_k_prob[TARGET] == 1, "expected_acv"].sum() + + # Rank by expected value + top_k_ev = test_df.nlargest(k, "expected_value") + ev_ev = top_k_ev.loc[top_k_ev[TARGET] == 1, "expected_acv"].sum() + + print(f"\n Top-{k} leads:") + print(f" Ranked by P(convert): captured ACV = ${ev_prob:,.0f}") + print(f" Ranked by expected value: captured ACV = ${ev_ev:,.0f}") + diff_pct = ((ev_ev - ev_prob) / ev_prob * 100) if ev_prob > 0 else 0 + print(f" Difference: {diff_pct:+.1f}%") + + +if __name__ == "__main__": + main() diff --git a/scripts/validate_v5_dataset.py b/scripts/validate_v5_dataset.py new file mode 100644 index 0000000..e8fdcdf --- /dev/null +++ b/scripts/validate_v5_dataset.py @@ -0,0 +1,522 @@ +#!/usr/bin/env python3 +"""Validate a v5 lead scoring intro CSV against the v5 validation spec. + +Usage: + python scripts/validate_v5_dataset.py lead_scoring_intro_v5.csv + +Exit code 0 = all mandatory checks pass. +Exit code 1 = at least one mandatory check failed. +""" + +from __future__ import annotations + +import sys + +import numpy as np +import pandas as pd +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import ( + average_precision_score, + roc_auc_score, +) +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder, StandardScaler + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +TARGET = "converted" + +BANNED_COLUMNS = { + "current_stage", + "funnel_stage", + "conversion_timestamp", + "is_sql", + "is_mql", + "lead_created_at", +} + +CAT_FEATURES = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", +] + +BINARY_FEATURES = [ + "opportunity_created", + "demo_completed", +] + +LEAKAGE_TRAP = "__leakage__total_touches_90d" + +# Deterministic group thresholds +MIN_GROUP_SIZE = 50 +RATE_LOWER = 0.02 +RATE_UPPER = 0.98 + +# AUC bounds (hold-out) +AUC_LOWER = 0.62 +AUC_UPPER = 0.90 + +# Leakage trap robustness thresholds (multi-seed) +TRAP_MEAN_DELTA = 0.03 +TRAP_MIN_DELTA = 0.015 +TRAP_N_SEEDS = 10 + +# Missingness +MAX_COL_MISSING_RATE = 0.10 + +# Duplicates +MAX_DUPLICATE_RATE = 0.01 + + +# --------------------------------------------------------------------------- +# Utility: fit LR on a train/test split, return test metrics +# --------------------------------------------------------------------------- + + +def _prepare_features( + df: pd.DataFrame, exclude_cols: list[str] | None = None +) -> tuple[pd.DataFrame, pd.Series]: + """Prepare X, y from DataFrame.""" + feature_cols = [c for c in df.columns if c != TARGET and c not in (exclude_cols or [])] + x_df = df[feature_cols].copy() + y = df[TARGET].astype(int) + + for col in x_df.select_dtypes(include=["object", "category"]).columns: + le = LabelEncoder() + x_df[col] = le.fit_transform(x_df[col].astype(str).fillna("__MISSING__")) + + x_df = x_df.select_dtypes(include=[np.number]) + x_df = x_df.fillna(x_df.median()) + return x_df, y + + +def _fit_lr_holdout( + df: pd.DataFrame, + exclude_cols: list[str] | None = None, + seed: int = 42, +) -> dict[str, float]: + """Fit LR on 70/30 hold-out split and return metrics.""" + x_df, y = _prepare_features(df, exclude_cols) + x_train, x_test, y_train, y_test = train_test_split( + x_df, y, test_size=0.30, random_state=seed, stratify=y + ) + + scaler = StandardScaler() + x_train_s = scaler.fit_transform(x_train) + x_test_s = scaler.transform(x_test) + + lr = LogisticRegression(max_iter=2000, random_state=42) + lr.fit(x_train_s, y_train) + probs = lr.predict_proba(x_test_s)[:, 1] + + auc = float(roc_auc_score(y_test, probs)) + pr_auc = float(average_precision_score(y_test, probs)) + + # Precision@K and Lift@K + metrics: dict[str, float] = {"auc": auc, "pr_auc": pr_auc} + n_test = len(y_test) + for k in [25, 50, 100]: + if k > n_test: + continue + top_k_idx = np.argsort(-probs)[:k] + top_k_labels = y_test.iloc[top_k_idx] + prec_k = float(top_k_labels.mean()) + base_rate = float(y_test.mean()) + lift_k = prec_k / base_rate if base_rate > 0 else 0.0 + metrics[f"precision@{k}"] = prec_k + metrics[f"lift@{k}"] = lift_k + + return metrics + + +def _fit_lr_auc_only( + df: pd.DataFrame, + exclude_cols: list[str] | None = None, + seed: int = 42, +) -> float: + """Fit LR on hold-out split and return only AUC (for multi-seed checks).""" + x_df, y = _prepare_features(df, exclude_cols) + x_train, x_test, y_train, y_test = train_test_split( + x_df, y, test_size=0.30, random_state=seed, stratify=y + ) + scaler = StandardScaler() + x_train_s = scaler.fit_transform(x_train) + x_test_s = scaler.transform(x_test) + + lr = LogisticRegression(max_iter=2000, random_state=42) + lr.fit(x_train_s, y_train) + probs = lr.predict_proba(x_test_s)[:, 1] + return float(roc_auc_score(y_test, probs)) + + +# --------------------------------------------------------------------------- +# Check implementations +# --------------------------------------------------------------------------- + + +def check_banned_columns(df: pd.DataFrame) -> list[str]: + """Check 1: No banned columns.""" + errors = [] + present = BANNED_COLUMNS & set(df.columns) + if present: + errors.append(f"Banned columns present: {sorted(present)}") + id_cols = [c for c in df.columns if c.endswith("_id")] + if id_cols: + errors.append(f"ID columns present: {sorted(id_cols)}") + return errors + + +def check_deterministic_groups(df: pd.DataFrame) -> list[str]: + """Check 2: No deterministic feature groups.""" + errors = [] + check_cols = [c for c in CAT_FEATURES + BINARY_FEATURES if c in df.columns] + for col in check_cols: + stats = df.groupby(col)[TARGET].agg(["mean", "count"]) + large = stats[stats["count"] >= MIN_GROUP_SIZE] + for val, row in large.iterrows(): + if row["mean"] < RATE_LOWER: + errors.append( + f"DETERMINISTIC: {col}={val} has {row['mean']:.1%} " + f"conversion (n={int(row['count'])})" + ) + if row["mean"] > RATE_UPPER: + errors.append( + f"DETERMINISTIC: {col}={val} has {row['mean']:.1%} " + f"conversion (n={int(row['count'])})" + ) + return errors + + +def check_conversion_rate(df: pd.DataFrame) -> list[str]: + """Check 3: Conversion rate realism.""" + rate = df[TARGET].mean() + if rate < 0.15 or rate > 0.40: + return [f"Conversion rate {rate:.1%} outside [15%, 40%]"] + return [] + + +def check_baseline_auc(df: pd.DataFrame) -> tuple[list[str], dict[str, float]]: + """Check 4: Baseline model AUC on hold-out split (without leakage trap).""" + metrics = _fit_lr_holdout(df, exclude_cols=[LEAKAGE_TRAP]) + errors = [] + auc = metrics["auc"] + if auc < AUC_LOWER: + errors.append(f"Baseline hold-out AUC {auc:.3f} below {AUC_LOWER}") + if auc > AUC_UPPER: + errors.append(f"Baseline hold-out AUC {auc:.3f} above {AUC_UPPER}") + return errors, metrics + + +def check_leakage_trap_robust( + df: pd.DataFrame, +) -> tuple[list[str], dict[str, float]]: + """Check 5: Leakage trap effectiveness across multiple split seeds.""" + if LEAKAGE_TRAP not in df.columns: + return [f"Leakage trap column '{LEAKAGE_TRAP}' not found"], {} + + deltas = [] + for seed in range(TRAP_N_SEEDS): + auc_without = _fit_lr_auc_only(df, exclude_cols=[LEAKAGE_TRAP], seed=seed) + auc_with = _fit_lr_auc_only(df, seed=seed) + deltas.append(auc_with - auc_without) + + mean_delta = float(np.mean(deltas)) + min_delta = float(np.min(deltas)) + max_delta = float(np.max(deltas)) + + errors = [] + if mean_delta < TRAP_MEAN_DELTA: + errors.append( + f"Leakage trap mean delta {mean_delta:.4f} below {TRAP_MEAN_DELTA} " + f"(min={min_delta:.4f}, max={max_delta:.4f})" + ) + if min_delta < TRAP_MIN_DELTA: + errors.append( + f"Leakage trap min delta {min_delta:.4f} below {TRAP_MIN_DELTA} " + f"across {TRAP_N_SEEDS} seeds" + ) + + stats = { + "mean_delta": mean_delta, + "min_delta": min_delta, + "max_delta": max_delta, + "deltas": deltas, + } + return errors, stats + + +def check_missingness(df: pd.DataFrame) -> list[str]: + """Check 6: Missingness structure and bounds.""" + errors = [] + + # web_sessions must have nulls + if "web_sessions" in df.columns: + if df["web_sessions"].isna().sum() == 0: + errors.append("web_sessions has no nulls") + else: + outbound_rate = ( + df.loc[df["lead_source"] == "sdr_outbound", "web_sessions"].isna().mean() + ) + inbound_rate = ( + df.loc[df["lead_source"] == "inbound_marketing", "web_sessions"].isna().mean() + ) + if inbound_rate > 0 and outbound_rate / inbound_rate < 3.0: + errors.append( + f"web_sessions missing ratio outbound/inbound = " + f"{outbound_rate / inbound_rate:.1f}x (need >3x)" + ) + elif inbound_rate == 0 and outbound_rate == 0: + errors.append("web_sessions has no source-conditional missingness") + + # seniority must have nulls + if "seniority" in df.columns: + if df["seniority"].isna().sum() == 0: + errors.append("seniority has no nulls") + else: + partner_rate = ( + df.loc[df["lead_source"] == "partner_referral", "seniority"].isna().mean() + ) + other_rate = df.loc[df["lead_source"] != "partner_referral", "seniority"].isna().mean() + if other_rate > 0 and partner_rate / other_rate < 3.0: + errors.append( + f"seniority missing ratio partner/other = " + f"{partner_rate / other_rate:.1f}x (need >3x)" + ) + + # days_since_last_touch must have nulls + if "days_since_last_touch" in df.columns: + if df["days_since_last_touch"].isna().sum() == 0: + errors.append("days_since_last_touch has no nulls") + + # Per-column missingness bound + for col in df.columns: + if col == TARGET: + continue + miss_rate = df[col].isna().mean() + if miss_rate > MAX_COL_MISSING_RATE: + errors.append(f"{col} has {miss_rate:.1%} missing (>{MAX_COL_MISSING_RATE:.0%})") + + # Target must never be missing + if df[TARGET].isna().sum() > 0: + errors.append(f"Target column '{TARGET}' has missing values!") + + return errors + + +def check_shape(df: pd.DataFrame) -> list[str]: + """Check 7: Shape constraints.""" + errors = [] + if len(df) != 1000: + errors.append(f"Expected 1000 rows, got {len(df)}") + if len(df.columns) != 19: + errors.append(f"Expected 19 columns, got {len(df.columns)}") + return errors + + +def check_duplicates(df: pd.DataFrame) -> list[str]: + """Check 8: No excessive duplicate rows.""" + n_dupes = df.duplicated().sum() + dupe_rate = n_dupes / len(df) + errors = [] + if dupe_rate > MAX_DUPLICATE_RATE: + errors.append(f"{n_dupes} duplicate rows ({dupe_rate:.1%}, max {MAX_DUPLICATE_RATE:.0%})") + return errors + + +def check_leakage_naming(df: pd.DataFrame) -> list[str]: + """Check 9: Leakage columns must be explicitly named with __leakage__ prefix.""" + errors = [] + leakage_cols = [c for c in df.columns if c.startswith("__leakage__")] + if len(leakage_cols) == 0: + errors.append("No __leakage__ prefixed column found") + elif len(leakage_cols) > 1: + errors.append(f"Multiple leakage columns found: {leakage_cols}") + # total_touches_all should NOT exist (replaced by __leakage__ name) + if "total_touches_all" in df.columns: + errors.append("Old leakage trap name 'total_touches_all' still present") + return errors + + +def check_acv_range(df: pd.DataFrame) -> list[str]: + """Check 10: expected_acv within narrative-consistent range.""" + errors = [] + if "expected_acv" in df.columns: + acv = df["expected_acv"].dropna() + if acv.min() < 18_000 - 1: + errors.append(f"expected_acv min {acv.min():.0f} below narrative floor 18,000") + if acv.max() > 120_000 + 1: + errors.append(f"expected_acv max {acv.max():.0f} above narrative cap 120,000") + return errors + + +# --------------------------------------------------------------------------- +# Warning checks +# --------------------------------------------------------------------------- + + +def warn_redundancy(df: pd.DataFrame) -> list[str]: + """Warning: Column redundancy.""" + warnings = [] + if "inbound_touches" in df.columns and "outbound_touches" in df.columns: + total = df["inbound_touches"].fillna(0) + df["outbound_touches"].fillna(0) + for col in df.select_dtypes(include=[np.number]).columns: + if col in ("inbound_touches", "outbound_touches", TARGET, LEAKAGE_TRAP): + continue + corr = total.corr(df[col].fillna(0)) + if abs(corr) > 0.99: + warnings.append(f"inbound+outbound correlates {corr:.3f} with {col}") + return warnings + + +def warn_low_variance(df: pd.DataFrame) -> list[str]: + """Warning: Low-variance features.""" + warnings = [] + for col in df.columns: + if col == TARGET: + continue + nunique = df[col].dropna().nunique() + if nunique < 3 and col not in BINARY_FEATURES: + warnings.append(f"{col} has only {nunique} unique value(s)") + return warnings + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def validate(csv_path: str) -> int: + """Run all checks and return exit code.""" + df = pd.read_csv(csv_path) + all_errors: list[str] = [] + all_warnings: list[str] = [] + + # Mandatory checks + print("Check 1: Banned columns...", end=" ") + errs = check_banned_columns(df) + print("FAIL" if errs else "PASS") + all_errors.extend(errs) + + print("Check 2: Deterministic groups...", end=" ") + errs = check_deterministic_groups(df) + print("FAIL" if errs else "PASS") + all_errors.extend(errs) + + print("Check 3: Conversion rate...", end=" ") + errs = check_conversion_rate(df) + rate = df[TARGET].mean() + print(f"{'FAIL' if errs else 'PASS'} ({rate:.1%})") + all_errors.extend(errs) + + print("Check 4: Baseline AUC (hold-out)...", end=" ") + errs, baseline_metrics = check_baseline_auc(df) + auc = baseline_metrics.get("auc", 0) + pr_auc = baseline_metrics.get("pr_auc", 0) + print(f"{'FAIL' if errs else 'PASS'} (AUC={auc:.3f}, PR-AUC={pr_auc:.3f})") + all_errors.extend(errs) + + if baseline_metrics: + for k in [25, 50, 100]: + key_p = f"precision@{k}" + key_l = f"lift@{k}" + if key_p in baseline_metrics: + print( + f" Precision@{k}={baseline_metrics[key_p]:.3f} " + f"Lift@{k}={baseline_metrics[key_l]:.2f}" + ) + + print("Check 5: Leakage trap (multi-seed)...", end=" ") + errs, trap_stats = check_leakage_trap_robust(df) + if trap_stats: + print( + f"{'FAIL' if errs else 'PASS'} " + f"(mean={trap_stats['mean_delta']:.4f}, " + f"min={trap_stats['min_delta']:.4f}, " + f"max={trap_stats['max_delta']:.4f})" + ) + else: + print("FAIL") + all_errors.extend(errs) + + print("Check 6: Missingness...", end=" ") + errs = check_missingness(df) + print("FAIL" if errs else "PASS") + all_errors.extend(errs) + + print("Check 7: Shape...", end=" ") + errs = check_shape(df) + print(f"{'FAIL' if errs else 'PASS'} ({len(df)} rows × {len(df.columns)} cols)") + all_errors.extend(errs) + + print("Check 8: Duplicates...", end=" ") + errs = check_duplicates(df) + n_dupes = df.duplicated().sum() + print(f"{'FAIL' if errs else 'PASS'} ({n_dupes} duplicates)") + all_errors.extend(errs) + + print("Check 9: Leakage naming...", end=" ") + errs = check_leakage_naming(df) + print("FAIL" if errs else "PASS") + all_errors.extend(errs) + + print("Check 10: ACV range...", end=" ") + errs = check_acv_range(df) + if "expected_acv" in df.columns: + acv = df["expected_acv"].dropna() + print(f"{'FAIL' if errs else 'PASS'} (range: {acv.min():.0f}–{acv.max():.0f})") + else: + print("FAIL (column missing)") + all_errors.extend(errs) + + # Missingness summary + print("\nMissingness summary:") + for col in df.columns: + n_miss = df[col].isna().sum() + if n_miss > 0: + print(f" {col}: {n_miss} ({n_miss / len(df):.1%})") + total_miss = df.isna().sum().sum() + print(f" Total: {total_miss} missing values across all columns") + + # Warnings + print("\nWarning checks:") + warns = warn_redundancy(df) + if warns: + all_warnings.extend(warns) + warns = warn_low_variance(df) + if warns: + all_warnings.extend(warns) + + # Report + if all_errors: + print(f"\n{'=' * 60}") + print(f"FAILED — {len(all_errors)} error(s):") + for err in all_errors: + print(f" ✗ {err}") + else: + print(f"\n{'=' * 60}") + print("ALL MANDATORY CHECKS PASSED") + + if all_warnings: + print(f"\n{len(all_warnings)} warning(s):") + for warn in all_warnings: + print(f" ⚠ {warn}") + + return 1 if all_errors else 0 + + +def main() -> None: + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} CSV_PATH", file=sys.stderr) + sys.exit(1) + sys.exit(validate(sys.argv[1])) + + +if __name__ == "__main__": + main() From 87fab6002adf7ef8973e8a14601b9a62cfd0e0af Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Thu, 30 Apr 2026 07:17:44 +0300 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20address=20PR=20#25=20review=20?= =?UTF-8?q?=E2=80=94=20train-only=20preprocessing,=20defensive=20checks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit COPILOT-1/6: Refactor _fit_lr_holdout and _fit_lr_auc_only to split before preprocessing. LabelEncoder, median imputation, and StandardScaler are now fit on training fold only. COPILOT-2: check_acv_range now coerces to numeric and fails explicitly when expected_acv has no usable values. COPILOT-3/4: Missingness ratio checks now handle empty lead_source slices explicitly instead of silently skipping on NaN comparisons. COPILOT-5: quick_baseline_eval_v5.py refactored to use split_and_preprocess() — same train-only preprocessing approach. Co-Authored-By: Claude Opus 4.6 --- scripts/quick_baseline_eval_v5.py | 67 ++++++++------- scripts/validate_v5_dataset.py | 130 +++++++++++++++++++----------- 2 files changed, 124 insertions(+), 73 deletions(-) diff --git a/scripts/quick_baseline_eval_v5.py b/scripts/quick_baseline_eval_v5.py index 4ec1624..50724af 100644 --- a/scripts/quick_baseline_eval_v5.py +++ b/scripts/quick_baseline_eval_v5.py @@ -28,17 +28,40 @@ SEED = 42 -def prepare(df: pd.DataFrame, exclude: list[str] | None = None) -> tuple[pd.DataFrame, pd.Series]: - """Encode categoricals, impute, return X and y.""" +def split_and_preprocess( + df: pd.DataFrame, + exclude: list[str] | None = None, + seed: int = SEED, +) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]: + """Split first, then fit preprocessing on train only. + + Returns (x_train, x_test, y_train, y_test) with numeric columns, + label-encoded categoricals, and train-median imputation. + """ feature_cols = [c for c in df.columns if c != TARGET and c not in (exclude or [])] - x = df[feature_cols].copy() + x_raw = df[feature_cols].copy() y = df[TARGET].astype(int) - for col in x.select_dtypes(include=["object", "category"]).columns: + + x_train_raw, x_test_raw, y_train, y_test = train_test_split( + x_raw, y, test_size=0.30, random_state=seed, stratify=y + ) + + cat_cols = list(x_train_raw.select_dtypes(include=["object", "category"]).columns) + for col in cat_cols: le = LabelEncoder() - x[col] = le.fit_transform(x[col].astype(str).fillna("__MISSING__")) - x = x.select_dtypes(include=[np.number]) - x = x.fillna(x.median()) - return x, y + le.fit(x_train_raw[col].astype(str).fillna("__MISSING__")) + x_train_raw[col] = le.transform(x_train_raw[col].astype(str).fillna("__MISSING__")) + test_vals = x_test_raw[col].astype(str).fillna("__MISSING__") + test_vals = test_vals.where(test_vals.isin(le.classes_), "__MISSING__") + x_test_raw[col] = le.transform(test_vals) + + x_train = x_train_raw.select_dtypes(include=[np.number]).copy() + x_test = x_test_raw[x_train.columns].copy() + train_medians = x_train.median() + x_train = x_train.fillna(train_medians) + x_test = x_test.fillna(train_medians) + + return x_train, x_test, y_train, y_test def evaluate(name: str, y_true: pd.Series, probs: np.ndarray) -> dict[str, float]: @@ -82,10 +105,8 @@ def main() -> None: print("BASELINE (without leakage trap)") print(f"{'=' * 60}") - x, y = prepare(df, exclude=[LEAKAGE_TRAP]) - x_train, x_test, y_train, y_test = train_test_split( - x, y, test_size=0.30, random_state=SEED, stratify=y - ) + x_train, x_test, y_train, y_test = split_and_preprocess(df, exclude=[LEAKAGE_TRAP]) + scaler = StandardScaler() x_train_s = scaler.fit_transform(x_train) x_test_s = scaler.transform(x_test) @@ -104,10 +125,7 @@ def main() -> None: print("WITH LEAKAGE TRAP (for comparison — students should detect this)") print(f"{'=' * 60}") - x_full, y_full = prepare(df) - x_train_f, x_test_f, y_train_f, y_test_f = train_test_split( - x_full, y_full, test_size=0.30, random_state=SEED, stratify=y_full - ) + x_train_f, x_test_f, y_train_f, y_test_f = split_and_preprocess(df) scaler_f = StandardScaler() x_train_fs = scaler_f.fit_transform(x_train_f) x_test_fs = scaler_f.transform(x_test_f) @@ -116,14 +134,11 @@ def main() -> None: lr_f.fit(x_train_fs, y_train_f) m_with = evaluate("LR with trap", y_test_f, lr_f.predict_proba(x_test_fs)[:, 1]) - lr_without = LogisticRegression(max_iter=2000, random_state=SEED) - x_no, _ = prepare(df, exclude=[LEAKAGE_TRAP]) - x_train_n, x_test_n, _, _ = train_test_split( - x_no, y_full, test_size=0.30, random_state=SEED, stratify=y_full - ) + x_train_n, x_test_n, _, _ = split_and_preprocess(df, exclude=[LEAKAGE_TRAP]) scaler_n = StandardScaler() x_train_ns = scaler_n.fit_transform(x_train_n) x_test_ns = scaler_n.transform(x_test_n) + lr_without = LogisticRegression(max_iter=2000, random_state=SEED) lr_without.fit(x_train_ns, y_train_f) m_without = evaluate("LR without trap", y_test_f, lr_without.predict_proba(x_test_ns)[:, 1]) @@ -138,9 +153,8 @@ def main() -> None: print(f"\n{'=' * 60}") print("FEATURE IMPORTANCE (Random Forest, without trap)") print(f"{'=' * 60}") - x_imp, _ = prepare(df, exclude=[LEAKAGE_TRAP]) importances = sorted( - zip(x_imp.columns, rf.feature_importances_, strict=False), + zip(x_train.columns, rf.feature_importances_, strict=False), key=lambda t: t[1], reverse=True, ) @@ -153,10 +167,9 @@ def main() -> None: print(f"\n{'=' * 60}") print("VALUE-AWARE SCORING DEMO") print(f"{'=' * 60}") - x_val, y_val = prepare(df, exclude=[LEAKAGE_TRAP]) - x_tr, x_te, y_tr, y_te = train_test_split( - x_val, y_val, test_size=0.30, random_state=SEED, stratify=y_val - ) + + # Reuse the baseline split (same seed, same rows) + x_tr, x_te, y_tr, y_te = split_and_preprocess(df, exclude=[LEAKAGE_TRAP]) scaler_v = StandardScaler() x_tr_s = scaler_v.fit_transform(x_tr) x_te_s = scaler_v.transform(x_te) diff --git a/scripts/validate_v5_dataset.py b/scripts/validate_v5_dataset.py index e8fdcdf..8949a21 100644 --- a/scripts/validate_v5_dataset.py +++ b/scripts/validate_v5_dataset.py @@ -80,21 +80,54 @@ # --------------------------------------------------------------------------- -def _prepare_features( - df: pd.DataFrame, exclude_cols: list[str] | None = None -) -> tuple[pd.DataFrame, pd.Series]: - """Prepare X, y from DataFrame.""" - feature_cols = [c for c in df.columns if c != TARGET and c not in (exclude_cols or [])] - x_df = df[feature_cols].copy() +def _split_and_preprocess( + df: pd.DataFrame, + exclude_cols: list[str] | None = None, + seed: int = 42, +) -> tuple[np.ndarray, np.ndarray, pd.Series, pd.Series]: + """Split first, then fit preprocessing on train only. + + Returns scaled (x_train, x_test, y_train, y_test). Label encoding, + median imputation, and standard scaling are all fit on the training fold + so that test-fold metrics are truly out-of-sample. + """ + exclude = set(exclude_cols or []) + feature_cols = [c for c in df.columns if c != TARGET and c not in exclude] + + x_raw = df[feature_cols].copy() y = df[TARGET].astype(int) - for col in x_df.select_dtypes(include=["object", "category"]).columns: + x_train_raw, x_test_raw, y_train, y_test = train_test_split( + x_raw, y, test_size=0.30, random_state=seed, stratify=y + ) + + # Encode categoricals: fit LabelEncoder on train, transform both. + cat_cols = list(x_train_raw.select_dtypes(include=["object", "category"]).columns) + encoders: dict[str, LabelEncoder] = {} + for col in cat_cols: le = LabelEncoder() - x_df[col] = le.fit_transform(x_df[col].astype(str).fillna("__MISSING__")) + le.fit(x_train_raw[col].astype(str).fillna("__MISSING__")) + encoders[col] = le + x_train_raw[col] = le.transform(x_train_raw[col].astype(str).fillna("__MISSING__")) + # Unseen test categories get mapped to "__MISSING__" + test_vals = x_test_raw[col].astype(str).fillna("__MISSING__") + test_vals = test_vals.where(test_vals.isin(le.classes_), "__MISSING__") + # Ensure __MISSING__ is in classes (it always is since we fillna above) + x_test_raw[col] = le.transform(test_vals) + + # Select numeric columns and impute with train medians. + x_train_num = x_train_raw.select_dtypes(include=[np.number]).copy() + x_test_num = x_test_raw[x_train_num.columns].copy() + train_medians = x_train_num.median() + x_train_num = x_train_num.fillna(train_medians) + x_test_num = x_test_num.fillna(train_medians) + + # Scale. + scaler = StandardScaler() + x_train_s = scaler.fit_transform(x_train_num) + x_test_s = scaler.transform(x_test_num) - x_df = x_df.select_dtypes(include=[np.number]) - x_df = x_df.fillna(x_df.median()) - return x_df, y + return x_train_s, x_test_s, y_train, y_test def _fit_lr_holdout( @@ -103,14 +136,7 @@ def _fit_lr_holdout( seed: int = 42, ) -> dict[str, float]: """Fit LR on 70/30 hold-out split and return metrics.""" - x_df, y = _prepare_features(df, exclude_cols) - x_train, x_test, y_train, y_test = train_test_split( - x_df, y, test_size=0.30, random_state=seed, stratify=y - ) - - scaler = StandardScaler() - x_train_s = scaler.fit_transform(x_train) - x_test_s = scaler.transform(x_test) + x_train_s, x_test_s, y_train, y_test = _split_and_preprocess(df, exclude_cols, seed) lr = LogisticRegression(max_iter=2000, random_state=42) lr.fit(x_train_s, y_train) @@ -142,14 +168,7 @@ def _fit_lr_auc_only( seed: int = 42, ) -> float: """Fit LR on hold-out split and return only AUC (for multi-seed checks).""" - x_df, y = _prepare_features(df, exclude_cols) - x_train, x_test, y_train, y_test = train_test_split( - x_df, y, test_size=0.30, random_state=seed, stratify=y - ) - scaler = StandardScaler() - x_train_s = scaler.fit_transform(x_train) - x_test_s = scaler.transform(x_test) - + x_train_s, x_test_s, y_train, y_test = _split_and_preprocess(df, exclude_cols, seed) lr = LogisticRegression(max_iter=2000, random_state=42) lr.fit(x_train_s, y_train) probs = lr.predict_proba(x_test_s)[:, 1] @@ -261,34 +280,50 @@ def check_missingness(df: pd.DataFrame) -> list[str]: if df["web_sessions"].isna().sum() == 0: errors.append("web_sessions has no nulls") else: - outbound_rate = ( - df.loc[df["lead_source"] == "sdr_outbound", "web_sessions"].isna().mean() - ) - inbound_rate = ( - df.loc[df["lead_source"] == "inbound_marketing", "web_sessions"].isna().mean() - ) - if inbound_rate > 0 and outbound_rate / inbound_rate < 3.0: + outbound_mask = df["lead_source"] == "sdr_outbound" + inbound_mask = df["lead_source"] == "inbound_marketing" + if not outbound_mask.any(): errors.append( - f"web_sessions missing ratio outbound/inbound = " - f"{outbound_rate / inbound_rate:.1f}x (need >3x)" + "web_sessions missingness check requires at least one sdr_outbound row" ) - elif inbound_rate == 0 and outbound_rate == 0: - errors.append("web_sessions has no source-conditional missingness") + elif not inbound_mask.any(): + errors.append( + "web_sessions missingness check requires at least one inbound_marketing row" + ) + else: + outbound_rate = df.loc[outbound_mask, "web_sessions"].isna().mean() + inbound_rate = df.loc[inbound_mask, "web_sessions"].isna().mean() + if inbound_rate > 0 and outbound_rate / inbound_rate < 3.0: + errors.append( + f"web_sessions missing ratio outbound/inbound = " + f"{outbound_rate / inbound_rate:.1f}x (need >3x)" + ) + elif inbound_rate == 0 and outbound_rate == 0: + errors.append("web_sessions has no source-conditional missingness") # seniority must have nulls if "seniority" in df.columns: if df["seniority"].isna().sum() == 0: errors.append("seniority has no nulls") else: - partner_rate = ( - df.loc[df["lead_source"] == "partner_referral", "seniority"].isna().mean() - ) - other_rate = df.loc[df["lead_source"] != "partner_referral", "seniority"].isna().mean() - if other_rate > 0 and partner_rate / other_rate < 3.0: + partner_mask = df["lead_source"] == "partner_referral" + other_mask = ~partner_mask + if not partner_mask.any(): + errors.append( + "seniority missingness check requires at least one partner_referral row" + ) + elif not other_mask.any(): errors.append( - f"seniority missing ratio partner/other = " - f"{partner_rate / other_rate:.1f}x (need >3x)" + "seniority missingness check requires at least one non-partner_referral row" ) + else: + partner_rate = df.loc[partner_mask, "seniority"].isna().mean() + other_rate = df.loc[other_mask, "seniority"].isna().mean() + if other_rate > 0 and partner_rate / other_rate < 3.0: + errors.append( + f"seniority missing ratio partner/other = " + f"{partner_rate / other_rate:.1f}x (need >3x)" + ) # days_since_last_touch must have nulls if "days_since_last_touch" in df.columns: @@ -348,7 +383,10 @@ def check_acv_range(df: pd.DataFrame) -> list[str]: """Check 10: expected_acv within narrative-consistent range.""" errors = [] if "expected_acv" in df.columns: - acv = df["expected_acv"].dropna() + acv = pd.to_numeric(df["expected_acv"], errors="coerce").dropna() + if acv.empty: + errors.append("expected_acv contains no usable numeric values") + return errors if acv.min() < 18_000 - 1: errors.append(f"expected_acv min {acv.min():.0f} below narrative floor 18,000") if acv.max() > 120_000 + 1: