diff --git a/.agent-plan.md b/.agent-plan.md index 8b325b2..2db5a90 100644 --- a/.agent-plan.md +++ b/.agent-plan.md @@ -41,10 +41,11 @@ early against the known-good lead-scoring path + physical reorg into Status: `LTV-M0` landed (#102, #103, #106). `LTV-M1`: `LTV-Pb` merged (#104); `LTV-Pc` (pLTV feature/task specs) still outstanding. `LTV-M2`: `LTV-Pd` -(`GenerationScheme` protocol + registry) opened as **#107** (awaiting review, -verified byte-identical). Next in M2: `LTV-Pe` (physically move lead-scoring -pipeline into `schemes/lead_scoring/`), then `LTV-Pf` (scaffold -`schemes/lifecycle/`). +(scheme protocol + registry) merged (#107); `LTV-Pe` (scheme owns bundle +rendering — second half of the seam) opened as **#108** (awaiting review, +verified byte-identical). M2 reordered so the render seam precedes the physical +move. Next in M2: `LTV-Pf` (physically move lead-scoring pipeline into +`schemes/lead_scoring/`), then `LTV-Pg` (scaffold `schemes/lifecycle/`). --- diff --git a/docs/ltv/roadmap.md b/docs/ltv/roadmap.md index 5955698..f97ffa3 100644 --- a/docs/ltv/roadmap.md +++ b/docs/ltv/roadmap.md @@ -42,15 +42,15 @@ protocol + registry, with the package physically reorganized into |-----------|------------|-----|------------| | `LTV-M0` | Planning + design lock | `LTV-Pa` | #102, #103 (+ scheme reframe) | | `LTV-M1` | Lifecycle schema foundation | `LTV-Pb`, `LTV-Pc` | #104 (Pb) | -| `LTV-M2` | Generation-scheme architecture + physical reorg | `LTV-Pd`, `LTV-Pe`, `LTV-Pf` | | -| `LTV-M3` | Customer population + lifecycle world | `LTV-Pg`, `LTV-Ph` | | -| `LTV-M4` | Lifecycle simulation engine | `LTV-Pi`, `LTV-Pj` | | -| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pk`, `LTV-Pl` | | -| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pm`, `LTV-Pn` | | -| `LTV-M7` | Validation + regression-metric calibration | `LTV-Po` | | -| `LTV-M8` | CLI, notebooks, publish | `LTV-Pp`, `LTV-Pq`, `LTV-Pr` | | +| `LTV-M2` | Generation-scheme architecture + physical reorg | `LTV-Pd`, `LTV-Pe`, `LTV-Pf`, `LTV-Pg` | #107 (Pd), #108 (Pe) | +| `LTV-M3` | Customer population + lifecycle world | `LTV-Ph`, `LTV-Pi` | | +| `LTV-M4` | Lifecycle simulation engine | `LTV-Pj`, `LTV-Pk` | | +| `LTV-M5` | Customer snapshots + pLTV targets (both regimes) | `LTV-Pl`, `LTV-Pm` | | +| `LTV-M6` | Register LifecycleScheme + recipe + manifest/version | `LTV-Pn`, `LTV-Po` | | +| `LTV-M7` | Validation + regression-metric calibration | `LTV-Pp` | | +| `LTV-M8` | CLI, notebooks, publish | `LTV-Pq`, `LTV-Pr`, `LTV-Ps` | | -Total: ~18 PRs across 9 milestones. +Total: ~19 PRs across 9 milestones. --- @@ -96,16 +96,33 @@ Total: ~18 PRs across 9 milestones. byte-identical (all 14 files of a pinned-timestamp bundle hash identically, main vs branch). - Labels: `type: refactor`, `layer: api`, `layer: core` -- [ ] **`LTV-Pe`** — `refactor: move lead-scoring pipeline to schemes/lead_scoring/`. - Physically relocate the lead-scoring population/engine/state/mechanisms/ - structure/snapshot/relational/task modules + its entity/feature/task specs - under `schemes/lead_scoring/`; leave shared primitives in `schema/`, - `render/` envelope, etc. Add back-compat import shims where `scripts/` or the - sibling datasets repo reference internal paths. +- [x] **`LTV-Pe`** — `refactor(render): scheme owns bundle rendering` (**PR #108**). Complete + the **second half** of the seam against the known-good lead-scoring path: + add `write_bundle` to the `GenerationScheme` protocol; move the + `api/bundle.py` orchestration body into `LeadScoringScheme.write_bundle` + (reusing the already-modular shared helpers — `build_manifest`, + `apply_exposure`, `get_filter`); `api/bundle.py::write_bundle` becomes a thin + dispatcher on `bundle.spec.scheme`, so `WorldBundle.save()` delegates to the + producing scheme. Also harden scheme registration so resolution no longer + depends on import order (the side-effect-registration footgun). Verified + byte-identical. **Sequenced before the physical move** so the file move + relocates a *complete* (both-halves) scheme and `bundle.py`'s call sites + change only once. (Reorder rationale: the render path is where schemes + diverge most; design it against lead-scoring with byte-identity as the oracle + before building lifecycle.) + - Tests: render dispatch, determinism through `save()`, unknown-scheme on + `save`, base-direct resolution (footgun guard), full suite green. + - Labels: `type: refactor`, `layer: render`, `layer: api` +- [ ] **`LTV-Pf`** — `refactor: move lead-scoring pipeline to schemes/lead_scoring/`. + Physically relocate the (now fully scheme-owned) lead-scoring population/ + engine/state/mechanisms/structure/snapshot/relational/task modules + its + entity/feature/task specs under `schemes/lead_scoring/`; leave shared + primitives in `schema/`, `render/` envelope, etc. Add back-compat import + shims where `scripts/` or the sibling datasets repo reference internal paths. - Tests: full suite + hash-determinism green; public API imports unchanged; shim coverage. - Labels: `type: refactor`, `layer: schema`, `layer: simulation`, `layer: render` -- [ ] **`LTV-Pf`** — `refactor: scaffold schemes/lifecycle/ + relocate LTV-Pb/Pc specs`. +- [ ] **`LTV-Pg`** — `refactor: scaffold schemes/lifecycle/ + relocate LTV-Pb/Pc specs`. Create `schemes/lifecycle/`; move the lifecycle entity rows (from #104) and the `LTV-Pc` feature/task specs into it; register a stub `LifecycleScheme` (pipeline methods raise `NotImplementedError` until M3–M6). Split any @@ -119,13 +136,13 @@ Total: ~18 PRs across 9 milestones. > Built directly under `schemes/lifecycle/`. -- [ ] **`LTV-Pg`** — `feat(lifecycle): customer population builder`. Customer +- [ ] **`LTV-Ph`** — `feat(lifecycle): customer population builder`. Customer entities, 5 new latent traits, **staggered start dates** ending at the absolute `observation_date` (D4); seam for future chained generation (D3). - Tests: determinism, latent distributions, staggered-start spread, FK integrity, acquisition-window boundary. - Labels: `type: feature`, `layer: simulation` -- [ ] **`LTV-Ph`** — `feat(lifecycle): motif families + mechanism policies`. 5 +- [ ] **`LTV-Pi`** — `feat(lifecycle): motif families + mechanism policies`. 5 retention motif families; `assign_lifecycle_mechanisms()` mapping motif → churn/expansion/payment params. - Tests: per-motif param tables, dispatch, determinism. @@ -135,13 +152,13 @@ Total: ~18 PRs across 9 milestones. ## `LTV-M4` — Lifecycle simulation engine -- [ ] **`LTV-Pi`** — `feat(lifecycle): churn / expansion / payment hazards`. +- [ ] **`LTV-Pj`** — `feat(lifecycle): churn / expansion / payment hazards`. Weibull churn hazard with renewal-date spike, expansion propensity (the heavy-tail generator for pLTV), payment failure + dunning. - Tests: hazard shape over tenure, renewal spike, dunning escalation, expansion MRR-delta bounds. - Labels: `type: feature`, `layer: mechanisms` -- [ ] **`LTV-Pj`** — `feat(lifecycle): weekly simulation engine`. +- [ ] **`LTV-Pk`** — `feat(lifecycle): weekly simulation engine`. `simulate_lifecycle()`: weekly loop per customer through `observation_date + 730d (+ early-regime buffer)` so all three windows are fully simulated (D6); emits `subscription_events`, `health_signals`, `invoices`; updates terminal @@ -154,7 +171,7 @@ Total: ~18 PRs across 9 milestones. ## `LTV-M5` — Customer snapshots + pLTV targets (both regimes) -- [ ] **`LTV-Pk`** — `feat(lifecycle): calendar-anchored customer snapshot`. +- [ ] **`LTV-Pl`** — `feat(lifecycle): calendar-anchored customer snapshot`. `build_customer_snapshot(cutoff=observation_date)`: last-12-week health aggregates; `mrr_change_at_snapshot` (valid) + `mrr_change_full_period` (trap); the three `ltv_revenue_{90,365,730}d` gross-revenue targets + @@ -162,7 +179,7 @@ Total: ~18 PRs across 9 milestones. - Tests: no post-cutoff data in windowed columns; ZILN target shape; trap invariant; target derivation; trap exempt from distortion. - Labels: `type: feature`, `layer: render` -- [ ] **`LTV-Pl`** — `feat(lifecycle): early-pLTV (tenure-anchored) task family`. +- [ ] **`LTV-Pm`** — `feat(lifecycle): early-pLTV (tenure-anchored) task family`. Reuse the snapshot builder with a per-customer relative cutoff (`customer_start + early_tenure_weeks`) to emit the cold-start snapshot + recomputed targets (D8); separate task directory. @@ -174,7 +191,7 @@ Total: ~18 PRs across 9 milestones. ## `LTV-M6` — Register LifecycleScheme + recipe + manifest/version -- [ ] **`LTV-Pm`** — `feat(lifecycle): complete LifecycleScheme + manifest/version`. +- [ ] **`LTV-Pn`** — `feat(lifecycle): complete LifecycleScheme + manifest/version`. Fill in the `LifecycleScheme` pipeline methods (population→sim→render→tasks); add `n_customers` + lifecycle config (windows, early-tenure, observation anchor) to `GenerationConfig`; record `generation_scheme` + `observation_date` @@ -184,7 +201,7 @@ Total: ~18 PRs across 9 milestones. - Tests: dispatch, lead-scoring path unaffected, manifest fields, regression split writer, exposure filtering for new tables. - Labels: `type: feature`, `layer: api`, `layer: render` -- [ ] **`LTV-Pn`** — `feat(recipes): b2b_saas_ltv_v1 recipe assets`. The three +- [ ] **`LTV-Po`** — `feat(recipes): b2b_saas_ltv_v1 recipe assets`. The three recipe YAMLs (`scheme: lifecycle`); register in the recipe registry; end-to-end `Generator.from_recipe("b2b_saas_ltv_v1").generate()` smoke test. - Tests: recipe loads, full round-trip, determinism, all task splits (3 @@ -195,7 +212,7 @@ Total: ~18 PRs across 9 milestones. ## `LTV-M7` — Validation + regression-metric calibration -- [ ] **`LTV-Po`** — `feat(validation): lifecycle leakage probes + pLTV metric bands`. +- [ ] **`LTV-Pp`** — `feat(validation): lifecycle leakage probes + pLTV metric bands`. Scheme-aware leakage probes (cutoff window check; banned terminal columns/tables; banned forward-window target columns); regression evaluation (Spearman, normalized Gini, decile calibration, total-pred-vs-actual, value @@ -208,15 +225,15 @@ Total: ~18 PRs across 9 milestones. ## `LTV-M8` — CLI, notebooks, publish -- [ ] **`LTV-Pp`** — `feat(cli): lifecycle generate flags + scheme-aware inspect`. +- [ ] **`LTV-Pq`** — `feat(cli): lifecycle generate flags + scheme-aware inspect`. `--n-customers`, observation/early-tenure flags; `inspect` dispatches on the bundle's `generation_scheme`. - Labels: `type: feature`, `layer: cli` -- [ ] **`LTV-Pq`** — `docs(notebooks): pLTV teaching sequence`. ZILN-vs-MSE +- [ ] **`LTV-Pr`** — `docs(notebooks): pLTV teaching sequence`. ZILN-vs-MSE baseline; discrimination/calibration metrics; the `mrr_change_full_period` leakage demo; early/cold-start pLTV; value-aware ranking; right-censoring note. - Labels: `type: docs`, `layer: render` -- [ ] **`LTV-Pr`** — `feat(release): package + publish b2b_saas_ltv_v1`. Kaggle +- [ ] **`LTV-Ps`** — `feat(release): package + publish b2b_saas_ltv_v1`. Kaggle + HF packaging (reuse Phase-5 packagers, scheme-aware), LLM critique, dataset card, release notes, tag. Publishes under the live `leadforge` Kaggle org. - Labels: `type: feature`, `layer: validation` diff --git a/leadforge/api/bundle.py b/leadforge/api/bundle.py index ec1fb36..00cff6b 100644 --- a/leadforge/api/bundle.py +++ b/leadforge/api/bundle.py @@ -1,34 +1,26 @@ -"""Bundle writer — assembles and serialises the full output bundle. - -:func:`write_bundle` is called by :meth:`WorldBundle.save` and orchestrates -all rendering steps: - -1. Project the relational dict (snapshot-safe for ``student_public``, - full-horizon for ``research_instructor``) and write ``tables/``. -2. Build the lead snapshot and write task splits (``tasks/``). -3. Write ``dataset_card.md`` and ``feature_dictionary.csv``. -4. Apply exposure filtering — write ``metadata/`` for ``research_instructor`` - mode; skip it for ``student_public``. -5. Build and write ``manifest.json``. +"""Bundle writer — dispatches serialisation to the producing generation scheme. + +:func:`write_bundle` is called by :meth:`WorldBundle.save`. It resolves the +bundle's generation scheme (``bundle.spec.scheme``) and delegates to that +scheme's :meth:`~leadforge.schemes.base.GenerationScheme.write_bundle`, which +owns the bundle's on-disk shape end to end (relational tables, task splits, +dataset card, feature dictionary, exposure metadata, manifest). + +Scope note: each scheme currently orchestrates its *own* write sequence; only +the scheme-agnostic relational-table write is shared today +(:func:`leadforge.render.relational.write_relational_tables`). A shared bundle +orchestrator with scheme render hooks is deferred to ``LTV-M6`` — it depends on +generalising ``build_manifest`` and ``apply_exposure``, which are still +lead-scoring-coupled (see ``docs/ltv/roadmap.md``). + +This thin module preserves the ``write_bundle(bundle, path)`` entry point. """ from __future__ import annotations -from pathlib import Path from typing import TYPE_CHECKING -from leadforge.exposure.filters import get_filter -from leadforge.exposure.modes import apply_exposure -from leadforge.narrative.dataset_card import render_dataset_card -from leadforge.render.manifests import build_manifest, write_manifest -from leadforge.render.relational import to_dataframes -from leadforge.render.relational_snapshot_safe import to_dataframes_snapshot_safe -from leadforge.render.snapshots import build_snapshot -from leadforge.render.tasks import write_task_splits -from leadforge.schema.dictionaries import write_feature_dictionary -from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, redacted_columns_for -from leadforge.schema.tables import write_parquet -from leadforge.schema.tasks import task_manifest_for_config +from leadforge.schemes import get_scheme if TYPE_CHECKING: from leadforge.core.models import WorldBundle @@ -39,7 +31,7 @@ def write_bundle( path: str, generation_timestamp: str | None = None, ) -> None: - """Write *bundle* to disk at *path*. + """Write *bundle* to disk at *path* via its generation scheme. Args: bundle: Fully populated :class:`~leadforge.core.models.WorldBundle`. @@ -48,119 +40,9 @@ def write_bundle( Pass a fixed value to produce byte-identical manifests. Raises: - RuntimeError: if any of ``bundle.simulation_result``, - ``bundle.population``, or ``bundle.world_graph`` are ``None``. + UnknownSchemeError: if ``bundle.spec.scheme`` is not registered. + RuntimeError: if the bundle is not fully populated (raised by the + scheme's ``write_bundle``). """ - if bundle.simulation_result is None or bundle.population is None or bundle.world_graph is None: - raise RuntimeError("WorldBundle is not fully populated. Call Generator.generate() first.") - - root = Path(path) - root.mkdir(parents=True, exist_ok=True) - - config = bundle.spec.config - result = bundle.simulation_result - population = bundle.population - world_graph = bundle.world_graph - - # The redaction set comes from the canonical feature spec — the same - # source of truth the validator uses. It is applied uniformly to - # every published parquet file (relational tables AND task splits) so - # users doing feature engineering off the raw tables (per the - # README's "Option 3") cannot trivially reintroduce a redacted - # column by joining ``tables/leads.parquet`` to their feature set. - redacted = redacted_columns_for(config.exposure_mode) - bundle_filter = get_filter(config.exposure_mode) - - # ------------------------------------------------------------------ - # 1. Relational tables → tables/ - # - # For ``student_public`` (``relational_snapshot_safe = True``) we - # project the full-horizon dict onto the snapshot-safe shape: - # ``BANNED_LEAD_COLUMNS`` / ``BANNED_OPP_COLUMNS`` are dropped, event - # tables are filtered per-lead to ``lead_created_at + snapshot_day``, - # and ``BANNED_TABLES`` (``customers`` / ``subscriptions``) are - # omitted entirely. The feature-level redaction below still applies - # on top — the two policies operate on disjoint columns - # (snapshot-safe owns the structural reconstruction surface; - # ``redacted_columns_for`` owns near-deterministic snapshot - # features), so they neither double-emit nor overlap. - # ------------------------------------------------------------------ - tables_dir = root / "tables" - tables_dir.mkdir(exist_ok=True) - - dfs = to_dataframes(result, population) - if bundle_filter.relational_snapshot_safe: - if config.snapshot_day is None: - raise ValueError( - f"exposure_mode={config.exposure_mode.value!r} requires " - "config.snapshot_day to be set (the snapshot-safe relational " - "export filters event tables to lead_created_at + snapshot_day); " - "got snapshot_day=None. Pin a snapshot_day on the recipe or " - "pass it explicitly." - ) - dfs = to_dataframes_snapshot_safe(dfs, snapshot_day=config.snapshot_day) - table_row_counts: dict[str, int] = {} - for table_name, df in dfs.items(): - if redacted: - cols_to_drop = [c for c in redacted if c in df.columns] - if cols_to_drop: - df = df.drop(columns=cols_to_drop) - write_parquet(df, tables_dir / f"{table_name}.parquet") - table_row_counts[table_name] = len(df) - - # ------------------------------------------------------------------ - # 2. Snapshot + task splits → tasks/ - # - # Same redaction rule applied to the snapshot DataFrame before the - # task splits are written, so manifest SHA-256 hashes reflect the - # published column set without a post-write rewrite step. - # ------------------------------------------------------------------ - snapshot = build_snapshot( - result, - population, - horizon_days=config.horizon_days, - snapshot_day=config.snapshot_day, - difficulty_params=config.difficulty_params, - seed=config.seed, - ) - if redacted: - drop_cols = [c for c in redacted if c in snapshot.columns] - if drop_cols: - snapshot = snapshot.drop(columns=drop_cols) - visible_features = tuple(f for f in LEAD_SNAPSHOT_FEATURES if f.name not in redacted) - - task = task_manifest_for_config(config.primary_task, config.label_window_days) - task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task) - - # ------------------------------------------------------------------ - # 3. Dataset card and feature dictionary - # ------------------------------------------------------------------ - (root / "dataset_card.md").write_text( - render_dataset_card( - bundle.spec, - task_manifest=task, - table_counts=table_row_counts, - features=visible_features, - ) - ) - write_feature_dictionary(root / "feature_dictionary.csv", features=visible_features) - - # ------------------------------------------------------------------ - # 4. Exposure metadata (research_instructor only) - # ------------------------------------------------------------------ - apply_exposure(bundle, root, config.exposure_mode) - - # ------------------------------------------------------------------ - # 5. Manifest - # ------------------------------------------------------------------ - manifest = build_manifest( - config=config, - world_graph=world_graph, - table_row_counts=table_row_counts, - task_row_counts={task.task_id: task_row_counts}, - bundle_root=root, - generation_timestamp=generation_timestamp, - redacted_columns=sorted(redacted), - relational_snapshot_safe=bundle_filter.relational_snapshot_safe, - ) - write_manifest(manifest, root) + scheme = get_scheme(bundle.spec.scheme) + scheme.write_bundle(bundle, path, generation_timestamp=generation_timestamp) diff --git a/leadforge/render/relational.py b/leadforge/render/relational.py index fb21578..08319a4 100644 --- a/leadforge/render/relational.py +++ b/leadforge/render/relational.py @@ -27,6 +27,9 @@ ) if TYPE_CHECKING: + from collections.abc import Collection + from pathlib import Path + from leadforge.simulation.engine import SimulationResult from leadforge.simulation.population import PopulationResult @@ -82,3 +85,40 @@ def to_dataframes( df = src.cls.empty_dataframe() dfs[table_name] = df return dfs + + +def write_relational_tables( + dfs: dict[str, pd.DataFrame], + tables_dir: Path, + *, + redacted: Collection[str] = frozenset(), +) -> dict[str, int]: + """Write a ``{table_name: DataFrame}`` dict to *tables_dir* as Parquet. + + A shared, scheme-agnostic envelope step used by each scheme's + ``write_bundle``: it drops any *redacted* columns present in a table, + writes one ``.parquet`` per entry, and returns ``{table_name: + row_count}``. The relational *shape* (which tables, snapshot-safe + projection) is the scheme's concern and is decided before calling this. + + Args: + dfs: Mapping of table name → DataFrame, already projected to the + published shape (e.g. snapshot-safe for ``student_public``). + tables_dir: Destination directory (created if absent). + redacted: Column names to strip from any table that contains them. + + Returns: + Row count per written table, in *dfs* iteration order. + """ + from leadforge.schema.tables import write_parquet + + tables_dir.mkdir(parents=True, exist_ok=True) + row_counts: dict[str, int] = {} + for table_name, df in dfs.items(): + if redacted: + cols_to_drop = [c for c in redacted if c in df.columns] + if cols_to_drop: + df = df.drop(columns=cols_to_drop) + write_parquet(df, tables_dir / f"{table_name}.parquet") + row_counts[table_name] = len(df) + return row_counts diff --git a/leadforge/schemes/base.py b/leadforge/schemes/base.py index b0dd9da..fd62a19 100644 --- a/leadforge/schemes/base.py +++ b/leadforge/schemes/base.py @@ -74,11 +74,51 @@ def build_world( """ ... + def write_bundle( + self, + bundle: WorldBundle, + path: str, + generation_timestamp: str | None = None, + ) -> None: + """Serialise *bundle* to *path* (the render half of the pipeline). + + Implementations own the bundle's full on-disk shape: which relational + tables are written, the task snapshot/splits, dataset card, feature + dictionary, exposure filtering, and manifest. Today each scheme + orchestrates this itself; the only shared step is + :func:`leadforge.render.relational.write_relational_tables`. A shared + orchestrator with scheme render hooks lands in ``LTV-M6`` once + ``build_manifest`` / ``apply_exposure`` are scheme-agnostic. + """ + ... -# Name → scheme instance. Populated by importing ``leadforge.schemes`` (its -# package ``__init__`` imports each built-in scheme module, which self-register). + +# Name → scheme instance. Populated by importing the built-in scheme modules +# (each self-registers on import). ``_ensure_builtins`` triggers this lazily so +# resolution works even if a caller reaches for ``base`` directly without first +# importing the ``leadforge.schemes`` package. SCHEME_REGISTRY: dict[str, GenerationScheme] = {} +_builtins_loaded = False + + +def _ensure_builtins() -> None: + """Import built-in scheme modules once, so the registry is populated. + + Importing the ``leadforge.schemes`` package runs its ``__init__``, which + imports each shipped scheme for its registration side effect. Guarded by a + module flag and idempotent (``import_module`` returns the cached module), so + this is safe to call on every resolution and re-entrant during package + import. + """ + global _builtins_loaded + if _builtins_loaded: + return + _builtins_loaded = True + import importlib + + importlib.import_module("leadforge.schemes") + def register_scheme(scheme: GenerationScheme) -> None: """Register *scheme* under its ``name``. @@ -99,6 +139,7 @@ def get_scheme(name: str) -> GenerationScheme: Raises: UnknownSchemeError: if no scheme is registered under *name*. """ + _ensure_builtins() try: return SCHEME_REGISTRY[name] except KeyError: @@ -109,4 +150,5 @@ def get_scheme(name: str) -> GenerationScheme: def available_schemes() -> tuple[str, ...]: """Return the names of all registered schemes, sorted.""" + _ensure_builtins() return tuple(sorted(SCHEME_REGISTRY)) diff --git a/leadforge/schemes/lead_scoring/__init__.py b/leadforge/schemes/lead_scoring/__init__.py index 7f3cc65..7437236 100644 --- a/leadforge/schemes/lead_scoring/__init__.py +++ b/leadforge/schemes/lead_scoring/__init__.py @@ -131,6 +131,144 @@ def _resolve_difficulty( category_latent_correlations ) + def write_bundle( + self, + bundle: WorldBundle, + path: str, + generation_timestamp: str | None = None, + ) -> None: + """Serialise a lead-scoring *bundle* to *path*. + + This method currently owns the *entire* lead-scoring on-disk shape — + relational export (snapshot-safe for ``student_public``), lead snapshot + + task splits, dataset card, feature dictionary, exposure metadata, and + manifest. Only the genuinely scheme-agnostic relational-table write is + factored out (``write_relational_tables``); the rest is intentionally + *not* yet shared. + + The deeper envelope/scheme decomposition (a shared bundle orchestrator + with scheme render hooks) is deferred to ``LTV-M6``: it requires + generalising ``build_manifest`` (today it takes the lead-scoring + ``world_graph``) and ``apply_exposure`` (today it writes the + lead-scoring hidden graph + latent registry), and is best designed with + a second scheme in hand. Until then ``LifecycleScheme.write_bundle`` + will reuse ``write_relational_tables`` + the leaf helpers + (``build_manifest`` / ``apply_exposure`` / ``get_filter``) but + orchestrate them itself. + """ + from pathlib import Path + + from leadforge.exposure.filters import get_filter + from leadforge.exposure.modes import apply_exposure + from leadforge.narrative.dataset_card import render_dataset_card + from leadforge.render.manifests import build_manifest, write_manifest + from leadforge.render.relational import to_dataframes, write_relational_tables + from leadforge.render.relational_snapshot_safe import to_dataframes_snapshot_safe + from leadforge.render.snapshots import build_snapshot + from leadforge.render.tasks import write_task_splits + from leadforge.schema.dictionaries import write_feature_dictionary + from leadforge.schema.features import LEAD_SNAPSHOT_FEATURES, redacted_columns_for + from leadforge.schema.tasks import task_manifest_for_config + + if ( + bundle.simulation_result is None + or bundle.population is None + or bundle.world_graph is None + ): + raise RuntimeError( + "WorldBundle is not fully populated. Call Generator.generate() first." + ) + + root = Path(path) + root.mkdir(parents=True, exist_ok=True) + + config = bundle.spec.config + result = bundle.simulation_result + population = bundle.population + world_graph = bundle.world_graph + + # The redaction set comes from the canonical feature spec — the same + # source of truth the validator uses. It is applied uniformly to + # every published parquet file (relational tables AND task splits) so + # users doing feature engineering off the raw tables (per the + # README's "Option 3") cannot trivially reintroduce a redacted + # column by joining ``tables/leads.parquet`` to their feature set. + redacted = redacted_columns_for(config.exposure_mode) + bundle_filter = get_filter(config.exposure_mode) + + # ------------------------------------------------------------------ + # 1. Relational tables → tables/ + # + # The lead-scoring *shape* (9 tables; snapshot-safe projection for + # student_public) is decided here; the redaction-drop + parquet-write + + # row-count loop is the shared, scheme-agnostic envelope step. + # ------------------------------------------------------------------ + dfs = to_dataframes(result, population) + if bundle_filter.relational_snapshot_safe: + if config.snapshot_day is None: + raise ValueError( + f"exposure_mode={config.exposure_mode.value!r} requires " + "config.snapshot_day to be set (the snapshot-safe relational " + "export filters event tables to lead_created_at + snapshot_day); " + "got snapshot_day=None. Pin a snapshot_day on the recipe or " + "pass it explicitly." + ) + dfs = to_dataframes_snapshot_safe(dfs, snapshot_day=config.snapshot_day) + table_row_counts = write_relational_tables(dfs, root / "tables", redacted=redacted) + + # ------------------------------------------------------------------ + # 2. Snapshot + task splits → tasks/ + # ------------------------------------------------------------------ + snapshot = build_snapshot( + result, + population, + horizon_days=config.horizon_days, + snapshot_day=config.snapshot_day, + difficulty_params=config.difficulty_params, + seed=config.seed, + ) + if redacted: + drop_cols = [c for c in redacted if c in snapshot.columns] + if drop_cols: + snapshot = snapshot.drop(columns=drop_cols) + visible_features = tuple(f for f in LEAD_SNAPSHOT_FEATURES if f.name not in redacted) + + task = task_manifest_for_config(config.primary_task, config.label_window_days) + task_row_counts = write_task_splits(snapshot, root / "tasks", seed=config.seed, task=task) + + # ------------------------------------------------------------------ + # 3. Dataset card and feature dictionary + # ------------------------------------------------------------------ + (root / "dataset_card.md").write_text( + render_dataset_card( + bundle.spec, + task_manifest=task, + table_counts=table_row_counts, + features=visible_features, + ) + ) + write_feature_dictionary(root / "feature_dictionary.csv", features=visible_features) + + # ------------------------------------------------------------------ + # 4. Exposure metadata (research_instructor only) + # ------------------------------------------------------------------ + apply_exposure(bundle, root, config.exposure_mode) + + # ------------------------------------------------------------------ + # 5. Manifest + # ------------------------------------------------------------------ + manifest = build_manifest( + config=config, + world_graph=world_graph, + table_row_counts=table_row_counts, + task_row_counts={task.task_id: task_row_counts}, + bundle_root=root, + generation_timestamp=generation_timestamp, + redacted_columns=sorted(redacted), + relational_snapshot_safe=bundle_filter.relational_snapshot_safe, + ) + write_manifest(manifest, root) + LEAD_SCORING_SCHEME = LeadScoringScheme() register_scheme(LEAD_SCORING_SCHEME) diff --git a/tests/render/test_write_relational_tables.py b/tests/render/test_write_relational_tables.py new file mode 100644 index 0000000..3e48a93 --- /dev/null +++ b/tests/render/test_write_relational_tables.py @@ -0,0 +1,47 @@ +"""Tests for the shared ``write_relational_tables`` envelope helper (LTV-Pe).""" + +from pathlib import Path + +import pandas as pd + +from leadforge.render.relational import write_relational_tables +from leadforge.schema.tables import read_parquet + + +def _dfs() -> dict[str, pd.DataFrame]: + return { + "accounts": pd.DataFrame({"account_id": ["a1", "a2"], "secret": [1, 2]}), + "leads": pd.DataFrame({"lead_id": ["l1"]}), + } + + +def test_writes_one_parquet_per_table_and_counts_rows(tmp_path: Path) -> None: + counts = write_relational_tables(_dfs(), tmp_path / "tables") + assert counts == {"accounts": 2, "leads": 1} + assert (tmp_path / "tables" / "accounts.parquet").exists() + assert (tmp_path / "tables" / "leads.parquet").exists() + + +def test_creates_nested_dir(tmp_path: Path) -> None: + target = tmp_path / "deep" / "tables" + assert not target.exists() + write_relational_tables(_dfs(), target) + assert target.is_dir() + + +def test_drops_redacted_columns_present(tmp_path: Path) -> None: + write_relational_tables(_dfs(), tmp_path / "tables", redacted={"secret", "absent"}) + accounts = read_parquet(tmp_path / "tables" / "accounts.parquet") + assert "secret" not in accounts.columns + assert "account_id" in accounts.columns + + +def test_no_redaction_keeps_all_columns(tmp_path: Path) -> None: + write_relational_tables(_dfs(), tmp_path / "tables") + accounts = read_parquet(tmp_path / "tables" / "accounts.parquet") + assert list(accounts.columns) == ["account_id", "secret"] + + +def test_preserves_iteration_order_in_counts(tmp_path: Path) -> None: + counts = write_relational_tables(_dfs(), tmp_path / "tables") + assert list(counts.keys()) == ["accounts", "leads"] diff --git a/tests/schemes/test_render_dispatch.py b/tests/schemes/test_render_dispatch.py new file mode 100644 index 0000000..7cd6357 --- /dev/null +++ b/tests/schemes/test_render_dispatch.py @@ -0,0 +1,92 @@ +"""Tests for the render half of the scheme seam (LTV-Pe). + +Covers ``WorldBundle.save`` / ``api.bundle.write_bundle`` dispatching to the +producing scheme, the registration footgun fix (``base``-direct resolution), +and render-path determinism. +""" + +import hashlib +import subprocess +import sys +from pathlib import Path + +import pytest + +from leadforge.api.bundle import write_bundle +from leadforge.api.generator import Generator +from leadforge.core.models import WorldBundle +from leadforge.schemes import UnknownSchemeError + +_SMALL = {"n_accounts": 20, "n_contacts": 40, "n_leads": 60, "difficulty": "intro"} +_TS = "2026-01-01T00:00:00Z" + + +def _hash_tree(root: Path) -> dict[str, str]: + return { + str(p.relative_to(root)): hashlib.sha256(p.read_bytes()).hexdigest() + for p in sorted(root.rglob("*")) + if p.is_file() + } + + +def _gen(): + return Generator.from_recipe("b2b_saas_procurement_v1", seed=42).generate(**_SMALL) + + +# --------------------------------------------------------------------------- +# Dispatch +# --------------------------------------------------------------------------- + + +def test_save_dispatches_to_scheme_and_writes_bundle(tmp_path: Path) -> None: + _gen().save(str(tmp_path), generation_timestamp=_TS) + assert (tmp_path / "manifest.json").exists() + assert (tmp_path / "tables").is_dir() + assert (tmp_path / "tasks").is_dir() + assert (tmp_path / "feature_dictionary.csv").exists() + assert (tmp_path / "dataset_card.md").exists() + + +def test_write_bundle_unknown_scheme_raises(tmp_path: Path) -> None: + bundle = _gen() + bundle.spec.scheme = "nope" + with pytest.raises(UnknownSchemeError): + write_bundle(bundle, str(tmp_path)) + + +def test_write_bundle_unpopulated_raises(tmp_path: Path) -> None: + # Default bundle has spec.scheme == "lead_scoring" → dispatches, then the + # lead-scoring write_bundle rejects the unpopulated bundle. + with pytest.raises(RuntimeError, match="not fully populated"): + write_bundle(WorldBundle(), str(tmp_path)) + + +# --------------------------------------------------------------------------- +# Render-path determinism +# --------------------------------------------------------------------------- + + +def test_save_is_byte_deterministic(tmp_path: Path) -> None: + a, b = tmp_path / "a", tmp_path / "b" + _gen().save(str(a), generation_timestamp=_TS) + _gen().save(str(b), generation_timestamp=_TS) + assert _hash_tree(a) == _hash_tree(b) + + +# --------------------------------------------------------------------------- +# Registration footgun fix — resolution must not depend on import order +# --------------------------------------------------------------------------- + + +def test_resolution_works_via_base_without_package_import() -> None: + # A fresh interpreter that imports ONLY leadforge.schemes.base (never the + # leadforge.schemes package) must still resolve built-in schemes, because + # get_scheme lazily triggers builtin registration. + code = "from leadforge.schemes.base import get_scheme; print(get_scheme('lead_scoring').name)" + out = subprocess.run( # noqa: S603 — controlled args, sys.executable + literal code + [sys.executable, "-c", code], + capture_output=True, + text=True, + check=True, + ) + assert out.stdout.strip() == "lead_scoring"