diff --git a/.env.example b/.env.example index dcb75a17..7d49f5b9 100644 --- a/.env.example +++ b/.env.example @@ -26,6 +26,8 @@ FORECAST_DEFAULT_HORIZON=14 FORECAST_MAX_HORIZON=90 FORECAST_MODEL_ARTIFACTS_DIR=./artifacts/models FORECAST_ENABLE_LIGHTGBM=false +# FORECAST_ENABLE_XGBOOST defaults to false (opt-in; install ml-xgboost extra) +# FORECAST_ENABLE_RANDOM_FOREST=false # PRP-36 optional model — pure sklearn, no extra needed # RAG Configuration # Embedding Provider: "openai" or "ollama" diff --git a/app/core/config.py b/app/core/config.py index 27614159..09a30cfc 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -100,6 +100,7 @@ class Settings(BaseSettings): forecast_model_artifacts_dir: str = "./artifacts/models" forecast_enable_lightgbm: bool = False forecast_enable_xgboost: bool = False + forecast_enable_random_forest: bool = False # Backtesting backtest_max_splits: int = 20 diff --git a/app/features/backtesting/metrics.py b/app/features/backtesting/metrics.py index 7bb90c0d..bd3244f0 100644 --- a/app/features/backtesting/metrics.py +++ b/app/features/backtesting/metrics.py @@ -191,6 +191,45 @@ def wape( name="wape", value=wape_value, n_samples=len(actuals), warnings=warnings ) + @staticmethod + def rmse( + actuals: np.ndarray[Any, np.dtype[np.floating[Any]]], + predictions: np.ndarray[Any, np.dtype[np.floating[Any]]], + ) -> MetricResult: + """Root Mean Squared Error. + + Formula: ``sqrt(mean((A - F) ** 2))`` + + Penalises large errors more than MAE — useful when a forecast that + misses a single point badly is operationally worse than one that + misses many points by a little. + + Args: + actuals: Ground truth values. + predictions: Predicted values. + + Returns: + MetricResult with RMSE value (NaN for empty arrays). + + Raises: + ValueError: If arrays have different lengths. + """ + warnings: list[str] = [] + + if len(actuals) == 0: + return MetricResult(name="rmse", value=np.nan, n_samples=0, warnings=["Empty array"]) + + if len(actuals) != len(predictions): + raise ValueError( + f"Length mismatch: actuals={len(actuals)}, predictions={len(predictions)}" + ) + + rmse_value = float(np.sqrt(np.mean((actuals - predictions) ** 2))) + + return MetricResult( + name="rmse", value=rmse_value, n_samples=len(actuals), warnings=warnings + ) + @staticmethod def bias( actuals: np.ndarray[Any, np.dtype[np.floating[Any]]], @@ -307,6 +346,7 @@ def calculate_all( """ return { "mae": self.mae(actuals, predictions).value, + "rmse": self.rmse(actuals, predictions).value, "smape": self.smape(actuals, predictions).value, "wape": self.wape(actuals, predictions).value, "bias": self.bias(actuals, predictions).value, @@ -342,3 +382,109 @@ def aggregate_fold_metrics( stability[f"{name}_stability"] = np.nan return aggregated, stability + + def aggregate_bucket_metrics( + self, + fold_bucket_metrics: list[dict[str, dict[str, float]]], + ) -> dict[str, dict[str, float]]: + """Aggregate per-horizon-bucket metrics across folds (PRP-36). + + For each bucket id present in any fold, compute the per-metric mean + across the folds that emitted that bucket. Folds that did NOT emit + a bucket (because no test point fell inside its horizon range — e.g. + ``h_29_plus`` on a 14-day forecast) are silently skipped: their + absence reduces the sample count, not the aggregated value. + + Args: + fold_bucket_metrics: List of per-fold bucket dicts (the structure + returned by :func:`compute_bucket_metrics`). + + Returns: + Per-bucket aggregated mean dict; empty when every fold reported + an empty bucket dict (degenerate "horizon shorter than the + shortest bucket" case — shouldn't happen given bucket starts + at 1). + """ + if not fold_bucket_metrics: + return {} + + # Collect every (bucket_id, metric) pair that appeared in any fold. + bucket_metric_values: dict[str, dict[str, list[float]]] = {} + for fold in fold_bucket_metrics: + for bucket_id, metric_dict in fold.items(): + bucket = bucket_metric_values.setdefault(bucket_id, {}) + for metric_name, metric_value in metric_dict.items(): + if not np.isnan(metric_value): + bucket.setdefault(metric_name, []).append(metric_value) + + # Compute mean across folds per (bucket, metric). + aggregated: dict[str, dict[str, float]] = {} + for bucket_id, metrics in bucket_metric_values.items(): + bucket_means: dict[str, float] = {} + for metric_name, values in metrics.items(): + if values: + bucket_means[metric_name] = float(np.mean(values)) + if bucket_means: + aggregated[bucket_id] = bucket_means + return aggregated + + +HORIZON_BUCKETS: tuple[tuple[str, int, int | None], ...] = ( + ("h_1_7", 1, 7), + ("h_8_14", 8, 14), + ("h_15_28", 15, 28), + ("h_29_plus", 29, None), +) +"""Per-horizon-bucket boundaries (1-based, inclusive ends; ``None`` = unbounded). + +Bucket ids are stable JSON-key-safe strings — keep them in sync with +``app/features/backtesting/schemas.py`` and the Slice C frontend reader. +""" + + +def compute_bucket_metrics( + actuals: np.ndarray[Any, np.dtype[np.floating[Any]]], + predictions: np.ndarray[Any, np.dtype[np.floating[Any]]], + horizon_offsets: list[int], +) -> dict[str, dict[str, float]]: + """Compute per-horizon-bucket metrics for a single fold (PRP-36). + + Slices the (actuals, predictions) pair by ``horizon_offsets`` lying in + each bucket's ``[start, end]`` range, then calls + :meth:`MetricsCalculator.calculate_all` on the slice. Empty buckets are + dropped from the output (a 14-day horizon's ``h_29_plus`` bucket simply + does not appear) — Slice C never has to interpret a NaN slot. + + Args: + actuals: Ground-truth array, length ``H``. + predictions: Predicted array, length ``H``. + horizon_offsets: Per-row horizon position, 1-based. Length ``H``. + + Returns: + ``dict[bucket_id, dict[metric_name, value]]`` keyed by the bucket + ids from :data:`HORIZON_BUCKETS`. Empty buckets are omitted. + + Raises: + ValueError: If the three arrays have different lengths. + """ + if not (len(actuals) == len(predictions) == len(horizon_offsets)): + raise ValueError( + f"array length mismatch: actuals={len(actuals)}, " + f"predictions={len(predictions)}, horizon_offsets={len(horizon_offsets)}" + ) + if len(actuals) == 0: + return {} + + calc = MetricsCalculator() + out: dict[str, dict[str, float]] = {} + h = np.asarray(horizon_offsets, dtype=np.int64) + max_h = int(h.max()) + for bucket_id, start, end in HORIZON_BUCKETS: + upper = end if end is not None else max_h + mask = (h >= start) & (h <= upper) + if not mask.any(): + continue + bucket_actuals = actuals[mask] + bucket_predictions = predictions[mask] + out[bucket_id] = calc.calculate_all(bucket_actuals, bucket_predictions) + return out diff --git a/app/features/backtesting/schemas.py b/app/features/backtesting/schemas.py index 747d25c2..571f5166 100644 --- a/app/features/backtesting/schemas.py +++ b/app/features/backtesting/schemas.py @@ -154,6 +154,12 @@ class FoldResult(BaseModel): actuals: Actual values for the test period. predictions: Predicted values for the test period. metrics: Dictionary of metric names to values. + horizon_bucket_metrics: PRP-36 — per-horizon-bucket metric block. + Keys are stable bucket ids from + :data:`app.features.backtesting.metrics.HORIZON_BUCKETS` + (``"h_1_7"``, ``"h_8_14"``, ``"h_15_28"``, ``"h_29_plus"``). + Empty buckets are dropped, so a 14-day horizon's payload omits + ``h_29_plus`` rather than emitting NaN. """ fold_index: int @@ -162,6 +168,13 @@ class FoldResult(BaseModel): actuals: list[float] predictions: list[float] metrics: dict[str, float] + horizon_bucket_metrics: dict[str, dict[str, float]] = Field( + default_factory=dict, + description=( + "PRP-36 — per-horizon-bucket metrics keyed by bucket id " + "('h_1_7', 'h_8_14', 'h_15_28', 'h_29_plus'). Empty buckets are dropped." + ), + ) class ModelBacktestResult(BaseModel): @@ -173,6 +186,10 @@ class ModelBacktestResult(BaseModel): fold_results: Results for each fold. aggregated_metrics: Mean metrics across folds. metric_std: Standard deviation of metrics across folds. + bucketed_aggregated_metrics: PRP-36 — per-horizon-bucket aggregated + means across folds. ``None`` when no fold emitted a non-empty + bucket dict; otherwise keyed by the same bucket ids as + :attr:`FoldResult.horizon_bucket_metrics`. feature_aware: True when the model consumed a per-fold feature matrix (``requires_features``); False for target-only baseline models. exogenous_policy: How the test-window exogenous columns were sourced. @@ -186,6 +203,13 @@ class ModelBacktestResult(BaseModel): fold_results: list[FoldResult] aggregated_metrics: dict[str, float] metric_std: dict[str, float] + bucketed_aggregated_metrics: dict[str, dict[str, float]] | None = Field( + default=None, + description=( + "PRP-36 — per-horizon-bucket aggregated metrics across folds. " + "None when no fold emitted bucket metrics." + ), + ) feature_aware: bool = False exogenous_policy: Literal["observed"] | None = None diff --git a/app/features/backtesting/service.py b/app/features/backtesting/service.py index 209e9081..6876035c 100644 --- a/app/features/backtesting/service.py +++ b/app/features/backtesting/service.py @@ -27,7 +27,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import get_settings -from app.features.backtesting.metrics import MetricsCalculator +from app.features.backtesting.metrics import MetricsCalculator, compute_bucket_metrics from app.features.backtesting.schemas import ( BacktestConfig, BacktestResponse, @@ -377,6 +377,7 @@ def _run_model_backtest( """ fold_results: list[FoldResult] = [] fold_metrics: list[dict[str, float]] = [] + fold_bucket_metrics: list[dict[str, dict[str, float]]] = [] # Probe the capability flag, then build the historical matrix once for # the whole run (feature-aware path only) — sliced, never rebuilt, for @@ -415,6 +416,17 @@ def _run_model_backtest( ) fold_metrics.append(metrics) + # PRP-36 — per-horizon-bucket metrics. ``test_dates[0]`` anchors + # horizon day 1 so ``(d - test_dates[0]).days + 1`` lands in + # bucket ``h_1_7`` for the first 7 days and walks outward. + horizon_offsets = [(d - split.test_dates[0]).days + 1 for d in split.test_dates] + bucket_metrics = compute_bucket_metrics( + actuals=y_test, + predictions=predictions, + horizon_offsets=horizon_offsets, + ) + fold_bucket_metrics.append(bucket_metrics) + # Create fold result split_boundary = SplitBoundary( fold_index=split.fold_index, @@ -434,6 +446,7 @@ def _run_model_backtest( actuals=[float(v) for v in y_test], predictions=[float(v) for v in predictions], metrics=metrics, + horizon_bucket_metrics=bucket_metrics, ) else: # Store minimal fold result without detailed arrays @@ -444,14 +457,23 @@ def _run_model_backtest( actuals=[], predictions=[], metrics=metrics, + horizon_bucket_metrics=bucket_metrics, ) fold_results.append(fold_result) + logger.debug( + "backtest.fold_complete", + fold_index=split.fold_index, + bucket_count=len(bucket_metrics), + model_type=model_config.model_type, + ) + # Aggregate metrics aggregated_metrics, metric_std = self.metrics_calculator.aggregate_fold_metrics( fold_metrics ) + bucketed_aggregated = self.metrics_calculator.aggregate_bucket_metrics(fold_bucket_metrics) return ModelBacktestResult( model_type=model_config.model_type, @@ -459,6 +481,7 @@ def _run_model_backtest( fold_results=fold_results, aggregated_metrics=aggregated_metrics, metric_std=metric_std, + bucketed_aggregated_metrics=bucketed_aggregated if bucketed_aggregated else None, feature_aware=feature_aware, exogenous_policy="observed" if feature_aware else None, ) diff --git a/app/features/backtesting/tests/test_metrics.py b/app/features/backtesting/tests/test_metrics.py index 80d85b87..0885d947 100644 --- a/app/features/backtesting/tests/test_metrics.py +++ b/app/features/backtesting/tests/test_metrics.py @@ -5,7 +5,11 @@ import numpy as np import pytest -from app.features.backtesting.metrics import MetricsCalculator +from app.features.backtesting.metrics import ( + HORIZON_BUCKETS, + MetricsCalculator, + compute_bucket_metrics, +) class TestMAE: @@ -209,6 +213,7 @@ def test_calculate_all_returns_all_metrics(self) -> None: result = calc.calculate_all(actuals, predictions) assert "mae" in result + assert "rmse" in result # PRP-36 — RMSE added alongside MAE/sMAPE/WAPE/bias. assert "smape" in result assert "wape" in result assert "bias" in result @@ -222,6 +227,7 @@ def test_calculate_all_values_consistent(self) -> None: all_metrics = calc.calculate_all(actuals, predictions) assert all_metrics["mae"] == calc.mae(actuals, predictions).value + assert all_metrics["rmse"] == calc.rmse(actuals, predictions).value assert all_metrics["smape"] == calc.smape(actuals, predictions).value assert all_metrics["wape"] == calc.wape(actuals, predictions).value assert all_metrics["bias"] == calc.bias(actuals, predictions).value @@ -376,3 +382,143 @@ def test_mixed_positive_negative_actuals(self) -> None: # MAE should still work mae_result = calc.mae(actuals, predictions) assert mae_result.value == pytest.approx(2.0) # mean of |2|, |2|, |2| + + +class TestRMSE: + """Tests for Root Mean Squared Error (PRP-36).""" + + def test_rmse_perfect_predictions(self) -> None: + """Perfect predictions yield RMSE == 0.""" + calc = MetricsCalculator() + actuals = np.array([10.0, 20.0, 30.0]) + predictions = np.array([10.0, 20.0, 30.0]) + assert calc.rmse(actuals, predictions).value == 0.0 + + def test_rmse_known_values(self) -> None: + """RMSE matches the closed-form formula.""" + calc = MetricsCalculator() + actuals = np.array([10.0, 20.0, 30.0]) + predictions = np.array([12.0, 18.0, 33.0]) + # errors: [-2, 2, -3] → sq: [4, 4, 9] → mean=17/3 → sqrt≈2.380 + assert calc.rmse(actuals, predictions).value == pytest.approx(np.sqrt(17.0 / 3.0)) + + def test_rmse_penalises_large_errors_more_than_mae(self) -> None: + """A single big miss surfaces in RMSE more strongly than in MAE.""" + calc = MetricsCalculator() + actuals = np.array([10.0, 10.0, 10.0]) + even = np.array([8.0, 8.0, 8.0]) # MAE=2, RMSE=2 (uniform error) + spiky = np.array([10.0, 10.0, 4.0]) # MAE=2, RMSE=sqrt(12)≈3.46 + assert calc.rmse(actuals, even).value == pytest.approx(calc.mae(actuals, even).value) + assert calc.rmse(actuals, spiky).value > calc.mae(actuals, spiky).value + + def test_rmse_empty_array_returns_nan(self) -> None: + """Empty inputs return NaN with a warning.""" + calc = MetricsCalculator() + result = calc.rmse(np.array([]), np.array([])) + assert math.isnan(result.value) + assert result.n_samples == 0 + assert "Empty array" in result.warnings + + def test_rmse_length_mismatch_raises(self) -> None: + """Different-length arrays surface as ValueError.""" + calc = MetricsCalculator() + with pytest.raises(ValueError, match="Length mismatch"): + calc.rmse(np.array([1.0, 2.0]), np.array([1.0])) + + +class TestComputeBucketMetrics: + """Tests for the per-horizon-bucket helper (PRP-36).""" + + def test_horizon_buckets_constant_shape(self) -> None: + """HORIZON_BUCKETS exposes four buckets with the documented boundaries.""" + ids = [b[0] for b in HORIZON_BUCKETS] + assert ids == ["h_1_7", "h_8_14", "h_15_28", "h_29_plus"] + assert HORIZON_BUCKETS[-1][2] is None # h_29_plus is unbounded. + + def test_compute_buckets_full_horizon_emits_all_present(self) -> None: + """A 30-day horizon spans all four buckets — they should all appear.""" + actuals = np.arange(1.0, 31.0) # 30 days + predictions = actuals + 1.0 # uniform +1 error + horizon_offsets = list(range(1, 31)) + result = compute_bucket_metrics(actuals, predictions, horizon_offsets) + assert set(result.keys()) == {"h_1_7", "h_8_14", "h_15_28", "h_29_plus"} + # Each bucket carries the same metric names as calculate_all. + for bucket in result.values(): + assert {"mae", "rmse", "smape", "wape", "bias"} <= set(bucket.keys()) + assert bucket["mae"] == pytest.approx(1.0) + + def test_compute_buckets_drops_empty_h29_for_14day_horizon(self) -> None: + """A 14-day horizon must NOT emit ``h_29_plus`` — empty buckets drop.""" + actuals = np.arange(1.0, 15.0) + predictions = actuals.copy() + horizon_offsets = list(range(1, 15)) + result = compute_bucket_metrics(actuals, predictions, horizon_offsets) + assert "h_1_7" in result + assert "h_8_14" in result + assert "h_15_28" not in result + assert "h_29_plus" not in result + + def test_compute_buckets_handles_unaligned_offsets(self) -> None: + """A non-contiguous offset list still slices into the right buckets.""" + actuals = np.array([1.0, 2.0, 3.0, 4.0]) + predictions = np.array([1.5, 2.5, 3.5, 4.5]) + horizon_offsets = [1, 14, 15, 50] # h_1_7, h_8_14, h_15_28, h_29_plus + result = compute_bucket_metrics(actuals, predictions, horizon_offsets) + assert set(result.keys()) == {"h_1_7", "h_8_14", "h_15_28", "h_29_plus"} + + def test_compute_buckets_length_mismatch_raises(self) -> None: + """Mismatched array lengths surface as ValueError.""" + with pytest.raises(ValueError, match="length mismatch"): + compute_bucket_metrics( + np.array([1.0, 2.0]), + np.array([1.0, 2.0, 3.0]), + [1, 2, 3], + ) + + def test_compute_buckets_empty_arrays_returns_empty_dict(self) -> None: + """Empty inputs return an empty dict (no buckets to emit).""" + result = compute_bucket_metrics(np.array([]), np.array([]), []) + assert result == {} + + +class TestAggregateBucketMetrics: + """Tests for cross-fold aggregation of per-horizon-bucket metrics.""" + + def test_aggregate_means_per_bucket(self) -> None: + """aggregate_bucket_metrics returns per-bucket means across folds.""" + calc = MetricsCalculator() + fold_buckets = [ + {"h_1_7": {"mae": 2.0, "rmse": 3.0}, "h_8_14": {"mae": 4.0}}, + {"h_1_7": {"mae": 6.0, "rmse": 7.0}, "h_8_14": {"mae": 8.0}}, + ] + aggregated = calc.aggregate_bucket_metrics(fold_buckets) + assert aggregated["h_1_7"]["mae"] == pytest.approx(4.0) + assert aggregated["h_1_7"]["rmse"] == pytest.approx(5.0) + assert aggregated["h_8_14"]["mae"] == pytest.approx(6.0) + + def test_aggregate_skips_buckets_absent_in_some_folds(self) -> None: + """A bucket present in only some folds aggregates over the present folds only.""" + calc = MetricsCalculator() + fold_buckets = [ + {"h_1_7": {"mae": 10.0}}, + {"h_1_7": {"mae": 20.0}, "h_29_plus": {"mae": 5.0}}, + ] + aggregated = calc.aggregate_bucket_metrics(fold_buckets) + assert aggregated["h_1_7"]["mae"] == pytest.approx(15.0) + assert aggregated["h_29_plus"]["mae"] == pytest.approx(5.0) + + def test_aggregate_empty_input_returns_empty_dict(self) -> None: + """No folds → no buckets.""" + calc = MetricsCalculator() + assert calc.aggregate_bucket_metrics([]) == {} + + def test_aggregate_skips_nan_values(self) -> None: + """NaN per-fold values do not contribute to the mean.""" + calc = MetricsCalculator() + fold_buckets = [ + {"h_1_7": {"mae": 2.0}}, + {"h_1_7": {"mae": float("nan")}}, + {"h_1_7": {"mae": 4.0}}, + ] + aggregated = calc.aggregate_bucket_metrics(fold_buckets) + assert aggregated["h_1_7"]["mae"] == pytest.approx(3.0) diff --git a/app/features/backtesting/tests/test_service.py b/app/features/backtesting/tests/test_service.py index 992ccb80..81608bd4 100644 --- a/app/features/backtesting/tests/test_service.py +++ b/app/features/backtesting/tests/test_service.py @@ -81,6 +81,49 @@ def test_run_model_backtest_naive( assert len(result.fold_results) == sample_split_config_expanding.n_splits assert "mae" in result.aggregated_metrics assert "smape" in result.aggregated_metrics + # PRP-36 — RMSE is now part of the aggregate. + assert "rmse" in result.aggregated_metrics + + def test_run_model_backtest_emits_horizon_bucket_metrics( + self, + sample_dates_120: list[date], + sample_values_120: np.ndarray, + sample_split_config_expanding: SplitConfig, + ) -> None: + """PRP-36 — every fold carries a horizon_bucket_metrics dict; agg block populated.""" + service = BacktestingService() + series_data = SeriesData( + dates=sample_dates_120, + values=sample_values_120, + store_id=1, + product_id=1, + ) + from app.features.backtesting.splitter import TimeSeriesSplitter + + splitter = TimeSeriesSplitter(sample_split_config_expanding) + result = service._run_model_backtest( + series_data=series_data, + splitter=splitter, + model_config=NaiveModelConfig(), + store_fold_details=True, + ) + + # The expanding split config defaults to horizon=14, so every fold + # spans buckets ``h_1_7`` + ``h_8_14``; ``h_15_28`` and ``h_29_plus`` + # are absent (empty buckets are dropped). + for fold in result.fold_results: + assert "h_1_7" in fold.horizon_bucket_metrics + assert "h_8_14" in fold.horizon_bucket_metrics + assert "h_15_28" not in fold.horizon_bucket_metrics + assert "h_29_plus" not in fold.horizon_bucket_metrics + # Each bucket carries the same metric names as calculate_all. + for bucket in fold.horizon_bucket_metrics.values(): + assert {"mae", "rmse", "smape", "wape", "bias"} <= set(bucket.keys()) + + # Aggregated bucket dict is populated and mirrors the fold shape. + assert result.bucketed_aggregated_metrics is not None + assert "h_1_7" in result.bucketed_aggregated_metrics + assert "h_8_14" in result.bucketed_aggregated_metrics def test_run_model_backtest_without_fold_details( self, diff --git a/app/features/explainability/explainers.py b/app/features/explainability/explainers.py index 8c4b9d3e..7d255210 100644 --- a/app/features/explainability/explainers.py +++ b/app/features/explainability/explainers.py @@ -22,6 +22,11 @@ Direction, DriverContribution, ) +from app.features.forecasting.models import ( + build_trend_baseline_design_row, + compute_seasonal_average_for_offset, + compute_weighted_average_weights, +) # A 1-D float series, matching the forecasters' target-array type. FloatArray = np.ndarray[Any, np.dtype[np.floating[Any]]] @@ -239,24 +244,284 @@ def confidence(self, y: FloatArray) -> ConfidenceLevel: return ConfidenceLevel.MEDIUM +class WeightedMovingAverageExplainer(BaseExplainer): + """Explainer for the weighted-moving-average baseline (PRP-36). + + Mirrors :class:`MovingAverageExplainer` but reports the weight strategy + in the driver description. The h=1 forecast is + ``np.average(y[-window_size:], weights=...)`` exactly. + """ + + def __init__( + self, + window_size: int = 7, + weight_strategy: str = "linear", + decay: float = 0.7, + ) -> None: + if window_size < 2: + raise ValueError(f"window_size must be >= 2, got {window_size}") + if weight_strategy not in ("linear", "exponential"): + raise ValueError( + f"weight_strategy must be 'linear' or 'exponential', got {weight_strategy!r}" + ) + if not 0.0 < decay < 1.0: + raise ValueError(f"decay must lie in (0.0, 1.0), got {decay}") + self.window_size = window_size + self.weight_strategy = weight_strategy + self.decay = decay + + def _weights(self) -> FloatArray: + # Reuses the forecaster's weight-construction helper so the + # explainer and the forecaster never drift. + return compute_weighted_average_weights( + window_size=self.window_size, + weight_strategy=self.weight_strategy, # type: ignore[arg-type] + decay=self.decay, + ) + + def explain(self, y: FloatArray) -> tuple[float, list[DriverContribution]]: + if len(y) < self.window_size: + raise ValueError(f"Need at least {self.window_size} observations") + window = y[-self.window_size :] + weights = self._weights() + forecast = float(np.average(window, weights=weights)) + dispersion = float(np.std(window)) + drivers = [ + DriverContribution( + name="weighted_window_mean", + feature_value=forecast, + contribution=forecast, + direction="positive", + description=( + f"The forecast is the {self.weight_strategy}-weighted mean of the " + f"last {self.window_size} observed values" + + (f" (decay={self.decay})." if self.weight_strategy == "exponential" else ".") + ), + ), + DriverContribution( + name="window_dispersion", + feature_value=dispersion, + contribution=0.0, + direction="neutral", + description=( + "Context only — standard deviation within the averaging " + "window; higher values mean a noisier, less reliable mean." + ), + ), + ] + return forecast, drivers + + def confidence(self, y: FloatArray) -> ConfidenceLevel: + if len(y) < self.window_size: + return ConfidenceLevel.LOW + window = y[-self.window_size :] + mean = float(np.mean(window)) + std = float(np.std(window)) + cv = std / mean if mean > 0 else 0.0 + if cv < 0.5: + return ConfidenceLevel.HIGH + return ConfidenceLevel.MEDIUM + + +class SeasonalAverageExplainer(BaseExplainer): + """Explainer for the seasonal-average baseline (PRP-36). + + Mirrors :class:`SeasonalAverageForecaster.predict` for ``horizon=1``: + horizon day 1 maps to offsets ``{1*S - 1, 2*S - 1, ...}`` from the end + of the stored history. The h=1 forecast is the mean (or trimmed mean + when ``trim_outliers=True`` AND ≥4 samples are present) of those + sampled values. + """ + + def __init__( + self, + season_length: int = 7, + lookback_cycles: int = 4, + trim_outliers: bool = False, + ) -> None: + if season_length < 2: + raise ValueError(f"season_length must be >= 2, got {season_length}") + if lookback_cycles < 2: + raise ValueError(f"lookback_cycles must be >= 2, got {lookback_cycles}") + self.season_length = season_length + self.lookback_cycles = lookback_cycles + self.trim_outliers = trim_outliers + + def explain(self, y: FloatArray) -> tuple[float, list[DriverContribution]]: + min_required = self.season_length * 2 + if len(y) < min_required: + raise ValueError(f"Need at least {min_required} observations") + # PRP-36 — single source of truth shared with the forecaster. + forecast, samples_used, samples_after_trim = compute_seasonal_average_for_offset( + history=y, + season_length=self.season_length, + lookback_cycles=self.lookback_cycles, + target_offset=1, # h=1 — the only horizon the explainer reports. + trim_outliers=self.trim_outliers, + ) + used_trim = self.trim_outliers and len(samples_used) >= 4 + trim_note = " after trimming the min + max samples" if used_trim else "" + # Dispersion is reported on the SAME array the forecast was + # averaged from — trimmed when trimming applied, raw otherwise — + # so the value matches the "what we averaged" semantic. + drivers = [ + DriverContribution( + name="seasonal_window_mean", + feature_value=float(forecast), + contribution=float(forecast), + direction="positive", + description=( + f"The forecast averages the values from the last {len(samples_used)} " + f"matching seasonal positions (every {self.season_length} days){trim_note}." + ), + ), + DriverContribution( + name="sample_dispersion", + feature_value=float(np.std(samples_after_trim)), + contribution=0.0, + direction="neutral", + description=( + "Context only — standard deviation across the sampled " + "seasonal positions actually averaged (post-trim when " + "trim_outliers is on)." + ), + ), + ] + return forecast, drivers + + def confidence(self, y: FloatArray) -> ConfidenceLevel: + if len(y) < self.season_length * self.lookback_cycles: + return ConfidenceLevel.LOW + return ConfidenceLevel.MEDIUM + + +class TrendRegressionBaselineExplainer(BaseExplainer): + """Explainer for the Ridge trend baseline (PRP-36). + + Surfaces the Ridge coefficients learned on the synthetic elapsed-day + + optional dow/month design. Unlike the target-only baselines, this + explainer requires a fitted Ridge — the service passes ``coef_`` + + ``intercept_`` in via :class:`_FittedRidgeBundle` rather than re-fitting + inside ``explain`` (re-fitting would re-engineer the design matrix, + losing the ``include_dow`` / ``include_month`` toggles). + """ + + def __init__( + self, + intercept: float, + coefficients: list[float], + include_dow: bool = True, + include_month: bool = True, + ) -> None: + self.intercept = intercept + self.coefficients = list(coefficients) + self.include_dow = include_dow + self.include_month = include_month + + def explain(self, y: FloatArray) -> tuple[float, list[DriverContribution]]: + if len(y) < 2: + raise ValueError("Need at least 2 observations") + elapsed_day = len(y) + # h=1 elapsed-day continuation: the next index after training. The + # design row is built via the SAME helper the forecaster's + # ``_design_row`` wraps — single source of truth for the encoding. + cols_arr = build_trend_baseline_design_row( + elapsed_day=elapsed_day, + include_dow=self.include_dow, + include_month=self.include_month, + ) + cols: list[float] = [float(v) for v in cols_arr] + if len(cols) != len(self.coefficients): + raise ValueError( + f"design row width ({len(cols)}) != coefficient count ({len(self.coefficients)})" + ) + contributions = [c * coef for c, coef in zip(cols, self.coefficients, strict=True)] + forecast = float(self.intercept + sum(contributions)) + drivers: list[DriverContribution] = [ + DriverContribution( + name="trend_intercept", + feature_value=1.0, + contribution=float(self.intercept), + direction=_direction(self.intercept), + description="Ridge intercept (baseline level before any covariates).", + ), + DriverContribution( + name="elapsed_day", + feature_value=float(elapsed_day), + contribution=float(contributions[0]), + direction=_direction(contributions[0]), + description=( + "Linear trend term — the slope Ridge fitted on the " + "elapsed-day index times the next-day value." + ), + ), + ] + if self.include_dow: + dow_contribution = sum(contributions[1:8]) + drivers.append( + DriverContribution( + name="day_of_week", + feature_value=float(elapsed_day % 7), + contribution=float(dow_contribution), + direction=_direction(dow_contribution), + description=("Calendar-cycle DOW one-hot effect for the forecasted day."), + ) + ) + if self.include_month: + offset = 8 if self.include_dow else 1 + month_contribution = sum(contributions[offset : offset + 12]) + drivers.append( + DriverContribution( + name="month_of_year", + feature_value=float((elapsed_day // 30) % 12), + contribution=float(month_contribution), + direction=_direction(month_contribution), + description=("Calendar-cycle month one-hot effect for the forecasted day."), + ) + ) + return forecast, drivers + + def confidence(self, y: FloatArray) -> ConfidenceLevel: + if len(y) < 30: + return ConfidenceLevel.LOW + if len(y) < 90: + return ConfidenceLevel.MEDIUM + return ConfidenceLevel.HIGH + + def explainer_factory( model_type: str, season_length: int | None = None, window_size: int | None = None, + weight_strategy: str | None = None, + decay: float | None = None, + lookback_cycles: int | None = None, + trim_outliers: bool | None = None, + trend_baseline_bundle: tuple[float, list[float], bool, bool] | None = None, ) -> BaseExplainer: """Build the rule-based explainer for a baseline model type. Args: - model_type: One of ``naive``, ``seasonal_naive``, ``moving_average``. - season_length: Seasonal period for ``seasonal_naive`` (defaults to 7). - window_size: Averaging window for ``moving_average`` (defaults to 7). + model_type: One of ``naive``, ``seasonal_naive``, ``moving_average``, + ``weighted_moving_average``, ``seasonal_average``, or + ``trend_regression_baseline``. + season_length: Seasonal period for ``seasonal_naive`` / ``seasonal_average``. + window_size: Window for ``moving_average`` / ``weighted_moving_average``. + weight_strategy: ``'linear'`` or ``'exponential'`` (weighted MA). + decay: Geometric decay for exponential WMA. + lookback_cycles: Cycles to average over (seasonal_average). + trim_outliers: Drop min + max per bucket (seasonal_average). + trend_baseline_bundle: ``(intercept, coefficients, include_dow, + include_month)`` for ``trend_regression_baseline`` — caller + supplies the fitted Ridge state. Returns: The matching explainer instance. Raises: - ValueError: For ``lightgbm``/``regression`` (MVP scope guard) or an - unknown model type. + ValueError: For ``lightgbm``/``regression``/``xgboost``/``random_forest`` + /``prophet_like`` (MVP scope guard — feature-aware models route + through a different code path) or an unknown model type. """ if model_type == "naive": return NaiveExplainer() @@ -264,7 +529,32 @@ def explainer_factory( return SeasonalNaiveExplainer(season_length=season_length or 7) if model_type == "moving_average": return MovingAverageExplainer(window_size=window_size or 7) - if model_type in ("lightgbm", "regression"): + if model_type == "weighted_moving_average": + return WeightedMovingAverageExplainer( + window_size=window_size or 7, + weight_strategy=weight_strategy or "linear", + decay=decay if decay is not None else 0.7, + ) + if model_type == "seasonal_average": + return SeasonalAverageExplainer( + season_length=season_length or 7, + lookback_cycles=lookback_cycles or 4, + trim_outliers=bool(trim_outliers) if trim_outliers is not None else False, + ) + if model_type == "trend_regression_baseline": + if trend_baseline_bundle is None: + raise ValueError( + "trend_regression_baseline explainer requires trend_baseline_bundle " + "(intercept, coefficients, include_dow, include_month) from the fitted Ridge." + ) + intercept, coefficients, include_dow, include_month = trend_baseline_bundle + return TrendRegressionBaselineExplainer( + intercept=intercept, + coefficients=coefficients, + include_dow=include_dow, + include_month=include_month, + ) + if model_type in ("lightgbm", "regression", "xgboost", "random_forest", "prophet_like"): raise ValueError( f"Explanations are available for baseline models only; " f"'{model_type}' is not supported (rule-based MVP)." diff --git a/app/features/explainability/schemas.py b/app/features/explainability/schemas.py index 9c767a09..8bd2455e 100644 --- a/app/features/explainability/schemas.py +++ b/app/features/explainability/schemas.py @@ -33,7 +33,14 @@ # Baseline model types this slice can explain. ``lightgbm``/``regression`` are # rejected with a clean 400 (MVP scope guard). -ExplainableModelType = Literal["naive", "seasonal_naive", "moving_average"] +ExplainableModelType = Literal[ + "naive", + "seasonal_naive", + "moving_average", + # PRP-36 — new target-only baselines (always-on). + "weighted_moving_average", + "seasonal_average", +] class ConfidenceLevel(str, Enum): @@ -140,8 +147,31 @@ class ExplainForecastRequest(BaseModel): description="Series cutoff date (the explainer reads only <= this date)", ) season_length: int | None = Field( - None, ge=1, le=365, description="Seasonal period for seasonal_naive (default 7)" + None, ge=1, le=365, description="Seasonal period for seasonal_naive / seasonal_average" ) window_size: int | None = Field( - None, ge=1, le=90, description="Averaging window for moving_average (default 7)" + None, + ge=1, + le=90, + description="Averaging window for moving_average / weighted_moving_average", + ) + # PRP-36 — weighted_moving_average + seasonal_average extras. + weight_strategy: Literal["linear", "exponential"] | None = Field( + None, description="Weighting scheme for weighted_moving_average (default 'linear')" + ) + decay: float | None = Field( + None, + gt=0.0, + lt=1.0, + description="Geometric decay for weighted_moving_average exponential (default 0.7)", + ) + lookback_cycles: int | None = Field( + None, + ge=2, + le=12, + description="Cycles to draw from for seasonal_average (default 4)", + ) + trim_outliers: bool | None = Field( + None, + description="Drop min + max samples before averaging (seasonal_average only)", ) diff --git a/app/features/explainability/service.py b/app/features/explainability/service.py index 2f10fe6e..77784e7d 100644 --- a/app/features/explainability/service.py +++ b/app/features/explainability/service.py @@ -98,6 +98,10 @@ async def explain_forecast( as_of_date=request.as_of_date, season_length=request.season_length, window_size=request.window_size, + weight_strategy=request.weight_strategy, + decay=request.decay, + lookback_cycles=request.lookback_cycles, + trim_outliers=request.trim_outliers, ) async def explain_run(self, db: AsyncSession, run_id: str) -> ForecastExplanation | None: @@ -127,6 +131,10 @@ async def explain_run(self, db: AsyncSession, run_id: str) -> ForecastExplanatio as_of_date=run.data_window_end, season_length=config.get("season_length"), window_size=config.get("window_size"), + weight_strategy=config.get("weight_strategy"), + decay=config.get("decay"), + lookback_cycles=config.get("lookback_cycles"), + trim_outliers=config.get("trim_outliers"), run_id=run_id, ) @@ -181,6 +189,10 @@ async def explain_job(self, db: AsyncSession, job_id: str) -> ForecastExplanatio # the explainer falls back to the forecaster defaults (7). season_length=None, window_size=None, + weight_strategy=None, + decay=None, + lookback_cycles=None, + trim_outliers=None, job_id=job_id, ) @@ -198,11 +210,23 @@ async def _explain( as_of_date: date_type, season_length: int | None, window_size: int | None, + weight_strategy: str | None = None, + decay: float | None = None, + lookback_cycles: int | None = None, + trim_outliers: bool | None = None, run_id: str | None = None, job_id: str | None = None, ) -> ForecastExplanation: """Build, persist, and return one rule-based explanation.""" - explainer = explainer_factory(model_type, season_length, window_size) + explainer = explainer_factory( + model_type, + season_length=season_length, + window_size=window_size, + weight_strategy=weight_strategy, + decay=decay, + lookback_cycles=lookback_cycles, + trim_outliers=trim_outliers, + ) y, _dates = await self._load_series(db, store_id, product_id, as_of_date) forecast_value, drivers = explainer.explain(y) confidence = explainer.confidence(y) @@ -358,6 +382,10 @@ def _min_required_history( return 2 * (season_length or 7) if model_type == "moving_average": return 2 * (window_size or 7) + if model_type == "weighted_moving_average": + return 2 * (window_size or 7) + if model_type == "seasonal_average": + return 2 * (season_length or 7) return 14 @staticmethod diff --git a/app/features/explainability/tests/test_explainers.py b/app/features/explainability/tests/test_explainers.py index 05045145..9a2b5ce1 100644 --- a/app/features/explainability/tests/test_explainers.py +++ b/app/features/explainability/tests/test_explainers.py @@ -159,3 +159,110 @@ def test_rejects_unknown_model(self) -> None: """An unknown model type raises ValueError.""" with pytest.raises(ValueError, match="Unknown model type"): explainer_factory("transformer") + + # PRP-36 — new baseline explainers + def test_builds_weighted_moving_average(self) -> None: + """The factory dispatches the weighted MA explainer with correct params.""" + from app.features.explainability.explainers import WeightedMovingAverageExplainer + + explainer = explainer_factory( + "weighted_moving_average", + window_size=14, + weight_strategy="exponential", + decay=0.5, + ) + assert isinstance(explainer, WeightedMovingAverageExplainer) + assert explainer.window_size == 14 + assert explainer.weight_strategy == "exponential" + assert explainer.decay == 0.5 + + def test_builds_seasonal_average(self) -> None: + """The factory dispatches the seasonal-average explainer with correct params.""" + from app.features.explainability.explainers import SeasonalAverageExplainer + + explainer = explainer_factory( + "seasonal_average", + season_length=7, + lookback_cycles=3, + trim_outliers=True, + ) + assert isinstance(explainer, SeasonalAverageExplainer) + assert explainer.season_length == 7 + assert explainer.lookback_cycles == 3 + assert explainer.trim_outliers is True + + def test_builds_trend_regression_baseline_with_bundle(self) -> None: + """The trend baseline factory requires a fitted Ridge bundle.""" + from app.features.explainability.explainers import ( + TrendRegressionBaselineExplainer, + ) + + explainer = explainer_factory( + "trend_regression_baseline", + trend_baseline_bundle=(0.5, [1.0] * 20, True, True), + ) + assert isinstance(explainer, TrendRegressionBaselineExplainer) + assert explainer.intercept == 0.5 + + def test_trend_regression_baseline_without_bundle_raises(self) -> None: + """Trend baseline without a fitted Ridge bundle fails with a clear message.""" + with pytest.raises(ValueError, match="trend_baseline_bundle"): + explainer_factory("trend_regression_baseline") + + @pytest.mark.parametrize( + "model_type", + ["lightgbm", "regression", "xgboost", "random_forest", "prophet_like"], + ) + def test_feature_aware_models_routed_to_scope_guard(self, model_type: str) -> None: + """PRP-36 — every feature-aware model surfaces the scope guard 422 message.""" + with pytest.raises(ValueError, match="baseline models only"): + explainer_factory(model_type) + + +class TestWeightedMovingAverageExplainer: + """PRP-36 — h=1 forecast equals the matching forecaster's prediction.""" + + def test_explanation_matches_forecaster_linear(self) -> None: + from app.features.explainability.explainers import ( + WeightedMovingAverageExplainer, + ) + from app.features.forecasting.models import WeightedMovingAverageForecaster + + y = np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) + forecaster = WeightedMovingAverageForecaster(window_size=7).fit(y) + explainer = WeightedMovingAverageExplainer(window_size=7) + forecast, drivers = explainer.explain(y) + assert forecast == pytest.approx(float(forecaster.predict(1)[0])) + names = [d.name for d in drivers] + assert "weighted_window_mean" in names + + def test_explanation_matches_forecaster_exponential(self) -> None: + from app.features.explainability.explainers import ( + WeightedMovingAverageExplainer, + ) + from app.features.forecasting.models import WeightedMovingAverageForecaster + + y = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + forecaster = WeightedMovingAverageForecaster( + window_size=5, weight_strategy="exponential", decay=0.5 + ).fit(y) + explainer = WeightedMovingAverageExplainer( + window_size=5, weight_strategy="exponential", decay=0.5 + ) + forecast, _ = explainer.explain(y) + assert forecast == pytest.approx(float(forecaster.predict(1)[0])) + + +class TestSeasonalAverageExplainer: + """PRP-36 — h=1 forecast equals the matching forecaster's prediction.""" + + def test_explanation_matches_forecaster_on_weekly_cycle(self) -> None: + from app.features.explainability.explainers import SeasonalAverageExplainer + from app.features.forecasting.models import SeasonalAverageForecaster + + weekly = np.array([10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0]) + y = np.tile(weekly, 4) + forecaster = SeasonalAverageForecaster(season_length=7, lookback_cycles=4).fit(y) + explainer = SeasonalAverageExplainer(season_length=7, lookback_cycles=4) + forecast, _ = explainer.explain(y) + assert forecast == pytest.approx(float(forecaster.predict(1)[0])) diff --git a/app/features/forecasting/feature_metadata.py b/app/features/forecasting/feature_metadata.py index 479c098b..cbfb942b 100644 --- a/app/features/forecasting/feature_metadata.py +++ b/app/features/forecasting/feature_metadata.py @@ -43,6 +43,10 @@ "naive": ModelFamily.BASELINE, "seasonal_naive": ModelFamily.BASELINE, "moving_average": ModelFamily.BASELINE, + "weighted_moving_average": ModelFamily.BASELINE, + "seasonal_average": ModelFamily.BASELINE, + "trend_regression_baseline": ModelFamily.ADDITIVE, + "random_forest": ModelFamily.TREE, "regression": ModelFamily.TREE, "lightgbm": ModelFamily.TREE, "xgboost": ModelFamily.TREE, diff --git a/app/features/forecasting/models.py b/app/features/forecasting/models.py index 07be30a5..1cb3ba2a 100644 --- a/app/features/forecasting/models.py +++ b/app/features/forecasting/models.py @@ -89,6 +89,87 @@ class FitResult: metrics: dict[str, float] = field(default_factory=lambda: {}) +# --------------------------------------------------------------------------- +# Shared PRP-36 helpers (reused by the forecasters AND the explainers). +# +# Centralising these here means the explainer's h=1 math always matches the +# forecaster's predict() math byte-for-byte — no two-place drift when a +# default changes. These are pure functions: no I/O, no state. +# --------------------------------------------------------------------------- + + +def compute_weighted_average_weights( + window_size: int, + weight_strategy: Literal["linear", "exponential"], + decay: float, +) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: + """Build the weight vector :class:`WeightedMovingAverageForecaster` applies. + + ``'linear'`` → ``np.arange(1, window_size+1)`` (newest = ``window_size``). + ``'exponential'`` → ``decay ** np.arange(window_size-1, -1, -1)`` (newest = 1.0). + """ + if weight_strategy == "linear": + return np.arange(1, window_size + 1, dtype=np.float64) + return np.power(decay, np.arange(window_size - 1, -1, -1, dtype=np.float64)) + + +def compute_seasonal_average_for_offset( + history: np.ndarray[Any, np.dtype[np.floating[Any]]], + season_length: int, + lookback_cycles: int, + target_offset: int, + trim_outliers: bool, +) -> tuple[float, list[float], np.ndarray[Any, np.dtype[np.floating[Any]]]]: + """Compute the seasonal-average forecast for one ``target_offset``. + + Mirrors :meth:`SeasonalAverageForecaster.predict` exactly for a single + horizon step. Returns ``(forecast, samples_used, samples_after_trim)`` + so callers can report whichever array they need: + + - ``forecast`` — the mean reported by the forecaster. + - ``samples_used`` — the raw samples drawn from ``history``. + - ``samples_after_trim`` — the array the mean was actually computed + from (equal to ``samples_used`` when ``trim_outliers`` is off or + ``len(samples) < 4``). + """ + samples: list[float] = [] + for k in range(1, lookback_cycles + 1): + idx_from_end = k * season_length - target_offset + if 0 <= idx_from_end < history.size: + samples.append(float(history[history.size - 1 - idx_from_end])) + if not samples: + fallback = float(history[-1]) + fallback_arr = np.asarray([fallback], dtype=np.float64) + return fallback, [fallback], fallback_arr + arr = np.asarray(samples, dtype=np.float64) + if trim_outliers and arr.size >= 4: + arr = np.sort(arr)[1:-1] + return float(arr.mean()), samples, arr + + +def build_trend_baseline_design_row( + elapsed_day: int, + include_dow: bool, + include_month: bool, +) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: + """Build one design row matching :class:`TrendRegressionBaselineForecaster`. + + Layout: ``[elapsed_day, (dow_one_hot x7)?, (month_one_hot x12)?]``. + + Synthetic encodings: ``elapsed_day % 7`` for dow, ``(elapsed_day // 30) % 12`` + for month. Calendar-agnostic and deterministic — see the forecaster's + docstring for the rationale. + """ + cols: list[float] = [float(elapsed_day)] + if include_dow: + dow = elapsed_day % 7 + cols.extend(1.0 if i == dow else 0.0 for i in range(7)) + if include_month: + month = (elapsed_day // 30) % 12 + cols.extend(1.0 if i == month else 0.0 for i in range(12)) + return np.asarray(cols, dtype=np.float64) + + class BaseForecaster(ABC): """Abstract base class for all forecasting models. @@ -480,6 +561,462 @@ def set_params(self, **params: Any) -> MovingAverageForecaster: # noqa: ANN401 return self +class WeightedMovingAverageForecaster(BaseForecaster): + """Target-only baseline: weighted average of the last ``window_size`` observations. + + Formula (constant for every horizon step): + ``y_hat[t+h] = np.average(y[-W:], weights=W_strategy)`` for all h. + + Two weight strategies are exposed via ``weight_strategy``: + + - ``'linear'`` → ``weights = np.arange(1, W+1)`` — newest observation + weighted highest (= ``W``), oldest weighted lowest (= ``1``). + - ``'exponential'`` → ``weights = decay ** np.arange(W-1, -1, -1)`` — + geometric decay; newest observation weighted ``decay**0 = 1.0``. + + CRITICAL: like :class:`MovingAverageForecaster`, this baseline does NOT + update recursively — every horizon step gets the same weighted mean. + """ + + requires_features: ClassVar[bool] = False + + def __init__( + self, + *, + window_size: int = 7, + weight_strategy: Literal["linear", "exponential"] = "linear", + decay: float = 0.7, + random_state: int = 42, + ) -> None: + """Initialize the weighted moving average forecaster. + + Args: + window_size: Number of trailing observations to average (>=2). + weight_strategy: Either ``'linear'`` or ``'exponential'``. + decay: Geometric decay factor for ``'exponential'``; must lie in + ``(0.0, 1.0)``. Ignored for ``'linear'``. + random_state: Random seed for reproducibility (unused but kept + for interface consistency). + + Raises: + ValueError: If ``window_size < 2``, if ``weight_strategy`` is + unknown, or if ``decay`` is outside ``(0.0, 1.0)``. + """ + super().__init__(random_state) + if window_size < 2: + raise ValueError( + f"window_size must be >= 2, got {window_size}. " + "A weighted moving average needs at least two observations." + ) + if weight_strategy not in ("linear", "exponential"): + raise ValueError( + f"weight_strategy must be 'linear' or 'exponential', got {weight_strategy!r}." + ) + if not 0.0 < decay < 1.0: + raise ValueError(f"decay must lie in (0.0, 1.0), got {decay}.") + self.window_size = window_size + self.weight_strategy: Literal["linear", "exponential"] = weight_strategy + self.decay = decay + self._weights: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None + self._forecast_value: float = 0.0 + + def fit( + self, + y: np.ndarray[Any, np.dtype[np.floating[Any]]], + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, # noqa: ARG002 + ) -> WeightedMovingAverageForecaster: + """Fit by computing the weighted mean of the last ``window_size`` values. + + Args: + y: Target values (1D array). + X: Ignored for the weighted moving average baseline. + + Returns: + self (for method chaining). + + Raises: + ValueError: If ``len(y) < window_size``. + """ + y_arr = np.asarray(y, dtype=np.float64) + if y_arr.size < self.window_size: + raise ValueError(f"Need at least {self.window_size} observations, got {y_arr.size}") + tail = y_arr[-self.window_size :] + # PRP-36 — weight vector built via the shared helper so the + # explainer reuses the identical formula. + self._weights = compute_weighted_average_weights( + window_size=self.window_size, + weight_strategy=self.weight_strategy, + decay=self.decay, + ) + self._last_values = tail + self._forecast_value = float(np.average(tail, weights=self._weights)) + self._is_fitted = True + return self + + def predict( + self, + horizon: int, + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, # noqa: ARG002 + ) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: + """Predict the constant weighted mean for every horizon step.""" + if not self._is_fitted: + raise RuntimeError("Model must be fitted before predict") + return np.full(horizon, self._forecast_value, dtype=np.float64) + + def get_params(self) -> dict[str, Any]: + """Return constructor parameters (sklearn convention).""" + return { + "window_size": self.window_size, + "weight_strategy": self.weight_strategy, + "decay": self.decay, + "random_state": self.random_state, + } + + def set_params(self, **params: Any) -> WeightedMovingAverageForecaster: # noqa: ANN401 + """Set constructor parameters (sklearn convention).""" + for key, value in params.items(): + setattr(self, key, value) + return self + + +class SeasonalAverageForecaster(BaseForecaster): + """Target-only baseline: average of prior matching seasonal positions. + + For horizon day ``j`` (1-based) with season length ``S``, the forecaster + averages the historical values at offsets ``{j - k*S}`` for ``k`` in + ``[1..lookback_cycles]`` that fall inside the stored history. With + ``trim_outliers=True`` and ≥4 samples, the per-bucket sample drops its + min and max before averaging. + + Compared to :class:`SeasonalNaiveForecaster` (which copies the value + from a single prior cycle position), this baseline averages across + multiple prior cycles — more robust on noisy series. + """ + + requires_features: ClassVar[bool] = False + + def __init__( + self, + *, + season_length: int = 7, + lookback_cycles: int = 4, + trim_outliers: bool = False, + random_state: int = 42, + ) -> None: + """Initialize the seasonal-average forecaster. + + Args: + season_length: Seasonality period in days (must be >= 2). + lookback_cycles: Number of trailing cycles to draw samples from + (must be >= 2). + trim_outliers: If True, drop the min + max sample per bucket + before averaging. Requires ≥4 samples to apply. + random_state: Random seed (unused, kept for interface parity). + + Raises: + ValueError: If ``season_length < 2`` or ``lookback_cycles < 2``. + """ + super().__init__(random_state) + if season_length < 2: + raise ValueError(f"season_length must be >= 2, got {season_length}.") + if lookback_cycles < 2: + raise ValueError(f"lookback_cycles must be >= 2, got {lookback_cycles}.") + self.season_length = season_length + self.lookback_cycles = lookback_cycles + self.trim_outliers = trim_outliers + self._history: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None + + def fit( + self, + y: np.ndarray[Any, np.dtype[np.floating[Any]]], + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, # noqa: ARG002 + ) -> SeasonalAverageForecaster: + """Store the last ``season_length * lookback_cycles`` observations.""" + y_arr = np.asarray(y, dtype=np.float64) + min_required = self.season_length * 2 + if y_arr.size < min_required: + raise ValueError( + f"Need at least {min_required} observations " + f"(season_length={self.season_length} * 2), got {y_arr.size}" + ) + window = self.season_length * self.lookback_cycles + # Keep only the trailing cycles relevant for sampling; if fewer + # observations exist, retain what's available so predict() still + # produces a sensible mean. + self._history = y_arr[-window:] if y_arr.size > window else y_arr.copy() + self._is_fitted = True + return self + + def predict( + self, + horizon: int, + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, # noqa: ARG002 + ) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: + """Average matching seasonal positions for every horizon step.""" + if not self._is_fitted or self._history is None: + raise RuntimeError("Model must be fitted before predict") + out = np.zeros(horizon, dtype=np.float64) + for j in range(horizon): + # PRP-36 — single source of truth for the h=j+1 math. The + # explainer reuses ``compute_seasonal_average_for_offset`` so + # the two paths never drift. + forecast_value, _samples_used, _samples_after_trim = ( + compute_seasonal_average_for_offset( + history=self._history, + season_length=self.season_length, + lookback_cycles=self.lookback_cycles, + target_offset=j + 1, # 1-based horizon day index + trim_outliers=self.trim_outliers, + ) + ) + out[j] = forecast_value + return out + + def get_params(self) -> dict[str, Any]: + """Return constructor parameters (sklearn convention).""" + return { + "season_length": self.season_length, + "lookback_cycles": self.lookback_cycles, + "trim_outliers": self.trim_outliers, + "random_state": self.random_state, + } + + def set_params(self, **params: Any) -> SeasonalAverageForecaster: # noqa: ANN401 + """Set constructor parameters (sklearn convention).""" + for key, value in params.items(): + setattr(self, key, value) + return self + + +class TrendRegressionBaselineForecaster(BaseForecaster): + """Target-only Ridge baseline: elapsed-day index + optional calendar one-hots. + + Builds its own design matrix from a synthetic elapsed-day index (and, + optionally, day-of-week / month one-hot columns). Unlike + :class:`RegressionForecaster`, this forecaster does NOT consume the V1 + or V2 feature frame — its features are purely calendar-derived inside + ``fit``/``predict``. ``requires_features`` stays ``False``. + + Ridge is deterministic by construction (closed-form solver); a fixed + ``random_state`` is kept for interface parity but never sampled. + """ + + requires_features: ClassVar[bool] = False + + def __init__( + self, + *, + alpha: float = 1.0, + include_dow: bool = True, + include_month: bool = True, + random_state: int = 42, + ) -> None: + """Initialize the trend regression baseline.""" + super().__init__(random_state) + if alpha < 0.0: + raise ValueError(f"alpha must be >= 0, got {alpha}.") + self.alpha = alpha + self.include_dow = include_dow + self.include_month = include_month + self._ridge: Ridge | None = None + self._n_train: int = 0 + + # ---------------------------------------------------------------- design + + def _design_row(self, elapsed_day: int) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: + """Build a single design row. + + Thin wrapper over :func:`build_trend_baseline_design_row` — the + explainer calls the module-level helper directly so the training + and explanation paths share one source of truth for the encoding. + """ + return build_trend_baseline_design_row( + elapsed_day=elapsed_day, + include_dow=self.include_dow, + include_month=self.include_month, + ) + + def _design_matrix( + self, + start_day: int, + n_rows: int, + ) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: + rows = [self._design_row(start_day + i) for i in range(n_rows)] + return np.vstack(rows) + + # --------------------------------------------------------------- fit/pred + + def fit( + self, + y: np.ndarray[Any, np.dtype[np.floating[Any]]], + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, # noqa: ARG002 + ) -> TrendRegressionBaselineForecaster: + """Fit Ridge on a synthetic elapsed-day design matrix.""" + y_arr = np.asarray(y, dtype=np.float64) + if y_arr.size < 2: + raise ValueError(f"Need at least 2 observations to fit a trend, got {y_arr.size}.") + # Synthetic elapsed-day index aligned to the historical positions. + X_train = self._design_matrix(start_day=0, n_rows=y_arr.size) + self._ridge = Ridge(alpha=self.alpha, random_state=self.random_state) + self._ridge.fit(X_train, y_arr) + self._n_train = int(y_arr.size) + self._is_fitted = True + return self + + def predict( + self, + horizon: int, + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, # noqa: ARG002 + ) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: + """Predict horizon steps using the elapsed-day continuation.""" + if not self._is_fitted or self._ridge is None: + raise RuntimeError("Model must be fitted before predict") + X_future = self._design_matrix(start_day=self._n_train, n_rows=horizon) + result = self._ridge.predict(X_future) + return np.asarray(result, dtype=np.float64) + + def get_params(self) -> dict[str, Any]: + """Return constructor parameters (sklearn convention).""" + return { + "alpha": self.alpha, + "include_dow": self.include_dow, + "include_month": self.include_month, + "random_state": self.random_state, + } + + def set_params(self, **params: Any) -> TrendRegressionBaselineForecaster: # noqa: ANN401 + """Set constructor parameters (sklearn convention).""" + for key, value in params.items(): + setattr(self, key, value) + return self + + +class RandomForestForecaster(BaseForecaster): + """Feature-aware forecaster wrapping ``sklearn.ensemble.RandomForestRegressor``. + + Optional, gated by ``forecast_enable_random_forest`` in settings (the + factory enforces the gate). Unlike :class:`RegressionForecaster`, the + wrapped estimator DOES expose ``feature_importances_`` — verified at + PRP-create time (sklearn 1.8.0) — so the + :func:`extract_feature_importance` tree branch handles it without a + new special case. + + Determinism recipe (verified): ``random_state`` is fixed AND ``n_jobs=1``. + Never set ``n_jobs > 1``; thread-parallel tree fitting introduces + nondeterminism. ``predict`` accepts the future feature matrix the + forecasting service builds via the V1 (or, once #299 lands, V2) row + builders — identical contract to :class:`RegressionForecaster`. + """ + + requires_features: ClassVar[bool] = True + + def __init__( + self, + *, + n_estimators: int = 100, + max_depth: int | None = 10, + min_samples_leaf: int = 2, + random_state: int = 42, + ) -> None: + """Initialize the RandomForest forecaster. + + Args: + n_estimators: Number of trees in the forest. + max_depth: Maximum depth per tree (``None`` = unlimited). + min_samples_leaf: Minimum samples required at a leaf. + random_state: Random seed (REQUIRED for determinism; combined + with ``n_jobs=1`` it gives byte-identical fits). + """ + super().__init__(random_state) + if n_estimators < 1: + raise ValueError(f"n_estimators must be >= 1, got {n_estimators}.") + if max_depth is not None and max_depth < 1: + raise ValueError(f"max_depth must be >= 1 or None, got {max_depth}.") + if min_samples_leaf < 1: + raise ValueError(f"min_samples_leaf must be >= 1, got {min_samples_leaf}.") + self.n_estimators = n_estimators + self.max_depth = max_depth + self.min_samples_leaf = min_samples_leaf + # Lazy import — RandomForestRegressor is a top-level sklearn class but + # we still mirror the existing pattern of constructing the estimator + # at ``fit`` time so unit tests can patch the import surface cleanly. + self._estimator: Any = None + self._feature_columns: list[str] | None = None + self._n_features_in: int = 0 + + def fit( + self, + y: np.ndarray[Any, np.dtype[np.floating[Any]]], + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, + ) -> RandomForestForecaster: + """Fit on a feature matrix ``X`` and target vector ``y``.""" + if X is None: + raise ValueError( + "RandomForestForecaster requires a non-None X feature matrix; " + "this is a feature-aware model." + ) + y_arr = np.asarray(y, dtype=np.float64) + X_arr = np.asarray(X, dtype=np.float64) + if X_arr.ndim != 2: + raise ValueError(f"X must be a 2-D feature matrix, got shape {X_arr.shape}.") + if X_arr.shape[0] != y_arr.size: + raise ValueError( + f"X / y row count mismatch: X has {X_arr.shape[0]}, y has {y_arr.size}." + ) + # Lazy import keeps the module-load surface stable. + from sklearn.ensemble import ( # pyright: ignore[reportMissingTypeStubs] + RandomForestRegressor, + ) + + self._estimator = RandomForestRegressor( + n_estimators=self.n_estimators, + max_depth=self.max_depth, + min_samples_leaf=self.min_samples_leaf, + random_state=self.random_state, + n_jobs=1, # REQUIRED for determinism; never widen this. + ) + self._estimator.fit(X_arr, y_arr) + self._n_features_in = int(X_arr.shape[1]) + self._is_fitted = True + return self + + def predict( + self, + horizon: int, # noqa: ARG002 — horizon is implied by X.shape[0] + X: np.ndarray[Any, np.dtype[np.floating[Any]]] | None = None, + ) -> np.ndarray[Any, np.dtype[np.floating[Any]]]: + """Predict using the supplied future feature matrix.""" + if not self._is_fitted or self._estimator is None: + raise RuntimeError("Model must be fitted before predict") + if X is None: + raise ValueError("RandomForestForecaster.predict requires a non-None X feature matrix.") + X_arr = np.asarray(X, dtype=np.float64) + if X_arr.ndim != 2: + raise ValueError(f"X must be a 2-D feature matrix, got shape {X_arr.shape}.") + if X_arr.shape[1] != self._n_features_in: + raise ValueError( + f"X column count mismatch: trained on {self._n_features_in} " + f"columns, predict received {X_arr.shape[1]}." + ) + result = self._estimator.predict(X_arr) + return np.asarray(result, dtype=np.float64) + + def get_params(self) -> dict[str, Any]: + """Return constructor parameters (sklearn convention).""" + return { + "n_estimators": self.n_estimators, + "max_depth": self.max_depth, + "min_samples_leaf": self.min_samples_leaf, + "random_state": self.random_state, + } + + def set_params(self, **params: Any) -> RandomForestForecaster: # noqa: ANN401 + """Set constructor parameters (sklearn convention).""" + for key, value in params.items(): + setattr(self, key, value) + return self + + class RegressionForecaster(BaseForecaster): """Feature-driven forecaster wrapping ``HistGradientBoostingRegressor``. @@ -1129,9 +1666,22 @@ def set_params(self, **params: Any) -> ProphetLikeForecaster: # noqa: ANN401 return self -# Type alias for model type literals +# Type alias for model type literals — keep in sync with ``_MODEL_FAMILY_MAP`` +# and the ``ModelConfig`` discriminated union. The +# ``test_model_family_map_covers_every_known_model_type`` test walks +# ``get_args(ModelType)`` to catch drift. ModelType = Literal[ - "naive", "seasonal_naive", "moving_average", "xgboost", "lightgbm", "regression", "prophet_like" + "naive", + "seasonal_naive", + "moving_average", + "weighted_moving_average", # PRP-36 + "seasonal_average", # PRP-36 + "trend_regression_baseline", # PRP-36 + "random_forest", # PRP-36 (optional) + "xgboost", + "lightgbm", + "regression", + "prophet_like", ] @@ -1174,6 +1724,55 @@ def model_factory(config: ModelConfig, random_state: int = 42) -> BaseForecaster random_state=random_state, ) raise ValueError("Invalid config type for moving_average") + elif model_type == "weighted_moving_average": + from app.features.forecasting.schemas import WeightedMovingAverageModelConfig + + if isinstance(config, WeightedMovingAverageModelConfig): + return WeightedMovingAverageForecaster( + window_size=config.window_size, + weight_strategy=config.weight_strategy, + decay=config.decay, + random_state=random_state, + ) + raise ValueError("Invalid config type for weighted_moving_average") + elif model_type == "seasonal_average": + from app.features.forecasting.schemas import SeasonalAverageModelConfig + + if isinstance(config, SeasonalAverageModelConfig): + return SeasonalAverageForecaster( + season_length=config.season_length, + lookback_cycles=config.lookback_cycles, + trim_outliers=config.trim_outliers, + random_state=random_state, + ) + raise ValueError("Invalid config type for seasonal_average") + elif model_type == "trend_regression_baseline": + from app.features.forecasting.schemas import TrendRegressionBaselineModelConfig + + if isinstance(config, TrendRegressionBaselineModelConfig): + return TrendRegressionBaselineForecaster( + alpha=config.alpha, + include_dow=config.include_dow, + include_month=config.include_month, + random_state=random_state, + ) + raise ValueError("Invalid config type for trend_regression_baseline") + elif model_type == "random_forest": + if not settings.forecast_enable_random_forest: + raise ValueError( + "random_forest is not enabled. Set forecast_enable_random_forest=True " + "in settings (PRP-36 — optional feature-aware model)." + ) + from app.features.forecasting.schemas import RandomForestModelConfig + + if isinstance(config, RandomForestModelConfig): + return RandomForestForecaster( + n_estimators=config.n_estimators, + max_depth=config.max_depth, + min_samples_leaf=config.min_samples_leaf, + random_state=random_state, + ) + raise ValueError("Invalid config type for random_forest") elif model_type == "lightgbm": if not settings.forecast_enable_lightgbm: raise ValueError( diff --git a/app/features/forecasting/schemas.py b/app/features/forecasting/schemas.py index 1223f8b9..8dd06c98 100644 --- a/app/features/forecasting/schemas.py +++ b/app/features/forecasting/schemas.py @@ -107,6 +107,153 @@ class MovingAverageModelConfig(ModelConfigBase): ) +class WeightedMovingAverageModelConfig(ModelConfigBase): + """Configuration for the weighted moving average baseline (PRP-36). + + Always-on target-only baseline. The fitted forecaster computes a + weighted mean of the last ``window_size`` observations and emits it + for every horizon step (no recursive update). + + Two weight strategies are supported: + + - ``'linear'`` → ``weights = np.arange(1, window_size+1)`` — most recent + observation weighted highest, oldest weighted lowest. + - ``'exponential'`` → ``weights = np.power(decay, np.arange(window_size-1, -1, -1))`` + — geometric decay from the most recent observation. + + Attributes: + window_size: Number of trailing observations included in the average. + weight_strategy: Either ``'linear'`` or ``'exponential'``. + decay: Geometric decay factor for the ``'exponential'`` strategy + (ignored when ``weight_strategy='linear'``). + """ + + model_type: Literal["weighted_moving_average"] = "weighted_moving_average" + window_size: int = Field( + default=7, + ge=2, + le=90, + description="Number of trailing observations to average", + ) + weight_strategy: Literal["linear", "exponential"] = Field( + default="linear", + description="Weighting scheme: 'linear' or 'exponential'", + ) + decay: float = Field( + default=0.7, + gt=0.0, + lt=1.0, + description="Geometric decay factor (used only for weight_strategy='exponential')", + ) + + +class SeasonalAverageModelConfig(ModelConfigBase): + """Configuration for the seasonal-average baseline (PRP-36). + + Always-on target-only baseline. For horizon day ``j`` with season + length ``S``, the fitted forecaster averages the values at offsets + ``{j - k*S}`` for ``k`` in ``[1..lookback_cycles]`` that fall inside + the stored history. With ``trim_outliers=True`` the per-bucket sample + drops its min and max before averaging (requires ≥4 samples to apply). + + Attributes: + season_length: Seasonality period in days (default 7 = weekly). + lookback_cycles: Number of trailing cycles to draw samples from. + trim_outliers: If True, drop the min + max sample before averaging. + """ + + model_type: Literal["seasonal_average"] = "seasonal_average" + season_length: int = Field( + default=7, + ge=2, + le=365, + description="Seasonality period in days", + ) + lookback_cycles: int = Field( + default=4, + ge=2, + le=12, + description="Number of trailing cycles to draw samples from", + ) + trim_outliers: bool = Field( + default=False, + description="If True, drop the min + max sample before averaging (requires ≥4 samples)", + ) + + +class TrendRegressionBaselineModelConfig(ModelConfigBase): + """Configuration for the Ridge trend baseline (PRP-36). + + Target-only Ridge regressor over an elapsed-day index plus optional + calendar one-hots (day-of-week, month). Does NOT consume the V1 or + V2 feature frame — its features are purely calendar-derived inside + the forecaster. ``requires_features`` stays ``False``. + + Attributes: + alpha: Ridge L2 regularization strength. + include_dow: If True, include a 7-column day-of-week one-hot. + include_month: If True, include a 12-column month-of-year one-hot. + """ + + model_type: Literal["trend_regression_baseline"] = "trend_regression_baseline" + alpha: float = Field( + default=1.0, + ge=0.0, + le=1000.0, + description="Ridge L2 regularization strength", + ) + include_dow: bool = Field( + default=True, + description="If True, include a day-of-week one-hot in the design matrix", + ) + include_month: bool = Field( + default=True, + description="If True, include a month-of-year one-hot in the design matrix", + ) + + +class RandomForestModelConfig(ModelConfigBase): + """Configuration for the sklearn RandomForest feature-aware forecaster (PRP-36). + + Optional, gated by ``forecast_enable_random_forest`` in settings. Wraps + ``sklearn.ensemble.RandomForestRegressor`` with ``n_jobs=1`` (required + for determinism) and a fixed ``random_state``. Unlike + ``HistGradientBoostingRegressor``, ``RandomForestRegressor`` DOES expose + ``feature_importances_`` — so ``extract_feature_importance`` returns a + 1-D importance vector matching ``feature_columns``. + + Attributes: + n_estimators: Number of trees in the forest. + max_depth: Maximum depth per tree (``None`` = unlimited). + min_samples_leaf: Minimum samples required to be at a leaf node. + feature_config_hash: Optional hash of the feature contract used. + """ + + model_type: Literal["random_forest"] = "random_forest" + n_estimators: int = Field( + default=100, + ge=10, + le=500, + description="Number of trees in the forest", + ) + max_depth: int | None = Field( + default=10, + ge=2, + le=64, + description="Maximum depth per tree (None = unlimited)", + ) + min_samples_leaf: int = Field( + default=2, + ge=1, + le=50, + description="Minimum samples required to be at a leaf node", + ) + feature_config_hash: str | None = Field( + default=None, + description="Hash of the feature contract used for training", + ) + + class LightGBMModelConfig(ModelConfigBase): """Configuration for LightGBM regressor (feature-flagged). @@ -271,6 +418,10 @@ class ProphetLikeModelConfig(ModelConfigBase): NaiveModelConfig | SeasonalNaiveModelConfig | MovingAverageModelConfig + | WeightedMovingAverageModelConfig + | SeasonalAverageModelConfig + | TrendRegressionBaselineModelConfig + | RandomForestModelConfig | LightGBMModelConfig | XGBoostModelConfig | RegressionModelConfig diff --git a/app/features/forecasting/tests/test_feature_metadata.py b/app/features/forecasting/tests/test_feature_metadata.py index 618bcc91..721a0efb 100644 --- a/app/features/forecasting/tests/test_feature_metadata.py +++ b/app/features/forecasting/tests/test_feature_metadata.py @@ -77,12 +77,20 @@ def _feature_columns() -> list[str]: def test_model_family_for_maps_baseline_types_to_baseline() -> None: - for mt in ("naive", "seasonal_naive", "moving_average"): + # PRP-36 — weighted_moving_average + seasonal_average join the baselines. + for mt in ( + "naive", + "seasonal_naive", + "moving_average", + "weighted_moving_average", + "seasonal_average", + ): assert model_family_for(mt) == ModelFamily.BASELINE def test_model_family_for_maps_tree_types_to_tree() -> None: - for mt in ("regression", "lightgbm", "xgboost"): + # PRP-36 — random_forest joins the tree family. + for mt in ("regression", "lightgbm", "xgboost", "random_forest"): assert model_family_for(mt) == ModelFamily.TREE @@ -90,6 +98,11 @@ def test_model_family_for_maps_prophet_like_to_additive() -> None: assert model_family_for("prophet_like") == ModelFamily.ADDITIVE +def test_model_family_for_maps_trend_regression_baseline_to_additive() -> None: + """PRP-36 — Ridge trend baseline is ADDITIVE (matches prophet_like lineage).""" + assert model_family_for("trend_regression_baseline") == ModelFamily.ADDITIVE + + def test_model_family_for_unknown_returns_baseline() -> None: """An unknown model_type logs a warning and degrades to BASELINE.""" assert model_family_for("future_arima_v9") == ModelFamily.BASELINE diff --git a/app/features/forecasting/tests/test_random_forest_forecaster.py b/app/features/forecasting/tests/test_random_forest_forecaster.py new file mode 100644 index 00000000..1f1fc508 --- /dev/null +++ b/app/features/forecasting/tests/test_random_forest_forecaster.py @@ -0,0 +1,143 @@ +"""Tests for :class:`RandomForestForecaster` (PRP-36 — optional feature-aware model).""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from app.features.forecasting.models import ( + RandomForestForecaster, + model_factory, +) +from app.features.forecasting.schemas import RandomForestModelConfig + + +def _enabled_settings() -> MagicMock: + """Return a settings mock with the random_forest flag flipped on.""" + s = MagicMock() + s.forecast_enable_random_forest = True + s.forecast_enable_lightgbm = False + s.forecast_enable_xgboost = False + return s + + +@pytest.fixture +def small_feature_matrix() -> tuple[np.ndarray, np.ndarray]: + """Build a deterministic 30-row by 3-column feature matrix + target.""" + rng = np.random.default_rng(seed=42) + X = rng.standard_normal(size=(30, 3)) + # y is a near-linear function of the features plus a small noise term so + # the forest has something to fit. + y = X[:, 0] * 2.0 + X[:, 1] * 0.5 - X[:, 2] + rng.standard_normal(size=30) * 0.1 + return X.astype(np.float64), y.astype(np.float64) + + +class TestRandomForestForecaster: + """Behavioural tests for the sklearn-RandomForest feature-aware model.""" + + def test_requires_features_true(self) -> None: + """RandomForestForecaster is the second feature-aware baseline.""" + assert RandomForestForecaster.requires_features is True + + def test_fit_requires_non_none_X(self) -> None: + """fit() raises when X is None (matches RegressionForecaster contract).""" + model = RandomForestForecaster(n_estimators=10) + with pytest.raises(ValueError, match="requires a non-None X"): + model.fit(np.array([1.0, 2.0, 3.0]), X=None) + + def test_fit_raises_on_row_mismatch( + self, small_feature_matrix: tuple[np.ndarray, np.ndarray] + ) -> None: + """fit() validates X.shape[0] == y.size.""" + X, y = small_feature_matrix + model = RandomForestForecaster(n_estimators=10) + with pytest.raises(ValueError, match="row count mismatch"): + model.fit(y[:-1], X=X) + + def test_predict_requires_non_none_X( + self, small_feature_matrix: tuple[np.ndarray, np.ndarray] + ) -> None: + """predict() raises when X is None (no recursive fallback).""" + X, y = small_feature_matrix + model = RandomForestForecaster(n_estimators=10).fit(y, X=X) + with pytest.raises(ValueError, match="requires a non-None X"): + model.predict(horizon=5, X=None) + + def test_predict_validates_column_count( + self, small_feature_matrix: tuple[np.ndarray, np.ndarray] + ) -> None: + """predict() validates X.shape[1] against the trained column count.""" + X, y = small_feature_matrix + model = RandomForestForecaster(n_estimators=10).fit(y, X=X) + with pytest.raises(ValueError, match="column count mismatch"): + model.predict(horizon=2, X=X[:2, :-1]) + + def test_predict_before_fit_raises(self) -> None: + """predict() before fit() raises RuntimeError.""" + with pytest.raises(RuntimeError, match="must be fitted"): + RandomForestForecaster().predict(horizon=5, X=np.array([[1.0, 2.0]])) + + def test_predict_shape(self, small_feature_matrix: tuple[np.ndarray, np.ndarray]) -> None: + """predict() returns one forecast per row of the future X.""" + X, y = small_feature_matrix + model = RandomForestForecaster(n_estimators=10).fit(y, X=X) + future_X = X[:5] + forecasts = model.predict(horizon=5, X=future_X) + assert forecasts.shape == (5,) + + def test_deterministic_with_seed( + self, small_feature_matrix: tuple[np.ndarray, np.ndarray] + ) -> None: + """random_state + n_jobs=1 give byte-identical predictions.""" + X, y = small_feature_matrix + a = RandomForestForecaster(n_estimators=10, random_state=42).fit(y, X=X) + b = RandomForestForecaster(n_estimators=10, random_state=42).fit(y, X=X) + np.testing.assert_array_equal(a.predict(horizon=5, X=X[:5]), b.predict(horizon=5, X=X[:5])) + + def test_feature_importances_shape_matches_feature_columns( + self, small_feature_matrix: tuple[np.ndarray, np.ndarray] + ) -> None: + """The wrapped estimator exposes a 1-D importance vector of width n_features.""" + X, y = small_feature_matrix + model = RandomForestForecaster(n_estimators=10).fit(y, X=X) + importances = model._estimator.feature_importances_ + assert importances.ndim == 1 + assert importances.shape == (X.shape[1],) + + def test_factory_gate_blocks_when_flag_off(self) -> None: + """model_factory refuses to dispatch random_forest when the flag is off.""" + disabled = MagicMock() + disabled.forecast_enable_random_forest = False + with patch("app.core.config.get_settings", return_value=disabled): + with pytest.raises(ValueError, match="random_forest is not enabled"): + model_factory(RandomForestModelConfig(n_estimators=10), random_state=42) + + def test_factory_creates_random_forest_when_enabled(self) -> None: + """model_factory dispatches the forecaster when the flag is on.""" + with patch("app.core.config.get_settings", return_value=_enabled_settings()): + model = model_factory( + RandomForestModelConfig(n_estimators=50, max_depth=8, min_samples_leaf=3), + random_state=42, + ) + assert isinstance(model, RandomForestForecaster) + assert model.n_estimators == 50 + assert model.max_depth == 8 + assert model.min_samples_leaf == 3 + assert model.random_state == 42 + + def test_invalid_n_estimators_raises(self) -> None: + """n_estimators < 1 surfaces a clear error.""" + with pytest.raises(ValueError, match="n_estimators"): + RandomForestForecaster(n_estimators=0) + + def test_invalid_max_depth_raises(self) -> None: + """max_depth below the minimum surfaces a clear error.""" + with pytest.raises(ValueError, match="max_depth"): + RandomForestForecaster(max_depth=0) + + def test_invalid_min_samples_leaf_raises(self) -> None: + """min_samples_leaf < 1 surfaces a clear error (rounds out the validation branches).""" + with pytest.raises(ValueError, match="min_samples_leaf"): + RandomForestForecaster(min_samples_leaf=0) diff --git a/app/features/forecasting/tests/test_seasonal_average_forecaster.py b/app/features/forecasting/tests/test_seasonal_average_forecaster.py new file mode 100644 index 00000000..42c5bc96 --- /dev/null +++ b/app/features/forecasting/tests/test_seasonal_average_forecaster.py @@ -0,0 +1,116 @@ +"""Tests for :class:`SeasonalAverageForecaster` (PRP-36).""" + +from __future__ import annotations + +import numpy as np +import pytest + +from app.features.forecasting.models import ( + SeasonalAverageForecaster, + model_factory, +) +from app.features.forecasting.schemas import SeasonalAverageModelConfig + + +def _weekly_pattern(n_weeks: int) -> np.ndarray: + """Build ``n_weeks`` weeks of a 7-day pattern [10, 20, ..., 70].""" + pattern = np.array([10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0]) + return np.tile(pattern, n_weeks) + + +class TestSeasonalAverageForecaster: + """Behavioural tests for the seasonal-average baseline.""" + + def test_requires_features_false(self) -> None: + """The seasonal-average baseline is target-only.""" + assert SeasonalAverageForecaster.requires_features is False + + def test_invalid_season_length_raises(self) -> None: + """season_length < 2 surfaces a clear error.""" + with pytest.raises(ValueError, match="season_length must be >= 2"): + SeasonalAverageForecaster(season_length=1) + + def test_invalid_lookback_cycles_raises(self) -> None: + """lookback_cycles < 2 surfaces a clear error.""" + with pytest.raises(ValueError, match="lookback_cycles must be >= 2"): + SeasonalAverageForecaster(lookback_cycles=1) + + def test_fit_raises_on_too_few_observations(self) -> None: + """fit() requires at least 2 * season_length observations.""" + model = SeasonalAverageForecaster(season_length=7) + with pytest.raises(ValueError, match="at least 14"): + model.fit(np.array([1.0] * 10)) + + def test_predict_picks_matching_dow_positions(self) -> None: + """A perfectly-cyclical series forecasts the matching DOW pattern exactly.""" + y = _weekly_pattern(n_weeks=4) + model = SeasonalAverageForecaster(season_length=7, lookback_cycles=4).fit(y) + # Horizon day 1 corresponds to the same DOW as positions + # {y[-7], y[-14], y[-21], y[-28]} — all equal to 10.0 in this pattern. + forecasts = model.predict(horizon=7) + np.testing.assert_array_almost_equal( + forecasts, + [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0], + ) + + def test_predict_shape(self) -> None: + """predict() returns the configured horizon length.""" + y = _weekly_pattern(n_weeks=4) + model = SeasonalAverageForecaster(season_length=7, lookback_cycles=4).fit(y) + assert model.predict(horizon=10).shape == (10,) + + def test_lookback_cycles_smaller_than_history_works(self) -> None: + """The forecaster trims history to ``lookback_cycles * season_length``.""" + y = _weekly_pattern(n_weeks=6) # 42 days + model = SeasonalAverageForecaster(season_length=7, lookback_cycles=2).fit(y) + # Only the last 14 days are sampled, but the cyclical pattern is + # identical so the forecast still matches the canonical week. + forecasts = model.predict(horizon=7) + np.testing.assert_array_almost_equal( + forecasts, + [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0], + ) + + def test_trim_outliers_drops_min_and_max(self) -> None: + """With ``trim_outliers=True``, each per-bucket sample drops min + max.""" + # Build a series where day-1 of each week takes values 5, 10, 100, 50 + # (4 lookback samples → after trim drops 5 and 100, leaves [10, 50] + # → mean = 30.0). Other days repeat a fixed value. + weeks = [] + for w_value in (5.0, 10.0, 100.0, 50.0): + weeks.append([w_value, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]) + y = np.array(weeks, dtype=np.float64).flatten() + + trimmed = SeasonalAverageForecaster( + season_length=7, lookback_cycles=4, trim_outliers=True + ).fit(y) + plain = SeasonalAverageForecaster( + season_length=7, lookback_cycles=4, trim_outliers=False + ).fit(y) + + # Trimmed mean over day-1 samples: drop {5.0, 100.0}, keep {10.0, 50.0} → 30.0 + assert trimmed.predict(horizon=1)[0] == pytest.approx(30.0) + # Plain mean: (5 + 10 + 100 + 50) / 4 = 41.25 + assert plain.predict(horizon=1)[0] == pytest.approx(41.25) + + def test_deterministic_with_seed(self) -> None: + """Two identically-configured fits emit byte-identical forecasts.""" + y = _weekly_pattern(n_weeks=4) + a = SeasonalAverageForecaster(random_state=42).fit(y) + b = SeasonalAverageForecaster(random_state=42).fit(y) + np.testing.assert_array_equal(a.predict(horizon=14), b.predict(horizon=14)) + + def test_predict_before_fit_raises(self) -> None: + """predict() before fit() raises RuntimeError.""" + with pytest.raises(RuntimeError, match="must be fitted"): + SeasonalAverageForecaster().predict(horizon=5) + + def test_factory_creates_seasonal_average(self) -> None: + """model_factory dispatches SeasonalAverageModelConfig.""" + cfg = SeasonalAverageModelConfig(season_length=7, lookback_cycles=3, trim_outliers=True) + model = model_factory(cfg, random_state=99) + assert isinstance(model, SeasonalAverageForecaster) + assert model.season_length == 7 + assert model.lookback_cycles == 3 + assert model.trim_outliers is True + assert model.random_state == 99 diff --git a/app/features/forecasting/tests/test_trend_regression_baseline_forecaster.py b/app/features/forecasting/tests/test_trend_regression_baseline_forecaster.py new file mode 100644 index 00000000..cb885482 --- /dev/null +++ b/app/features/forecasting/tests/test_trend_regression_baseline_forecaster.py @@ -0,0 +1,84 @@ +"""Tests for :class:`TrendRegressionBaselineForecaster` (PRP-36).""" + +from __future__ import annotations + +import numpy as np +import pytest + +from app.features.forecasting.models import ( + TrendRegressionBaselineForecaster, + model_factory, +) +from app.features.forecasting.schemas import TrendRegressionBaselineModelConfig + + +class TestTrendRegressionBaselineForecaster: + """Behavioural tests for the Ridge trend baseline.""" + + def test_requires_features_false(self) -> None: + """The trend baseline is target-only (calendar features built internally).""" + assert TrendRegressionBaselineForecaster.requires_features is False + + def test_fit_raises_on_too_few_observations(self) -> None: + """fit() needs at least two observations to estimate a trend.""" + model = TrendRegressionBaselineForecaster() + with pytest.raises(ValueError, match="at least 2 observations"): + model.fit(np.array([10.0])) + + def test_predict_before_fit_raises(self) -> None: + """predict() before fit() raises RuntimeError.""" + with pytest.raises(RuntimeError, match="must be fitted"): + TrendRegressionBaselineForecaster().predict(horizon=5) + + def test_predict_shape(self) -> None: + """predict() returns the configured horizon length.""" + y = np.linspace(0.0, 30.0, 60) + model = TrendRegressionBaselineForecaster().fit(y) + assert model.predict(horizon=14).shape == (14,) + + def test_perfect_linear_series_extrapolated_within_tolerance(self) -> None: + """A noise-free linear series extrapolates near-perfectly under Ridge.""" + # y = 1 * elapsed_day on a 60-day window. Disable calendar one-hots so + # the design reduces to a single elapsed-day column and Ridge regresses + # the slope cleanly. + n = 60 + y = np.arange(n, dtype=np.float64) + model = TrendRegressionBaselineForecaster( + alpha=0.0, include_dow=False, include_month=False + ).fit(y) + forecasts = model.predict(horizon=10) + expected = np.arange(n, n + 10, dtype=np.float64) + np.testing.assert_allclose(forecasts, expected, atol=1e-6) + + def test_deterministic_with_seed(self) -> None: + """Ridge is closed-form; two identical fits give identical forecasts.""" + y = np.sin(np.linspace(0.0, 10.0, 90)) + np.linspace(0.0, 5.0, 90) + a = TrendRegressionBaselineForecaster(random_state=42).fit(y) + b = TrendRegressionBaselineForecaster(random_state=42).fit(y) + np.testing.assert_array_equal(a.predict(horizon=14), b.predict(horizon=14)) + + def test_dow_toggle_changes_design_matrix_width(self) -> None: + """include_dow expands the design matrix by 7 one-hot columns.""" + y = np.arange(40, dtype=np.float64) + with_dow = TrendRegressionBaselineForecaster(include_dow=True, include_month=False) + without_dow = TrendRegressionBaselineForecaster(include_dow=False, include_month=False) + # Design matrix is built internally — compare the first row width. + row_with = with_dow._design_row(elapsed_day=0) + row_without = without_dow._design_row(elapsed_day=0) + assert row_with.shape[0] - row_without.shape[0] == 7 + + # Both should still fit + predict against the same series. + with_dow.fit(y) + without_dow.fit(y) + assert with_dow.predict(horizon=3).shape == (3,) + assert without_dow.predict(horizon=3).shape == (3,) + + def test_factory_creates_trend_regression_baseline(self) -> None: + """model_factory dispatches TrendRegressionBaselineModelConfig.""" + cfg = TrendRegressionBaselineModelConfig(alpha=2.0, include_dow=False, include_month=True) + model = model_factory(cfg, random_state=7) + assert isinstance(model, TrendRegressionBaselineForecaster) + assert model.alpha == 2.0 + assert model.include_dow is False + assert model.include_month is True + assert model.random_state == 7 diff --git a/app/features/forecasting/tests/test_weighted_moving_average_forecaster.py b/app/features/forecasting/tests/test_weighted_moving_average_forecaster.py new file mode 100644 index 00000000..0af6e7a6 --- /dev/null +++ b/app/features/forecasting/tests/test_weighted_moving_average_forecaster.py @@ -0,0 +1,119 @@ +"""Tests for :class:`WeightedMovingAverageForecaster` (PRP-36).""" + +from __future__ import annotations + +import numpy as np +import pytest + +from app.features.forecasting.models import ( + WeightedMovingAverageForecaster, + model_factory, +) +from app.features.forecasting.schemas import WeightedMovingAverageModelConfig + + +class TestWeightedMovingAverageForecaster: + """Behavioural tests for the weighted-moving-average baseline.""" + + def test_requires_features_false(self) -> None: + """The weighted moving average is a target-only baseline.""" + assert WeightedMovingAverageForecaster.requires_features is False + + def test_fit_raises_on_too_few_observations(self) -> None: + """fit() must reject a series shorter than window_size.""" + model = WeightedMovingAverageForecaster(window_size=7) + with pytest.raises(ValueError, match="at least 7"): + model.fit(np.array([1.0, 2.0, 3.0])) + + def test_invalid_window_size_raises(self) -> None: + """window_size below the minimum surfaces a clear error.""" + with pytest.raises(ValueError, match="window_size must be >= 2"): + WeightedMovingAverageForecaster(window_size=1) + + def test_invalid_weight_strategy_raises(self) -> None: + """Unknown weight strategy surfaces a clear error.""" + with pytest.raises(ValueError, match="weight_strategy must be"): + WeightedMovingAverageForecaster(weight_strategy="quadratic") # type: ignore[arg-type] + + @pytest.mark.parametrize("decay", [-0.1, 0.0, 1.0, 1.5]) + def test_invalid_decay_raises(self, decay: float) -> None: + """decay outside the open (0, 1) interval surfaces a clear error.""" + with pytest.raises(ValueError, match="decay must lie in"): + WeightedMovingAverageForecaster(decay=decay) + + def test_fit_then_predict_shape(self) -> None: + """predict() returns the configured horizon length.""" + y = np.array([10.0, 12.0, 14.0, 16.0, 18.0, 20.0, 22.0]) + model = WeightedMovingAverageForecaster(window_size=7).fit(y) + forecasts = model.predict(horizon=5) + assert forecasts.shape == (5,) + assert np.all(forecasts == forecasts[0]) # constant forecast + + def test_linear_weights_match_np_average(self) -> None: + """Linear-strategy mean matches np.average(weights=1..W) exactly.""" + y = np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) + model = WeightedMovingAverageForecaster(window_size=7, weight_strategy="linear").fit(y) + expected = float(np.average(y, weights=np.arange(1, 8))) + assert model.predict(horizon=1)[0] == pytest.approx(expected) + + def test_exponential_weights_match_np_average(self) -> None: + """Exponential-strategy mean matches np.average(weights=decay**...).""" + y = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + decay = 0.5 + model = WeightedMovingAverageForecaster( + window_size=5, weight_strategy="exponential", decay=decay + ).fit(y) + weights = np.power(decay, np.arange(4, -1, -1)) + expected = float(np.average(y, weights=weights)) + assert model.predict(horizon=1)[0] == pytest.approx(expected) + + def test_deterministic_with_seed(self) -> None: + """Two identically-configured fits emit byte-identical forecasts.""" + y = np.linspace(1.0, 20.0, 20) + a = WeightedMovingAverageForecaster(window_size=7, random_state=42).fit(y) + b = WeightedMovingAverageForecaster(window_size=7, random_state=42).fit(y) + np.testing.assert_array_equal(a.predict(horizon=10), b.predict(horizon=10)) + + def test_predict_before_fit_raises(self) -> None: + """predict() before fit() raises RuntimeError.""" + with pytest.raises(RuntimeError, match="must be fitted"): + WeightedMovingAverageForecaster().predict(horizon=5) + + def test_recent_observations_weighted_higher_than_old_under_linear(self) -> None: + """A trend series biases the linear-weighted mean toward recent values.""" + # Series rising 1..10 — linear weighting should produce a forecast + # closer to the recent end than to the simple mean (5.5). + y = np.arange(1.0, 11.0) + wma = WeightedMovingAverageForecaster(window_size=10, weight_strategy="linear").fit(y) + simple_mean = float(y.mean()) + wma_value = float(wma.predict(horizon=1)[0]) + assert wma_value > simple_mean, ( + f"linear WMA should overweight recent values, got {wma_value} <= {simple_mean}" + ) + + def test_get_set_params_round_trip(self) -> None: + """get_params()/set_params() round-trip the constructor surface.""" + model = WeightedMovingAverageForecaster( + window_size=14, weight_strategy="exponential", decay=0.9 + ) + params = model.get_params() + assert params == { + "window_size": 14, + "weight_strategy": "exponential", + "decay": 0.9, + "random_state": 42, + } + model.set_params(window_size=7) + assert model.window_size == 7 + + def test_factory_creates_weighted_moving_average(self) -> None: + """model_factory dispatches WeightedMovingAverageModelConfig.""" + cfg = WeightedMovingAverageModelConfig( + window_size=10, weight_strategy="exponential", decay=0.6 + ) + model = model_factory(cfg, random_state=123) + assert isinstance(model, WeightedMovingAverageForecaster) + assert model.window_size == 10 + assert model.weight_strategy == "exponential" + assert model.decay == 0.6 + assert model.random_state == 123 diff --git a/app/features/ops/schemas.py b/app/features/ops/schemas.py index 02a8405c..136f6d4a 100644 --- a/app/features/ops/schemas.py +++ b/app/features/ops/schemas.py @@ -7,10 +7,27 @@ """ from datetime import date, datetime +from enum import StrEnum from typing import Literal from pydantic import BaseModel, ConfigDict, Field + +class StaleReason(StrEnum): + """Canonical reasons surfaced on a stale deployment alias (PRP-36). + + Values are stable JSON strings — the wire payload uses the ``str`` + form, the service uses the enum for branching. ``stale_reason`` on + :class:`AliasHealth` keeps the ``str`` shape for back-compat; new + callers should compare to these enum values. + """ + + NEWER_SUCCESS_RUN = "newer_success_run" + ARTIFACT_NOT_VERIFIED = "artifact_not_verified" + RUN_NOT_SUCCESS = "run_not_success" + FEATURE_FRAME_VERSION_MISMATCH = "feature_frame_version_mismatch" + + # ============================================================================= # System & freshness # ============================================================================= @@ -131,12 +148,30 @@ class AliasHealth(BaseModel): ) stale_reason: str | None = Field( None, - description="Human-readable explanation when is_stale is true; null otherwise.", + description=( + "Human-readable explanation when is_stale is true; null otherwise. " + "Values match :class:`StaleReason` (PRP-36): newer_success_run, " + "artifact_not_verified, run_not_success, feature_frame_version_mismatch." + ), ) wape: float | None = Field( None, description="WAPE of the aliased run, when present in its metrics; null otherwise.", ) + alias_feature_frame_version: int | None = Field( + None, + description=( + "PRP-36 — feature_frame_version of the aliased run; null when the run pre-dates PRP-35." + ), + ) + comparable_run_feature_frame_version: int | None = Field( + None, + description=( + "PRP-36 — feature_frame_version of the newer comparable run that " + "triggered a feature_frame_version_mismatch stale-reason. Null " + "when stale_reason != feature_frame_version_mismatch." + ), + ) # ============================================================================= @@ -317,6 +352,21 @@ class ModelHealthEntry(BaseModel): ..., description="Chronological WAPE observations; may carry null gaps.", ) + alias_feature_frame_version: int | None = Field( + None, + description=( + "PRP-36 — feature_frame_version of the most recent successful run " + "for this grain; null when pre-PRP-35." + ), + ) + comparable_run_feature_frame_version: int | None = Field( + None, + description=( + "PRP-36 — feature_frame_version of the run that would replace the " + "current alias under a feature_frame_version_mismatch verdict. " + "Null when no V-mismatch comparator exists." + ), + ) class ModelHealthResponse(BaseModel): diff --git a/app/features/ops/service.py b/app/features/ops/service.py index e88e2054..43c59318 100644 --- a/app/features/ops/service.py +++ b/app/features/ops/service.py @@ -29,6 +29,7 @@ RetrainingCandidate, RetrainingCandidatesResponse, RunHealth, + StaleReason, StatusCount, SystemHealth, WapePoint, @@ -134,29 +135,83 @@ def classify_drift( return "stable", delta +def _run_feature_frame_version(run: ModelRun) -> int: + """Read ``feature_frame_version`` from ``run.runtime_info`` JSONB (PRP-36). + + Returns the int when the key is present, else **V=1**. This is the + same load-bearing back-compat seam as + :meth:`RegistryService._feature_frame_version_filter` — a legacy run + that pre-dates PRP-35 is treated as V=1 everywhere in the ops layer + so that ``_alias_staleness`` doesn't fabricate a + ``feature_frame_version_mismatch`` between a legacy alias and an + explicit-V=1 comparable run. + + Notes: + The schema-side :attr:`RunResponse.feature_frame_version` deliberately + keeps ``None`` for "key absent" — UIs that need to distinguish + "no V info" from "V=1" can do so off the response, while internal + comparison logic uses this normalized helper. + """ + info = run.runtime_info or {} + value = info.get("feature_frame_version") + if isinstance(value, int) and value in (1, 2): + return value + return 1 + + def _alias_staleness( run: ModelRun, latest_success_by_grain: dict[tuple[int, int], ModelRun], -) -> tuple[bool, str | None]: - """Decide whether an aliased run is stale, and why. +) -> tuple[bool, str | None, int, int | None]: + """Decide whether an aliased run is stale, and why (PRP-36). - An alias is stale when its run is no longer a successful run, or when a - newer successful run exists for the same ``(store, product)`` grain — the - industry-standard alias-staleness check (cf. MLflow alias governance). + An alias is stale when: + 1. its run is no longer a successful run, OR + 2. a newer successful run exists for the same ``(store, product)`` + grain — the industry-standard alias-staleness check, OR + 3. a comparable run exists at a DIFFERENT ``feature_frame_version`` + from the alias's run (PRP-36 — V1 vs V2 mismatch). + + The V-mismatch branch fires whenever an alias's run V_a differs from + the latest comparable run V_b — even when timestamps match — because + Slice C surfaces it distinctly from "a newer run exists". Args: run: The model run the alias points at. latest_success_by_grain: Latest successful run keyed by (store, product). Returns: - A ``(is_stale, reason)`` tuple; ``reason`` is None when not stale. + ``(is_stale, reason, alias_v, comparable_v)``. ``reason`` is None + when not stale. ``alias_v`` is always an int (legacy runs without + the JSONB key are normalized to V=1 — see + :func:`_run_feature_frame_version`). ``comparable_v`` is non-None + only when the mismatch branch fires. """ + alias_v = _run_feature_frame_version(run) if run.status != RunStatus.SUCCESS.value: - return True, f"aliased run status is '{run.status}', not 'success'" + return True, StaleReason.RUN_NOT_SUCCESS.value, alias_v, None latest = latest_success_by_grain.get((run.store_id, run.product_id)) - if latest is not None and latest.id != run.id and latest.created_at > run.created_at: - return True, "a newer successful run exists for this store/product" - return False, None + if latest is None or latest.id == run.id: + return False, None, alias_v, None + + latest_v = _run_feature_frame_version(latest) + # PRP-36 — V-mismatch wins over NEWER_SUCCESS_RUN. A V1 alias with a + # newer V2 comparable run is classified as a mismatch so Slice C can + # surface "this alias's V is now stale" distinctly from "a newer run + # exists at the same V". Both sides are normalized via the helper so + # a legacy missing-key run never spuriously mismatches an explicit + # V=1 comparable run. + if alias_v != latest_v: + return ( + True, + StaleReason.FEATURE_FRAME_VERSION_MISMATCH.value, + alias_v, + latest_v, + ) + + if latest.created_at > run.created_at: + return True, StaleReason.NEWER_SUCCESS_RUN.value, alias_v, None + return False, None, alias_v, None # ============================================================================= @@ -285,7 +340,9 @@ async def get_summary(self, db: AsyncSession) -> OpsSummaryResponse: run = runs_by_id.get(alias.run_id) if run is None: # orphan FK — defensive; the FK constraint forbids it continue - is_stale, stale_reason = _alias_staleness(run, latest_success_by_grain) + is_stale, stale_reason, alias_v, comparable_v = _alias_staleness( + run, latest_success_by_grain + ) aliases.append( AliasHealth( alias_name=alias.alias_name, @@ -297,6 +354,8 @@ async def get_summary(self, db: AsyncSession) -> OpsSummaryResponse: is_stale=is_stale, stale_reason=stale_reason, wape=extract_wape(run.metrics), + alias_feature_frame_version=alias_v, + comparable_run_feature_frame_version=comparable_v, ) ) if is_stale: @@ -513,6 +572,16 @@ async def get_model_health(self, db: AsyncSession, limit: int) -> ModelHealthRes direction, delta = classify_drift([point.wape for point in history]) numeric = [point.wape for point in history if point.wape is not None] latest_run = grain_runs[-1] + # PRP-36 — surface the alias (latest-run) V and, when an earlier + # run in the chain carries a DIFFERENT V, the comparable V so + # Slice C can flag the mismatch on the model-health view too. + latest_run_v = _run_feature_frame_version(latest_run) + mismatch_v: int | None = None + for prior_run in grain_runs[:-1]: + prior_v = _run_feature_frame_version(prior_run) + if prior_v != latest_run_v: + mismatch_v = prior_v + break entries.append( ModelHealthEntry( store_id=store_id, @@ -527,6 +596,8 @@ async def get_model_health(self, db: AsyncSession, limit: int) -> ModelHealthRes last_trained_at=latest_run.created_at, staleness_days=max((today - latest_run.data_window_end).days, 0), wape_history=history, + alias_feature_frame_version=latest_run_v, + comparable_run_feature_frame_version=mismatch_v, ) ) diff --git a/app/features/ops/tests/test_service.py b/app/features/ops/tests/test_service.py index 3c228660..fcfb61a7 100644 --- a/app/features/ops/tests/test_service.py +++ b/app/features/ops/tests/test_service.py @@ -4,7 +4,19 @@ functions with no I/O. """ -from app.features.ops.service import classify_drift, extract_wape, score_retraining_candidate +from datetime import UTC, datetime +from types import SimpleNamespace +from typing import cast + +from app.features.ops.schemas import StaleReason +from app.features.ops.service import ( + _alias_staleness, + _run_feature_frame_version, + classify_drift, + extract_wape, + score_retraining_candidate, +) +from app.features.registry.models import ModelRun # ============================================================================= # score_retraining_candidate @@ -133,3 +145,132 @@ def test_classify_drift_zero_baseline_guard() -> None: def test_classify_drift_never_raises_on_sparse_history() -> None: """Sparse / all-None history degrades gracefully to 'unknown'.""" assert classify_drift([None, None, None]) == ("unknown", None) + + +# ============================================================================= +# PRP-36 — _alias_staleness V-mismatch path +# ============================================================================= + + +def _make_run( + *, + run_id: str, + store_id: int = 1, + product_id: int = 1, + status: str = "success", + created_at: datetime | None = None, + feature_frame_version: int | None = None, +) -> ModelRun: + """Minimal duck-typed ModelRun the helpers consume. + + The helpers only read ``.status / .store_id / .product_id / .created_at + / .id / .runtime_info`` so a SimpleNamespace is sufficient at runtime; + we ``cast`` to ``ModelRun`` so static checking is happy. + """ + runtime_info: dict[str, object] = {} + if feature_frame_version is not None: + runtime_info["feature_frame_version"] = feature_frame_version + fake = SimpleNamespace( + run_id=run_id, + id=hash(run_id) & 0xFFFFFFFF, + store_id=store_id, + product_id=product_id, + status=status, + created_at=created_at or datetime(2026, 1, 1, tzinfo=UTC), + runtime_info=runtime_info if runtime_info else None, + ) + return cast(ModelRun, fake) + + +def test_run_feature_frame_version_reads_runtime_info() -> None: + """V is read from runtime_info JSONB; missing key resolves to V=1 (filter-aligned).""" + assert _run_feature_frame_version(_make_run(run_id="a", feature_frame_version=2)) == 2 + assert _run_feature_frame_version(_make_run(run_id="b")) == 1 + + +def test_run_feature_frame_version_rejects_unsupported_value() -> None: + """Unknown int (e.g. 3) or non-int values fall back to V=1 (defensive).""" + legacy_explicit_v3 = _make_run(run_id="bad-int") + legacy_explicit_v3.runtime_info = {"feature_frame_version": 3} + legacy_str = _make_run(run_id="bad-str") + legacy_str.runtime_info = {"feature_frame_version": "2"} + assert _run_feature_frame_version(legacy_explicit_v3) == 1 + assert _run_feature_frame_version(legacy_str) == 1 + + +def test_alias_staleness_legacy_run_treated_as_v1_no_spurious_mismatch() -> None: + """A legacy alias (no V key) compared to an explicit-V=1 comparable is NOT stale.""" + older = datetime(2026, 1, 1, tzinfo=UTC) + legacy = _make_run(run_id="legacy", created_at=older) # no V key + explicit_v1 = _make_run( + run_id="explicit-v1", + created_at=older, # same created_at → no NEWER_SUCCESS_RUN either + feature_frame_version=1, + ) + is_stale, reason, alias_v, comparable_v = _alias_staleness(legacy, {(1, 1): explicit_v1}) + # Both normalize to V=1 — no mismatch, no newer (same created_at), so not stale. + assert is_stale is False + assert reason is None + assert alias_v == 1 + assert comparable_v is None + + +def test_alias_staleness_status_branch_wins() -> None: + """A non-SUCCESS aliased run is stale with RUN_NOT_SUCCESS regardless of V.""" + run = _make_run(run_id="r1", status="failed", feature_frame_version=1) + latest_map: dict[tuple[int, int], ModelRun] = { + (1, 1): _make_run(run_id="r2", feature_frame_version=2) + } + is_stale, reason, alias_v, comparable_v = _alias_staleness(run, latest_map) + assert is_stale is True + assert reason == StaleReason.RUN_NOT_SUCCESS.value + assert alias_v == 1 + assert comparable_v is None + + +def test_alias_staleness_v_mismatch_wins_over_newer_run() -> None: + """A V1 alias with a newer V2 comparable run reports MISMATCH, not NEWER.""" + older = datetime(2026, 1, 1, tzinfo=UTC) + newer = datetime(2026, 5, 1, tzinfo=UTC) + run = _make_run(run_id="v1", created_at=older, feature_frame_version=1) + latest = _make_run(run_id="v2", created_at=newer, feature_frame_version=2) + is_stale, reason, alias_v, comparable_v = _alias_staleness(run, {(1, 1): latest}) + assert is_stale is True + assert reason == StaleReason.FEATURE_FRAME_VERSION_MISMATCH.value + assert alias_v == 1 + assert comparable_v == 2 + + +def test_alias_staleness_same_v_newer_run_uses_newer_reason() -> None: + """V matches but the comparable is newer → NEWER_SUCCESS_RUN reason.""" + older = datetime(2026, 1, 1, tzinfo=UTC) + newer = datetime(2026, 5, 1, tzinfo=UTC) + run = _make_run(run_id="v2-old", created_at=older, feature_frame_version=2) + latest = _make_run(run_id="v2-new", created_at=newer, feature_frame_version=2) + is_stale, reason, alias_v, comparable_v = _alias_staleness(run, {(1, 1): latest}) + assert is_stale is True + assert reason == StaleReason.NEWER_SUCCESS_RUN.value + assert alias_v == 2 + assert comparable_v is None + + +def test_alias_staleness_v1_alias_v1_latest_legacy_back_compat() -> None: + """A V1 alias whose latest comparable is also legacy V1 (no key) → not stale.""" + older = datetime(2026, 1, 1, tzinfo=UTC) + run = _make_run(run_id="legacy", created_at=older) # no V key + # Same run is latest_by_grain — no newer comparable. + is_stale, reason, alias_v, comparable_v = _alias_staleness(run, {(1, 1): run}) + assert is_stale is False + assert reason is None + # PRP-36 — legacy missing-key normalizes to V=1 inside the ops layer + # so it matches the registry's _feature_frame_version_filter contract. + assert alias_v == 1 + assert comparable_v is None + + +def test_alias_staleness_legacy_v1_vs_explicit_v1_no_mismatch_when_same_run() -> None: + """A legacy run carrying no V key compared to itself is not stale (same id).""" + run = _make_run(run_id="self", feature_frame_version=1) + is_stale, reason, _, _ = _alias_staleness(run, {(1, 1): run}) + assert is_stale is False + assert reason is None diff --git a/app/features/registry/schemas.py b/app/features/registry/schemas.py index 61a16c19..9ef5417b 100644 --- a/app/features/registry/schemas.py +++ b/app/features/registry/schemas.py @@ -82,6 +82,17 @@ class RunCreate(BaseModel): product_id: int = Field(..., ge=1) agent_context: AgentContext | None = None git_sha: str | None = Field(None, max_length=40) + runtime_info_extras: dict[str, Any] | None = Field( + default=None, + description=( + "PRP-36 — optional caller-supplied extras merged INTO the runtime " + "info captured by the service (Python/sklearn versions). The " + "intended payload is the V2 metadata the forecasting service " + "wrote to the model bundle: feature_frame_version, " + "feature_groups, feature_safety_classes, feature_pinned_constants. " + "Caller-supplied keys win over service-captured keys." + ), + ) @field_validator("data_window_end") @classmethod @@ -165,6 +176,36 @@ def model_family(self) -> ModelFamily: return model_family_for(self.model_type) + @computed_field # type: ignore[prop-decorator] + @property + def feature_frame_version(self) -> int | None: + """PRP-36 — V1 (1) or V2 (2), read from ``runtime_info`` JSONB. + + ``None`` for runs that pre-date PRP-35 / PRP-36 and never wrote + the key. Plain Python ``int`` type — no cross-slice import. + """ + if not self.runtime_info: + return None + value = self.runtime_info.get("feature_frame_version") + if isinstance(value, int): + return value + return None + + @computed_field # type: ignore[prop-decorator] + @property + def feature_groups(self) -> dict[str, list[str]] | None: + """PRP-36 — V2 per-group canonical column manifest, read from ``runtime_info``. + + ``None`` for V1 runs (the key is only populated when training + with feature_frame_version=2) and for runs that pre-date PRP-35. + """ + if not self.runtime_info: + return None + value = self.runtime_info.get("feature_groups") + if isinstance(value, dict): + return value + return None + class RunListResponse(BaseModel): """Paginated list of runs.""" diff --git a/app/features/registry/service.py b/app/features/registry/service.py index 1170d3af..503c45d2 100644 --- a/app/features/registry/service.py +++ b/app/features/registry/service.py @@ -19,7 +19,7 @@ from typing import Any import structlog -from sqlalchemy import func, select +from sqlalchemy import Integer, cast, func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import InstrumentedAttribute @@ -200,6 +200,11 @@ async def create_run( run_id = uuid.uuid4().hex config_hash = self._compute_config_hash(run_data.model_config_data) + # PRP-36 — fish feature_frame_version out of the caller-supplied + # runtime_info_extras so the duplicate predicate distinguishes V1 vs V2. + # Default 1 when absent (V1 back-compat: every legacy run is V1). + request_v = self._extract_feature_frame_version(run_data.runtime_info_extras) + # Check for duplicates based on policy if self.settings.registry_duplicate_policy in ("deny", "detect"): existing = await self._find_duplicate( @@ -209,6 +214,7 @@ async def create_run( product_id=run_data.product_id, data_window_start=run_data.data_window_start, data_window_end=run_data.data_window_end, + feature_frame_version=request_v, ) if existing: if self.settings.registry_duplicate_policy == "deny": @@ -220,8 +226,13 @@ async def create_run( config_hash=config_hash, ) - # Capture runtime info + # Capture runtime info and merge caller-supplied extras (PRP-36). + # Caller-supplied keys win over service-captured keys so the + # forecasting layer can pin feature_frame_version, feature_groups, + # feature_safety_classes, feature_pinned_constants on the run. runtime_info = self._capture_runtime_info() + if run_data.runtime_info_extras: + runtime_info.update(run_data.runtime_info_extras) # Convert agent context to dict if present agent_context_dict = None @@ -626,6 +637,22 @@ async def compare_runs( metrics_diff=metrics_diff, ) + @staticmethod + def _extract_feature_frame_version( + runtime_info_extras: dict[str, Any] | None, + ) -> int: + """Pull ``feature_frame_version`` from caller-supplied extras (PRP-36). + + Missing key OR malformed value → V=1 (legacy back-compat: every + run that pre-dates PRP-35 / PRP-36 is V1 by definition). + """ + if not runtime_info_extras: + return 1 + value = runtime_info_extras.get("feature_frame_version") + if isinstance(value, int) and value in (1, 2): + return value + return 1 + async def _find_duplicate( self, db: AsyncSession, @@ -634,8 +661,13 @@ async def _find_duplicate( product_id: int, data_window_start: date, data_window_end: date, + feature_frame_version: int = 1, ) -> ModelRun | None: - """Find existing run with same config and data window. + """Find existing run with same config, data window, AND feature_frame_version. + + PRP-36 — the match key now includes feature_frame_version. A V1 run and + a V2 run with otherwise-identical fields are NOT duplicates; the + comparable-runs / champion-alias logic depends on this distinction. Args: db: Database session. @@ -644,6 +676,8 @@ async def _find_duplicate( product_id: Product ID. data_window_start: Data window start date. data_window_end: Data window end date. + feature_frame_version: V1 (1) or V2 (2). Rows with a missing JSONB + key are treated as V=1 (legacy back-compat). Returns: The most recent matching run, or None. @@ -664,6 +698,7 @@ async def _find_duplicate( & (ModelRun.data_window_start == data_window_start) & (ModelRun.data_window_end == data_window_end) & (ModelRun.status != RunStatusORM.ARCHIVED.value) + & self._feature_frame_version_filter(feature_frame_version) ) .order_by(ModelRun.created_at.desc()) .limit(1) @@ -671,6 +706,77 @@ async def _find_duplicate( result = await db.execute(stmt) return result.scalars().first() + @staticmethod + def _feature_frame_version_filter(feature_frame_version: int) -> Any: # noqa: ANN401 + """SQLAlchemy WHERE clause selecting runs whose V matches (PRP-36). + + Missing JSONB key resolves to V=1 — that is the load-bearing + back-compat seam (legacy V1 runs never wrote the key). + """ + v_column = cast(ModelRun.runtime_info["feature_frame_version"].astext, Integer) + if feature_frame_version == 1: + # Legacy rows without the key are V1; match BOTH "key absent" AND + # "key explicitly set to 1". + return or_( + v_column == 1, + ModelRun.runtime_info["feature_frame_version"].astext.is_(None), + ) + return v_column == feature_frame_version + + async def find_comparable_runs( + self, + db: AsyncSession, + *, + store_id: int, + product_id: int, + feature_frame_version: int, + data_window_start: date, + data_window_end: date, + model_type: str | None = None, + limit: int = 20, + ) -> list[ModelRun]: + """Return runs comparable to the (grain, window, V) tuple given (PRP-36). + + Comparable predicate: + - same ``(store_id, product_id)`` grain; + - data windows OVERLAP + (``run.data_window_end >= data_window_start`` AND + ``run.data_window_start <= data_window_end``); + - same ``feature_frame_version`` (legacy rows without the JSONB + key are treated as V=1); + - ``status == SUCCESS`` (champion-eligible). + + Args: + db: Database session. + store_id: Store ID grain anchor. + product_id: Product ID grain anchor. + feature_frame_version: V1 or V2 — cross-V runs are NOT comparable. + data_window_start: Caller's data window start. + data_window_end: Caller's data window end. + model_type: Optional further filter; ``None`` returns all model types. + limit: Maximum rows returned, ordered by ``created_at desc``. + + Returns: + List of comparable :class:`ModelRun` rows, newest first. + """ + stmt = ( + select(ModelRun) + .where( + (ModelRun.store_id == store_id) + & (ModelRun.product_id == product_id) + & (ModelRun.status == RunStatusORM.SUCCESS.value) + & (ModelRun.data_window_end >= data_window_start) + & (ModelRun.data_window_start <= data_window_end) + & self._feature_frame_version_filter(feature_frame_version) + ) + .order_by(ModelRun.created_at.desc()) + .limit(limit) + ) + if model_type is not None: + stmt = stmt.where(ModelRun.model_type == model_type) + result = await db.execute(stmt) + return list(result.scalars().all()) + def _model_to_response(self, model_run: ModelRun) -> RunResponse: """Convert ORM model to response schema. diff --git a/app/features/registry/tests/test_schemas.py b/app/features/registry/tests/test_schemas.py index a1a714e4..616cdc3b 100644 --- a/app/features/registry/tests/test_schemas.py +++ b/app/features/registry/tests/test_schemas.py @@ -457,3 +457,74 @@ def test_model_family_propagates_to_serialized_json(self) -> None: response = self._make_response("prophet_like") json_str = response.model_dump_json(by_alias=True) assert '"model_family":"additive"' in json_str + + +class TestRunResponseFeatureFrameVersion: + """PRP-36 — feature_frame_version + feature_groups computed fields on RunResponse. + + Both fields are computed from ``runtime_info`` JSONB at serialization + time — no DB column, no migration. Mirrors the model_family precedent. + """ + + _BASE_FIELDS: ClassVar[dict[str, object]] = { + "run_id": "abc123", + "status": RunStatus.SUCCESS, + "model_type": "regression", + "model_config_data": {"model_type": "regression"}, + "config_hash": "deadbeefdeadbeef", + "data_window_start": date(2024, 1, 1), + "data_window_end": date(2024, 1, 31), + "store_id": 1, + "product_id": 1, + "created_at": datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC), + "updated_at": datetime(2024, 1, 2, 0, 0, 0, tzinfo=UTC), + } + + def _make_response(self, runtime_info: dict[str, object] | None) -> RunResponse: + fields = {**self._BASE_FIELDS, "runtime_info": runtime_info} + return RunResponse.model_validate(fields) + + def test_feature_frame_version_none_when_runtime_info_missing(self) -> None: + """A V1-era run with no runtime_info column resolves to None.""" + response = self._make_response(None) + assert response.feature_frame_version is None + assert response.feature_groups is None + + def test_feature_frame_version_none_when_key_absent(self) -> None: + """An existing runtime_info dict without the V key resolves to None.""" + response = self._make_response({"python_version": "3.12"}) + assert response.feature_frame_version is None + assert response.feature_groups is None + + def test_feature_frame_version_v2_extracted(self) -> None: + """A V2 run surfaces feature_frame_version=2 and feature_groups dict.""" + response = self._make_response( + { + "feature_frame_version": 2, + "feature_groups": { + "target_history": ["lag_1", "lag_7"], + "calendar": ["dow_sin", "dow_cos"], + }, + } + ) + assert response.feature_frame_version == 2 + assert response.feature_groups == { + "target_history": ["lag_1", "lag_7"], + "calendar": ["dow_sin", "dow_cos"], + } + + def test_feature_frame_version_v1_extracted(self) -> None: + """A V1 run with explicit feature_frame_version=1 round-trips; feature_groups None.""" + response = self._make_response({"feature_frame_version": 1}) + assert response.feature_frame_version == 1 + assert response.feature_groups is None + + def test_feature_frame_version_invalid_value_resolves_to_none(self) -> None: + """A non-int feature_frame_version value resolves to None (defensive).""" + response = self._make_response({"feature_frame_version": "two"}) + assert response.feature_frame_version is None + + def test_feature_groups_invalid_type_resolves_to_none(self) -> None: + """A non-dict feature_groups value resolves to None (defensive).""" + response = self._make_response({"feature_frame_version": 2, "feature_groups": ["lag_1"]}) + assert response.feature_groups is None diff --git a/app/features/registry/tests/test_service.py b/app/features/registry/tests/test_service.py index 684ec0b4..abd2a2ce 100644 --- a/app/features/registry/tests/test_service.py +++ b/app/features/registry/tests/test_service.py @@ -146,6 +146,31 @@ def test_compute_config_hash_order_independent(self) -> None: assert run1.compute_config_hash() == run2.compute_config_hash() +class TestRegistryServiceFeatureFrameVersion: + """PRP-36 — V1 / V2 distinction helpers for duplicate + comparable runs.""" + + def test_extract_feature_frame_version_default_v1(self) -> None: + """A None extras dict resolves to V1 (legacy back-compat).""" + assert RegistryService._extract_feature_frame_version(None) == 1 + + def test_extract_feature_frame_version_empty_dict_defaults_v1(self) -> None: + """An extras dict without the key resolves to V1.""" + assert RegistryService._extract_feature_frame_version({}) == 1 + + def test_extract_feature_frame_version_explicit_v1(self) -> None: + """Explicit feature_frame_version=1 round-trips.""" + assert RegistryService._extract_feature_frame_version({"feature_frame_version": 1}) == 1 + + def test_extract_feature_frame_version_explicit_v2(self) -> None: + """Explicit feature_frame_version=2 round-trips.""" + assert RegistryService._extract_feature_frame_version({"feature_frame_version": 2}) == 2 + + def test_extract_feature_frame_version_rejects_unsupported_value(self) -> None: + """Unknown int (e.g. 3) and non-int (e.g. '2') fall back to V1.""" + assert RegistryService._extract_feature_frame_version({"feature_frame_version": 3}) == 1 + assert RegistryService._extract_feature_frame_version({"feature_frame_version": "2"}) == 1 + + class TestRegistryServiceConfigDiff: """Tests for configuration diffing.""" diff --git a/docs/_base/API_CONTRACTS.md b/docs/_base/API_CONTRACTS.md index d57debcc..85ae57e9 100644 --- a/docs/_base/API_CONTRACTS.md +++ b/docs/_base/API_CONTRACTS.md @@ -19,9 +19,9 @@ All endpoints serve JSON; error responses use `application/problem+json` (RFC 78 | analytics | GET | `/analytics/inventory-status` | Latest `inventory_snapshot_daily` row per `(store, product)` grain (Postgres `DISTINCT ON`); optional `store_id`/`product_id` filters; `200` + empty list on an empty table (never `404`) | | featuresets | POST | `/featuresets/compute` | Compute time-safe features (lag/rolling/calendar, leakage-prevented) | | featuresets | POST | `/featuresets/preview` | Preview features with sample rows | -| forecasting | POST | `/forecasting/train` | Train a model (naive / seasonal_naive / moving_average / lightgbm / regression). `regression` wraps `HistGradientBoostingRegressor` on lag + calendar + exogenous features — the baseline a `model_exogenous` scenario re-forecasts through | +| forecasting | POST | `/forecasting/train` | Train a model. PRP-36 expands the model set: target-only `naive`/`seasonal_naive`/`moving_average`/`weighted_moving_average`/`seasonal_average`/`trend_regression_baseline`; feature-aware `regression`/`prophet_like` (always-on); opt-in `lightgbm`/`xgboost`/`random_forest` behind the matching `forecast_enable_*` flag. `regression` wraps `HistGradientBoostingRegressor` on lag + calendar + exogenous features — the baseline a `model_exogenous` scenario re-forecasts through | | forecasting | POST | `/forecasting/predict` | Generate horizon predictions from a trained model | -| backtesting | POST | `/backtesting/run` | Time-series CV (rolling/expanding splits, MAE/sMAPE/WAPE/bias/stability) | +| backtesting | POST | `/backtesting/run` | Time-series CV. PRP-36 — `aggregated_metrics` now carries `rmse` alongside MAE/sMAPE/WAPE/bias; every `fold_results[i].horizon_bucket_metrics` is a per-bucket metric dict keyed by `h_1_7`/`h_8_14`/`h_15_28`/`h_29_plus` (empty buckets dropped); `main_model_results.bucketed_aggregated_metrics` (and same on each `baseline_results[i]`) carries per-bucket means across folds, or `null` when no fold emitted a bucket dict | | explainability | POST | `/explain/forecast` | Rule-based explanation of the h=1 forecast a named baseline model (`naive`/`seasonal_naive`/`moving_average`) produces on the series ending at `as_of_date`; returns a `ForecastExplanation` — driver contributions, advisory retail reason codes (correlation, not causation), confidence band, caveats, agent summary. Time-safe (`<= as_of_date`); a non-baseline `model_type` or a too-short series → RFC 7807 400 | | explainability | GET | `/explain/runs/{run_id}` | Explain a registry `model_run` — config reconstructed from `model_run.model_config`, cutoff `data_window_end`. Missing run → 404; a non-baseline (`lightgbm`/`regression`) run → 400 | | explainability | GET | `/explain/jobs/{job_id}` | Explain a completed `predict` job — store/product/model read from `job.result`, cutoff = day before the first forecast date. Missing job → 404; a job that is not a completed predict job → 400 | @@ -33,7 +33,7 @@ All endpoints serve JSON; error responses use `application/problem+json` (RFC 78 | scenarios | DELETE | `/scenarios/{scenario_id}` | Delete a saved plan; `404` when missing | | registry | POST | `/registry/runs` | Create model run (pending) | | registry | GET | `/registry/runs` | List with filters + pagination + optional allow-listed `sort_by`/`sort_order` (created_at/model_type/status/store_id/product_id; unknown → default `created_at desc`) | -| registry | GET | `/registry/runs/{run_id}` | Run details + JSONB metrics + runtime_info | +| registry | GET | `/registry/runs/{run_id}` | Run details + JSONB metrics + runtime_info. PRP-36 — response gains Optional computed fields `feature_frame_version: int \| null` and `feature_groups: dict[str, list[str]] \| null` (both read from `runtime_info`; `null` for V1 / pre-PRP-35 runs) | | registry | PATCH | `/registry/runs/{run_id}` | Update status / metrics / artifact_uri | | registry | GET | `/registry/runs/{run_id}/verify` | SHA-256 artifact integrity check | | registry | POST | `/registry/aliases` | Create/update alias (only on `success` runs) | diff --git a/docs/_base/DOMAIN_MODEL.md b/docs/_base/DOMAIN_MODEL.md index c2e6e8bc..25e2927b 100644 --- a/docs/_base/DOMAIN_MODEL.md +++ b/docs/_base/DOMAIN_MODEL.md @@ -22,11 +22,13 @@ ### `model_run` (Registry) - **Root:** `ModelRun(run_id: UUID, status: RunStatus)` - **Status state machine:** `pending` → `running` → `success` | `failed` → `archived` -- **JSONB fields:** `model_config`, `metrics`, `runtime_info` (Python/numpy/pandas versions captured at training) +- **JSONB fields:** `model_config`, `metrics`, `runtime_info` (Python/numpy/pandas versions captured at training; PRP-35/PRP-36 additionally pin `feature_frame_version`, `feature_columns`, `feature_groups`, `feature_safety_classes`, `feature_pinned_constants` when the caller supplies them via `RunCreate.runtime_info_extras`) - **Invariants:** - An alias may point only to a `success` run. - Artifact_uri SHA-256 hash must verify before any consumer trusts it (`GET /registry/runs/{id}/verify`). - `runtime_info` is immutable after `success`. + - **Comparable-run rule (PRP-36).** A run is comparable to another only when ALL three hold: same `(store_id, product_id)` grain, OVERLAPPING `data_window_start`/`data_window_end`, AND same `feature_frame_version`. The third clause is load-bearing — `RegistryService._find_duplicate`, `RegistryService.find_comparable_runs`, and `OpsService` staleness all enforce it. A V1 run and a V2 run with otherwise identical fields are NOT duplicates and NOT comparable; legacy rows without the JSONB key are treated as V=1. + - **Stale-alias V mismatch (PRP-36).** When an alias's run has `feature_frame_version=V_a` and a newer comparable SUCCESS run has `feature_frame_version=V_b != V_a`, the alias is marked `is_stale=true` with `stale_reason="feature_frame_version_mismatch"` — a distinct enum value from `newer_success_run` so the UI surfaces "your V is now stale" separately from "a newer run exists". ### `agent_session` (Agents) - **Root:** `AgentSession(session_id: UUID, status: SessionStatus)` diff --git a/docs/optional-features/05-advanced-ml-model-zoo.md b/docs/optional-features/05-advanced-ml-model-zoo.md index 5c47c11f..121e0300 100644 --- a/docs/optional-features/05-advanced-ml-model-zoo.md +++ b/docs/optional-features/05-advanced-ml-model-zoo.md @@ -4,11 +4,58 @@ Add serious forecasting models beyond current baselines: -- LightGBM -- XGBoost +- LightGBM (opt-in extra) +- XGBoost (opt-in extra) +- Random Forest (pure scikit-learn, opt-in flag — PRP-36) - Prophet-like models with trend, seasonality, holiday, and regressor components -The goal is not just to add dependencies, but to upgrade ForecastLabAI from baseline forecasting to a credible model comparison platform. +PRP-36 also adds richer **always-on baselines** so a feature-aware +model's "extra complexity is justified" statement actually means +something: + +- `weighted_moving_average` — linear or exponential weight strategy +- `seasonal_average` — average of last N seasonal cycles (with optional + outlier-trim) +- `trend_regression_baseline` — Ridge over an elapsed-day index + dow/month + one-hots + +The goal is not just to add dependencies, but to upgrade ForecastLabAI +from baseline forecasting to a credible model comparison platform. + +### PRP-36 — backtest comparison contract + +`POST /backtesting/run` now returns, in addition to the existing +aggregate metrics: + +- `aggregated_metrics.rmse` — root-mean-squared error alongside + MAE / sMAPE / WAPE / bias. +- `fold_results[*].horizon_bucket_metrics` — per-fold, per-bucket + metric dict, keyed by stable bucket ids: `h_1_7`, `h_8_14`, + `h_15_28`, `h_29_plus`. **Empty buckets are dropped** (a 14-day + horizon's payload never carries `h_29_plus`). +- `main_model_results.bucketed_aggregated_metrics` and + `baseline_results[*].bucketed_aggregated_metrics` — per-bucket means + across folds. `None` when every fold reported an empty bucket dict. + +This is additive — older clients keep working unchanged. + +### PRP-36 — diagnostic script + +`examples/forecasting/model_zoo_compare.py` exercises every available +model (always-on baselines + opt-in feature-aware models) for one +`(store_id, product_id)` grain. It prints an aggregate metrics + per-bucket +WAPE table without writing anything outside the existing +`/forecasting/train` + `/backtesting/run` flow: + +```bash +uv run python examples/forecasting/model_zoo_compare.py \ + --store-id 1 --product-id 1 \ + --start-date 2025-01-01 --end-date 2025-12-31 +``` + +Optional models behind a flag (LightGBM / XGBoost / Random Forest) are +SKIPPED with a printed note when their flag is off — the script never +fails the run because an opt-in model is missing. ## Why It Fits ForecastLabAI diff --git a/docs/optional-features/09-model-champion-challenger-governance.md b/docs/optional-features/09-model-champion-challenger-governance.md index 3b66b27f..61ac07ae 100644 --- a/docs/optional-features/09-model-champion-challenger-governance.md +++ b/docs/optional-features/09-model-champion-challenger-governance.md @@ -2,11 +2,36 @@ ## Summary -Add formal promotion gates for model aliases: compare champion vs challenger, validate metrics, verify artifacts, check data freshness, require approval, and record the decision. +Add formal promotion gates for model aliases: compare champion vs +challenger, validate metrics, verify artifacts, check data freshness, +require approval, and record the decision. ## Why It Fits ForecastLabAI -The registry already stores runs, metrics, artifacts, aliases, hashes, and statuses. Agents already require approval for sensitive actions. This feature makes promotion decisions explicit and auditable. +The registry already stores runs, metrics, artifacts, aliases, hashes, and +statuses. Agents already require approval for sensitive actions. This +feature makes promotion decisions explicit and auditable. + +## Comparable-run rule (PRP-36) + +A run is comparable to another only when **all three** hold: + +1. Same `(store_id, product_id)` grain. +2. **Overlapping** `data_window_start` / `data_window_end`. +3. **Same `feature_frame_version`** — read from `runtime_info.feature_frame_version` + on the registry row; legacy rows without the key are treated as V1. + +The third clause is load-bearing — a V1 run and a V2 run with otherwise +identical fields are **not** duplicates and **not** comparable. Promoting +a V1 alias over a V2 challenger (or vice versa) would silently change +the feature contract the alias points at. + +`RegistryService.find_comparable_runs(...)` is the canonical query and +`OpsService.get_summary` uses the same predicate to classify staleness. +When an alias's run has `V_a` and a newer comparable SUCCESS run has +`V_b != V_a`, the alias is marked `is_stale=true` with +`stale_reason="feature_frame_version_mismatch"` (a distinct value from +`newer_success_run`) so Slice C can render the mismatch separately. ## User Value diff --git a/examples/forecasting/model_zoo_compare.py b/examples/forecasting/model_zoo_compare.py new file mode 100644 index 00000000..35b35c34 --- /dev/null +++ b/examples/forecasting/model_zoo_compare.py @@ -0,0 +1,262 @@ +"""PRP-36 — Model-zoo comparison diagnostic. + +Read-only script: trains + backtests every available forecasting model +against the local seeded database for a single ``(store_id, product_id)`` +grain, then prints a metrics + per-bucket WAPE table. Uses the public +HTTP API at ``http://localhost:8123`` — never writes outside the existing +``/forecasting/train`` + ``/backtesting/run`` flow. + +Usage:: + + # 1. Run the stack: + docker compose up -d + uv run alembic upgrade head + uv run uvicorn app.main:app --reload --port 8123 + + # 2. Seed the local database (any time): + uv run python scripts/seed_random.py --full-new --seed 42 --confirm + + # 3. Compare every model on a single grain: + uv run python examples/forecasting/model_zoo_compare.py \\ + --store-id 1 --product-id 1 \\ + --start-date 2025-01-01 --end-date 2025-12-31 + +Models compared (always-on): + +- ``naive``, ``seasonal_naive``, ``moving_average`` +- ``weighted_moving_average``, ``seasonal_average`` (PRP-36 baselines) +- ``trend_regression_baseline`` (PRP-36 Ridge baseline) +- ``regression`` (HGBR feature-aware) +- ``prophet_like`` (Ridge additive) + +Optional feature-aware models — exercised only when the matching +``forecast_enable_*`` flag is on AND the extra is installed: + +- ``lightgbm``, ``xgboost``, ``random_forest`` + +The script reads ``GET /config/ai`` to discover which models are +available; absent models are SKIPPED with a printed note (the script +never fails the run because an opt-in model is off). +""" + +from __future__ import annotations + +import argparse +import json +import sys +from dataclasses import dataclass +from datetime import date as date_type +from typing import Any + +import httpx + +DEFAULT_API_BASE = "http://localhost:8123" + + +@dataclass(frozen=True) +class ModelSpec: + """One row in the model-zoo comparison table.""" + + model_type: str + config: dict[str, Any] + optional: bool = False + + +ALWAYS_ON_MODELS: tuple[ModelSpec, ...] = ( + ModelSpec("naive", {"model_type": "naive"}), + ModelSpec("seasonal_naive", {"model_type": "seasonal_naive", "season_length": 7}), + ModelSpec("moving_average", {"model_type": "moving_average", "window_size": 7}), + ModelSpec( + "weighted_moving_average", + { + "model_type": "weighted_moving_average", + "window_size": 7, + "weight_strategy": "linear", + "decay": 0.7, + }, + ), + ModelSpec( + "seasonal_average", + { + "model_type": "seasonal_average", + "season_length": 7, + "lookback_cycles": 4, + "trim_outliers": False, + }, + ), + ModelSpec( + "trend_regression_baseline", + { + "model_type": "trend_regression_baseline", + "alpha": 1.0, + "include_dow": True, + "include_month": True, + }, + ), + ModelSpec( + "regression", + {"model_type": "regression", "max_iter": 200, "learning_rate": 0.05, "max_depth": 6}, + ), + ModelSpec( + "prophet_like", + {"model_type": "prophet_like", "alpha": 1.0}, + ), +) + +OPTIONAL_MODELS: tuple[ModelSpec, ...] = ( + ModelSpec( + "lightgbm", + {"model_type": "lightgbm", "n_estimators": 100, "max_depth": 6, "learning_rate": 0.1}, + optional=True, + ), + ModelSpec( + "xgboost", + {"model_type": "xgboost", "n_estimators": 100, "max_depth": 6, "learning_rate": 0.1}, + optional=True, + ), + ModelSpec( + "random_forest", + { + "model_type": "random_forest", + "n_estimators": 100, + "max_depth": 10, + "min_samples_leaf": 2, + }, + optional=True, + ), +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="PRP-36 model-zoo comparison") + parser.add_argument("--api-base", default=DEFAULT_API_BASE) + parser.add_argument("--store-id", type=int, required=True) + parser.add_argument("--product-id", type=int, required=True) + parser.add_argument("--start-date", required=True, help="YYYY-MM-DD") + parser.add_argument("--end-date", required=True, help="YYYY-MM-DD") + parser.add_argument("--n-splits", type=int, default=4) + parser.add_argument("--horizon", type=int, default=14) + return parser.parse_args() + + +def _backtest_one_model( + client: httpx.Client, + *, + api_base: str, + store_id: int, + product_id: int, + start_date: str, + end_date: str, + spec: ModelSpec, + n_splits: int, + horizon: int, +) -> dict[str, Any] | None: + body = { + "store_id": store_id, + "product_id": product_id, + "start_date": start_date, + "end_date": end_date, + "config": { + "split_config": { + "n_splits": n_splits, + "horizon": horizon, + "gap": 0, + "strategy": "expanding", + "min_train_size": 30, + }, + "model_config_main": spec.config, + "include_baselines": False, # we compare them explicitly here + "store_fold_details": False, + }, + } + try: + response = client.post(f"{api_base}/backtesting/run", json=body, timeout=120.0) + except httpx.HTTPError as exc: + print(f" ⚠️ {spec.model_type}: HTTP error — {exc!r}") + return None + if response.status_code != 200: + # Optional models behind a flag yield a clear ValueError → 400/422. + try: + detail = response.json().get("detail", "") + except json.JSONDecodeError: + detail = response.text[:200] + if spec.optional: + print(f" ⏭️ {spec.model_type}: skipped — {detail}") + else: + print(f" ❌ {spec.model_type}: HTTP {response.status_code} — {detail}") + return None + return dict(response.json()) + + +def _format_row(spec: ModelSpec, result: dict[str, Any] | None) -> str: + if result is None: + return f"{spec.model_type:<28} skipped" + main = result.get("main_model_results", {}) + aggregated = main.get("aggregated_metrics", {}) + bucketed = main.get("bucketed_aggregated_metrics") or {} + wape_h_1_7 = bucketed.get("h_1_7", {}).get("wape") + wape_h_8_14 = bucketed.get("h_8_14", {}).get("wape") + + def _fmt(value: Any) -> str: + if value is None: + return " -" + return f"{float(value):>6.2f}" + + return ( + f"{spec.model_type:<28}" + f" MAE {_fmt(aggregated.get('mae'))}" + f" RMSE {_fmt(aggregated.get('rmse'))}" + f" WAPE {_fmt(aggregated.get('wape'))}" + f" h_1_7 {_fmt(wape_h_1_7)}" + f" h_8_14 {_fmt(wape_h_8_14)}" + ) + + +def main() -> int: + args = parse_args() + start_date_iso = str(date_type.fromisoformat(args.start_date)) + end_date_iso = str(date_type.fromisoformat(args.end_date)) + + print(f"━ Model zoo comparison ─ store {args.store_id}, product {args.product_id}") + print(f" window: {start_date_iso} → {end_date_iso}") + print(f" folds: {args.n_splits}, horizon: {args.horizon}") + print() + + rows: list[str] = [] + with httpx.Client() as client: + # Probe / health gate. + try: + health = client.get(f"{args.api_base}/health", timeout=5.0) + if health.status_code != 200: + print(f"❌ /health returned {health.status_code}; aborting.") + return 2 + except httpx.HTTPError as exc: + print(f"❌ API unreachable at {args.api_base}: {exc!r}") + return 2 + + all_specs: tuple[ModelSpec, ...] = ALWAYS_ON_MODELS + OPTIONAL_MODELS + for spec in all_specs: + print(f"🔄 Backtesting {spec.model_type} …") + result = _backtest_one_model( + client, + api_base=args.api_base, + store_id=args.store_id, + product_id=args.product_id, + start_date=start_date_iso, + end_date=end_date_iso, + spec=spec, + n_splits=args.n_splits, + horizon=args.horizon, + ) + rows.append(_format_row(spec, result)) + + print() + print("━" * 100) + for row in rows: + print(row) + print("━" * 100) + return 0 + + +if __name__ == "__main__": + sys.exit(main())