Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions .agent-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## Current System State

**v0.5.0 in progress — Milestones 7–11 complete.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. 581 tests passing.
**v0.5.0 in progress — Milestones 7–11 complete, v4-M1 in PR.** Full simulation engine + render/bundle + exposure filtering + CLI commands + validation harness implemented. v4 engine changes + build pipeline ready. 609 tests passing.

---

Expand All @@ -16,29 +16,31 @@ The primary focus is producing a v4 lead scoring dataset that fixes the issues f

See `docs/v4/design.md` for full details.

### v4-M0: Planning + spike ⬜ (this PR)
### v4-M0: Planning + spike ✓ (PR #19, #20)

- [x] `docs/v4/design.md` — consolidated requirements, contract, engine changes, implementation plan
- [x] `docs/v4/validation_spec.md` — automated validation checks
- [x] `docs/v4/planning_pr_review.md` — self-review and treatment plan
- [x] `scripts/spike_category_signal.py` — spike experiment validating category signal approach
- [x] Updated `CLAUDE.md`, `AGENTS.md`, `.agent-plan.md`

### v4-M1: Engine + build pipeline
### v4-M1: Engine + build pipeline ✓ (in PR)

Engine changes:
- [ ] Add `category_latent_correlations` to `difficulty_profiles.yaml` (intro profile)
- [ ] Apply correlations in `simulation/population.py` after initial latent sampling
- [ ] Add `snapshot_day` parameter to `render/snapshots.py` with windowed aggregation
- [ ] Add new features: `touches_week_1`, `days_since_first_touch`, `expected_acv`, `total_touches_all`
- [ ] Add `FeatureSpec` entries to `schema/features.py`
- [ ] Tests for all engine changes (backward compat + v4 mode)
- [x] Add `category_latent_correlations` to `difficulty_profiles.yaml` (intro profile, scale 1.8)
- [x] Apply correlations in `simulation/population.py` via `_apply_category_latent_correlations()`
- [x] Wire correlations through `api/generator.py` → `build_population()`
- [x] Add `snapshot_day` parameter to `render/snapshots.py` with windowed aggregation
- [x] Add new features: `touches_week_1`, `days_since_first_touch`, `expected_acv`, `total_touches_all`
- [x] Add `opportunity_created` feature (tracks ANY opp, not just open ones)
- [x] Add `FeatureSpec` entries to `schema/features.py`
- [x] 19 new tests (16 windowed snapshot + 3 category-latent correlations)

Build pipeline:
- [ ] `scripts/build_v4_snapshot.py` — day-21 snapshot + leakage trap + structured missingness + subsampling
- [ ] `scripts/validate_v4_dataset.py` — full validation per `docs/v4/validation_spec.md`
- [ ] End-to-end: generate bundle → build CSV → validate → all checks pass
- [ ] LR AUC 0.65–0.90 (without trap); 0.03 boost with trap
- [x] `scripts/build_v4_snapshot.py` — day-14 snapshot + leakage trap + structured missingness + subsampling
- [x] `scripts/validate_v4_dataset.py` — full validation per `docs/v4/validation_spec.md`
- [x] End-to-end: generate bundle → build CSV → validate → all 7 mandatory checks pass
- [x] LR AUC 0.659 (without trap); 0.03+ boost with trap

### v4-M2: Documentation + release ⬜

Expand Down
9 changes: 8 additions & 1 deletion leadforge/api/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@
from leadforge.core.models import WorldBundle


def write_bundle(bundle: WorldBundle, path: str) -> None:
def write_bundle(
bundle: WorldBundle,
path: str,
generation_timestamp: str | None = None,
) -> None:
"""Write *bundle* to disk at *path*.

Args:
bundle: Fully populated :class:`~leadforge.core.models.WorldBundle`.
path: Destination directory (created if absent).
generation_timestamp: ISO-8601 UTC timestamp. Defaults to now.
Pass a fixed value to produce byte-identical manifests.

Raises:
RuntimeError: if any of ``bundle.simulation_result``,
Expand Down Expand Up @@ -90,5 +96,6 @@ def write_bundle(bundle: WorldBundle, path: str) -> None:
table_row_counts=table_row_counts,
task_row_counts={CONVERTED_WITHIN_90_DAYS.task_id: task_row_counts},
bundle_root=root,
generation_timestamp=generation_timestamp,
)
write_manifest(manifest, root)
22 changes: 21 additions & 1 deletion leadforge/api/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,27 @@ def generate(
)

world_graph = sample_hidden_graph(config.seed)
population = build_population(config, narrative, world_graph)

# Load category-latent correlations from difficulty profile if available.
from leadforge.api.recipes import Recipe
from leadforge.recipes.registry import load_recipe

category_latent_correlations = None
try:
raw = load_recipe(config.recipe_id)
recipe = Recipe.from_dict(raw)
profiles = recipe.load_difficulty_profiles()
profile = profiles.get(config.difficulty.value, {})
category_latent_correlations = profile.get("category_latent_correlations")
except (FileNotFoundError, KeyError):
category_latent_correlations = None
Comment on lines +174 to +182

population = build_population(
config,
narrative,
world_graph,
category_latent_correlations=category_latent_correlations,
)
result = simulate_world(config, population, world_graph)

spec = WorldSpec(config=config, narrative=narrative)
Expand Down
6 changes: 4 additions & 2 deletions leadforge/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class WorldBundle:
simulation_result: SimulationResult | None = None
world_graph: WorldGraph | None = None

def save(self, path: str) -> None:
def save(self, path: str, generation_timestamp: str | None = None) -> None:
"""Write the full output bundle to *path*.

