Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions .agent-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,42 @@

## Current System State

**v0.4.0 in progress — Milestone 7 complete (PR open).** Full simulation engine implemented:
per-lead mutable state, 90-day daily-step loop, touch/session/sales-activity emission,
HazardTransition stage advancement, ConversionHazard final-close, and post-conversion
opportunity/customer/subscription creation. 490 tests passing.
**v0.4.0 in progress — Milestones 7–8 complete (PRs open).** Full simulation engine + render/bundle
layer implemented. 521 tests passing.

---

## Active Task Breakdown — Milestone 8: Observation Model (v0.4.0)
## Next Up — Milestone 9: Exposure Filtering (v0.4.0)

Goal: Transform the hidden simulated world into realistic CRM-like observations.
Goal: Apply `student_public` / `research_instructor` exposure-mode filtering during bundle write.

- [ ] **1. Snapshot builder** (`render/snapshots.py`) — lead-anchored flat feature snapshot
- [ ] **2. Relational export** (`render/relational.py`) — DataFrame per table from SimulationResult
- [ ] **3. Task export** (`render/tasks.py`) — train/valid/test Parquet split for `converted_within_90_days`
- [ ] **4. Manifest builder** (`render/manifests.py`) — bundle manifest.json
- [ ] **5. Bundle writer** (`api/bundle.py`) — `WorldBundle.save(path)`
- [ ] `exposure/modes.py` — `ExposureMode`-aware filter dispatch
- [ ] `exposure/filters.py` — column/table redaction rules per mode
- [ ] `exposure/redaction.py` — latent-column scrubbing for `student_public`
- [ ] Wire into `api/bundle.py` write pipeline

---

## Context Pointers

- Milestone 7 scope: `docs/leadforge_implementation_plan.md` §10 "Milestone 7"
- Simulation spec: `docs/leadforge_architecture_spec.md` §11 "Simulation engine"
- Mechanism layer: `leadforge/mechanisms/` (all M6 files)
- Milestone 8 scope: `docs/leadforge_implementation_plan.md` §10 "Milestone 8"
- Render layer: `leadforge/render/` (snapshots, relational, tasks, manifests)
- Bundle writer: `leadforge/api/bundle.py`

---

## Completed Phases

### Milestone 8 — Render / Bundle Layer ✓ (v0.4.0 in PR)
- `render/relational.py`: `to_dataframes()` — 9-table dict of typed DataFrames from SimulationResult + PopulationResult
- `render/snapshots.py`: `build_snapshot()` — 30-column leakage-free lead snapshot with touch/session/activity aggregates, account/contact field joins
- `render/tasks.py`: `write_task_splits()` — deterministic 70/15/15 train/valid/test Parquet split + `task_manifest.json`
- `render/manifests.py`: `build_manifest()` / `write_manifest()` — manifest.json with provenance, row counts, SHA-256 hashes
- `api/bundle.py`: `write_bundle()` — orchestrates all render steps; writes full bundle to disk
- `core/models.py`: `WorldBundle.save(path)` — delegates to `write_bundle()` via lazy import
- `api/generator.py`: `Generator.generate()` — fully implemented end-to-end flow
- 31 new render tests; total 521 passing

### Milestone 7 — Simulation Engine ✓ (v0.4.0 in PR)
- `simulation/state.py`: `LeadSimState` — per-lead mutable state (stage, dwell, converted, churned, sql_day)
- `simulation/engine.py`: `simulate_world()` — 90-day daily-step loop; `SimulationResult` output type
Expand Down
86 changes: 86 additions & 0 deletions leadforge/api/bundle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Bundle writer — assembles and serialises the full output bundle.

:func:`write_bundle` is called by :meth:`WorldBundle.save` and orchestrates
all rendering steps:

1. Write relational Parquet tables (``tables/``).
2. Build the lead snapshot and write task splits (``tasks/``).
3. Write ``dataset_card.md`` and ``feature_dictionary.csv``.
4. Build and write ``manifest.json``.
"""

from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

from leadforge.narrative.dataset_card import render_dataset_card
from leadforge.render.manifests import build_manifest, write_manifest
from leadforge.render.relational import to_dataframes
from leadforge.render.snapshots import build_snapshot
from leadforge.render.tasks import write_task_splits
from leadforge.schema.dictionaries import write_feature_dictionary
from leadforge.schema.tables import write_parquet
from leadforge.schema.tasks import CONVERTED_WITHIN_90_DAYS

if TYPE_CHECKING:
from leadforge.core.models import WorldBundle


def write_bundle(bundle: WorldBundle, path: str) -> None:
"""Write *bundle* to disk at *path*.

Args:
bundle: Fully populated :class:`~leadforge.core.models.WorldBundle`.
path: Destination directory (created if absent).

