diff --git a/.agent-plan.md b/.agent-plan.md index fd6a800..cd9459a 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -78,6 +78,13 @@ No engine changes required — v5 is a build pipeline + validation improvement. - [x] `tests/scripts/test_validate_cli.py` — 6 tests covering CLI entrypoint (exit codes, `--out-json`, `--emit-release-snippet`, `--enforce-1000`, missing args) - [x] Tests cover edge cases: insufficient positives/negatives in subsampling, missingness rate bounds, source-conditional missingness variation, immutability of inputs, determinism given seed +### Extract pipeline functions into package (PR #29) + +- [x] `leadforge/pipelines/__init__.py` + `leadforge/pipelines/build_v5.py` — pipeline functions (`subsample`, `inject_missingness`, `derive_binary_features`, `cap_expected_acv`, `rename_and_select`, `boost_leakage_trap`) + constants extracted from script +- [x] `scripts/build_v5_snapshot.py` — now a thin CLI wrapper importing from `leadforge.pipelines.build_v5`; orchestration functions (`generate_bundle`, `build_v5_dataset`) remain in script +- [x] `tests/scripts/test_build_v5_snapshot.py` — imports from `leadforge.pipelines.build_v5` directly (no more `importlib` hack) +- [x] All 705 tests pass; lint clean + --- ## Deferred Items diff --git a/leadforge/pipelines/__init__.py b/leadforge/pipelines/__init__.py new file mode 100644 index 0000000..2b907ad --- /dev/null +++ b/leadforge/pipelines/__init__.py @@ -0,0 +1 @@ +"""Pipeline modules for building derived datasets from leadforge bundles.""" diff --git a/leadforge/pipelines/build_v5.py b/leadforge/pipelines/build_v5.py new file mode 100644 index 0000000..0819c01 --- /dev/null +++ b/leadforge/pipelines/build_v5.py @@ -0,0 +1,195 @@ +"""Pipeline functions for building the v5 lead scoring intro CSV. + +This module contains the reusable data transformation steps. The CLI +orchestration (bundle generation, file I/O) lives in +``scripts/build_v5_snapshot.py``. +""" + +from __future__ import annotations + +import sys + +import numpy as np +import pandas as pd + +__all__ = [ + "ACV_CAP", + "ACV_FLOOR", + "FINAL_COLUMNS", + "N_LEADS", + "RENAME_MAP", + "SEED", + "SNAPSHOT_DAY", + "SUBSAMPLE_N", + "TARGET_RATE", + "boost_leakage_trap", + "cap_expected_acv", + "derive_binary_features", + "inject_missingness", + "rename_and_select", + "subsample", +] + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +SEED = 42 +N_LEADS = 5000 +SNAPSHOT_DAY = 10 +SUBSAMPLE_N = 1000 +TARGET_RATE = 0.30 + +# Narrative-consistent ACV bounds (from narrative.yaml: $18k–$120k). +ACV_FLOOR = 18_000.0 +ACV_CAP = 120_000.0 + +# v5 column set: 18 features + 1 target = 19 columns. +FINAL_COLUMNS = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", + "opportunity_created", + "demo_completed", + "expected_acv", + "inbound_touches", + "outbound_touches", + "touches_week_1", + "days_since_first_touch", + "web_sessions", + "sales_activities", + "days_since_last_touch", + "__leakage__total_touches_90d", + "converted", +] + +# Snapshot column → v5 column renaming. +RENAME_MAP = { + "employee_band": "company_size", + "estimated_revenue_band": "company_revenue", + "role_function": "contact_role", + "inbound_touch_count": "inbound_touches", + "outbound_touch_count": "outbound_touches", + "session_count": "web_sessions", + "activity_count": "sales_activities", + "converted_within_90_days": "converted", + "total_touches_all": "__leakage__total_touches_90d", +} + + +# --------------------------------------------------------------------------- +# Pipeline steps +# --------------------------------------------------------------------------- + + +def derive_binary_features(df: pd.DataFrame) -> pd.DataFrame: + """Derive binary features for the v5 column set.""" + df = df.copy() + df["opportunity_created"] = df["opportunity_created"].astype(int) + df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) + return df + + +def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: + """Clip expected_acv to narrative-consistent range [ACV_FLOOR, ACV_CAP].""" + df = df.copy() + df["expected_acv"] = df["expected_acv"].clip(lower=ACV_FLOOR, upper=ACV_CAP) + return df + + +def rename_and_select(df: pd.DataFrame) -> pd.DataFrame: + """Rename snapshot columns to v5 names and select final column set.""" + df = df.rename(columns=RENAME_MAP) + df["converted"] = df["converted"].astype(int) + missing = [c for c in FINAL_COLUMNS if c not in df.columns] + if missing: + raise ValueError( + f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" + ) + return df[FINAL_COLUMNS] + + +def subsample( + df: pd.DataFrame, + rng: np.random.RandomState, + n: int = SUBSAMPLE_N, + target_rate: float = TARGET_RATE, +) -> pd.DataFrame: + """Stratified subsample to n rows at target_rate conversion.""" + positives = df[df["converted"] == 1] + negatives = df[df["converted"] == 0] + n_pos = int(n * target_rate) + n_neg = n - n_pos + + if len(positives) < n_pos: + print(f"WARNING: only {len(positives)} positives, need {n_pos}", file=sys.stderr) + n_pos = len(positives) + n_neg = n - n_pos + if len(negatives) < n_neg: + print(f"WARNING: only {len(negatives)} negatives, need {n_neg}", file=sys.stderr) + n_neg = len(negatives) + + pos_sample = positives.sample(n=n_pos, random_state=rng) + neg_sample = negatives.sample(n=n_neg, random_state=rng) + return ( + pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + ) + + +def inject_missingness(df: pd.DataFrame, rng: np.random.RandomState) -> pd.DataFrame: + """Apply structured missingness per the v5 contract. + + Conditional rates per source (overall per-column rate stays <10%): + - web_sessions: SDR outbound 15%, inbound marketing 2%, partner referral 5% + - seniority: partner referral 8%, others 1% + - days_since_last_touch: structural NaN (no touches) + 3% MCAR + - days_since_first_touch: structural NaN (no touches) + 2% MCAR + """ + df = df.copy() + n = len(df) + + # web_sessions: source-conditional missingness + for source, rate in [ + ("sdr_outbound", 0.15), + ("inbound_marketing", 0.02), + ("partner_referral", 0.05), + ]: + mask = (df["lead_source"] == source) & (rng.random(n) < rate) + df.loc[mask, "web_sessions"] = np.nan + + # seniority: source-conditional missingness + partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) + other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) + df.loc[partner_mask | other_mask, "seniority"] = np.nan + + # days_since_last_touch: additional 3% MCAR on top of structural NaN + dslt_mask = rng.random(n) < 0.03 + df.loc[dslt_mask, "days_since_last_touch"] = np.nan + + # days_since_first_touch: additional 2% MCAR on top of structural NaN + dsft_mask = rng.random(n) < 0.02 + df.loc[dsft_mask, "days_since_first_touch"] = np.nan + + return df + + +def boost_leakage_trap(df: pd.DataFrame, rng: np.random.RandomState) -> pd.DataFrame: + """Amplify the leakage trap signal to ensure robust detectability. + + Adds target-correlated noise to ``__leakage__total_touches_90d`` so + that converted leads accumulate extra post-snapshot touches. This + simulates a realistic scenario where the feature aggregates engagement + activity that occurs *after* the conversion decision is made. + """ + df = df.copy() + trap_col = "__leakage__total_touches_90d" + n = len(df) + converted = df["converted"].values + # Converted leads: add a Poisson(1)-distributed number of extra + # "post-conversion" touches (typically small, but unbounded) + boost = converted * rng.poisson(1, size=n) + df[trap_col] = df[trap_col] + boost + return df diff --git a/scripts/build_v5_snapshot.py b/scripts/build_v5_snapshot.py index 5bf8a37..fed29ff 100644 --- a/scripts/build_v5_snapshot.py +++ b/scripts/build_v5_snapshot.py @@ -22,65 +22,26 @@ import pandas as pd from leadforge.api.generator import Generator +from leadforge.pipelines.build_v5 import ( + N_LEADS, + SEED, + SNAPSHOT_DAY, + boost_leakage_trap, + cap_expected_acv, + derive_binary_features, + inject_missingness, + rename_and_select, + subsample, +) from leadforge.render.snapshots import build_snapshot # --------------------------------------------------------------------------- -# Constants -# --------------------------------------------------------------------------- -SEED = 42 -N_LEADS = 5000 -SNAPSHOT_DAY = 10 -SUBSAMPLE_N = 1000 -TARGET_RATE = 0.30 - -# Narrative-consistent ACV bounds (from narrative.yaml: $18k–$120k). -ACV_FLOOR = 18_000.0 -ACV_CAP = 120_000.0 - -# v5 column set: 18 features + 1 target = 19 columns. -_FINAL_COLUMNS = [ - "industry", - "region", - "company_size", - "company_revenue", - "contact_role", - "seniority", - "lead_source", - "opportunity_created", - "demo_completed", - "expected_acv", - "inbound_touches", - "outbound_touches", - "touches_week_1", - "days_since_first_touch", - "web_sessions", - "sales_activities", - "days_since_last_touch", - "__leakage__total_touches_90d", - "converted", -] - -# Snapshot column → v5 column renaming. -_RENAME_MAP = { - "employee_band": "company_size", - "estimated_revenue_band": "company_revenue", - "role_function": "contact_role", - "inbound_touch_count": "inbound_touches", - "outbound_touch_count": "outbound_touches", - "session_count": "web_sessions", - "activity_count": "sales_activities", - "converted_within_90_days": "converted", - "total_touches_all": "__leakage__total_touches_90d", -} - - -# --------------------------------------------------------------------------- -# Pipeline steps +# Orchestration (stays in script — depends on Generator) # --------------------------------------------------------------------------- def generate_bundle(seed: int = SEED, n_leads: int = N_LEADS) -> pd.DataFrame: - """Generate a full bundle and return the day-14 snapshot.""" + """Generate a full bundle and return the day-10 snapshot.""" gen = Generator.from_recipe( "b2b_saas_procurement_v1", seed=seed, @@ -96,116 +57,6 @@ def generate_bundle(seed: int = SEED, n_leads: int = N_LEADS) -> pd.DataFrame: ) -def derive_binary_features(df: pd.DataFrame) -> pd.DataFrame: - """Derive binary features for the v5 column set.""" - df = df.copy() - df["opportunity_created"] = df["opportunity_created"].astype(int) - df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) - return df - - -def cap_expected_acv(df: pd.DataFrame) -> pd.DataFrame: - """Clip expected_acv to narrative-consistent range [ACV_FLOOR, ACV_CAP].""" - df = df.copy() - df["expected_acv"] = df["expected_acv"].clip(lower=ACV_FLOOR, upper=ACV_CAP) - return df - - -def rename_and_select(df: pd.DataFrame) -> pd.DataFrame: - """Rename snapshot columns to v5 names and select final column set.""" - df = df.rename(columns=_RENAME_MAP) - df["converted"] = df["converted"].astype(int) - missing = [c for c in _FINAL_COLUMNS if c not in df.columns] - if missing: - raise ValueError( - f"Missing required columns after renaming: {missing}. Available: {sorted(df.columns)}" - ) - return df[_FINAL_COLUMNS] - - -def subsample( - df: pd.DataFrame, - rng: np.random.RandomState, - n: int = SUBSAMPLE_N, - target_rate: float = TARGET_RATE, -) -> pd.DataFrame: - """Stratified subsample to n rows at target_rate conversion.""" - positives = df[df["converted"] == 1] - negatives = df[df["converted"] == 0] - n_pos = int(n * target_rate) - n_neg = n - n_pos - - if len(positives) < n_pos: - print(f"WARNING: only {len(positives)} positives, need {n_pos}", file=sys.stderr) - n_pos = len(positives) - n_neg = n - n_pos - if len(negatives) < n_neg: - print(f"WARNING: only {len(negatives)} negatives, need {n_neg}", file=sys.stderr) - n_neg = len(negatives) - - pos_sample = positives.sample(n=n_pos, random_state=rng) - neg_sample = negatives.sample(n=n_neg, random_state=rng) - return ( - pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) - ) - - -def inject_missingness(df: pd.DataFrame, rng: np.random.RandomState) -> pd.DataFrame: - """Apply structured missingness per the v5 contract. - - Patterns (all <10% per column): - - web_sessions: SDR outbound 15%, inbound marketing 2%, partner referral 5% - - seniority: partner referral 8%, others 1% - - days_since_last_touch: structural NaN (no touches) + 3% MCAR - - days_since_first_touch: structural NaN (no touches) + 2% MCAR - """ - df = df.copy() - n = len(df) - - # web_sessions: source-conditional missingness - for source, rate in [ - ("sdr_outbound", 0.15), - ("inbound_marketing", 0.02), - ("partner_referral", 0.05), - ]: - mask = (df["lead_source"] == source) & (rng.random(n) < rate) - df.loc[mask, "web_sessions"] = np.nan - - # seniority: source-conditional missingness - partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) - other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) - df.loc[partner_mask | other_mask, "seniority"] = np.nan - - # days_since_last_touch: additional 3% MCAR on top of structural NaN - dslt_mask = rng.random(n) < 0.03 - df.loc[dslt_mask, "days_since_last_touch"] = np.nan - - # days_since_first_touch: additional 2% MCAR on top of structural NaN - dsft_mask = rng.random(n) < 0.02 - df.loc[dsft_mask, "days_since_first_touch"] = np.nan - - return df - - -def boost_leakage_trap(df: pd.DataFrame, rng: np.random.RandomState) -> pd.DataFrame: - """Amplify the leakage trap signal to ensure robust detectability. - - Adds target-correlated noise to ``__leakage__total_touches_90d`` so - that converted leads accumulate extra post-snapshot touches. This - simulates a realistic scenario where the feature aggregates engagement - activity that occurs *after* the conversion decision is made. - """ - df = df.copy() - trap_col = "__leakage__total_touches_90d" - n = len(df) - converted = df["converted"].values - # Converted leads: add a Poisson(1)-distributed number of extra - # "post-conversion" touches (typically small, but unbounded) - boost = converted * rng.poisson(1, size=n) - df[trap_col] = df[trap_col] + boost - return df - - def build_v5_dataset(seed: int = SEED) -> pd.DataFrame: """Full pipeline: generate → derive → cap ACV → rename → subsample → boost → missingness.""" rng = np.random.RandomState(seed) diff --git a/tests/scripts/test_build_v5_snapshot.py b/tests/scripts/test_build_v5_snapshot.py index 0772fb1..1a59b4b 100644 --- a/tests/scripts/test_build_v5_snapshot.py +++ b/tests/scripts/test_build_v5_snapshot.py @@ -1,37 +1,23 @@ -"""Tests for scripts/build_v5_snapshot.py pipeline functions.""" +"""Tests for leadforge.pipelines.build_v5 pipeline functions.""" from __future__ import annotations -import importlib.util -from pathlib import Path - import numpy as np import pandas as pd import pytest -# --------------------------------------------------------------------------- -# Import the script module (not in a package, so use importlib) -# --------------------------------------------------------------------------- -_SCRIPT_PATH = Path(__file__).resolve().parents[2] / "scripts" / "build_v5_snapshot.py" - -spec = importlib.util.spec_from_file_location("build_v5_snapshot", _SCRIPT_PATH) -assert spec is not None -assert spec.loader is not None -build_v5 = importlib.util.module_from_spec(spec) -spec.loader.exec_module(build_v5) - -# Re-export for convenience -subsample = build_v5.subsample -inject_missingness = build_v5.inject_missingness -derive_binary_features = build_v5.derive_binary_features -cap_expected_acv = build_v5.cap_expected_acv -rename_and_select = build_v5.rename_and_select -boost_leakage_trap = build_v5.boost_leakage_trap -ACV_FLOOR = build_v5.ACV_FLOOR -ACV_CAP = build_v5.ACV_CAP -_FINAL_COLUMNS = build_v5._FINAL_COLUMNS -_RENAME_MAP = build_v5._RENAME_MAP - +from leadforge.pipelines.build_v5 import ( + ACV_CAP, + ACV_FLOOR, + FINAL_COLUMNS, + RENAME_MAP, + boost_leakage_trap, + cap_expected_acv, + derive_binary_features, + inject_missingness, + rename_and_select, + subsample, +) # --------------------------------------------------------------------------- # Helpers @@ -157,7 +143,7 @@ def test_does_not_modify_input(self): class TestRenameAndSelect: def test_output_columns_match_final(self): df = _make_v5_df() - assert list(df.columns) == _FINAL_COLUMNS + assert list(df.columns) == FINAL_COLUMNS def test_converted_is_int(self): df = _make_v5_df() @@ -177,18 +163,18 @@ def test_rename_mapping_applied(self): df = derive_binary_features(snapshot) df = cap_expected_acv(df) result = rename_and_select(df) - for new_name in _RENAME_MAP.values(): + for new_name in RENAME_MAP.values(): assert new_name in result.columns def test_extra_columns_are_dropped(self): - """Columns not in _FINAL_COLUMNS should be silently dropped.""" + """Columns not in FINAL_COLUMNS should be silently dropped.""" snapshot = _make_snapshot() snapshot["extra_col"] = 999 df = derive_binary_features(snapshot) df = cap_expected_acv(df) result = rename_and_select(df) assert "extra_col" not in result.columns - assert list(result.columns) == _FINAL_COLUMNS + assert list(result.columns) == FINAL_COLUMNS # --------------------------------------------------------------------------- @@ -293,7 +279,7 @@ def test_other_columns_not_affected(self): "days_since_last_touch", "days_since_first_touch", } - for col in _FINAL_COLUMNS: + for col in FINAL_COLUMNS: if col not in miss_cols: orig_nan = df[col].isna().sum() new_nan = result[col].isna().sum()