Creates the directory if it does not exist. The bundle layout
Expand All @@ -124,6 +124,8 @@ def save(self, path: str) -> None:

Args:
path: Destination directory (created if absent).
generation_timestamp: ISO-8601 UTC timestamp. Defaults to now.
Pass a fixed value to produce byte-identical manifests.

Raises:
RuntimeError: if :attr:`simulation_result`, :attr:`population`,
Expand All @@ -133,4 +135,4 @@ def save(self, path: str) -> None:
"""
from leadforge.api.bundle import write_bundle

write_bundle(self, path)
write_bundle(self, path, generation_timestamp=generation_timestamp)
26 changes: 26 additions & 0 deletions leadforge/recipes/b2b_saas_procurement_v1/difficulty_profiles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,32 @@ intro:
conversion_rate_range: [0.30, 0.45]
# Strength of buying-committee-friction effects
committee_friction: 0.10
# Correlate observable categories with latent traits to produce category-level
# conversion signal. Each entry maps an observable field to a latent trait and
# per-value additive boosts (applied after initial latent sampling in
# build_population). Values validated by spike experiment (scale 1.8).
category_latent_correlations:
seniority:
latent_trait: latent_contact_authority
boosts:
individual_contributor: -0.27
manager: -0.09
director: 0.09
vp: 0.22
c_suite: 0.36
estimated_revenue_band:
latent_trait: latent_account_fit
boosts:
"$1M-$10M": -0.18
"$10M-$50M": 0.0
"$50M-$200M": 0.18
"$200M+": 0.32
lead_source:
latent_trait: latent_engagement_propensity
boosts:
sdr_outbound: -0.14
inbound_marketing: 0.09
partner_referral: 0.22

intermediate:
description: >
Expand Down
138 changes: 125 additions & 13 deletions leadforge/render/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
TouchRow,
)
from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES
from leadforge.simulation.population import REVENUE_BAND_MIDPOINTS

if TYPE_CHECKING:
from leadforge.simulation.engine import SimulationResult
Expand Down Expand Up @@ -54,6 +55,7 @@ def build_snapshot(
result: SimulationResult,
population: PopulationResult,
horizon_days: int = 90,
snapshot_day: int | None = None,
) -> pd.DataFrame:
"""Build the lead snapshot DataFrame from simulation output.

