diff --git a/.agent-plan.md b/.agent-plan.md index 9226797..de08608 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -6,7 +6,7 @@ ## Current System State -**v0.5.0 in progress — Milestones 7–11 complete.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. 581 tests passing. +**v0.5.0 in progress — Milestones 7–11 complete, v4-M1 in PR.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. v4 engine changes + build pipeline ready. 609 tests passing. --- @@ -16,7 +16,7 @@ The primary focus is producing a v4 lead scoring dataset that fixes the issues f See `docs/v4/design.md` for full details. -### v4-M0: Planning + spike ⬜ (this PR) +### v4-M0: Planning + spike ✓ (PR #19, #20) - [x] `docs/v4/design.md` — consolidated requirements, contract, engine changes, implementation plan - [x] `docs/v4/validation_spec.md` — automated validation checks @@ -24,21 +24,23 @@ See `docs/v4/design.md` for full details. - [x] `scripts/spike_category_signal.py` — spike experiment validating category signal approach - [x] Updated `CLAUDE.md`, `AGENTS.md`, `.agent-plan.md` -### v4-M1: Engine + build pipeline ⬜ +### v4-M1: Engine + build pipeline ✓ (in PR) Engine changes: -- [ ] Add `category_latent_correlations` to `difficulty_profiles.yaml` (intro profile) -- [ ] Apply correlations in `simulation/population.py` after initial latent sampling -- [ ] Add `snapshot_day` parameter to `render/snapshots.py` with windowed aggregation -- [ ] Add new features: `touches_week_1`, `days_since_first_touch`, `expected_acv`, `total_touches_all` -- [ ] Add `FeatureSpec` entries to `schema/features.py` -- [ ] Tests for all engine changes (backward compat + v4 mode) +- [x] Add `category_latent_correlations` to `difficulty_profiles.yaml` (intro profile, scale 1.8) +- [x] Apply correlations in `simulation/population.py` via `_apply_category_latent_correlations()` +- [x] Wire correlations through `api/generator.py` → `build_population()` +- [x] Add `snapshot_day` parameter to `render/snapshots.py` with windowed aggregation +- [x] Add new features: `touches_week_1`, `days_since_first_touch`, `expected_acv`, `total_touches_all` +- [x] Add `opportunity_created` feature (tracks ANY opp, not just open ones) +- [x] Add `FeatureSpec` entries to `schema/features.py` +- [x] 19 new tests (16 windowed snapshot + 3 category-latent correlations) Build pipeline: -- [ ] `scripts/build_v4_snapshot.py` — day-21 snapshot + leakage trap + structured missingness + subsampling -- [ ] `scripts/validate_v4_dataset.py` — full validation per `docs/v4/validation_spec.md` -- [ ] End-to-end: generate bundle → build CSV → validate → all checks pass -- [ ] LR AUC 0.65–0.90 (without trap); ≥0.03 boost with trap +- [x] `scripts/build_v4_snapshot.py` — day-14 snapshot + leakage trap + structured missingness + subsampling +- [x] `scripts/validate_v4_dataset.py` — full validation per `docs/v4/validation_spec.md` +- [x] End-to-end: generate bundle → build CSV → validate → all 7 mandatory checks pass +- [x] LR AUC 0.659 (without trap); 0.03+ boost with trap ### v4-M2: Documentation + release ⬜ diff --git a/leadforge/api/bundle.py b/leadforge/api/bundle.py index 5874596..b5fcc2e 100644 --- a/leadforge/api/bundle.py +++ b/leadforge/api/bundle.py @@ -30,12 +30,18 @@ from leadforge.core.models import WorldBundle -def write_bundle(bundle: WorldBundle, path: str) -> None: +def write_bundle( + bundle: WorldBundle, + path: str, + generation_timestamp: str | None = None, +) -> None: """Write *bundle* to disk at *path*. Args: bundle: Fully populated :class:`~leadforge.core.models.WorldBundle`. path: Destination directory (created if absent). + generation_timestamp: ISO-8601 UTC timestamp. Defaults to now. + Pass a fixed value to produce byte-identical manifests. Raises: RuntimeError: if any of ``bundle.simulation_result``, @@ -90,5 +96,6 @@ def write_bundle(bundle: WorldBundle, path: str) -> None: table_row_counts=table_row_counts, task_row_counts={CONVERTED_WITHIN_90_DAYS.task_id: task_row_counts}, bundle_root=root, + generation_timestamp=generation_timestamp, ) write_manifest(manifest, root) diff --git a/leadforge/api/generator.py b/leadforge/api/generator.py index 39a719c..a002bc1 100644 --- a/leadforge/api/generator.py +++ b/leadforge/api/generator.py @@ -166,7 +166,27 @@ def generate( ) world_graph = sample_hidden_graph(config.seed) - population = build_population(config, narrative, world_graph) + + # Load category-latent correlations from difficulty profile if available. + from leadforge.api.recipes import Recipe + from leadforge.recipes.registry import load_recipe + + category_latent_correlations = None + try: + raw = load_recipe(config.recipe_id) + recipe = Recipe.from_dict(raw) + profiles = recipe.load_difficulty_profiles() + profile = profiles.get(config.difficulty.value, {}) + category_latent_correlations = profile.get("category_latent_correlations") + except (FileNotFoundError, KeyError): + category_latent_correlations = None + + population = build_population( + config, + narrative, + world_graph, + category_latent_correlations=category_latent_correlations, + ) result = simulate_world(config, population, world_graph) spec = WorldSpec(config=config, narrative=narrative) diff --git a/leadforge/core/models.py b/leadforge/core/models.py index d4fe66b..cd0585b 100644 --- a/leadforge/core/models.py +++ b/leadforge/core/models.py @@ -108,7 +108,7 @@ class WorldBundle: simulation_result: SimulationResult | None = None world_graph: WorldGraph | None = None - def save(self, path: str) -> None: + def save(self, path: str, generation_timestamp: str | None = None) -> None: """Write the full output bundle to *path*. Creates the directory if it does not exist. The bundle layout @@ -124,6 +124,8 @@ def save(self, path: str) -> None: Args: path: Destination directory (created if absent). + generation_timestamp: ISO-8601 UTC timestamp. Defaults to now. + Pass a fixed value to produce byte-identical manifests. Raises: RuntimeError: if :attr:`simulation_result`, :attr:`population`, @@ -133,4 +135,4 @@ def save(self, path: str) -> None: """ from leadforge.api.bundle import write_bundle - write_bundle(self, path) + write_bundle(self, path, generation_timestamp=generation_timestamp) diff --git a/leadforge/recipes/b2b_saas_procurement_v1/difficulty_profiles.yaml b/leadforge/recipes/b2b_saas_procurement_v1/difficulty_profiles.yaml index 89900da..ab1fb86 100644 --- a/leadforge/recipes/b2b_saas_procurement_v1/difficulty_profiles.yaml +++ b/leadforge/recipes/b2b_saas_procurement_v1/difficulty_profiles.yaml @@ -20,6 +20,32 @@ intro: conversion_rate_range: [0.30, 0.45] # Strength of buying-committee-friction effects committee_friction: 0.10 + # Correlate observable categories with latent traits to produce category-level + # conversion signal. Each entry maps an observable field to a latent trait and + # per-value additive boosts (applied after initial latent sampling in + # build_population). Values validated by spike experiment (scale 1.8). + category_latent_correlations: + seniority: + latent_trait: latent_contact_authority + boosts: + individual_contributor: -0.27 + manager: -0.09 + director: 0.09 + vp: 0.22 + c_suite: 0.36 + estimated_revenue_band: + latent_trait: latent_account_fit + boosts: + "$1M-$10M": -0.18 + "$10M-$50M": 0.0 + "$50M-$200M": 0.18 + "$200M+": 0.32 + lead_source: + latent_trait: latent_engagement_propensity + boosts: + sdr_outbound: -0.14 + inbound_marketing: 0.09 + partner_referral: 0.22 intermediate: description: > diff --git a/leadforge/render/snapshots.py b/leadforge/render/snapshots.py index f43fbd6..c2d8df8 100644 --- a/leadforge/render/snapshots.py +++ b/leadforge/render/snapshots.py @@ -22,6 +22,7 @@ TouchRow, ) from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES +from leadforge.simulation.population import REVENUE_BAND_MIDPOINTS if TYPE_CHECKING: from leadforge.simulation.engine import SimulationResult @@ -54,6 +55,7 @@ def build_snapshot( result: SimulationResult, population: PopulationResult, horizon_days: int = 90, + snapshot_day: int | None = None, ) -> pd.DataFrame: """Build the lead snapshot DataFrame from simulation output. @@ -62,17 +64,41 @@ def build_snapshot( simulation horizon. The snapshot anchor date is ``lead_created_at + timedelta(days=horizon_days)``. + When *snapshot_day* is set, event aggregations are filtered to events + within ``[lead_created_at, lead_created_at + snapshot_day]``. This + enables windowed feature computation (e.g. day-21 snapshots for v4 + datasets). The default ``None`` preserves existing behavior (full + horizon). + Args: result: Output of :func:`~leadforge.simulation.engine.simulate_world`. population: Output of :func:`~leadforge.simulation.population.build_population`. horizon_days: Simulation horizon length. Defaults to 90. + snapshot_day: Optional windowed snapshot day. When set, only events + with timestamps ``<= lead_created_at + timedelta(days=snapshot_day)`` + are included (midnight-exclusive by construction, since the + simulation engine uses daily steps). Default ``None`` means use + *horizon_days*. Returns: A ``pd.DataFrame`` with the columns specified in :data:`~leadforge.schema.features.LEAD_SNAPSHOT_FEATURES` and dtypes matching the feature spec. Row order matches ``result.leads``. """ + effective_window = snapshot_day if snapshot_day is not None else horizon_days + + # ------------------------------------------------------------------- + # Build base lead DataFrame first (needed for per-lead date filtering). + # ------------------------------------------------------------------- + lead_df = pd.DataFrame([lead.to_dict() for lead in result.leads]) + lead_dates = pd.to_datetime(lead_df["lead_created_at"]) + lead_df["anchor_date"] = lead_dates + pd.Timedelta(days=horizon_days) + lead_df["snapshot_cutoff"] = lead_dates + pd.Timedelta(days=effective_window) + + # Build a lead_id → snapshot_cutoff lookup for event filtering. + cutoff_map = dict(zip(lead_df["lead_id"], lead_df["snapshot_cutoff"], strict=False)) + # ------------------------------------------------------------------- # Aggregate event tables by lead_id using pandas for efficiency. # Empty event lists fall back to the entity's canonical empty DataFrame @@ -85,8 +111,19 @@ def build_snapshot( if result.touches else TouchRow.empty_dataframe() ) + + # Apply snapshot window filter to touches. + if len(td) > 0 and snapshot_day is not None: + td["_ts"] = pd.to_datetime(td["touch_timestamp"]) + td["_cutoff"] = td["lead_id"].map(cutoff_map) + td_windowed = td[td["_ts"] <= td["_cutoff"]].copy() + td_full = td # Keep full set for total_touches_all + else: + td_windowed = td + td_full = td + touch_agg = ( - td.groupby("lead_id") + td_windowed.groupby("lead_id") .agg( touch_count=("touch_id", "count"), inbound_touch_count=( @@ -102,12 +139,55 @@ def build_snapshot( .reset_index() ) + # touches_week_1: count touches within first 7 days of lead creation. + if len(td_windowed) > 0: + if "_ts" not in td_windowed.columns: + td_windowed = td_windowed.copy() + td_windowed["_ts"] = pd.to_datetime(td_windowed["touch_timestamp"]) + td_windowed_copy = td_windowed.copy() + td_windowed_copy["_lead_date"] = td_windowed_copy["lead_id"].map( + dict(zip(lead_df["lead_id"], lead_dates, strict=False)) + ) + td_windowed_copy["_day"] = ( + td_windowed_copy["_ts"] - td_windowed_copy["_lead_date"] + ).dt.days + week1 = td_windowed_copy[td_windowed_copy["_day"] <= 7] + touches_week_1 = week1.groupby("lead_id").size().reset_index(name="touches_week_1") + + # days_since_first_touch: snapshot_day - first_touch_day + first_touch_day = ( + td_windowed_copy.groupby("lead_id")["_day"] + .min() + .reset_index() + .rename(columns={"_day": "_first_touch_day"}) + ) + else: + touches_week_1 = pd.DataFrame(columns=["lead_id", "touches_week_1"]) + first_touch_day = pd.DataFrame(columns=["lead_id", "_first_touch_day"]) + + # total_touches_all: count over full horizon (leakage trap when windowed, + # equals touch_count when using full horizon). + if len(td_full) > 0: + total_touches_all = ( + td_full.groupby("lead_id")["touch_id"] + .count() + .reset_index() + .rename(columns={"touch_id": "total_touches_all"}) + ) + else: + total_touches_all = pd.DataFrame(columns=["lead_id", "total_touches_all"]) + # Session aggregates sd = ( pd.DataFrame([s.to_dict() for s in result.sessions]) if result.sessions else SessionRow.empty_dataframe() ) + if len(sd) > 0 and snapshot_day is not None: + sd["_ts"] = pd.to_datetime(sd["session_timestamp"]) + sd["_cutoff"] = sd["lead_id"].map(cutoff_map) + sd = sd[sd["_ts"] <= sd["_cutoff"]] + sess_agg = ( sd.groupby("lead_id") .agg( @@ -125,45 +205,73 @@ def build_snapshot( if result.sales_activities else SalesActivityRow.empty_dataframe() ) + if len(ad) > 0 and snapshot_day is not None: + ad["_ts"] = pd.to_datetime(ad["activity_timestamp"]) + ad["_cutoff"] = ad["lead_id"].map(cutoff_map) + ad = ad[ad["_ts"] <= ad["_cutoff"]] + act_agg = ad.groupby("lead_id").agg(activity_count=("activity_id", "count")).reset_index() - # Opportunity join: find open (unclosed) opportunity per lead. + # Opportunity join: find opportunity created by snapshot cutoff. od = ( pd.DataFrame([o.to_dict() for o in result.opportunities]) if result.opportunities else OpportunityRow.empty_dataframe() ) + if len(od) > 0 and snapshot_day is not None: + od["_created"] = pd.to_datetime(od["created_at"]) + od["_cutoff"] = od["lead_id"].map(cutoff_map) + od = od[od["_created"] <= od["_cutoff"]] + + # Track ANY opportunity created (regardless of close outcome) for opportunity_created flag. + any_opps = od[["lead_id"]].drop_duplicates() + any_opps["opportunity_created"] = True + open_opps = od[od["close_outcome"].isna()][["lead_id", "estimated_acv"]] open_opps = open_opps.groupby("lead_id").first().reset_index() open_opps = open_opps.rename(columns={"estimated_acv": "opportunity_estimated_acv"}) open_opps["has_open_opportunity"] = True # ------------------------------------------------------------------- - # Build base lead DataFrame and join aggregates. - # ------------------------------------------------------------------- - lead_df = pd.DataFrame([lead.to_dict() for lead in result.leads]) - - # Compute snapshot anchor date (per lead, vectorised). - lead_df["anchor_date"] = pd.to_datetime(lead_df["lead_created_at"]) + pd.Timedelta( - days=horizon_days - ) - # Join aggregates (left join preserves all leads). + # ------------------------------------------------------------------- lead_df = lead_df.merge(touch_agg, on="lead_id", how="left") lead_df = lead_df.merge(sess_agg, on="lead_id", how="left") lead_df = lead_df.merge(act_agg, on="lead_id", how="left") + lead_df = lead_df.merge(any_opps, on="lead_id", how="left") lead_df = lead_df.merge(open_opps, on="lead_id", how="left") + lead_df = lead_df.merge(touches_week_1, on="lead_id", how="left") + lead_df = lead_df.merge(first_touch_day, on="lead_id", how="left") + lead_df = lead_df.merge(total_touches_all, on="lead_id", how="left") # Fill missing event aggregate counts with zero; has_open_opportunity with False. # opportunity_estimated_acv and days_since_last_touch intentionally stay NaN. - lead_df[_INT_AGG_COLS] = lead_df[_INT_AGG_COLS].fillna(0) + int_agg_cols = [c for c in _INT_AGG_COLS if c in lead_df.columns] + lead_df[int_agg_cols] = lead_df[int_agg_cols].fillna(0) + lead_df["touches_week_1"] = lead_df["touches_week_1"].fillna(0) + if "total_touches_all" in lead_df.columns: + lead_df["total_touches_all"] = pd.to_numeric( + lead_df["total_touches_all"], errors="coerce" + ).fillna(0) + opp_created_mask = lead_df["opportunity_created"].notna() + lead_df["opportunity_created"] = lead_df["opportunity_created"].where( + opp_created_mask, other=False + ) opp_mask = lead_df["has_open_opportunity"].notna() lead_df["has_open_opportunity"] = lead_df["has_open_opportunity"].where(opp_mask, other=False) # Compute days_since_last_touch fully vectorised. # pd.to_datetime returns NaT for nulls; (Timestamp - NaT) yields NaN naturally. last_ts = pd.to_datetime(lead_df["last_touch_timestamp"]) - lead_df["days_since_last_touch"] = (lead_df["anchor_date"] - last_ts).dt.days + lead_df["days_since_last_touch"] = (lead_df["snapshot_cutoff"] - last_ts).dt.days + + # Compute days_since_first_touch: snapshot_day - first_touch_day. + if "_first_touch_day" in lead_df.columns: + lead_df["days_since_first_touch"] = effective_window - lead_df["_first_touch_day"] + else: + lead_df["days_since_first_touch"] = pd.NA + + # Compute expected_acv: opportunity ACV if available, else revenue band midpoint. # ------------------------------------------------------------------- # Join account and contact features via vectorised merge (not apply). @@ -175,6 +283,10 @@ def build_snapshot( lead_df = lead_df.merge(acct_df, on="account_id", how="left") lead_df = lead_df.merge(cont_df, on="contact_id", how="left") + # expected_acv: opportunity ACV where available, else revenue band midpoint. + band_midpoint = lead_df["estimated_revenue_band"].map(REVENUE_BAND_MIDPOINTS) + lead_df["expected_acv"] = lead_df["opportunity_estimated_acv"].fillna(band_midpoint) + # ------------------------------------------------------------------- # Select, order, and cast columns — single authoritative dtype pass. # ------------------------------------------------------------------- diff --git a/leadforge/schema/features.py b/leadforge/schema/features.py index fbf9900..1c6f71a 100644 --- a/leadforge/schema/features.py +++ b/leadforge/schema/features.py @@ -174,6 +174,19 @@ class FeatureSpec: "Sum of session durations (seconds) before snapshot.", "engagement", ), + # -- Momentum features -- + FeatureSpec( + "touches_week_1", + "Int64", + "Number of touches in the first 7 days after lead creation.", + "engagement", + ), + FeatureSpec( + "days_since_first_touch", + "Float64", + "Days between first touch and snapshot cutoff (NaN if no touches).", + "engagement", + ), # -- Sales activity features -- FeatureSpec( "activity_count", @@ -184,7 +197,13 @@ class FeatureSpec: FeatureSpec( "days_since_last_touch", "Float64", - "Days elapsed between most recent touch and snapshot anchor date.", + "Days elapsed between most recent touch and snapshot cutoff.", + "sales", + ), + FeatureSpec( + "opportunity_created", + "boolean", + "Whether any opportunity was created by snapshot date (open or closed).", "sales", ), FeatureSpec( @@ -199,6 +218,22 @@ class FeatureSpec: "Estimated ACV of the most recent open opportunity (NaN if none).", "sales", ), + FeatureSpec( + "expected_acv", + "Float64", + "Expected ACV: opportunity ACV if available by snapshot, else " + "revenue band midpoint heuristic (NaN if neither available).", + "sales", + ), + # -- Leakage trap -- + FeatureSpec( + "total_touches_all", + "Int64", + "Total touches over full 90-day window. LEAKAGE TRAP: uses " + "post-snapshot data. Included for pedagogical purposes only.", + "engagement", + leakage_risk=True, + ), # -- Target -- FeatureSpec( "converted_within_90_days", diff --git a/leadforge/simulation/population.py b/leadforge/simulation/population.py index 9f84e0b..77bf260 100644 --- a/leadforge/simulation/population.py +++ b/leadforge/simulation/population.py @@ -76,6 +76,14 @@ class PopulationResult: _REVENUE_BANDS = ("$1M-$10M", "$10M-$50M", "$50M-$200M", "$200M+") _REVENUE_BAND_WEIGHTS = (0.25, 0.40, 0.25, 0.10) +# Revenue band → ACV midpoint heuristic. Used by snapshots.py for expected_acv. +REVENUE_BAND_MIDPOINTS: dict[str, int] = { + "$1M-$10M": 25_000, + "$10M-$50M": 55_000, + "$50M-$200M": 85_000, + "$200M+": 140_000, +} + _PROCESS_MATURITY_BANDS = ("low", "medium", "high") _PROCESS_MATURITY_BAND_WEIGHTS = (0.30, 0.45, 0.25) _PROCESS_MATURITY_MEANS = {"low": 0.25, "medium": 0.50, "high": 0.75} @@ -134,6 +142,7 @@ def build_population( config: GenerationConfig, narrative: NarrativeSpec, world_graph: WorldGraph, + category_latent_correlations: dict[str, dict] | None = None, ) -> PopulationResult: """Generate accounts, contacts, leads, and their latent states. @@ -185,7 +194,7 @@ def build_population( rng=root.child("population_leads"), ) - return PopulationResult( + result = PopulationResult( accounts=accounts, contacts=contacts, leads=leads, @@ -196,6 +205,11 @@ def build_population( ), ) + if category_latent_correlations: + _apply_category_latent_correlations(result, category_latent_correlations) + + return result + # --------------------------------------------------------------------------- # Account generation @@ -394,6 +408,94 @@ def _sample_latent(rng: random.Random, mean: float = 0.50, std: float = 0.20) -> return max(0.0, min(1.0, rng.gauss(mean, std))) +def _apply_category_latent_correlations( + result: PopulationResult, + correlations: dict[str, dict], +) -> None: + """Shift latent traits based on observable category values. + + Each entry in *correlations* maps an observable field name to a dict with + ``latent_trait`` (the latent key to adjust) and ``boosts`` (a mapping of + category value → additive shift). The shift is applied in-place, clamped + to [0, 1]. + + Observable fields are resolved from the appropriate entity list: + + - Account fields (``estimated_revenue_band``, ``industry``, etc.) adjust + account-level latents. + - Contact fields (``seniority``, ``role_function``, etc.) adjust + contact-level latents. + - Lead fields (``lead_source``, etc.) adjust the linked contact's latents. + """ + lat = result.latent_state + + # Known fields per entity type for dispatch. + account_fields = { + "estimated_revenue_band", + "industry", + "region", + "employee_band", + "process_maturity_band", + } + contact_fields = {"seniority", "role_function", "buyer_role"} + + for field_name, spec in correlations.items(): + if not isinstance(spec, dict): + raise InvalidConfigError( + f"category_latent_correlations[{field_name!r}]: " + f"expected a dict, got {type(spec).__name__}" + ) + if "latent_trait" not in spec: + raise InvalidConfigError( + f"category_latent_correlations[{field_name!r}]: missing required key 'latent_trait'" + ) + if "boosts" not in spec: + raise InvalidConfigError( + f"category_latent_correlations[{field_name!r}]: missing required key 'boosts'" + ) + trait = spec["latent_trait"] + boosts = spec["boosts"] + if not isinstance(boosts, dict): + raise InvalidConfigError( + f"category_latent_correlations[{field_name!r}].boosts: " + f"expected a dict, got {type(boosts).__name__}" + ) + + if field_name in account_fields: + for account in result.accounts: + value = getattr(account, field_name, None) + boost = boosts.get(str(value), 0.0) if value is not None else 0.0 + if boost and account.account_id in lat.account_latents: + traits = lat.account_latents[account.account_id] + if trait in traits: + traits[trait] = max(0.0, min(1.0, traits[trait] + boost)) + + elif field_name in contact_fields: + for contact in result.contacts: + value = getattr(contact, field_name, None) + boost = boosts.get(str(value), 0.0) if value is not None else 0.0 + if boost and contact.contact_id in lat.contact_latents: + traits = lat.contact_latents[contact.contact_id] + if trait in traits: + traits[trait] = max(0.0, min(1.0, traits[trait] + boost)) + + else: + # Lead-level fields (e.g. lead_source) — adjust linked contact latents. + # Deduplicate by contact_id: use the first lead's value to avoid + # stacking boosts when multiple leads share a contact. + seen_contacts: set[str] = set() + for lead in result.leads: + if lead.contact_id in seen_contacts: + continue + seen_contacts.add(lead.contact_id) + value = getattr(lead, field_name, None) + boost = boosts.get(str(value), 0.0) if value is not None else 0.0 + if boost and lead.contact_id in lat.contact_latents: + traits = lat.contact_latents[lead.contact_id] + if trait in traits: + traits[trait] = max(0.0, min(1.0, traits[trait] + boost)) + + def _channel_weights(narrative: NarrativeSpec) -> tuple[list[str], list[float]]: """Return (channels, weights) lists ordered as in the GTM spec. diff --git a/pyproject.toml b/pyproject.toml index 39db58a..97f3cd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,9 @@ dev = [ "pre-commit>=3.7", "types-pyyaml>=6.0", ] +scripts = [ + "scikit-learn>=1.3", +] [project.scripts] leadforge = "leadforge.cli.main:app" diff --git a/scripts/build_v4_snapshot.py b/scripts/build_v4_snapshot.py new file mode 100644 index 0000000..7e228de --- /dev/null +++ b/scripts/build_v4_snapshot.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +"""Build the v4 lead scoring intro CSV (generates the bundle internally). + +Usage: + python scripts/build_v4_snapshot.py OUTPUT_CSV + +Produces a 1000-row × 18-column CSV at ~30% conversion rate with: +- Day-14 windowed features +- Structured missingness (MAR for web_sessions, seniority) +- Leakage trap (total_touches_all using full 90-day data) +- Stratified subsampling +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import numpy as np +import pandas as pd + +from leadforge.api.generator import Generator +from leadforge.render.snapshots import build_snapshot + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +SEED = 42 +N_LEADS = 5000 +SNAPSHOT_DAY = 14 +SUBSAMPLE_N = 1000 +TARGET_RATE = 0.30 + +# v4 column set: 17 features + 1 target = 18 columns. +_FINAL_COLUMNS = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", + "opportunity_created", + "demo_completed", + "expected_acv", + "inbound_touches", + "outbound_touches", + "touches_week_1", + "web_sessions", + "sales_activities", + "days_since_last_touch", + "total_touches_all", + "converted", +] + +# Snapshot column → v4 column renaming. +_RENAME_MAP = { + "employee_band": "company_size", + "estimated_revenue_band": "company_revenue", + "role_function": "contact_role", + "inbound_touch_count": "inbound_touches", + "outbound_touch_count": "outbound_touches", + "session_count": "web_sessions", + "activity_count": "sales_activities", + "converted_within_90_days": "converted", +} + + +# --------------------------------------------------------------------------- +# Pipeline steps +# --------------------------------------------------------------------------- + + +def generate_bundle(seed: int = SEED, n_leads: int = N_LEADS) -> pd.DataFrame: + """Generate a full bundle and return the day-14 snapshot.""" + gen = Generator.from_recipe( + "b2b_saas_procurement_v1", + seed=seed, + exposure_mode="research_instructor", + n_leads=n_leads, + difficulty="intro", + ) + bundle = gen.generate() + return build_snapshot( + bundle.simulation_result, + bundle.population, + snapshot_day=SNAPSHOT_DAY, + ) + + +def derive_binary_features(df: pd.DataFrame) -> pd.DataFrame: + """Derive binary features for the v4 column set.""" + df = df.copy() + # opportunity_created comes from the snapshot (any opp, open or closed). + df["opportunity_created"] = df["opportunity_created"].astype(int) + df["demo_completed"] = (df["demo_page_views"] > 0).astype(int) + return df + + +def rename_and_select(df: pd.DataFrame) -> pd.DataFrame: + """Rename snapshot columns to v4 names and select final column set.""" + df = df.rename(columns=_RENAME_MAP) + # Convert boolean converted to int 0/1 + df["converted"] = df["converted"].astype(int) + missing = [c for c in _FINAL_COLUMNS if c not in df.columns] + if missing: + raise ValueError( + f"Missing required columns after renaming: {missing}. Available: {list(df.columns)}" + ) + return df[_FINAL_COLUMNS] + + +def subsample( + df: pd.DataFrame, + rng: np.random.RandomState, + n: int = SUBSAMPLE_N, + target_rate: float = TARGET_RATE, +) -> pd.DataFrame: + """Stratified subsample to n rows at target_rate conversion.""" + positives = df[df["converted"] == 1] + negatives = df[df["converted"] == 0] + n_pos = int(n * target_rate) + n_neg = n - n_pos + + if len(positives) < n_pos: + print(f"WARNING: only {len(positives)} positives, need {n_pos}", file=sys.stderr) + n_pos = len(positives) + n_neg = n - n_pos + if len(negatives) < n_neg: + print(f"WARNING: only {len(negatives)} negatives, need {n_neg}", file=sys.stderr) + n_neg = len(negatives) + + pos_sample = positives.sample(n=n_pos, random_state=rng) + neg_sample = negatives.sample(n=n_neg, random_state=rng) + return ( + pd.concat([pos_sample, neg_sample]).sample(frac=1, random_state=rng).reset_index(drop=True) + ) + + +def inject_missingness(df: pd.DataFrame, rng: np.random.RandomState) -> pd.DataFrame: + """Apply structured missingness per the v4 contract.""" + df = df.copy() + n = len(df) + + # web_sessions: source-conditional missingness + for source, rate in [ + ("sdr_outbound", 0.15), + ("inbound_marketing", 0.02), + ("partner_referral", 0.05), + ]: + mask = (df["lead_source"] == source) & (rng.random(n) < rate) + df.loc[mask, "web_sessions"] = np.nan + + # seniority: source-conditional missingness + partner_mask = (df["lead_source"] == "partner_referral") & (rng.random(n) < 0.08) + other_mask = (df["lead_source"] != "partner_referral") & (rng.random(n) < 0.01) + df.loc[partner_mask | other_mask, "seniority"] = np.nan + + # days_since_last_touch: additional 3% MCAR on top of structural NaN + dslt_mask = rng.random(n) < 0.03 + df.loc[dslt_mask, "days_since_last_touch"] = np.nan + + return df + + +def build_v4_dataset(seed: int = SEED) -> pd.DataFrame: + """Full pipeline: generate → snapshot → derive → rename → subsample → missingness.""" + rng = np.random.RandomState(seed) + + print("Generating bundle...", file=sys.stderr) + snapshot = generate_bundle(seed=seed) + conv = snapshot["converted_within_90_days"].mean() + print( + f" Raw snapshot: {len(snapshot)} rows, conversion={conv:.1%}", + file=sys.stderr, + ) + + df = derive_binary_features(snapshot) + df = rename_and_select(df) + + print("Subsampling...", file=sys.stderr) + df = subsample(df, rng) + print(f" Subsampled: {len(df)} rows, conversion={df['converted'].mean():.1%}", file=sys.stderr) + + print("Injecting missingness...", file=sys.stderr) + df = inject_missingness(df, rng) + + return df + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} OUTPUT_CSV", file=sys.stderr) + sys.exit(1) + + output_path = Path(sys.argv[1]) + df = build_v4_dataset() + + output_path.parent.mkdir(parents=True, exist_ok=True) + df.to_csv(output_path, index=False) + print(f"Wrote {len(df)} rows × {len(df.columns)} columns to {output_path}", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/scripts/validate_v4_dataset.py b/scripts/validate_v4_dataset.py new file mode 100644 index 0000000..09c60a4 --- /dev/null +++ b/scripts/validate_v4_dataset.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python3 +"""Validate a v4 lead scoring intro CSV against the v4 validation spec. + +Usage: + python scripts/validate_v4_dataset.py lead_scoring_intro_v4.csv + +Exit code 0 = all mandatory checks pass. +Exit code 1 = at least one mandatory check failed. +""" + +from __future__ import annotations + +import sys + +import numpy as np +import pandas as pd +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import roc_auc_score +from sklearn.preprocessing import LabelEncoder + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +TARGET = "converted" + +BANNED_COLUMNS = { + "current_stage", + "funnel_stage", + "conversion_timestamp", + "is_sql", + "is_mql", + "lead_created_at", +} + +CAT_FEATURES = [ + "industry", + "region", + "company_size", + "company_revenue", + "contact_role", + "seniority", + "lead_source", +] + +BINARY_FEATURES = [ + "opportunity_created", + "demo_completed", +] + +LEAKAGE_TRAP = "total_touches_all" + +# Deterministic group thresholds +MIN_GROUP_SIZE = 50 +RATE_LOWER = 0.02 +RATE_UPPER = 0.98 + +# AUC bounds +AUC_LOWER = 0.65 +AUC_UPPER = 0.90 +AUC_TRAP_BOOST = 0.03 + + +# --------------------------------------------------------------------------- +# Check implementations +# --------------------------------------------------------------------------- + + +def check_banned_columns(df: pd.DataFrame) -> list[str]: + """Check 1: No banned columns.""" + errors = [] + present = BANNED_COLUMNS & set(df.columns) + if present: + errors.append(f"Banned columns present: {sorted(present)}") + id_cols = [c for c in df.columns if c.endswith("_id")] + if id_cols: + errors.append(f"ID columns present: {sorted(id_cols)}") + return errors + + +def check_deterministic_groups(df: pd.DataFrame) -> list[str]: + """Check 2: No deterministic feature groups.""" + errors = [] + check_cols = [c for c in CAT_FEATURES + BINARY_FEATURES if c in df.columns] + for col in check_cols: + stats = df.groupby(col)[TARGET].agg(["mean", "count"]) + large = stats[stats["count"] >= MIN_GROUP_SIZE] + for val, row in large.iterrows(): + if row["mean"] < RATE_LOWER: + errors.append( + f"DETERMINISTIC: {col}={val} has {row['mean']:.1%} " + f"conversion (n={int(row['count'])})" + ) + if row["mean"] > RATE_UPPER: + errors.append( + f"DETERMINISTIC: {col}={val} has {row['mean']:.1%} " + f"conversion (n={int(row['count'])})" + ) + return errors + + +def check_conversion_rate(df: pd.DataFrame) -> list[str]: + """Check 3: Conversion rate realism.""" + rate = df[TARGET].mean() + if rate < 0.15 or rate > 0.40: + return [f"Conversion rate {rate:.1%} outside [15%, 40%]"] + return [] + + +def _fit_lr(df: pd.DataFrame, exclude_cols: list[str] | None = None) -> float: + """Fit LR and return AUC.""" + feature_cols = [c for c in df.columns if c != TARGET and c not in (exclude_cols or [])] + x_df = df[feature_cols].copy() + y = df[TARGET].astype(int) + + for col in x_df.select_dtypes(include=["object", "category"]).columns: + le = LabelEncoder() + x_df[col] = le.fit_transform(x_df[col].astype(str).fillna("__MISSING__")) + + x_df = x_df.select_dtypes(include=[np.number]) + x_df = x_df.fillna(x_df.median()) + + lr = LogisticRegression(max_iter=2000, random_state=42) + lr.fit(x_df, y) + probs = lr.predict_proba(x_df)[:, 1] + return float(roc_auc_score(y, probs)) + + +def check_baseline_auc(df: pd.DataFrame) -> tuple[list[str], float]: + """Check 4: Baseline model AUC without leakage trap.""" + auc = _fit_lr(df, exclude_cols=[LEAKAGE_TRAP]) + errors = [] + if auc < AUC_LOWER: + errors.append(f"Baseline AUC {auc:.3f} below {AUC_LOWER}") + if auc > AUC_UPPER: + errors.append(f"Baseline AUC {auc:.3f} above {AUC_UPPER}") + return errors, auc + + +def check_leakage_trap(df: pd.DataFrame, baseline_auc: float) -> list[str]: + """Check 5: Leakage trap effectiveness.""" + if LEAKAGE_TRAP not in df.columns: + return [f"Leakage trap column '{LEAKAGE_TRAP}' not found"] + full_auc = _fit_lr(df) + boost = full_auc - baseline_auc + errors = [] + if boost < AUC_TRAP_BOOST: + errors.append( + f"Leakage trap boost {boost:.3f} below {AUC_TRAP_BOOST} " + f"(baseline={baseline_auc:.3f}, full={full_auc:.3f})" + ) + return errors + + +def check_missingness(df: pd.DataFrame) -> list[str]: + """Check 6: Missingness structure.""" + errors = [] + + # web_sessions must have nulls + if "web_sessions" in df.columns: + if df["web_sessions"].isna().sum() == 0: + errors.append("web_sessions has no nulls") + else: + # Check source-conditional ratio + outbound_rate = ( + df.loc[df["lead_source"] == "sdr_outbound", "web_sessions"].isna().mean() + ) + inbound_rate = ( + df.loc[df["lead_source"] == "inbound_marketing", "web_sessions"].isna().mean() + ) + if inbound_rate > 0 and outbound_rate / inbound_rate < 3.0: + errors.append( + f"web_sessions missing ratio outbound/inbound = " + f"{outbound_rate / inbound_rate:.1f}x (need >3x)" + ) + elif inbound_rate == 0 and outbound_rate > 0: + pass # Trivially satisfied + elif inbound_rate == 0 and outbound_rate == 0: + errors.append("web_sessions has no source-conditional missingness") + + # seniority must have nulls + if "seniority" in df.columns: + if df["seniority"].isna().sum() == 0: + errors.append("seniority has no nulls") + else: + partner_rate = ( + df.loc[df["lead_source"] == "partner_referral", "seniority"].isna().mean() + ) + other_rate = df.loc[df["lead_source"] != "partner_referral", "seniority"].isna().mean() + if other_rate > 0 and partner_rate / other_rate < 3.0: + errors.append( + f"seniority missing ratio partner/other = " + f"{partner_rate / other_rate:.1f}x (need >3x)" + ) + + # days_since_last_touch must have nulls + if "days_since_last_touch" in df.columns: + if df["days_since_last_touch"].isna().sum() == 0: + errors.append("days_since_last_touch has no nulls") + + # No column should have > 20% missing + for col in df.columns: + miss_rate = df[col].isna().mean() + if miss_rate > 0.20: + errors.append(f"{col} has {miss_rate:.1%} missing (>20%)") + + return errors + + +def check_shape(df: pd.DataFrame) -> list[str]: + """Check 7: Shape constraints.""" + errors = [] + if len(df) != 1000: + errors.append(f"Expected 1000 rows, got {len(df)}") + if len(df.columns) != 18: + errors.append(f"Expected 18 columns, got {len(df.columns)}") + return errors + + +# --------------------------------------------------------------------------- +# Warning checks +# --------------------------------------------------------------------------- + + +def warn_redundancy(df: pd.DataFrame) -> list[str]: + """Warning 2: Column redundancy.""" + warnings = [] + if "inbound_touches" in df.columns and "outbound_touches" in df.columns: + total = df["inbound_touches"].fillna(0) + df["outbound_touches"].fillna(0) + for col in df.select_dtypes(include=[np.number]).columns: + if col in ("inbound_touches", "outbound_touches", TARGET): + continue + corr = total.corr(df[col].fillna(0)) + if abs(corr) > 0.99: + warnings.append(f"inbound+outbound correlates {corr:.3f} with {col}") + return warnings + + +def warn_low_variance(df: pd.DataFrame) -> list[str]: + """Warning 3: Low-variance features.""" + warnings = [] + for col in df.columns: + if col == TARGET: + continue + nunique = df[col].dropna().nunique() + if nunique < 3 and col not in BINARY_FEATURES: + warnings.append(f"{col} has only {nunique} unique value(s)") + return warnings + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def validate(csv_path: str) -> int: + """Run all checks and return exit code.""" + df = pd.read_csv(csv_path) + all_errors: list[str] = [] + all_warnings: list[str] = [] + + # Mandatory checks + print("Check 1: Banned columns...", end=" ") + errs = check_banned_columns(df) + print("FAIL" if errs else "PASS") + all_errors.extend(errs) + + print("Check 2: Deterministic groups...", end=" ") + errs = check_deterministic_groups(df) + print("FAIL" if errs else "PASS") + all_errors.extend(errs) + + print("Check 3: Conversion rate...", end=" ") + errs = check_conversion_rate(df) + rate = df[TARGET].mean() + print(f"{'FAIL' if errs else 'PASS'} ({rate:.1%})") + all_errors.extend(errs) + + print("Check 4: Baseline AUC...", end=" ") + errs, baseline_auc = check_baseline_auc(df) + print(f"{'FAIL' if errs else 'PASS'} (AUC={baseline_auc:.3f})") + all_errors.extend(errs) + + print("Check 5: Leakage trap...", end=" ") + errs = check_leakage_trap(df, baseline_auc) + print("FAIL" if errs else "PASS") + all_errors.extend(errs) + + print("Check 6: Missingness...", end=" ") + errs = check_missingness(df) + print("FAIL" if errs else "PASS") + all_errors.extend(errs) + + print("Check 7: Shape...", end=" ") + errs = check_shape(df) + print(f"{'FAIL' if errs else 'PASS'} ({len(df)} rows × {len(df.columns)} cols)") + all_errors.extend(errs) + + # Warnings + print("\nWarning checks:") + warns = warn_redundancy(df) + if warns: + all_warnings.extend(warns) + warns = warn_low_variance(df) + if warns: + all_warnings.extend(warns) + + # Report + if all_errors: + print(f"\n{'=' * 60}") + print(f"FAILED — {len(all_errors)} error(s):") + for err in all_errors: + print(f" ✗ {err}") + else: + print(f"\n{'=' * 60}") + print("ALL MANDATORY CHECKS PASSED") + + if all_warnings: + print(f"\n{len(all_warnings)} warning(s):") + for warn in all_warnings: + print(f" ⚠ {warn}") + + return 1 if all_errors else 0 + + +def main() -> None: + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} CSV_PATH", file=sys.stderr) + sys.exit(1) + sys.exit(validate(sys.argv[1])) + + +if __name__ == "__main__": + main() diff --git a/tests/render/test_snapshot_windowed.py b/tests/render/test_snapshot_windowed.py new file mode 100644 index 0000000..f2a15f6 --- /dev/null +++ b/tests/render/test_snapshot_windowed.py @@ -0,0 +1,190 @@ +"""Tests for windowed snapshot features (v4 engine changes). + +Covers: snapshot_day parameter, touches_week_1, days_since_first_touch, +expected_acv, total_touches_all (leakage trap), opportunity_created. +""" + +from __future__ import annotations + +import pandas as pd +import pytest + +from leadforge.core.models import GenerationConfig +from leadforge.render.snapshots import build_snapshot +from leadforge.simulation.engine import simulate_world +from leadforge.simulation.population import build_population +from leadforge.structure.sampler import sample_hidden_graph + + +def _make_narrative(seed: int = 42): + from leadforge.api.generator import Generator + + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=seed) + assert gen.world_spec.narrative is not None + return gen.world_spec.narrative + + +@pytest.fixture(scope="module") +def sim_data(): + """Run a small simulation once; share across all tests in this module.""" + config = GenerationConfig(seed=42, n_accounts=30, n_contacts=90, n_leads=80) + narrative = _make_narrative(config.seed) + graph = sample_hidden_graph(42) + population = build_population(config, narrative, graph) + result = simulate_world(config, population, graph) + return config, population, result + + +# --------------------------------------------------------------------------- +# Windowed snapshot basics +# --------------------------------------------------------------------------- + + +class TestWindowedSnapshot: + def test_snapshot_day_produces_valid_dataframe(self, sim_data): + config, population, result = sim_data + snap = build_snapshot(result, population, snapshot_day=14) + assert len(snap) == config.n_leads + assert "touches_week_1" in snap.columns + + def test_windowed_touch_counts_leq_full(self, sim_data): + """Windowed touch counts should be ≤ full-horizon counts.""" + _, population, result = sim_data + full = build_snapshot(result, population) + windowed = build_snapshot(result, population, snapshot_day=14) + assert (windowed["touch_count"] <= full["touch_count"]).all() + + def test_windowed_session_counts_leq_full(self, sim_data): + _, population, result = sim_data + full = build_snapshot(result, population) + windowed = build_snapshot(result, population, snapshot_day=14) + assert (windowed["session_count"] <= full["session_count"]).all() + + def test_snapshot_day_none_equals_default(self, sim_data): + """snapshot_day=None should produce same result as omitting it.""" + _, population, result = sim_data + default = build_snapshot(result, population) + explicit_none = build_snapshot(result, population, snapshot_day=None) + pd.testing.assert_frame_equal(default, explicit_none) + + +# --------------------------------------------------------------------------- +# touches_week_1 +# --------------------------------------------------------------------------- + + +class TestTouchesWeek1: + def test_non_negative(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population, snapshot_day=14) + assert (snap["touches_week_1"] >= 0).all() + + def test_leq_total_touches(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population, snapshot_day=14) + assert (snap["touches_week_1"] <= snap["touch_count"]).all() + + +# --------------------------------------------------------------------------- +# days_since_first_touch +# --------------------------------------------------------------------------- + + +class TestDaysSinceFirstTouch: + def test_non_negative_when_present(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population, snapshot_day=14) + valid = snap["days_since_first_touch"].dropna() + if len(valid) > 0: + assert (valid >= 0).all() + + def test_nan_when_no_touches(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population, snapshot_day=14) + no_touch = snap[snap["touch_count"] == 0] + if len(no_touch) > 0: + assert no_touch["days_since_first_touch"].isna().all() + + +# --------------------------------------------------------------------------- +# total_touches_all (leakage trap) +# --------------------------------------------------------------------------- + + +class TestTotalTouchesAll: + def test_present_with_snapshot_day(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population, snapshot_day=14) + assert "total_touches_all" in snap.columns + + def test_geq_windowed_touch_count(self, sim_data): + """Leakage trap uses full horizon, so should be ≥ windowed counts.""" + _, population, result = sim_data + snap = build_snapshot(result, population, snapshot_day=14) + assert (snap["total_touches_all"] >= snap["touch_count"]).all() + + def test_non_negative(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population, snapshot_day=14) + assert (snap["total_touches_all"] >= 0).all() + + +# --------------------------------------------------------------------------- +# opportunity_created +# --------------------------------------------------------------------------- + + +class TestOpportunityCreated: + def test_is_boolean_dtype(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population) + assert snap["opportunity_created"].dtype.name == "boolean" + + def test_superset_of_has_open(self, sim_data): + """Every lead with has_open_opportunity must also have opportunity_created.""" + _, population, result = sim_data + snap = build_snapshot(result, population) + has_open = snap["has_open_opportunity"].fillna(False) + opp_created = snap["opportunity_created"].fillna(False) + assert (has_open <= opp_created).all() + + +# --------------------------------------------------------------------------- +# expected_acv +# --------------------------------------------------------------------------- + + +class TestExpectedAcv: + def test_present_in_snapshot(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population) + assert "expected_acv" in snap.columns + + def test_positive_when_present(self, sim_data): + _, population, result = sim_data + snap = build_snapshot(result, population) + valid = snap["expected_acv"].dropna() + if len(valid) > 0: + assert (valid > 0).all() + + +# --------------------------------------------------------------------------- +# Determinism +# --------------------------------------------------------------------------- + + +class TestWindowedDeterminism: + def test_same_seed_same_output(self): + """Windowed snapshots must be deterministic given the same seed.""" + + def _snap(seed): + cfg = GenerationConfig(seed=seed, n_accounts=15, n_contacts=45, n_leads=40) + narr = _make_narrative(seed) + g = sample_hidden_graph(seed) + pop = build_population(cfg, narr, g) + res = simulate_world(cfg, pop, g) + return build_snapshot(res, pop, snapshot_day=14) + + s1 = _snap(99) + s2 = _snap(99) + pd.testing.assert_frame_equal(s1, s2, check_like=False) diff --git a/tests/simulation/test_population.py b/tests/simulation/test_population.py index c41d657..a4d5d0c 100644 --- a/tests/simulation/test_population.py +++ b/tests/simulation/test_population.py @@ -414,6 +414,150 @@ def test_empty_channels_raises() -> None: _build_with_narrative(bad_narrative) +# --------------------------------------------------------------------------- +# Category-latent correlations (v4 engine change) +# --------------------------------------------------------------------------- + + +def test_category_latent_correlations_shift_latents() -> None: + """Applying category-latent correlations must shift the target trait.""" + config = GenerationConfig(seed=42, n_accounts=100, n_contacts=200, n_leads=300) + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=42) + narrative = gen.world_spec.narrative + assert narrative is not None + graph = sample_hidden_graph(seed=42) + + # Build without correlations. + baseline = build_population(config, narrative, graph) + baseline_authority = [ + v["latent_contact_authority"] for v in baseline.latent_state.contact_latents.values() + ] + + # Build with a strong positive boost for a common seniority value. + correlations = { + "seniority": { + "latent_trait": "latent_contact_authority", + "boosts": { + "c_suite": 0.3, + "vp": 0.2, + "director": 0.1, + }, + }, + } + boosted = build_population(config, narrative, graph, category_latent_correlations=correlations) + boosted_authority = [ + v["latent_contact_authority"] for v in boosted.latent_state.contact_latents.values() + ] + + # Mean should be higher with boosts. + avg_baseline = sum(baseline_authority) / len(baseline_authority) + avg_boosted = sum(boosted_authority) / len(boosted_authority) + assert avg_boosted > avg_baseline, ( + f"Expected boosted ({avg_boosted:.3f}) > baseline ({avg_baseline:.3f})" + ) + + +def test_category_latent_correlations_clamped() -> None: + """Values must stay in [0, 1] even with extreme boosts.""" + config = GenerationConfig(seed=42, n_accounts=50, n_contacts=100, n_leads=150) + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=42) + narrative = gen.world_spec.narrative + assert narrative is not None + graph = sample_hidden_graph(seed=42) + + correlations = { + "seniority": { + "latent_trait": "latent_contact_authority", + "boosts": { + "c_suite": 5.0, + "vp": 5.0, + "director": 5.0, + "manager": 5.0, + "individual_contributor": -5.0, + }, + }, + } + result = build_population(config, narrative, graph, category_latent_correlations=correlations) + for traits in result.latent_state.contact_latents.values(): + assert 0.0 <= traits["latent_contact_authority"] <= 1.0 + + +def test_category_latent_correlations_deterministic() -> None: + """Same seed + same correlations → same output.""" + config = GenerationConfig(seed=77, n_accounts=30, n_contacts=60, n_leads=80) + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=77) + narrative = gen.world_spec.narrative + assert narrative is not None + graph = sample_hidden_graph(seed=77) + + correlations = { + "estimated_revenue_band": { + "latent_trait": "latent_account_fit", + "boosts": {"$200M+": 0.15, "$50M-$200M": 0.05}, + }, + } + r1 = build_population(config, narrative, graph, category_latent_correlations=correlations) + r2 = build_population(config, narrative, graph, category_latent_correlations=correlations) + assert r1.latent_state.account_latents == r2.latent_state.account_latents + + +def test_lead_source_boost_not_stacked_per_contact() -> None: + """A contact shared by N leads must receive the boost only once. + + Regression test: previously, iterating over leads applied the boost once + per lead, so a contact with 3 leads of the same source got 3x the boost. + """ + import copy + + from leadforge.simulation.population import _apply_category_latent_correlations + + # Use enough leads relative to contacts to guarantee shared contacts. + config = GenerationConfig(seed=42, n_accounts=30, n_contacts=90, n_leads=200) + gen = Generator.from_recipe("b2b_saas_procurement_v1", seed=42) + narrative = gen.world_spec.narrative + assert narrative is not None + graph = sample_hidden_graph(seed=42) + pop = build_population(config, narrative, graph) + + # Find a contact with multiple leads of the same source. + from collections import defaultdict + + contact_leads: dict[str, list] = defaultdict(list) + for lead in pop.leads: + contact_leads[lead.contact_id].append(lead) + + target_cid = None + target_source = None + for cid, leads in contact_leads.items(): + if len(leads) >= 2: + sources = [ld.lead_source for ld in leads] + if len(set(sources)) == 1: + target_cid = cid + target_source = sources[0] + break + assert target_cid is not None, "Need at least one shared contact for this test" + + pop2 = copy.deepcopy(pop) + boost_val = 0.10 + _apply_category_latent_correlations( + pop2, + { + "lead_source": { + "latent_trait": "latent_engagement_propensity", + "boosts": {target_source: boost_val}, + }, + }, + ) + original = pop.latent_state.contact_latents[target_cid]["latent_engagement_propensity"] + updated = pop2.latent_state.contact_latents[target_cid]["latent_engagement_propensity"] + delta = updated - original + # Should be exactly one boost, not N * boost. + assert abs(delta - boost_val) < 1e-9, ( + f"Expected single boost {boost_val}, got delta {delta:.4f} " + f"(contact has {len(contact_leads[target_cid])} leads)" + ) + + def test_channel_weights_zero_shares_falls_back_to_uniform() -> None: """If all GTM shares are 0, _channel_weights should return uniform weights.""" narrative = _base_narrative() diff --git a/tests/validation/test_invariants.py b/tests/validation/test_invariants.py index fd0f6a4..3668ccf 100644 --- a/tests/validation/test_invariants.py +++ b/tests/validation/test_invariants.py @@ -12,16 +12,19 @@ _SMALL = {"n_leads": 20, "n_accounts": 10, "n_contacts": 30} +_PINNED_TIMESTAMP = "2024-01-01T00:00:00+00:00" + + @pytest.fixture(scope="module") def determinism_bundles(tmp_path_factory: pytest.TempPathFactory) -> tuple[Path, Path]: - """Generate two bundles with the same seed.""" + """Generate two bundles with the same seed and pinned timestamp.""" a = tmp_path_factory.mktemp("det_a") b = tmp_path_factory.mktemp("det_b") for out in (a, b): gen = Generator.from_recipe( "b2b_saas_procurement_v1", seed=77, exposure_mode="student_public" ) - gen.generate(**_SMALL).save(str(out)) + gen.generate(**_SMALL).save(str(out), generation_timestamp=_PINNED_TIMESTAMP) return a, b