Raises:
RuntimeError: if any of ``bundle.simulation_result``,
``bundle.population``, or ``bundle.world_graph`` are ``None``.
"""
if bundle.simulation_result is None or bundle.population is None or bundle.world_graph is None:
raise RuntimeError("WorldBundle is not fully populated. Call Generator.generate() first.")

root = Path(path)
root.mkdir(parents=True, exist_ok=True)

config = bundle.spec.config
result = bundle.simulation_result
population = bundle.population
world_graph = bundle.world_graph

# ------------------------------------------------------------------
# 1. Relational tables → tables/
# ------------------------------------------------------------------
tables_dir = root / "tables"
tables_dir.mkdir(exist_ok=True)

dfs = to_dataframes(result, population)
table_row_counts: dict[str, int] = {}
for table_name, df in dfs.items():
write_parquet(df, tables_dir / f"{table_name}.parquet")
table_row_counts[table_name] = len(df)

# ------------------------------------------------------------------
# 2. Snapshot + task splits → tasks/
# ------------------------------------------------------------------
snapshot = build_snapshot(result, population, horizon_days=config.horizon_days)
task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed)

# ------------------------------------------------------------------
# 3. Dataset card and feature dictionary
# ------------------------------------------------------------------
(root / "dataset_card.md").write_text(render_dataset_card(bundle.spec))
write_feature_dictionary(root / "feature_dictionary.csv")

# ------------------------------------------------------------------
# 4. Manifest
# ------------------------------------------------------------------
manifest = build_manifest(
config=config,
world_graph=world_graph,
table_row_counts=table_row_counts,
task_row_counts={CONVERTED_WITHIN_90_DAYS.task_id: task_row_counts},
bundle_root=root,
)
write_manifest(manifest, root)
64 changes: 60 additions & 4 deletions leadforge/api/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,67 @@ def generate(
n_accounts: int | None = None,
n_contacts: int | None = None,
n_leads: int | None = None,
difficulty: str | DifficultyProfile = DifficultyProfile.intermediate,
difficulty: str | DifficultyProfile = _MISSING, # type: ignore[assignment]
**kwargs: Any,
) -> WorldBundle:
"""Run the world simulation and return a bundle.
"""Run the full world simulation and return an in-memory bundle.

Overrides in *n_accounts*, *n_contacts*, *n_leads*, and *difficulty*
take effect for this call only — they do not mutate the Generator.
When *difficulty* is omitted the Generator's configured difficulty is used.

Args:
n_accounts: Override account count.
n_contacts: Override contact count.
n_leads: Override lead count.
difficulty: Difficulty profile name or enum value. Defaults to
the difficulty set on the Generator (i.e. from the recipe).
**kwargs: Reserved for future use.

Not yet implemented — available in v0.3.0+.
Returns:
A fully populated :class:`~leadforge.core.models.WorldBundle`.
Call :meth:`~leadforge.core.models.WorldBundle.save` to write it
to disk.
"""
raise NotImplementedError("Generator.generate() is not yet implemented. Coming in v0.3.0.")
import dataclasses

from leadforge.simulation.engine import simulate_world
from leadforge.simulation.population import build_population
from leadforge.structure.sampler import sample_hidden_graph

config = self._world_spec.config

# Apply per-call overrides without mutating the shared config.
overrides: dict[str, Any] = {}
if n_accounts is not None:
overrides["n_accounts"] = n_accounts
if n_contacts is not None:
overrides["n_contacts"] = n_contacts
if n_leads is not None:
overrides["n_leads"] = n_leads
if difficulty is not _MISSING:
if not isinstance(difficulty, DifficultyProfile):
difficulty = DifficultyProfile(difficulty) # type: ignore[arg-type]
if difficulty != config.difficulty:
overrides["difficulty"] = difficulty
if overrides:
config = dataclasses.replace(config, **overrides)

narrative = self._world_spec.narrative
if narrative is None:
raise RuntimeError(
"No narrative loaded. Initialise the Generator via "
"Generator.from_recipe() to resolve the narrative."
)

world_graph = sample_hidden_graph(config.seed)
population = build_population(config, narrative, world_graph)
result = simulate_world(config, population, world_graph)

spec = WorldSpec(config=config, narrative=narrative)
return WorldBundle(
spec=spec,
population=population,
simulation_result=result,
world_graph=world_graph,
)
42 changes: 41 additions & 1 deletion leadforge/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

if TYPE_CHECKING:
from leadforge.narrative.spec import NarrativeSpec
from leadforge.simulation.engine import SimulationResult
from leadforge.simulation.population import PopulationResult
from leadforge.structure.graph import WorldGraph