Expand All @@ -62,17 +64,41 @@ def build_snapshot(
simulation horizon. The snapshot anchor date is
``lead_created_at + timedelta(days=horizon_days)``.

When *snapshot_day* is set, event aggregations are filtered to events
within ``[lead_created_at, lead_created_at + snapshot_day]``. This
enables windowed feature computation (e.g. day-21 snapshots for v4
datasets). The default ``None`` preserves existing behavior (full
horizon).

Args:
result: Output of :func:`~leadforge.simulation.engine.simulate_world`.
population: Output of
:func:`~leadforge.simulation.population.build_population`.
horizon_days: Simulation horizon length. Defaults to 90.
snapshot_day: Optional windowed snapshot day. When set, only events
with timestamps ``<= lead_created_at + timedelta(days=snapshot_day)``
are included (midnight-exclusive by construction, since the
simulation engine uses daily steps). Default ``None`` means use
*horizon_days*.

Returns:
A ``pd.DataFrame`` with the columns specified in
:data:`~leadforge.schema.features.LEAD_SNAPSHOT_FEATURES` and dtypes
matching the feature spec. Row order matches ``result.leads``.
"""
effective_window = snapshot_day if snapshot_day is not None else horizon_days

# -------------------------------------------------------------------
# Build base lead DataFrame first (needed for per-lead date filtering).
# -------------------------------------------------------------------
lead_df = pd.DataFrame([lead.to_dict() for lead in result.leads])
lead_dates = pd.to_datetime(lead_df["lead_created_at"])
lead_df["anchor_date"] = lead_dates + pd.Timedelta(days=horizon_days)
lead_df["snapshot_cutoff"] = lead_dates + pd.Timedelta(days=effective_window)

# Build a lead_id → snapshot_cutoff lookup for event filtering.
cutoff_map = dict(zip(lead_df["lead_id"], lead_df["snapshot_cutoff"], strict=False))

# -------------------------------------------------------------------
# Aggregate event tables by lead_id using pandas for efficiency.
# Empty event lists fall back to the entity's canonical empty DataFrame
Expand All @@ -85,8 +111,19 @@ def build_snapshot(
if result.touches
else TouchRow.empty_dataframe()
)

# Apply snapshot window filter to touches.
if len(td) > 0 and snapshot_day is not None:
td["_ts"] = pd.to_datetime(td["touch_timestamp"])
td["_cutoff"] = td["lead_id"].map(cutoff_map)
td_windowed = td[td["_ts"] <= td["_cutoff"]].copy()
td_full = td # Keep full set for total_touches_all
else:
td_windowed = td
td_full = td

touch_agg = (
td.groupby("lead_id")
td_windowed.groupby("lead_id")
.agg(
touch_count=("touch_id", "count"),
inbound_touch_count=(
Expand All @@ -102,12 +139,55 @@ def build_snapshot(
.reset_index()
)

# touches_week_1: count touches within first 7 days of lead creation.
if len(td_windowed) > 0:
if "_ts" not in td_windowed.columns:
td_windowed = td_windowed.copy()
td_windowed["_ts"] = pd.to_datetime(td_windowed["touch_timestamp"])
td_windowed_copy = td_windowed.copy()
td_windowed_copy["_lead_date"] = td_windowed_copy["lead_id"].map(
dict(zip(lead_df["lead_id"], lead_dates, strict=False))
)
td_windowed_copy["_day"] = (
td_windowed_copy["_ts"] - td_windowed_copy["_lead_date"]
).dt.days
week1 = td_windowed_copy[td_windowed_copy["_day"] <= 7]
touches_week_1 = week1.groupby("lead_id").size().reset_index(name="touches_week_1")

# days_since_first_touch: snapshot_day - first_touch_day
first_touch_day = (
td_windowed_copy.groupby("lead_id")["_day"]
.min()
.reset_index()
.rename(columns={"_day": "_first_touch_day"})
)
else:
touches_week_1 = pd.DataFrame(columns=["lead_id", "touches_week_1"])
first_touch_day = pd.DataFrame(columns=["lead_id", "_first_touch_day"])

# total_touches_all: count over full horizon (leakage trap when windowed,
# equals touch_count when using full horizon).
if len(td_full) > 0:
total_touches_all = (
td_full.groupby("lead_id")["touch_id"]
.count()
.reset_index()
.rename(columns={"touch_id": "total_touches_all"})
)
else:
total_touches_all = pd.DataFrame(columns=["lead_id", "total_touches_all"])

# Session aggregates
sd = (
pd.DataFrame([s.to_dict() for s in result.sessions])
if result.sessions
else SessionRow.empty_dataframe()
)
if len(sd) > 0 and snapshot_day is not None:
sd["_ts"] = pd.to_datetime(sd["session_timestamp"])
sd["_cutoff"] = sd["lead_id"].map(cutoff_map)
sd = sd[sd["_ts"] <= sd["_cutoff"]]

sess_agg = (
sd.groupby("lead_id")
.agg(
Expand All @@ -125,45 +205,73 @@ def build_snapshot(
if result.sales_activities
else SalesActivityRow.empty_dataframe()
)
if len(ad) > 0 and snapshot_day is not None:
ad["_ts"] = pd.to_datetime(ad["activity_timestamp"])
ad["_cutoff"] = ad["lead_id"].map(cutoff_map)
ad = ad[ad["_ts"] <= ad["_cutoff"]]

act_agg = ad.groupby("lead_id").agg(activity_count=("activity_id", "count")).reset_index()

# Opportunity join: find open (unclosed) opportunity per lead.
# Opportunity join: find opportunity created by snapshot cutoff.
od = (
pd.DataFrame([o.to_dict() for o in result.opportunities])
if result.opportunities
else OpportunityRow.empty_dataframe()
)
if len(od) > 0 and snapshot_day is not None:
od["_created"] = pd.to_datetime(od["created_at"])
od["_cutoff"] = od["lead_id"].map(cutoff_map)
od = od[od["_created"] <= od["_cutoff"]]

# Track ANY opportunity created (regardless of close outcome) for opportunity_created flag.
any_opps = od[["lead_id"]].drop_duplicates()
any_opps["opportunity_created"] = True

open_opps = od[od["close_outcome"].isna()][["lead_id", "estimated_acv"]]
open_opps = open_opps.groupby("lead_id").first().reset_index()
open_opps = open_opps.rename(columns={"estimated_acv": "opportunity_estimated_acv"})
open_opps["has_open_opportunity"] = True

# -------------------------------------------------------------------
# Build base lead DataFrame and join aggregates.
# -------------------------------------------------------------------
lead_df = pd.DataFrame([lead.to_dict() for lead in result.leads])

# Compute snapshot anchor date (per lead, vectorised).
lead_df["anchor_date"] = pd.to_datetime(lead_df["lead_created_at"]) + pd.Timedelta(
days=horizon_days
)

# Join aggregates (left join preserves all leads).
# -------------------------------------------------------------------
lead_df = lead_df.merge(touch_agg, on="lead_id", how="left")
lead_df = lead_df.merge(sess_agg, on="lead_id", how="left")
lead_df = lead_df.merge(act_agg, on="lead_id", how="left")
lead_df = lead_df.merge(any_opps, on="lead_id", how="left")
lead_df = lead_df.merge(open_opps, on="lead_id", how="left")
lead_df = lead_df.merge(touches_week_1, on="lead_id", how="left")
lead_df = lead_df.merge(first_touch_day, on="lead_id", how="left")
lead_df = lead_df.merge(total_touches_all, on="lead_id", how="left")

# Fill missing event aggregate counts with zero; has_open_opportunity with False.
# opportunity_estimated_acv and days_since_last_touch intentionally stay NaN.
lead_df[_INT_AGG_COLS] = lead_df[_INT_AGG_COLS].fillna(0)
int_agg_cols = [c for c in _INT_AGG_COLS if c in lead_df.columns]
lead_df[int_agg_cols] = lead_df[int_agg_cols].fillna(0)
lead_df["touches_week_1"] = lead_df["touches_week_1"].fillna(0)
if "total_touches_all" in lead_df.columns:
lead_df["total_touches_all"] = pd.to_numeric(
lead_df["total_touches_all"], errors="coerce"
).fillna(0)
opp_created_mask = lead_df["opportunity_created"].notna()
lead_df["opportunity_created"] = lead_df["opportunity_created"].where(
opp_created_mask, other=False
)
opp_mask = lead_df["has_open_opportunity"].notna()
lead_df["has_open_opportunity"] = lead_df["has_open_opportunity"].where(opp_mask, other=False)

# Compute days_since_last_touch fully vectorised.
# pd.to_datetime returns NaT for nulls; (Timestamp - NaT) yields NaN naturally.
last_ts = pd.to_datetime(lead_df["last_touch_timestamp"])
lead_df["days_since_last_touch"] = (lead_df["anchor_date"] - last_ts).dt.days
lead_df["days_since_last_touch"] = (lead_df["snapshot_cutoff"] - last_ts).dt.days

# Compute days_since_first_touch: snapshot_day - first_touch_day.
if "_first_touch_day" in lead_df.columns:
lead_df["days_since_first_touch"] = effective_window - lead_df["_first_touch_day"]
else:
lead_df["days_since_first_touch"] = pd.NA

# Compute expected_acv: opportunity ACV if available, else revenue band midpoint.

# -------------------------------------------------------------------
# Join account and contact features via vectorised merge (not apply).
Expand All @@ -175,6 +283,10 @@ def build_snapshot(
lead_df = lead_df.merge(acct_df, on="account_id", how="left")
lead_df = lead_df.merge(cont_df, on="contact_id", how="left")

# expected_acv: opportunity ACV where available, else revenue band midpoint.
band_midpoint = lead_df["estimated_revenue_band"].map(REVENUE_BAND_MIDPOINTS)
lead_df["expected_acv"] = lead_df["opportunity_estimated_acv"].fillna(band_midpoint)
Comment on lines +286 to +288

# -------------------------------------------------------------------
# Select, order, and cast columns — single authoritative dtype pass.
# -------------------------------------------------------------------
Expand Down
Loading
Loading