def _require_positive_int(value: Any, name: str) -> None:
Expand Down Expand Up @@ -90,7 +93,44 @@ class WorldSpec:
class WorldBundle:
"""In-memory result of one complete generation run.

Populated in Milestone 7+ (simulation and rendering).
Holds all generated artefacts and provides :meth:`save` to write the
full output bundle to disk.

Attributes:
spec: Fully resolved world specification (config + narrative).
population: Generated accounts, contacts, leads, and latent state.
simulation_result: Simulated event tables and final lead outcomes.
world_graph: Sampled hidden world graph used during simulation.
"""

spec: WorldSpec = field(default_factory=WorldSpec)
population: PopulationResult | None = None
simulation_result: SimulationResult | None = None
world_graph: WorldGraph | None = None

def save(self, path: str) -> None:
"""Write the full output bundle to *path*.

Creates the directory if it does not exist. The bundle layout
matches the canonical structure defined in ``CLAUDE.md``::

path/
manifest.json
dataset_card.md
feature_dictionary.csv
tables/ # one .parquet per relational table
tasks/converted_within_90_days/{train,valid,test}.parquet
tasks/converted_within_90_days/task_manifest.json

Args:
path: Destination directory (created if absent).

Raises:
RuntimeError: if :attr:`simulation_result`, :attr:`population`,
or :attr:`world_graph` have not been populated (i.e. if
:meth:`~leadforge.api.generator.Generator.generate` was not
called).
"""
from leadforge.api.bundle import write_bundle

write_bundle(self, path)
104 changes: 104 additions & 0 deletions leadforge/render/manifests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Bundle manifest builder.

:func:`build_manifest` constructs the ``manifest.json`` dict that is written
at the root of every output bundle. The manifest records provenance (recipe,
seed, version, generation timestamp) and integrity metadata (row counts and
SHA-256 hashes) for the Parquet data files: relational tables and task splits.
"""

from __future__ import annotations

import hashlib
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from leadforge.core.models import GenerationConfig
from leadforge.structure.graph import WorldGraph

# Bump this whenever the bundle layout or manifest schema changes.
BUNDLE_SCHEMA_VERSION = "1"


def build_manifest(
config: GenerationConfig,
world_graph: WorldGraph,
table_row_counts: dict[str, int],
task_row_counts: dict[str, dict[str, int]],
bundle_root: Path,
generation_timestamp: str | None = None,
) -> dict[str, Any]:
"""Build the bundle manifest dict.

SHA-256 hashes are computed by reading the written Parquet files from
*bundle_root*, so all table and task files must already exist on disk
before calling this function.

Args:
config: The resolved generation configuration.
world_graph: The sampled hidden world graph (provides motif_family).
table_row_counts: Mapping of table name → row count.
task_row_counts: Mapping of task_id → {split_name → row count}.
bundle_root: Root directory of the written bundle.
generation_timestamp: ISO-8601 UTC timestamp string. Defaults to now.

Returns:
A JSON-serialisable dict ready to be written as ``manifest.json``.
"""
if generation_timestamp is None:
generation_timestamp = datetime.now(UTC).isoformat(timespec="seconds")

# Build table entries with row counts and file hashes.
tables: dict[str, Any] = {}
for table_name, row_count in table_row_counts.items():
rel_path = f"tables/{table_name}.parquet"
abs_path = bundle_root / rel_path
sha = _sha256(abs_path)
tables[table_name] = {"row_count": row_count, "file": rel_path, "sha256": sha}
Comment thread
shaypal5 marked this conversation as resolved.

# Build task entries.
tasks: dict[str, Any] = {}
for task_id, split_counts in task_row_counts.items():
entry: dict[str, Any] = {}
for split_name, row_count in split_counts.items():
rel_path = f"tasks/{task_id}/{split_name}.parquet"
abs_path = bundle_root / rel_path
sha = _sha256(abs_path)
entry[f"{split_name}_rows"] = row_count
entry[f"{split_name}_sha256"] = sha
tasks[task_id] = entry

return {
"bundle_schema_version": BUNDLE_SCHEMA_VERSION,
"package_version": config.package_version,
"recipe_id": config.recipe_id,
"seed": config.seed,
"generation_timestamp": generation_timestamp,
"exposure_mode": config.exposure_mode.value,
"difficulty": config.difficulty.value,
"n_accounts": config.n_accounts,
"n_contacts": config.n_contacts,
"n_leads": config.n_leads,
"horizon_days": config.horizon_days,
"motif_family": world_graph.motif_family,
"tables": tables,
"tasks": tasks,
}


def write_manifest(manifest: dict[str, Any], bundle_root: Path) -> Path:
"""Serialise *manifest* to ``bundle_root/manifest.json`` and return the path."""
path = bundle_root / "manifest.json"
path.write_text(json.dumps(manifest, indent=2))
return path


def _sha256(path: Path) -> str:
"""Return the hex-encoded SHA-256 digest of *path*."""
h = hashlib.sha256()
with path.open("rb") as fh:
for chunk in iter(lambda: fh.read(65536), b""):
h.update(chunk)
return h.hexdigest()
Loading
Loading