feat(webapp,run-engine,redis-worker): mollifier — burst-buffer to absorb trigger storms#3709
feat(webapp,run-engine,redis-worker): mollifier — burst-buffer to absorb trigger storms#3709d-cs wants to merge 160 commits into
Conversation
Redis-backed burst-smoothing layer behind MOLLIFIER_ENABLED=0 (default).
With the kill switch off, the gate short-circuits on its first env check
and production behaviour is identical to main.
@trigger.dev/redis-worker:
- MollifierBuffer: atomic Lua-backed FIFO with accept / pop / ack /
requeue / fail + TTL. Per-env queues with HSET entry storage,
atomic RPOP + status transition, FIFO retry ordering.
- MollifierDrainer: generic round-robin worker with concurrency cap,
retry semantics, and a stop deadline to avoid livelock on a hung
handler. Phase 3 will wire the handler to engine.trigger().
- Full testcontainers-backed test suite (21 tests).
apps/webapp:
- evaluateGate cascade-check (kill switch -> org feature flag ->
shadow mode -> trip evaluator -> mollify / shadow_log / pass_through).
Dependencies injected for testability; the trip evaluator stub
returns { divert: false } in phase 1.
- Inserted into RunEngineTriggerTaskService.call() before
traceEventConcern.traceRun. The mollify branch throws (unreachable
in phase 1).
- Lazy MollifierBuffer + MollifierDrainer singletons; no Redis
connection unless MOLLIFIER_ENABLED=1.
- 12 MOLLIFIER_* env vars (all safe defaults) and a mollifierEnabled
feature flag in the global catalog.
- Drainer booted from worker.server.ts on first import.
- Read-fallback stub for phase 3.
- Gate cascade tests + .env loader so env.server validates in vitest
workers.
Phase 2 will land the real trip evaluator; phase 3 will activate the
buffer-write + drain path.
…dual-write monitoring + drainer ack loop) Phase 1 of the trigger-burst smoothing initiative. Adds the A-side trip evaluator (atomic Lua sliding-window per env) and wires it into the trigger hot path. When the per-org mollifierEnabled feature flag is on AND the evaluator says divert, the canonical replay payload is buffered to Redis (via buffer.accept) AND the trigger continues through engine.trigger — i.e. dual-write. The drainer pops + acks (no-op handler) to prove the dequeue mechanism works end-to-end. Operators audit by joining mollifier.buffered (write) and mollifier.drained (consume) logs by runId. Buffer primitives hardened: - accept is idempotent on duplicate runId (Lua EXISTS guard) - pop skips orphan queue references (entry HASH TTL'd while runId queued) - fail no-ops on missing entry (no partial FAILED hash leak) - mollifier:envs set pruned on draining pop, restored on requeue - 16-row truth-table test enumerates the gate cascade - BufferedTriggerPayload defines the canonical replay shape Phase 2 will use to invoke engine.trigger - payload hash for audit-equivalence computed off the hot path (in the drainer) to avoid CPU during a spike Regression tests in apps/webapp/test/engine/triggerTask.test.ts pin the mollifier integration: - validation throws BEFORE the gate runs (no orphan buffer write on rejected triggers) - mollify dual-write happy path (Postgres + Redis both reflect the run) - pass_through path does NOT call buffer.accept - engine.trigger throwing AFTER buffer.accept leaves an orphan (documented behaviour — drainer auto-cleans; audit-trail surfaces it) - idempotency-key match short-circuits BEFORE the gate is consulted - debounce match produces an orphan (documented behaviour — Phase 2 must lift handleDebounce upfront before buffer.accept) Behaviour with MOLLIFIER_ENABLED=0 (default) is byte-identical to main. With MOLLIFIER_ENABLED=1 and the flag off, only mollifier.would_mollify logs fire (no buffer state). With the flag on, dual-write activates. Includes two opt-in *.fuzz.test.ts suites (gated on FUZZ=1) that randomise operation sequences against evaluateTrip and the drainer to find timing edges. They are clearly marked TEMPORARY in their headers.
- changeset: drop "deferred" wording — phase-1 actively dual-writes + runs the drainer ack loop. - worker.server.ts: wrap mollifier drainer init in try/catch + register SIGTERM/SIGINT handlers so the polling loop stops cleanly on shutdown. - bufferedTriggerPayload: only serialise idempotencyKeyExpiresAt when an idempotencyKey is present (avoid impossible orphan-expiry payloads). - mollifierTelemetry: narrow recordDecision reason to DecisionReason union to keep OTEL attribute cardinality bounded. - mollifierGate: rename resolveOrgFlag → resolveFlag. The underlying FeatureFlag table is global by key, so the "org" prefix was misleading; per-org gating is out of scope for phase-1. - tests: drop vi.fn mocks. mollifierGate now uses plain closure spies; mollifierTripEvaluator runs against a real MollifierBuffer backed by a redisTest container (closed client exercises the fail-open path). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…stacking Worker.init() is called per request from entry.server.tsx, so the process.once SIGTERM/SIGINT pair added in 98c1520 would stack a fresh listener every request under dev hot-reload (process.once only removes after firing). Gate registration on a process-global flag, matching the existing __worker__ pattern. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion.featureFlags The mollifier gate's resolveOrgFlag was a global feature-flag lookup named as if org-scoped. Phase-1 plan and design doc both intended per-org gating; the implementation regressed because the global flag() helper has no orgId parameter. Adopt the existing per-org feature-flag pattern (used by canAccessAi, canAccessPrivateConnections, compute beta gating): pass `Organization.featureFlags` through as `flag()` overrides. Per-org opt-in now works admin-toggleable via the existing Organization.featureFlags JSON column — no schema migration needed. - mollifierGate: revert resolveFlag/flagEnabled back to resolveOrgFlag/orgFlagEnabled (the name now matches reality). GateInputs gains `orgFeatureFlags`; the default resolver passes them as overrides to `flag()`. - triggerTask.server.ts: thread `environment.organization.featureFlags` into the gate call. - tests: three new postgresTest cases exercise the real DB-backed resolveOrgFlag end-to-end, proving (a) per-org opt-in isolation, (b) unrelated beta flags don't bleed across, (c) per-org overrides take precedence over the global FeatureFlag row. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nect The unit cascade tests in mollifierGate.test.ts import the gate module, which transitively pulls in ~/db.server. That module constructs the prisma singleton at import time and eagerly calls $connect(), which fails against localhost:5432 in the unit-test shard and surfaces as an unhandled rejection that fails the whole vitest run. Mocking the module keeps the cascade tests pure and leaves the postgresTest cases on the testcontainer-fixture prisma untouched.
- Gate drainer init on WORKER_ENABLED so only worker replicas run the polling loop. - Update the enqueueSystem TTL comment now that delayed/pending-version are first enqueues. - Correct the mollifier gate docstring to describe the fixed-window counter and tripped-key rearm. - Swap findUnique for findFirst in the trigger task test to match the webapp Prisma rule.
…eFlags The gate's `GateInputs` now requires `orgFeatureFlags`, but the surface type used by the trigger service was still the pre-org-scope shape, so the default evaluator wasn't assignable and the call site couldn't pass the flag overrides.
…est startup The per-org isolation suite uses `postgresTest`, which spins up a fresh Postgres testcontainer per case. On CI the 5s vitest default regularly times out on container start before the test body runs. Match the 30s `vi.setConfig` used by other postgresTest suites in this app.
…rrors resolveOrgFlag now checks the per-org Organization.featureFlags override in-memory before falling back to the global flag() helper, so the common per-org enablement path resolves without a Prisma round-trip on every trigger call. evaluateGate also wraps the flag resolution in try/catch and fails open to false on error, mirroring the trip evaluator.
…exit Pass a configurable timeout to drainer.stop() so SIGTERM/SIGINT can't hang forever if an in-flight handler is wedged. Matches the precedent set by BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS (default 30s).
processOneFromEnv now catches buffer.pop() failures so one env's hiccup doesn't reject Promise.all and bubble up to the loop's outer catch. The polling loop itself wraps each runOnce in try/catch and backs off with capped exponential delay (up to 5s) instead of exiting permanently on the first listEnvs/pop error. Stop semantics are unchanged: only the stopping flag breaks the loop. Adds two regression tests using a stub buffer (no Redis container) so fault injection is deterministic.
The phase-1 scaffolding referenced MollifierBuffer, getMollifierBuffer, and deserialiseMollifierSnapshot without importing them — CI typecheck fails with TS2304. The runtime path is gated behind MOLLIFIER_ENABLED=0 so this never produced a runtime symptom, but the types must resolve.
…luator fail-open
The TripDecision header comment claimed each webapp instance maintained
its own rate counter — wrong. evaluateTrip writes to mollifier:rate:\${envId}
with no per-instance prefix, so all replicas pointing at the same Redis
share the key. The threshold is the fleet-wide ceiling.
Also wrap d.evaluator() in evaluateGate in try/catch so a throwing
evaluator falls back to no-divert. The default createRealTripEvaluator
catches its own errors, but the contract should be symmetric with the
already-wrapped resolveOrgFlag call so a future evaluator can't break
the trigger hot path's fail-open contract.
The two notes describe the same PR's behaviour from two angles; merging them into one entry gives a cleaner changelog line and matches how the PR is presented to reviewers.
drainer.fuzz.test.ts and evaluateTrip.fuzz.test.ts are valuable as ongoing property checks but aren't load-bearing for the phase-1 review. Moving them to a follow-up keeps this PR smaller without losing coverage of the production paths (buffer.test.ts and drainer.test.ts together cover the contract surface).
The enqueueSystem.ts comment touch-up was an unrelated drive-by during phase-1 review and doesn't belong in this PR. Will land separately.
External changelog readers don't have context on internal phase numbering; describe the feature itself (opt-in burst protection, default-off env vars, shadow mode, dual-write activation) instead of "phase 1".
…iversion The previous wording implied the buffer/drainer was active protection; in this release they're audit-only. Spell out that no trigger calls are diverted or rate-limited yet, and that active smoothing follows later.
MollifierEvaluateGate and MollifierGetBuffer were defined in the consumer (triggerTask.server.ts) but described the surface of the gate and the buffer accessor respectively. Move each to the module that owns the underlying implementation so the type lives with the producer, not the caller. No behavioural change.
…-redis fallback Two operational guards for misconfigured rollouts: 1. Drop the MOLLIFIER_REDIS_* fallback to the main REDIS_* cluster. The mollifier writes to a dedicated Redis to keep burst traffic off the engine's primary queue — silently colocating with the main Redis when MOLLIFIER_REDIS_HOST is unset defeats the design. 2. Degrade gracefully instead of crashing the pod. If MOLLIFIER_ENABLED was flipped on without setting MOLLIFIER_REDIS_HOST, the buffer returns null (with a one-shot warn log per process) and the drainer no-ops. No crash loops, no failed deploys, no traffic impact — operators see the warn line and fix the misconfig in a follow-up deploy. The drainer's previously-unreachable "env vars inconsistent" throw becomes reachable in this degraded mode; replace it with a null return so worker.server.ts's existing null check short-circuits cleanly.
mollifier:envs is a Redis SET that grows with the count of envs that currently have buffered entries. Under normal operation that's small, but an extended drainer outage can leave entries piled up across thousands of envs — at which point runOnce would queue one processOneFromEnv per env through pLimit, ballooning per-tick latency and event-loop queue depth. Cap per-tick fan-out at MOLLIFIER_DRAIN_MAX_ENVS_PER_TICK (default 500). When the set fits within the cap, behaviour is unchanged (take all, rotate cursor by 1 for fairness). When the set exceeds the cap, take a rotating slice and advance the cursor by the slice size so successive ticks sweep through the full set. Tests use a stub buffer to drive listEnvs() deterministically with thousands of envs without provisioning a real Redis.
…tchQueue The MollifierDrainer's stop() was polling `isRunning` every 20ms until the loop exited, which differs from the codebase's convention for similar polling loops (FairQueue, BatchQueue both hold the loop promise as a field and await it directly on stop). Switch to the same pattern: store the loop promise on start(), then in stop() race it against the timeout via Promise.race. With no timeout we just await the loop directly. With a timeout the warn-and-return behaviour is unchanged. No polling, no separate `isRunning` poll loop. Behaviour is identical to the previous implementation, including the hung-handler timeout path (covered by the existing "stop returns after timeoutMs even if a handler is hung" test).
The previous chunking advanced the cursor by sliceSize each tick, producing fixed disjoint slices like [0..3], [4..7], [0..3], ... With that pattern env_0 was always at slice position 0 (first into pLimit) and env_3 always at position 3 (last) — reinstating the head-of-line bias rotation was meant to prevent. Advance the cursor by 1 instead. Slices now overlap across consecutive ticks (e.g. [0..3], [1..4], [2..5], ...) so every env reaches every slot position 0..sliceSize-1 across one envs.length-tick cycle. Drainage rate per env is unchanged: each env still appears in exactly sliceSize of every envs.length ticks. New regression test pins the fairness property by asserting each env touches every slot at least once per cycle.
…y envs Adds a regression test that proves a light env (single buffered entry) is drained within (envs.length - sliceSize + 1) ticks regardless of how many entries the heavy envs have queued. The test uses a stub buffer whose listEnvs/pop pair mirrors the production atomic-Lua semantic: an env disappears from listEnvs the moment its queue empties, so the light env exits the rotation as soon as it's popped — while the heavy envs stay in the rotation until their thousands of entries are drained. Together with the head-of-line fairness test this pins both fairness properties: (1) every env touches every slice slot per cycle (no within-slice bias), and (2) no env's drainage latency depends on the queue depth of other envs (no across-slice starvation).
…eckedIndexedAccess The fairness test compared popsPerTick[0][0] vs popsPerTick[1][0] directly. Under the redis-worker package's strict tsconfig (noUncheckedIndexedAccess implied), array index access returns T | undefined, which trips TS2532. Destructure into named locals and use optional chaining — same assertion, no `\!` non-null soup.
1. start() resets envCursor to 0 — new behaviour. A stop+start cycle now begins rotation cleanly from envs[0] rather than inheriting between- restart cursor drift. 2. Malformed payload → non-retryable handler error path. Pins that the deserialise failure goes terminal without invoking the handler. 3. Ack failure after handler success — documents the current behavioural gap. ack() lives inside processEntry's try, so a Redis blip on ack routes a successfully-handled entry through the retry/terminal path. Phase 2's engine-replay handler will need idempotency to absorb the re-execution, OR ack should be lifted out of the try block. 4. start() idempotency — second call is a no-op (no doubled loop). 5. stop() idempotency — safe to call when never started or twice. 6. Loop-level backoff actually grows on consecutive runOnce failures and resets on first success. Distinct from per-entry retry attempts already covered elsewhere; this is the consecutiveErrors counter that drives backoffMs between ticks. Also adds org-level fairness analogue of the existing env starvation test: a light org (1 env, 1 entry) is not starved behind a heavy org with many envs and many entries. The buffer doesn't track orgs as a separate axis, so org fairness is an emergent property of env rotation — the test pins that property explicitly.
…el fairness Previously the drainer rotated per-env: an org with N busy envs got N scheduling slots per tick. A noisy tenant with many envs would drain proportionally faster than a quiet tenant with one env. Switch to hierarchical rotation: pick orgs round-robin (capped by maxOrgsPerTick), then pick one env per picked org (also rotating). Implementation is drainer-side only — no buffer or Lua changes. The drainer caches envId→orgId from popped entries; envs not yet cached are treated as their own pseudo-org for one tick, so cold start matches the old per-env behaviour and converges to hierarchical once cache is hot (usually within one tick). Cache and cursors reset on start() alongside the existing cursor reset. API change: maxEnvsPerTick → maxOrgsPerTick on MollifierDrainerOptions, MOLLIFIER_DRAIN_MAX_ENVS_PER_TICK → MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK on the webapp env. Same default (500). Operators tune for "typical orgs with pending entries" rather than env count. Trade-off: total per-tick pops drop from O(envs) to O(orgs). For an org with N envs, each env's individual drainage rate is 1/N of what it was, but the tenant overall is bounded the same way as a single-env tenant — which is the fairness contract. Tests: - Renamed maxEnvsPerTick references throughout existing tests; old behaviour still holds at cold cache (each env = pseudo-org). - New "heavy org with many envs does not dominate vs light org" pins the post-warm-up ~1:1 drainage ratio between a 6-env org and a 1-env org over a sustained 20-tick run. - New "within an org, envs are rotated round-robin across ticks" pins the inner env cursor's behaviour for a single multi-env org. - Cursor-reset test renamed and now asserts cache+cursors all reset. Also removed an outdated test-count comment in apps/webapp/test/engine/triggerTask.test.ts that listed "four tests" when reality has moved on.
…uage)
The changeset accreted across the PR's evolution and ended up reading as
three deltas ("now survives", "is now two-level", "no longer scales").
On merge this is the introduction of the feature — there's no prior
state to contrast against. Rewrite as one cohesive description of what
ships.
…rness
Previously the drainer cached envId→orgId from popped entries and used a
sentinel pseudo-org for envs it hadn't seen yet. The sentinel polluted
the bucket map with fake org IDs and was a foreseeable source of bugs.
This commit moves org membership into the buffer's atomic Lua scripts.
New Redis keys, both maintained transactionally alongside per-env queues:
- mollifier:orgs — orgs with at least one queued env
- mollifier:org-envs:${orgId} — envs of that org with queued entries
acceptMollifierEntry SADDs into all three sets (envs + orgs + org-envs).
popAndMarkDraining cleans up envs+orgs+org-envs together when the queue
empties in the success branch (we know orgId from the popped entry). The
no-runId branch can't read orgId so it only cleans envs — stale org-envs
entries are bounded by env count and recovered on the next accept.
requeueMollifierEntry re-SADDs all three since the env may have just been
pruned.
The drainer now walks listOrgs() → listEnvsForOrg(org) → pop(env) with
two cursors: orgCursor across all active orgs and a per-org envCursor
for round-robin within each org. No client-side cache, no sentinel,
deterministic from the first tick.
Tests updated:
- multi-org-round-robin (was multi-env-round-robin): two orgs with one
and two envs respectively, asserts org_B drains its only env each
tick while org_A rotates through its two.
- concurrency-cap test spreads 12 envs across 12 orgs (otherwise one
org → one pop per tick).
- "heavy org doesn't dominate vs light org" gets explicit listOrgs /
listEnvsForOrg from the test's env→org map; assertion tightened to
0.7–1.5 ratio over 20 ticks.
- "within an org envs rotated round-robin" gets explicit listEnvsForOrg.
- "envCursor resets" → "rotation cursors reset"; cache is gone, only
orgCursor and perOrgEnvCursors reset on start().
- makeStubBuffer auto-derives listOrgs/listEnvsForOrg from listEnvs
(each env as its own org) so tests that don't care about org grouping
don't need to provide them explicitly.
24/24 drainer tests pass, 35/35 buffer tests pass (some redis-container
flakes under full-suite load; all green in isolation). Webapp typecheck
clean.
Synthesise the SpanRun shape from buffer snapshots so the run-detail page's inspector panel renders identically to a PG-resident run. SSE log stream, realtime stream resources, logs-download and debug resource fall back to the buffer instead of 404-ing. Short-URL redirects resolve buffered runs to the canonical dashboard URL. Bulk-cancel scans the buffer alongside the ClickHouse selection so runs queued mid-burst are included. Trigger response now carries the snapshot's spanId so the dashboard's Run Test redirect opens the details panel without an extra click. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… in synthetic SpanRun The cancel dialog stayed open after a successful submit because it was uncontrolled Radix state and the action redirects to the same URL — revalidation didn't trigger a re-mount. Wrap the submit button in DialogClose so the click closes the dialog at the same time the form posts. The SyntheticRun synthesised for the run-detail page hardcoded status PENDING regardless of whether the buffer snapshot had cancelledAt set. Customers cancelling a buffered run saw their run still labelled Queued until the drainer materialised it. Surface cancelledAt + cancelReason on SyntheticRun, switch the synthesised SpanRun status to CANCELED, and mirror the cancelled flag onto the single-span trace so the timeline matches PG behaviour. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Buffered runs are prepended to the runs table on the runs list page so customers see freshly-triggered work even while the gate is diverting. The merge uses a compound base64 cursor that wraps the PG presenter's own cursor — page 1 can be entirely buffered (top of the list), page 2 takes the buffered overflow and transitions into the PG content, and later pages drop the buffer scan entirely once it's been exhausted. Filter predicates (tasks, statuses, tags, period, from/to, isTest, runId) are evaluated against the buffer snapshot so the list reflects the same filter scope as the PG-side query. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The replay form loader hit `taskRun.findFirst` and threw 404 when the run was buffered, which dumps the user back to the task list. Wire a buffer fallback that synthesises the same loader return shape from the snapshot, including a project-and-environments lookup scoped by the buffer entry's orgId so the env selector renders identically. The replay action itself already supports buffered runs via the ReplayTaskRunService synthetic-run cast — only the form's preflight load was broken. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… Radix DialogClose Previous attempt wrapped the form's submit button in <DialogClose asChild> so the dialog closed on click. That race-condition'd with Remix's <Form>: Radix's Slot-attached onClick triggered onOpenChange(false), the Dialog and its child Form unmounted mid-cycle, and the button's name=value pair (carrying `redirectUrl`) was dropped from the submitted FormData. The action then read `submission.value.redirectUrl` as undefined and the resulting redirect landed on `/env/dev` instead of the run-detail page. Switch to a ControlledCancelRunDialog at the call site that owns the Radix `open` state. The inner CancelRunDialog watches the navigation state transitions and signals the parent to close the dialog once the submission has captured its submitter cleanly. Submit-button name=value is preserved; redirect resolves to the run-detail page; modal still dismisses after submit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two SDK schemas were drifting from what the mollifier paths emitted:
1. ListRunResponseItem declares `idempotencyKey: z.string().optional()`
(omit-or-string). The listing-merge synthesiser was emitting
`idempotencyKey: null` for buffered runs, which old SDK versions
reject with a validation error before surfacing the row.
2. RetrieveRunTraceResponseBody declares a non-nullable `rootSpan`
matching the recursive RetrieveRunTraceSpan shape. The buffered
branch of /api/v1/runs/{id}/trace returned `rootSpan: null` plus an
`events: []` field that isn't in the schema. Synthesise a real
partial span (task identifier as message, no children, isPartial:
true) from the buffer snapshot so the response satisfies the schema
the SDK validates against.
Verified end-to-end by calling the MCP server's list_runs and
get_run_details against a buffered run; both now succeed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A broader audit of every public API route's buffered branch found a
handful of schema-drift bugs the SDK would reject on existing clients:
- /api/v1/runs/{id}/spans/{spanId} returned `parentId: undefined`
(omitted in JSON). Schema declares `parentId: z.string().nullable()`
— present-but-null is required. Send `null` explicitly. Also reflect
the snapshot's cancelled state in `isPartial` / `isCancelled`.
- /api/v1/runs/{id}/reschedule's buffered branch returned a stripped
`{ id, delayUntil }`. The SDK's `rescheduleRun` validates against the
full `RetrieveRunResponse` shape. Route the buffered response through
the same ApiRetrieveRunPresenter the PG branch uses (which falls back
to the buffer for synthetic runs). Allows `synthesisedResponse` in
`mutateWithFallback` to be async.
- ApiRetrieveRunPresenter.synthesiseFoundRunFromBuffer ignored the
snapshot's `cancelledAt` and `delayUntil`. Status was hardcoded to
`PENDING` regardless of cancellation; `completedAt` and `delayUntil`
were always `null`. SDK callers (and the MCP cancel_run helper)
reported status as Queued after a successful cancel. Map the synthetic
status through a small switch so CANCELED, SYSTEM_FAILURE and PENDING
all surface correctly.
- Add `delayUntil` to SyntheticRun so set_delay reschedule patches
survive the next retrieve. Mirror it onto the dashboard SpanRun
synthesiser too.
Verified end-to-end by replaying every public-API method against a
buffered run.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ryable error Previously, a non-retryable engine.trigger failure during drain left the buffer entry as `status: "FAILED"` in Redis with no PG row. The customer saw the run in their SDK / dashboard listing for ~10 min (buffer TTL) then it vanished entirely — no audit trail of the failure. Billing was unaffected (no attempts ever ran) but observability was zero. Reuse the engine's existing `createFailedTaskRun` helper (the same one batch-trigger calls when an item fails to start) — writes a terminal SYSTEM_FAILURE TaskRun row with the engine.trigger error stored on `error`, no attempts, P2002-idempotent on the unique constraint. Drainer handler classifies the failure: - Retryable PG error → rethrow so MollifierDrainer.drainOne requeues - Non-retryable → createFailedTaskRun, swallow original error so the buffer entry is ack'd (PG now has the audit row) - createFailedTaskRun also fails (PG truly unreachable) → rethrow original so drainer falls through to its existing buffer.fail terminal-marker path - Snapshot too malformed to construct the environment block → rethrow (defensive — drainer falls through to buffer.fail) Tests cover each path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The runs list (API and dashboard) is eventually consistent — buffered runs were creating a sandwich problem where the head of the list could include buffered rows while in-transit rows between PG replication and ClickHouse went missing. Drop the merge so the list returns PG/ ClickHouse rows only; buffered visibility will return via a separate global status indicator. Reverts the merge wiring in api.v1.runs, api.v1.projects.$projectRef .runs, and the dashboard runs index, and deletes listingMerge.server and dashboardListingMerge.server. The MCP list_runs tool rides through the API and inherits the same behaviour. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Today the run span (the SERVER trace event keyed by runId) is created inside `traceEventConcern.traceRun`, which sits *after* the mollifier gate. When the gate diverts a trigger into the Redis buffer, the run span is therefore not written to the event store until the drainer replays the snapshot — buffered runs are invisible in the trace view, parents' trace trees miss the child until drain, and alerting pipelines can't reference the run. Hoist the gate evaluation and mollify branch inside `traceRun` so both paths open the run span. The mollify branch records mollifier attributes on the same event, captures `event.traceId`/`event.spanId` into the buffer snapshot (replacing the separately-allocated `mollifier.queued` OTel span), and returns the synthesised result. `traceRun` flushes the PARTIAL event to the store on callback return. Extend the existing call-site test to assert (a) traceRun fires before buffer.accept and (b) the snapshot's traceId/spanId match the run span's IDs. The MockTraceEventConcern now mirrors the production ClickhouseEventRepository shape so the `traceContext.traceparent` assertion exercises the seeding path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The bulk-action confirmation count is sourced from ClickHouse, so PG rows not yet replicated to ClickHouse are silently excluded from both the count and the processing pass. Phase 4's first-batch mollifier- buffer scan broke that symmetry — buffered runs were processed without being counted, so a customer confirming "Replay ~0 runs" could see N buffered runs replayed without seeing them anywhere in the UI. Restore the eventually-consistent contract: bulk actions only target runs visible to ClickHouse. Buffered runs are picked up by subsequent bulk actions once they drain into PG → ClickHouse, mirroring how PG-not-yet-CH runs already work today. Removes `bulkActionBuffer.server.ts` (helper) and its container-backed test. Will reimplement once the buffered-runs UX (global status indicator) gives the customer a way to see and confirm against the buffered set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Customers subscribing to a freshly-triggered run via useRealtimeRun silently hung when the gate diverted the run into the mollifier buffer. The route's findResource looked up the PG TaskRun by friendlyId, found nothing, and returned 404. Electric SQL's ShapeStream treats the initial 404 as terminal — no retry, no error surfaced to the hook, and crucially no recovery after the drainer eventually INSERTed the PG row. The customer's component shows the empty state indefinitely even though the run is alive and progressing. When the PG lookup misses but the buffer has the run, return a synthetic resource whose `id` is derived from the friendlyId — the same value engine.trigger will write when the drainer materialises this run. The route then opens the Electric subscription against `WHERE id='<id>'`, Electric streams an empty initial snapshot, and the SDK long-polls until the drainer's INSERT propagates through. Empirically validated end-to-end: trigger a buffered run, open the subscription, simulate the drainer's PG INSERT + UPDATE, and the SDK iterator yields the QUEUED and EXECUTING events in real time. Adds a `mollifier.realtime_subscriptions.buffered` counter and a structured log line. The observability gate fires once per cold subscription (Electric's `handle` query param is the dedup signal), not on every ~20s long-poll reconnect; that gate is unit-tested. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous commit's regression coverage was thin: only the log-dedup gate was unit-tested. The load-bearing logic — synthesise a resource when PG misses but the buffer has the run, with an `id` matching what the drainer will eventually write — had no regression test, so a future change that removed the buffered fallback would put the silent- hang back into prod without anything failing in CI. Extract the resource-resolution rules from the route's findResource into `resolveRealtimeRunResource`, a pure function. Cover the branching with unit tests (PG hit, PG hit during drain race, PG miss + buffer hit, missing taskIdentifier default, both miss) and pin the full chain with a container-backed test that uses a real MollifierBuffer + the real readFallback helper and asserts the synthesised `id` matches `RunId.fromFriendlyId(friendlyId)`. That identity is what Electric's `WHERE id='<id>'` clause depends on when the drainer eventually INSERTs the row. 12 tests total across the three Phase-5.2 suites; one empirical probe run after the refactor confirmed end-to-end behaviour unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Without an external signal that the drainer is falling behind, a stuck or offline drainer drives the buffer toward the entry-hash TTL line and runs vanish silently — no PG row, no log, no dashboard indication. Add a periodic read-only sweep over the buffer's queue ZSETs that emits a `mollifier.stale_entries` OTel counter and a structured `mollifier.stale_entry` warning log for each entry whose dwell exceeds the configured threshold. Independent of the drainer (its own gate + `TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED`) so an entirely offline drainer is exactly when the sweep is most useful. Defaults: interval 5min, threshold half of `entryTtlSeconds`, hard cap of 1000 entries per env per pass. Sweep is strictly read-only — does not remove or salvage entries. The retention-policy question (drop the entry TTL entirely vs raise it vs pre-TTL salvage) is intentionally deferred to a separate change; this commit gets the signal in place first. Tested with a real `MollifierBuffer` (testcontainers): stale entries flagged, fresh entries left alone, multi-org scan walks every queue. Manually verified end-to-end: with a 10s interval + 2s threshold, each tick logs the buffered run with growing dwellMs as expected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The mollifier.stale_entries counter from the previous commit reflects
sweep-tick events, not stable state. A single stuck entry observed
across N ticks contributes N events, so a rate() query is
proportional to (stuck-entry-count × scan-frequency), not "how many
entries are stale right now". Useful for historical views but the
wrong shape for ops alerts.
Add a companion observable gauge `mollifier.stale_entries.current`
with `{envId}` attribute. The sweep emits a per-env snapshot on each
pass (including zero counts for envs whose stale entries cleared),
and an OTel batch-observable callback exposes the latest snapshot to
the metric exporter on every scrape. Recommended alert:
mollifier_stale_entries_current{envId=...} > 0 for 5m
The snapshot replaces (not merges) so an env that paged on a
previous sweep clears when the drainer catches up, instead of
staying latched at the last stale count. Test seam captures the
snapshot to verify per-env counts and the clear-on-drain behaviour.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Captures what the metrics mean, which signal is alertable (the `stale_entries.current` gauge, not the counter), each named failure mode and its recovery flow, and Redis debug commands for poking at the buffer by hand. Mirrors the `batch-queue-metrics.md` internal-doc style. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The docs/ directory is the public Mintlify customer docs site; internal operational runbooks belong elsewhere. Move the mollifier ops manual to _ops/ alongside the _plans/ working-doc convention. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…recovery mechanism Buffer entries used to EXPIRE after entryTtlSeconds (600s dev / 1h prod). Once that window elapsed without the drainer ack'ing, the entry just vanished — no PG row, no log, no customer signal. The stale-entry sweep was added in the previous commit so ops gets paged on dwell-too-long; with that signal in place, the TTL itself is now the cause of the failure mode it was meant to mitigate. Remove it. Buffer entries persist until the drainer ACKs (with the existing 30s post-materialise grace TTL) or FAILs them. Idempotency lookup keys also lose their TTL — keeping them paired to the entry hash prevents the dedup-drift bug where a TTL'd lookup would let the same idempotency key spawn a second buffered run while the first still existed. `failMollifierEntry` now DELs the entry hash + lookup because the SYSTEM_FAILURE PG row written by the drainer is the canonical record; the buffer entry is no longer load-bearing. Knock-on changes: - `MollifierBufferOptions`: `entryTtlSeconds` removed (no consumers outside this repo). - `TRIGGER_MOLLIFIER_ENTRY_TTL_S`: removed from env.server.ts and the example .env. The stale-sweep threshold now has its own explicit default (5min) instead of "half of TTL". - `MollifierBuffer.getEntryTtlSeconds`: retained — it returns the Redis-side TTL, which is now -1 in steady state and ~30s after ack. Used by the ack-grace-TTL test. - Existing tests updated: TTL-related cases inverted to assert no TTL; FAILED-state cases inverted to assert teardown; runId-reuse-after- fail now succeeds (slot is reclaimable). Operational alert: Redis memory pressure if the drainer is offline. That's the same failure mode as Redis OOM in any other context, with existing infra-level alerts. The mollifier.stale_entries.current gauge fires first; ops should be on it long before memory becomes a problem. See _ops/mollifier-ops.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The mollifier drainer's terminal-failure path (Phase 4G) and the batch-trigger's "queue size limit exceeded" path both call createFailedTaskRun to write a SYSTEM_FAILURE PG row for runs that never actually executed. Neither path emitted runFailed afterwards, so the runEngineHandlers' `runFailed` listener never fired — which means PerformTaskRunAlertsService never enqueued an alert delivery job, and customers' configured TASK_RUN alert channels missed the failure entirely. The row was visible in the dashboard list but silent for alerting purposes. Emit runFailed from createFailedTaskRun with `attemptNumber: 0` as the marker that the run never executed (distinguishes synthesised terminal failures from runs that exhausted their retries). PerformTaskRunAlertsService doesn't filter on attemptNumber or status, so the existing pipeline picks the event up without further changes. DeliverAlertService dispatches via the channel type (email/webhook/etc) the same way it does for any other terminal failure. Test: a containerTest subscribes to runFailed before calling createFailedTaskRun, asserts exactly one event fires with the expected payload shape. The existing batchTrigger tests still pass (they didn't assert the negative). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Phase 4's audit found two Zod drifts reactively (idempotencyKey: null and parentId: undefined). This script proactively sweeps every public SDK method with a buffered branch by calling them through the real @trigger.dev/core apiClient — zodfetch's schemas execute against each response, so any drift now fails the audit. The existing mollifier-challenge shell scripts only do jq structural checks, which miss schema-level drift like null-vs-undefined or optional-vs-nullable mismatches. Covers nine methods against a fresh buffered run each (separate runs for destructive ones so they don't interfere): retrieveRun, retrieveRunTrace, retrieveSpan, listRunEvents, addTags, updateRunMetadata, replayRun, rescheduleRun, cancelRun. Manually verified against the live local webapp — all nine pass with no drift surfaced. The audit is reusable as a smoke-check before each prod rollout. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx # apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts # apps/webapp/app/routes/api.v1.runs.$runId.trace.ts # apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts
…lans Single changeset and server-changes entry for the mollifier feature instead of one per commit. _plans/ files come out of the tree (they stay on disk as untracked working notes, per convention). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…cripts, ops doc Single redis-worker changeset under its original filename. references/stress-tasks, scripts/mollifier-challenge, _ops/ come out of the tree — kept locally as working artefacts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Restore .changeset/mollifier-redis-worker-primitives.md to its original content (it was already on main from the Phase 1 scaffolding PR — should never have been touched during consolidation). Add .changeset/mollifier-buffer-extensions.md covering the post-scaffolding additions: idempotency lookup, snapshot mutation API, metadata CAS, watermark listing, claim primitives, ZSET-backed queue, ack grace TTL, drop-entry-TTL, and the @trigger.dev/core notice field. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…storical changelogs) I should not have touched this file. Restoring exactly from origin/main — zero diff against main. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Local working script. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Revert docs/realtime/react-hooks/subscribe.mdx to match main — the realtime-burst Note belongs in a separate docs pass, not in the mollifier feature PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
🦋 Changeset detectedLatest commit: 3a9bca2 The changes in this PR will be included in the next version bump. This PR includes changesets to release 32 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
WalkthroughAdds a Redis-backed “mollifier” buffer with idempotency claims, ZSET queueing, CAS metadata, and snapshot mutation. Wires a drainer handler, telemetry, and stale sweep worker. Extends env schema and server startup. Introduces PG-first with buffer fallbacks across presenters, routes, realtime, and actions (cancel, reschedule, tags, metadata, spans, trace). Adds redirect helpers and synthetic trace/span builders. Updates run engine to write cancelled/failed runs and emit events. Expands core schema (optional notice). Provides extensive tests. Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 14
🧹 Nitpick comments (3)
apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx (1)
336-380: 💤 Low valueEmpty string fallback for
traceIdmay cause downstream issues.When
buffered.traceIdisnullorundefined, the fallback to""produces a run object with an emptytraceId. This could cause issues if downstream code (e.g., trace pub/sub subscriptions, admin debug tooltips) attempts to use this value without checking for emptiness.Consider returning
nullfor the trace iftraceIdis missing, or handle the empty case explicitly in the component.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx around lines 336 - 380, The run object currently sets traceId and spanId to empty strings which can break downstream consumers; in tryMollifiedRunFallback change run.traceId and run.spanId to use null when missing (e.g., run.traceId = buffered.traceId ?? null, run.spanId = buffered.spanId ?? null) and only call/return buildSyntheticTraceForBufferedRun(buffered) if buffered.traceId exists (otherwise return trace: null) so consumers receive null instead of an empty string.apps/webapp/app/routes/api.v1.runs.$runId.tags.ts (1)
47-95: 💤 Low valueSuccess message may overcount tags on buffer path.
The
synthesisedResponseat line 85 reportsSuccessfully set ${nonEmptyTags.length} new tagsbut the Lua buffer deduplication may have filtered some tags that already existed. The PG path correctly reportsnewTags.lengthafter deduplication.This is a minor UX inconsistency rather than a functional issue—the buffer correctly deduplicates, but the message doesn't reflect actual new tags added.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/routes/api.v1.runs`.$runId.tags.ts around lines 47 - 95, The synthesisedResponse in mutateWithFallback currently reports "Successfully set ${nonEmptyTags.length} new tags" but nonEmptyTags may be deduplicated by the buffer (mutateSnapshot Lua), so change the message in the synthesisedResponse to avoid overstating actual additions—either compute a deduplicated count if you can access the snapshot in this path or change the text to "Queued ${nonEmptyTags.length} tag(s) for addition" or "Attempted to set ${nonEmptyTags.length} tag(s)" so it matches the PG path's semantics (see synthesisedResponse, nonEmptyTags, newTags, mutateWithFallback).apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts (1)
14-36: 💤 Low valueLoader returns success without verifying run exists.
The new
loaderreturns{ attempts: [] }with 200 status without checking if the run actually exists (in PG or buffer). This differs from other routes that return 404 for non-existent runs.If this is intentional for API parity (as the comment suggests), consider documenting that this endpoint always returns 200 regardless of run existence, or add a run existence check for consistency with other endpoints.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/webapp/app/routes/api.v1.runs`.$runParam.attempts.ts around lines 14 - 36, The loader currently authenticates via authenticateApiRequest and validates params with ParamsSchema but always returns { attempts: [] } — fix by checking run existence before returning 200: after ParamsSchema parsing, look up the run (reuse the project’s existing run lookup function used by other routes, e.g. the same helper used in v3 retrieve endpoints or a getRunById/getRun helper) and if not found return json({ error: "Run not found" }, { status: 404 }); only return json({ attempts: [] }, { status: 200 }) when the run exists; keep authentication and validation logic in loader unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/webapp/app/routes/api.v1.runs`.$runId.metadata.ts:
- Around line 78-86: The code uses an unsafe cast to coerce { id: env.id,
organizationId: env.organizationId } into the third parameter of
updateMetadataService.call (inside the tryCatch call), which bypasses type
checking; replace the cast by either passing the actual environment object (e.g.
auth.environment) if available, or construct and pass an object that fully
implements the expected environment/type for updateMetadataService.call (include
any required properties beyond id and organizationId), or if the correct shape
is unknown add a clear TODO and a typed interface matching the service
signature; update the call site referencing updateMetadataService.call,
targetRunId, env.id and env.organizationId to use the correct typed value
instead of as unknown as Parameters<...>.
In `@apps/webapp/app/routes/api.v1.runs`.$runParam.replay.ts:
- Around line 50-62: The buffered fallback result is unsafely cast to TaskRun
and passed into ReplayTaskRunService; add a Zod validation step to assert the
subset of fields ReplayTaskRunService actually consumes (validate the `buffered`
object returned by findRunByIdWithMollifierFallback), parse it with a schema in
apps/webapp or packages/core, and only assign to taskRun after zod.parse
succeeds (avoid using `as unknown as TaskRun`); reference the buffered variable,
the findRunByIdWithMollifierFallback call, TaskRun type, and the
ReplayTaskRunService usage so the validated shape matches what the service
expects.
In `@apps/webapp/app/routes/realtime.v1.runs`.$runId.ts:
- Around line 82-86: The metric call is recording a high-cardinality env id;
remove authentication.environment.id from metric attribution by stopping the
call pattern recordRealtimeBufferedSubscription(authentication.environment.id)
and removing envId from the logger.info dimensions for
"mollifier.realtime.buffered_subscription"; instead pass only bounded attributes
(e.g., outcome/source flags or booleans) into recordRealtimeBufferedSubscription
(or make the parameter optional) and keep authentication.environment.id and any
other sensitive IDs in plain logs only (e.g., a separate logger.debug entry),
while continuing to emit bufferedDwellMs and other low-cardinality fields.
In `@apps/webapp/app/routes/resources.runs`.$runParam.logs.download.ts:
- Around line 33-66: The buffered-run branch returns a placeholder without
verifying org membership; after calling getMollifierBuffer() and
buffer.getEntry(parsedParams.runParam) you must verify the requesting user is
allowed to view that run using the same org-membership check used in the PG path
(reuse the existing membership/authorization helper or compare the entry's
orgId/envId to the caller's orgs) and return 404/403 if not allowed before
creating the placeholder stream; ensure checks reference getMollifierBuffer,
buffer.getEntry, and parsedParams.runParam so the authorization is applied to
the buffered entry path.
In `@apps/webapp/app/routes/resources.taskruns`.$runParam.debug.ts:
- Around line 47-87: The buffered-run branch returns payloads from
getMollifierBuffer()/buffer.getEntry(runParam) without performing the same
organization membership check done for the PG path; fix by, after you get entry
and before returning typedjson, load the run's organization and verify the
requesting user belongs to that org (use prisma with entry.runId to fetch the
run/org and reuse the same org-membership check used in the PG path) — if
membership fails, throw a 404/unauthorized the same way the PG path does; ensure
you import prisma and call the identical membership helper used elsewhere so
authorization semantics match.
In `@apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts`:
- Around line 10-19: Update isRetryablePgError to explicitly detect Prisma P1001
by checking both err.code and err.errorCode for "P1001" in addition to existing
message checks; inside the function (isRetryablePgError) treat either (err as {
code?: string }).code === "P1001" or (err as { errorCode?: string }).errorCode
=== "P1001" as retryable so PrismaClientKnownRequestError and
PrismaClientInitializationError variants are both covered.
In `@apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts`:
- Around line 32-34: Remove the high-cardinality envId label from OTEL metric
calls: update recordRealtimeBufferedSubscription (which currently calls
realtimeBufferedSubscriptionsCounter.add(1, { envId })), recordStaleEntry
(similar add call), and the observable callback used to create staleEntriesGauge
so none pass { envId } as an attribute; instead emit metrics without envId and
then update any related alerts/dashboards (e.g.,
mollifier_stale_entries_current) that depended on the envId label to use a
supported grouping or aggregate instead.
In `@apps/webapp/app/v3/mollifier/readFallback.server.ts`:
- Around line 137-143: The code currently parses snapshot.cancelledAt into
cancelledAt via asString(...) then new Date(...) and treats any truthy Date
object as a cancel marker, which will mis-classify "Invalid Date" as canceled;
update the logic in the block around cancelledAtRaw/cancelledAt (symbols:
cancelledAtRaw, cancelledAt, asString, snapshot.cancelledAt, status,
SyntheticRun) to validate the Date after construction (e.g., check
isNaN(cancelledAt.getTime()) or an isValidDate helper) and set cancelledAt to
undefined when invalid, then only set status = "CANCELED" and include
cancelledAt in outputs when the date is valid; apply the same validation pattern
for other parsed dates mentioned (lines ~146-147).
In `@apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts`:
- Around line 63-84: Replace the ad-hoc casting/typeof checks in
findBufferedRunRedirectInfo with a Zod-backed validation: define a Zod schema
for the snapshot shape (including environment object with project and
organization objects each having slug: string, and an optional spanId), call
schema.safeParse(deserialiseSnapshot(entry.payload)), and if parsing fails log
the same warning via logger.warn (include runFriendlyId and the parse error) and
return null; on success read environment, project, organization, slug fields and
optional spanId from the parsed data instead of using as Record<string, unknown>
and manual typeof checks.
In `@apps/webapp/app/v3/services/resetIdempotencyKey.server.ts`:
- Around line 36-54: The current logic treats a buffer.resetIdempotency()
failure as a harmless miss and can return a 404 when pgCount === 0, hiding a
partial outage; change the flow in resetIdempotencyKey.server (the
buffer.resetIdempotency() catch/assignment that sets bufferResult and the later
totalCount/ServiceValidationError throw) so that if buffer.resetIdempotency()
failed (detectable via the catch branch that logs the error and returns {
clearedRunId: null } or by adding an explicit error flag on bufferResult), you
do not convert that into a Not Found; instead surface an outage error (e.g.
throw a ServiceUnavailableError or rethrow the original error) when pgCount ===
0 and the buffer operation failed, and keep the existing 404 only when both
pgCount === 0 and the buffer reset explicitly succeeded with no clearedRunId.
In `@apps/webapp/test/mollifierStaleSweep.test.ts`:
- Around line 70-77: The test comment and name for the MollifierBuffer
stale-sweep case are incorrect: the setup (use of futureNow) actually makes all
three entries stale, but the description says one entry is fresh and skipped;
update the test name/comment to match the actual behavior (three stale entries
flagged) or change the setup to keep one truly fresh. Locate the test function
and surrounding text referencing "flags entries whose dwell exceeds the stale
threshold and skips fresh ones" and the local variable futureNow, then either
rename the test to indicate "three stale entries" and adjust comments near the
assertions (around the assertions at lines referenced by assertions on expected
stale counts) or modify the timestamp setup so one entry remains fresh; ensure
references to MollifierBuffer and the sweep assertions (the checks around lines
asserting two vs three stale entries) are consistent.
In `@internal-packages/run-engine/src/engine/index.ts`:
- Around line 593-603: The idempotency path in createCancelledRun currently
returns any existing row found by prisma.taskRun.findFirst without verifying its
state; change the logic after retrieving existing to verify the run's status is
actually canceled (e.g., existing.status === 'CANCELED' or matches
TaskRunStatus.CANCELED) and only return the existing row if that check passes;
otherwise log a mismatch (include snapshot.friendlyId) and surface an error or
conflict instead of silently treating the operation as successful so
cancellation semantics are enforced.
In `@packages/redis-worker/src/mollifier/buffer.ts`:
- Around line 57-61: The code uses a shared constant IDEMPOTENCY_CLAIM_PENDING
and performs unconditional DEL which allows a stale claimant to delete a new
owner’s claim; change the claim model to generate a unique per-claim ownership
token (e.g., random UUID) when creating the claim (use makeIdempotencyClaimKey
for the key and store the token as the value), return/store that token with the
claim owner, and replace unconditional DEL with a conditional release that only
deletes if the stored value matches the owner’s token (implement via a Redis
EVAL/Lua script or Redis GET+WATCH/MULTI to compare-and-delete). Update all
claim acquire/release call sites to set the token on claim creation and pass it
to the conditional release so only the rightful owner can remove its claim.
- Around line 46-48: The Redis key builders (e.g., makeIdempotencyLookupKey and
other key-construction sites in this file that concatenate segments with ':')
are vulnerable to collisions when segments contain ':'; update these functions
to encode each variable segment (taskIdentifier, idempotencyKey, etc.) before
concatenation (use a safe reversible encoding such as encodeURIComponent or
base64url) so the colon separators remain unambiguous, and decode where needed
when reading values back; ensure you apply the same encoding consistently across
all key-builder functions in this file.
---
Nitpick comments:
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx:
- Around line 336-380: The run object currently sets traceId and spanId to empty
strings which can break downstream consumers; in tryMollifiedRunFallback change
run.traceId and run.spanId to use null when missing (e.g., run.traceId =
buffered.traceId ?? null, run.spanId = buffered.spanId ?? null) and only
call/return buildSyntheticTraceForBufferedRun(buffered) if buffered.traceId
exists (otherwise return trace: null) so consumers receive null instead of an
empty string.
In `@apps/webapp/app/routes/api.v1.runs`.$runId.tags.ts:
- Around line 47-95: The synthesisedResponse in mutateWithFallback currently
reports "Successfully set ${nonEmptyTags.length} new tags" but nonEmptyTags may
be deduplicated by the buffer (mutateSnapshot Lua), so change the message in the
synthesisedResponse to avoid overstating actual additions—either compute a
deduplicated count if you can access the snapshot in this path or change the
text to "Queued ${nonEmptyTags.length} tag(s) for addition" or "Attempted to set
${nonEmptyTags.length} tag(s)" so it matches the PG path's semantics (see
synthesisedResponse, nonEmptyTags, newTags, mutateWithFallback).
In `@apps/webapp/app/routes/api.v1.runs`.$runParam.attempts.ts:
- Around line 14-36: The loader currently authenticates via
authenticateApiRequest and validates params with ParamsSchema but always returns
{ attempts: [] } — fix by checking run existence before returning 200: after
ParamsSchema parsing, look up the run (reuse the project’s existing run lookup
function used by other routes, e.g. the same helper used in v3 retrieve
endpoints or a getRunById/getRun helper) and if not found return json({ error:
"Run not found" }, { status: 404 }); only return json({ attempts: [] }, {
status: 200 }) when the run exists; keep authentication and validation logic in
loader unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 0350dbbf-9d81-4f1a-a3fb-242aeb0b7848
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (81)
.changeset/mollifier-buffer-extensions.md.gitignore.server-changes/mollifier.mdapps/webapp/app/components/runs/v3/CancelRunDialog.tsxapps/webapp/app/entry.server.tsxapps/webapp/app/env.server.tsapps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.tsapps/webapp/app/presenters/v3/ApiRunListPresenter.server.tsapps/webapp/app/presenters/v3/RunStreamPresenter.server.tsapps/webapp/app/routes/@.runs.$runParam.tsapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsxapps/webapp/app/routes/api.v1.runs.$runId.metadata.tsapps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.tsapps/webapp/app/routes/api.v1.runs.$runId.tags.tsapps/webapp/app/routes/api.v1.runs.$runId.trace.tsapps/webapp/app/routes/api.v1.runs.$runParam.attempts.tsapps/webapp/app/routes/api.v1.runs.$runParam.replay.tsapps/webapp/app/routes/api.v1.runs.$runParam.reschedule.tsapps/webapp/app/routes/api.v1.runs.tsapps/webapp/app/routes/api.v1.tasks.$taskId.trigger.tsapps/webapp/app/routes/api.v2.runs.$runParam.cancel.tsapps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.tsapps/webapp/app/routes/realtime.v1.runs.$runId.tsapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsxapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.tsapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.tsapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.tsapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsxapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsxapps/webapp/app/routes/resources.runs.$runParam.logs.download.tsapps/webapp/app/routes/resources.taskruns.$runParam.cancel.tsapps/webapp/app/routes/resources.taskruns.$runParam.debug.tsapps/webapp/app/routes/resources.taskruns.$runParam.replay.tsapps/webapp/app/routes/runs.$runParam.tsapps/webapp/app/runEngine/concerns/idempotencyKeys.server.tsapps/webapp/app/runEngine/services/triggerTask.server.tsapps/webapp/app/v3/mollifier/applyMetadataMutation.server.tsapps/webapp/app/v3/mollifier/idempotencyClaim.server.tsapps/webapp/app/v3/mollifier/mollifierBuffer.server.tsapps/webapp/app/v3/mollifier/mollifierDrainer.server.tsapps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.tsapps/webapp/app/v3/mollifier/mollifierGate.server.tsapps/webapp/app/v3/mollifier/mollifierMollify.server.tsapps/webapp/app/v3/mollifier/mollifierSnapshot.server.tsapps/webapp/app/v3/mollifier/mollifierStaleSweep.server.tsapps/webapp/app/v3/mollifier/mollifierTelemetry.server.tsapps/webapp/app/v3/mollifier/mutateWithFallback.server.tsapps/webapp/app/v3/mollifier/readFallback.server.tsapps/webapp/app/v3/mollifier/realtimeRunResource.server.tsapps/webapp/app/v3/mollifier/resolveRunForMutation.server.tsapps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.tsapps/webapp/app/v3/mollifier/syntheticSpanRun.server.tsapps/webapp/app/v3/mollifier/syntheticTrace.server.tsapps/webapp/app/v3/mollifierStaleSweepWorker.server.tsapps/webapp/app/v3/services/resetIdempotencyKey.server.tsapps/webapp/seed.mtsapps/webapp/test/engine/triggerTask.test.tsapps/webapp/test/mollifierApplyMetadataMutation.test.tsapps/webapp/test/mollifierDrainerHandler.test.tsapps/webapp/test/mollifierGate.test.tsapps/webapp/test/mollifierIdempotencyClaim.test.tsapps/webapp/test/mollifierMollify.test.tsapps/webapp/test/mollifierMutateWithFallback.test.tsapps/webapp/test/mollifierReadFallback.test.tsapps/webapp/test/mollifierRealtimeRunResource.test.tsapps/webapp/test/mollifierRealtimeRunResourceBuffer.test.tsapps/webapp/test/mollifierRealtimeSubscription.test.tsapps/webapp/test/mollifierResolveRunForMutation.test.tsapps/webapp/test/mollifierStaleSweep.test.tsapps/webapp/test/mollifierSyntheticRedirectInfo.test.tsapps/webapp/test/mollifierSyntheticSpanRun.test.tsapps/webapp/test/mollifierTripEvaluator.test.tsinternal-packages/run-engine/src/engine/index.tsinternal-packages/run-engine/src/engine/tests/createCancelledRun.test.tsinternal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.tspackages/core/src/v3/schemas/api.tspackages/redis-worker/src/mollifier/buffer.test.tspackages/redis-worker/src/mollifier/buffer.tspackages/redis-worker/src/mollifier/drainer.test.tspackages/redis-worker/src/mollifier/index.tspackages/redis-worker/src/mollifier/schemas.ts
💤 Files with no reviewable changes (2)
- apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts
- apps/webapp/app/routes/api.v1.runs.ts
| const [error] = await tryCatch( | ||
| updateMetadataService.call( | ||
| targetRunId, | ||
| { operations }, | ||
| { id: env.id, organizationId: env.organizationId } as unknown as Parameters< | ||
| typeof updateMetadataService.call | ||
| >[2] | ||
| ) | ||
| ); |
There was a problem hiding this comment.
Unsafe type assertion bypasses type checking.
The as unknown as Parameters<...> cast at lines 82-85 silences the type checker entirely. If updateMetadataService.call expects additional properties on the environment object, this will fail silently at runtime.
Consider either:
- Passing the full
auth.environmentif available in scope - Constructing an object that actually satisfies the expected type
- Adding a TODO to properly type this when the service interface is clarified
Suggested improvement
- const [error] = await tryCatch(
- updateMetadataService.call(
- targetRunId,
- { operations },
- { id: env.id, organizationId: env.organizationId } as unknown as Parameters<
- typeof updateMetadataService.call
- >[2]
- )
- );
+ // TODO: Ensure env shape matches service expectation or refactor service to accept minimal shape
+ const [error] = await tryCatch(
+ updateMetadataService.call(targetRunId, { operations }, env as Parameters<
+ typeof updateMetadataService.call
+ >[2])
+ );🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/routes/api.v1.runs`.$runId.metadata.ts around lines 78 - 86,
The code uses an unsafe cast to coerce { id: env.id, organizationId:
env.organizationId } into the third parameter of updateMetadataService.call
(inside the tryCatch call), which bypasses type checking; replace the cast by
either passing the actual environment object (e.g. auth.environment) if
available, or construct and pass an object that fully implements the expected
environment/type for updateMetadataService.call (include any required properties
beyond id and organizationId), or if the correct shape is unknown add a clear
TODO and a typed interface matching the service signature; update the call site
referencing updateMetadataService.call, targetRunId, env.id and
env.organizationId to use the correct typed value instead of as unknown as
Parameters<...>.
| // Buffered fallback (Q2). The SyntheticRun shape was extended in | ||
| // Phase B4 to carry every field ReplayTaskRunService reads from a | ||
| // TaskRun. Cast through unknown — the synthesised object has the | ||
| // same field surface as a real PG row from the service's | ||
| // perspective. | ||
| const buffered = await findRunByIdWithMollifierFallback({ | ||
| runId: runParam, | ||
| environmentId: env.id, | ||
| organizationId: env.organizationId, | ||
| }); | ||
| if (buffered) { | ||
| taskRun = buffered as unknown as TaskRun; | ||
| } |
There was a problem hiding this comment.
Validate buffered fallback payload before treating it as TaskRun.
This branch trusts a synthetic/buffered object via as unknown as TaskRun and sends it directly into ReplayTaskRunService. Add a zod parse at this boundary (for the fields replay actually needs) before assignment to avoid runtime shape mismatches from buffer snapshots.
As per coding guidelines: "Use zod for validation in packages/core and apps/webapp".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/routes/api.v1.runs`.$runParam.replay.ts around lines 50 - 62,
The buffered fallback result is unsafely cast to TaskRun and passed into
ReplayTaskRunService; add a Zod validation step to assert the subset of fields
ReplayTaskRunService actually consumes (validate the `buffered` object returned
by findRunByIdWithMollifierFallback), parse it with a schema in apps/webapp or
packages/core, and only assign to taskRun after zod.parse succeeds (avoid using
`as unknown as TaskRun`); reference the buffered variable, the
findRunByIdWithMollifierFallback call, TaskRun type, and the
ReplayTaskRunService usage so the validated shape matches what the service
expects.
| recordRealtimeBufferedSubscription(authentication.environment.id); | ||
| logger.info("mollifier.realtime.buffered_subscription", { | ||
| runId: run.friendlyId, | ||
| envId: authentication.environment.id, | ||
| bufferDwellMs: bufferedDwellMs, |
There was a problem hiding this comment.
Remove envId from metric dimensions for buffered subscription telemetry.
recordRealtimeBufferedSubscription(authentication.environment.id) encodes a high-cardinality identifier in metric attribution. Replace with bounded attributes (e.g., outcome/source flags) and keep env-specific details in logs only.
As per coding guidelines: "When creating or editing OTEL metrics ... Do not use high-cardinality attributes ... such as UUIDs/IDs (envId, userId, runId, ...)".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/routes/realtime.v1.runs`.$runId.ts around lines 82 - 86, The
metric call is recording a high-cardinality env id; remove
authentication.environment.id from metric attribution by stopping the call
pattern recordRealtimeBufferedSubscription(authentication.environment.id) and
removing envId from the logger.info dimensions for
"mollifier.realtime.buffered_subscription"; instead pass only bounded attributes
(e.g., outcome/source flags or booleans) into recordRealtimeBufferedSubscription
(or make the parameter optional) and keep authentication.environment.id and any
other sensitive IDs in plain logs only (e.g., a separate logger.debug entry),
while continuing to emit bufferedDwellMs and other low-cardinality fields.
| if (!run || !run.organizationId) { | ||
| // Buffered run has no events to package yet. Return a small gzipped | ||
| // placeholder file so the dashboard's "Download logs" button doesn't | ||
| // 404 mid-burst. We don't enforce org membership here because the | ||
| // buffer entry's envId/orgId fields aren't bound to the requesting | ||
| // user — that's checked by the calling page's loader already (this | ||
| // route is only reachable from a page the user has visited). | ||
| const buffer = getMollifierBuffer(); | ||
| if (buffer) { | ||
| try { | ||
| const entry = await buffer.getEntry(parsedParams.runParam); | ||
| if (entry) { | ||
| const placeholder = new Readable({ | ||
| read() { | ||
| this.push( | ||
| "# This run has not started yet. Logs will be available once it begins executing.\n" | ||
| ); | ||
| this.push(null); | ||
| }, | ||
| }); | ||
| const compressed = placeholder.pipe(createGzip()); | ||
| return new Response(compressed as any, { | ||
| status: 200, | ||
| headers: { | ||
| "Content-Type": "application/octet-stream", | ||
| "Content-Disposition": `attachment; filename="${parsedParams.runParam}.log"`, | ||
| "Content-Encoding": "gzip", | ||
| }, | ||
| }); | ||
| } | ||
| } catch { | ||
| // fall through to 404 on buffer error | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing authorization check for buffered run path.
The PG path (lines 18-31) verifies org membership, but the buffer fallback path (lines 40-62) only checks if a buffer entry exists. Any authenticated user who knows a runParam could hit this endpoint for buffered runs belonging to other organizations. The comment acknowledges skipping auth but the reasoning ("checked by calling page's loader") doesn't hold for direct URL access.
Consider adding org membership verification before returning the placeholder:
🔒 Proposed fix
const buffer = getMollifierBuffer();
if (buffer) {
try {
const entry = await buffer.getEntry(parsedParams.runParam);
if (entry) {
+ // Verify user has access to this org
+ const member = await prisma.orgMember.findFirst({
+ where: { userId: user.id, organizationId: entry.orgId },
+ select: { id: true },
+ });
+ if (!member) {
+ return new Response("Not found", { status: 404 });
+ }
const placeholder = new Readable({🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/routes/resources.runs`.$runParam.logs.download.ts around
lines 33 - 66, The buffered-run branch returns a placeholder without verifying
org membership; after calling getMollifierBuffer() and
buffer.getEntry(parsedParams.runParam) you must verify the requesting user is
allowed to view that run using the same org-membership check used in the PG path
(reuse the existing membership/authorization helper or compare the entry's
orgId/envId to the caller's orgs) and return 404/403 if not allowed before
creating the placeholder stream; ensure checks reference getMollifierBuffer,
buffer.getEntry, and parsedParams.runParam so the authorization is applied to
the buffered entry path.
| if (!run) { | ||
| // Buffered run isn't on a queue yet (it sits in the mollifier buffer | ||
| // until the drainer materialises it), so the queue-concurrency fields | ||
| // don't apply. Return a minimal "buffered" debug payload from the | ||
| // snapshot so the Debug panel can show *something* instead of 404'ing. | ||
| const buffer = getMollifierBuffer(); | ||
| if (buffer) { | ||
| try { | ||
| const entry = await buffer.getEntry(runParam); | ||
| if (entry) { | ||
| const snapshot = deserialiseSnapshot<{ | ||
| taskIdentifier?: string; | ||
| queue?: string; | ||
| concurrencyKey?: string; | ||
| }>(entry.payload); | ||
| return typedjson({ | ||
| engine: "V2" as const, | ||
| buffered: true, | ||
| run: { | ||
| id: entry.runId, | ||
| engine: "V2" as const, | ||
| friendlyId: entry.runId, | ||
| queue: snapshot.queue ?? null, | ||
| concurrencyKey: snapshot.concurrencyKey ?? null, | ||
| queueTimestamp: entry.createdAt, | ||
| runtimeEnvironment: null, | ||
| }, | ||
| queueConcurrencyLimit: undefined, | ||
| envConcurrencyLimit: undefined, | ||
| queueCurrentConcurrency: undefined, | ||
| envCurrentConcurrency: undefined, | ||
| queueReserveConcurrency: undefined, | ||
| envReserveConcurrency: undefined, | ||
| keys: [], | ||
| }); | ||
| } | ||
| } catch { | ||
| // fall through to 404 on buffer error | ||
| } | ||
| } | ||
| throw new Response("Not Found", { status: 404 }); |
There was a problem hiding this comment.
Missing authorization check for buffered run path.
Similar to the logs download route, the PG path (line 20) verifies org membership, but the buffer fallback (lines 52-82) only checks if a buffer entry exists without verifying that the requesting user belongs to the buffered run's organization.
🔒 Proposed fix
const buffer = getMollifierBuffer();
if (buffer) {
try {
const entry = await buffer.getEntry(runParam);
if (entry) {
+ // Verify user has access to this org
+ const member = await prisma.orgMember.findFirst({
+ where: { userId, organizationId: entry.orgId },
+ select: { id: true },
+ });
+ if (!member) {
+ throw new Response("Not Found", { status: 404 });
+ }
const snapshot = deserialiseSnapshot<{Note: You'll need to add prisma import: import { prisma } from "~/db.server";
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/routes/resources.taskruns`.$runParam.debug.ts around lines 47
- 87, The buffered-run branch returns payloads from
getMollifierBuffer()/buffer.getEntry(runParam) without performing the same
organization membership check done for the PG path; fix by, after you get entry
and before returning typedjson, load the run's organization and verify the
requesting user belongs to that org (use prisma with entry.runId to fetch the
run/org and reuse the same org-membership check used in the PG path) — if
membership fails, throw a 404/unauthorized the same way the PG path does; ensure
you import prisma and call the identical membership helper used elsewhere so
authorization semantics match.
| .catch((err) => { | ||
| // Buffer outage shouldn't 500 the reset endpoint if PG | ||
| // already cleared something. Log and treat as a miss. | ||
| logger.error("ResetIdempotencyKeyService: buffer reset failed", { | ||
| idempotencyKey, | ||
| taskIdentifier, | ||
| err: err instanceof Error ? err.message : String(err), | ||
| }); | ||
| return { clearedRunId: null }; | ||
| }) | ||
| : { clearedRunId: null }; | ||
|
|
||
| const totalCount = pgCount + (bufferResult.clearedRunId ? 1 : 0); | ||
|
|
||
| if (totalCount === 0) { | ||
| throw new ServiceValidationError( | ||
| `No runs found with idempotency key: ${idempotencyKey} and task: ${taskIdentifier}`, | ||
| 404 | ||
| ); |
There was a problem hiding this comment.
Avoid returning 404 when the buffer reset path is unavailable.
If PG clears nothing and buffer.resetIdempotency() fails, the current fallback turns this into a false “not found” (Line 50). That hides a partial outage and can leave a buffered key effectively uncleared while the caller gets 404.
Suggested fix
+ let bufferResetFailed = false;
const bufferResult = buffer
? await buffer
.resetIdempotency({
envId: authenticatedEnv.id,
taskIdentifier,
idempotencyKey,
})
.catch((err) => {
+ bufferResetFailed = true;
logger.error("ResetIdempotencyKeyService: buffer reset failed", {
idempotencyKey,
taskIdentifier,
err: err instanceof Error ? err.message : String(err),
});
return { clearedRunId: null };
})
: { clearedRunId: null };
const totalCount = pgCount + (bufferResult.clearedRunId ? 1 : 0);
+ if (pgCount === 0 && bufferResetFailed) {
+ throw new ServiceValidationError(
+ "Unable to verify buffered idempotency state right now; please retry",
+ 503
+ );
+ }
+
if (totalCount === 0) {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/app/v3/services/resetIdempotencyKey.server.ts` around lines 36 -
54, The current logic treats a buffer.resetIdempotency() failure as a harmless
miss and can return a 404 when pgCount === 0, hiding a partial outage; change
the flow in resetIdempotencyKey.server (the buffer.resetIdempotency()
catch/assignment that sets bufferResult and the later
totalCount/ServiceValidationError throw) so that if buffer.resetIdempotency()
failed (detectable via the catch branch that logs the error and returns {
clearedRunId: null } or by adding an explicit error flag on bufferResult), you
do not convert that into a Not Found; instead surface an outage error (e.g.
throw a ServiceUnavailableError or rethrow the original error) when pgCount ===
0 and the buffer operation failed, and keep the existing 404 only when both
pgCount === 0 and the buffer reset explicitly succeeded with no clearedRunId.
| "flags entries whose dwell exceeds the stale threshold and skips fresh ones", | ||
| async ({ redisOptions }) => { | ||
| const buffer = new MollifierBuffer({ redisOptions }); | ||
| try { | ||
| // Two stale entries (one in each env) + one fresh entry. Sweep | ||
| // should flag the two stale, leave the fresh one alone, record | ||
| // the counter once per stale entry, and emit a warning per | ||
| // stale entry with the dwell + threshold. |
There was a problem hiding this comment.
Test name/comment mismatch with actual stale-count scenario.
Line 70 and Line 74-77 say one entry is fresh and skipped, but the setup plus futureNow makes all three entries stale (as asserted on Line 113 and Line 116-119). Please rename/update comments so the test intent matches behavior.
Suggested edit
- "flags entries whose dwell exceeds the stale threshold and skips fresh ones",
+ "flags entries whose dwell exceeds the stale threshold",
@@
- // Two stale entries (one in each env) + one fresh entry. Sweep
- // should flag the two stale, leave the fresh one alone, record
- // the counter once per stale entry, and emit a warning per
- // stale entry with the dwell + threshold.
+ // Three entries are accepted, then `now` is advanced by 5 minutes.
+ // With a 1-minute threshold, all three should be flagged stale and
+ // each should emit one counter tick + one warning.
@@
- // All three entries have dwell ~5min, all exceed the 1-min
- // threshold; each emits one counter tick + one warning.
+ // All three entries exceed the threshold; each emits one counter
+ // tick + one warning.Also applies to: 114-116
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/webapp/test/mollifierStaleSweep.test.ts` around lines 70 - 77, The test
comment and name for the MollifierBuffer stale-sweep case are incorrect: the
setup (use of futureNow) actually makes all three entries stale, but the
description says one entry is fresh and skipped; update the test name/comment to
match the actual behavior (three stale entries flagged) or change the setup to
keep one truly fresh. Locate the test function and surrounding text referencing
"flags entries whose dwell exceeds the stale threshold and skips fresh ones" and
the local variable futureNow, then either rename the test to indicate "three
stale entries" and adjust comments near the assertions (around the assertions at
lines referenced by assertions on expected stale counts) or modify the timestamp
setup so one entry remains fresh; ensure references to MollifierBuffer and the
sweep assertions (the checks around lines asserting two vs three stale entries)
are consistent.
| if ( | ||
| err instanceof Prisma.PrismaClientKnownRequestError && | ||
| err.code === "P2002" | ||
| ) { | ||
| this.logger.info( | ||
| "createCancelledRun: row already exists, returning existing (idempotent)", | ||
| { friendlyId: snapshot.friendlyId }, | ||
| ); | ||
| const existing = await prisma.taskRun.findFirst({ where: { id } }); | ||
| if (existing) return existing; | ||
| } |
There was a problem hiding this comment.
P2002 idempotency path should verify existing row is actually canceled.
Line 601-603 returns any existing row for the same id. If that row is not CANCELED, createCancelledRun() can report success without enforcing cancellation semantics.
Suggested guard
if (
err instanceof Prisma.PrismaClientKnownRequestError &&
err.code === "P2002"
) {
@@
const existing = await prisma.taskRun.findFirst({ where: { id } });
- if (existing) return existing;
+ if (existing) {
+ if (existing.status === "CANCELED") {
+ return existing;
+ }
+
+ throw new Error(
+ `createCancelledRun conflict: existing run ${snapshot.friendlyId} has status ${existing.status}`
+ );
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if ( | |
| err instanceof Prisma.PrismaClientKnownRequestError && | |
| err.code === "P2002" | |
| ) { | |
| this.logger.info( | |
| "createCancelledRun: row already exists, returning existing (idempotent)", | |
| { friendlyId: snapshot.friendlyId }, | |
| ); | |
| const existing = await prisma.taskRun.findFirst({ where: { id } }); | |
| if (existing) return existing; | |
| } | |
| if ( | |
| err instanceof Prisma.PrismaClientKnownRequestError && | |
| err.code === "P2002" | |
| ) { | |
| this.logger.info( | |
| "createCancelledRun: row already exists, returning existing (idempotent)", | |
| { friendlyId: snapshot.friendlyId }, | |
| ); | |
| const existing = await prisma.taskRun.findFirst({ where: { id } }); | |
| if (existing) { | |
| if (existing.status === "CANCELED") { | |
| return existing; | |
| } | |
| throw new Error( | |
| `createCancelledRun conflict: existing run ${snapshot.friendlyId} has status ${existing.status}` | |
| ); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal-packages/run-engine/src/engine/index.ts` around lines 593 - 603, The
idempotency path in createCancelledRun currently returns any existing row found
by prisma.taskRun.findFirst without verifying its state; change the logic after
retrieving existing to verify the run's status is actually canceled (e.g.,
existing.status === 'CANCELED' or matches TaskRunStatus.CANCELED) and only
return the existing row if that check passes; otherwise log a mismatch (include
snapshot.friendlyId) and surface an error or conflict instead of silently
treating the operation as successful so cancellation semantics are enforced.
| function makeIdempotencyLookupKey(input: IdempotencyLookupInput): string { | ||
| return `mollifier:idempotency:${input.envId}:${input.taskIdentifier}:${input.idempotencyKey}`; | ||
| } |
There was a problem hiding this comment.
Encode composite key segments to prevent idempotency key alias collisions.
Line 47 and Line 60 build Redis keys by raw : concatenation. If taskIdentifier and/or idempotencyKey contain :, different tuples can map to the same Redis key and dedupe the wrong run.
💡 Suggested fix
+function encodeKeyPart(value: string): string {
+ return Buffer.from(value, "utf8").toString("base64url");
+}
+
function makeIdempotencyLookupKey(input: IdempotencyLookupInput): string {
- return `mollifier:idempotency:${input.envId}:${input.taskIdentifier}:${input.idempotencyKey}`;
+ return `mollifier:idempotency:${encodeKeyPart(input.envId)}:${encodeKeyPart(input.taskIdentifier)}:${encodeKeyPart(input.idempotencyKey)}`;
}
@@
function makeIdempotencyClaimKey(input: IdempotencyLookupInput): string {
- return `mollifier:claim:${input.envId}:${input.taskIdentifier}:${input.idempotencyKey}`;
+ return `mollifier:claim:${encodeKeyPart(input.envId)}:${encodeKeyPart(input.taskIdentifier)}:${encodeKeyPart(input.idempotencyKey)}`;
}Also applies to: 59-61
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/redis-worker/src/mollifier/buffer.ts` around lines 46 - 48, The
Redis key builders (e.g., makeIdempotencyLookupKey and other key-construction
sites in this file that concatenate segments with ':') are vulnerable to
collisions when segments contain ':'; update these functions to encode each
variable segment (taskIdentifier, idempotencyKey, etc.) before concatenation
(use a safe reversible encoding such as encodeURIComponent or base64url) so the
colon separators remain unambiguous, and decode where needed when reading values
back; ensure you apply the same encoding consistently across all key-builder
functions in this file.
| export const IDEMPOTENCY_CLAIM_PENDING = "pending"; | ||
|
|
||
| function makeIdempotencyClaimKey(input: IdempotencyLookupInput): string { | ||
| return `mollifier:claim:${input.envId}:${input.taskIdentifier}:${input.idempotencyKey}`; | ||
| } |
There was a problem hiding this comment.
Claim ownership is not protected; late release can erase another claimant’s state.
Line 57 uses a shared "pending" marker and Line 403 performs unconditional DEL. If ownership changes after TTL expiry, an old caller can delete a new resolved claim, reopening duplicate execution paths.
Also applies to: 359-387, 399-404, 719-737
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/redis-worker/src/mollifier/buffer.ts` around lines 57 - 61, The code
uses a shared constant IDEMPOTENCY_CLAIM_PENDING and performs unconditional DEL
which allows a stale claimant to delete a new owner’s claim; change the claim
model to generate a unique per-claim ownership token (e.g., random UUID) when
creating the claim (use makeIdempotencyClaimKey for the key and store the token
as the value), return/store that token with the claim owner, and replace
unconditional DEL with a conditional release that only deletes if the stored
value matches the owner’s token (implement via a Redis EVAL/Lua script or Redis
GET+WATCH/MULTI to compare-and-delete). Update all claim acquire/release call
sites to set the token on claim creation and pass it to the conditional release
so only the rightful owner can remove its claim.
Summary
The mollifier sits in front of
engine.triggerand diverts trigger storms into a Redis buffer when the per-env trigger rate exceeds a configurable threshold. A background drainer materialises buffered entries back into Postgres at a controlled rate. The customer'strigger()call still returns a validrunIdsynchronously; the run materialises in PG within a sub-second window in healthy operation.The gate is a Lua sliding-window rate check (per-env). When tripped, it holds for
TRIGGER_MOLLIFIER_HOLD_MSand every trigger during the hold goes to the buffer. The drainer is a fair round-robin over orgs → envs with configurable concurrency. Every read and mutate API that touches a run (retrieve, cancel, replay, reschedule, addTags, updateMetadata, realtime stream, etc.) gains a buffered-source branch so the buffered window is invisible to the customer — same response shape, same Zod schema, same dashboard rendering.Default:
TRIGGER_MOLLIFIER_ENABLED=0. Off completely unless explicitly opted in.Design
The gate (
apps/webapp/app/v3/mollifier/mollifierGate.server.ts): per-env sliding window, Lua-atomic INCR + tripped-flag SETEX. Configured byTRIGGER_MOLLIFIER_TRIP_WINDOW_MS,TRIGGER_MOLLIFIER_TRIP_THRESHOLD,TRIGGER_MOLLIFIER_HOLD_MS. Globally short-circuited byTRIGGER_MOLLIFIER_ENABLEDbefore theevaluateGatecall so a non-mollifier deployment pays nothing per trigger.The buffer (
packages/redis-worker/src/mollifier/buffer.ts): Redis ZSET per env (mollifier:queue:<envId>) keyed bycreatedAtMicros, plus a per-run hash (mollifier:entries:<runId>) carrying the snapshot. Idempotency-lookup keys provide trigger-time dedup symmetric with PG's unique constraint. Entries persist until the drainer ACKs (with a 30s post-materialise grace) or FAILs them — no accept-time TTL, since silent eviction would lose runs without a customer-visible signal.The drainer (
apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts+@trigger.dev/redis-worker'sMollifierDrainer): polls the buffer, replays each entry throughengine.trigger. Handles the cancel-before-PG bifurcation viaengine.createCancelledRun(no engine.trigger needed when a buffered cancel landed first). On non-retryable engine errors, writes aSYSTEM_FAILUREPG row viaengine.createFailedTaskRunso customers see the failure in their dashboard — andcreateFailedTaskRunnow emitsrunFailedso the alert pipeline picks the row up.Read-side fallbacks: every public API route that retrieves a run gains a buffered-source branch. The realtime stream specifically holds the Electric SQL subscription open across the buffered window so
useRealtimeRundoesn't see a 404 and bail — when the drainer materialises the PG row, Electric streams the INSERT to the client.The run span lands in the event store at trigger time, before the gate divert decision. Buffered runs are visible in the trace view immediately rather than only after drain — important for
useRealtimeRunand for trigger-and-wait flows where a parent's trace expects the child's span to exist.Observability:
mollifier.decisions{outcome}counter for gate decisions.mollifier.stale_entries.current{envId}gauge for entries sitting longer thanTRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS(default 5min) — the alertable signal for an offline / falling-behind drainer.mollifier.realtime_subscriptions.buffered{envId}counter for visibility into customers hitting the buffered window with the realtime hook. All emit via the standard OTel meter pipeline.Customer-visible surface: the API response from a mollified trigger carries a
mollifier.queuednotice. Otherwise the buffered window is invisible — same response shapes, same dashboard, same SDK behaviour. The list APIs (and dashboard runs list) are eventually consistent and do not surface buffered rows; that visibility is parked for a future global status-bar UX piece.Test plan
apps/webapp/test/mollifier*.test.ts,internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts,packages/redis-worker/src/mollifier/buffer.test.ts).apiClientmethods so zodfetch schemas validate against the buffered responses end-to-end. No drift surfaced.TRIGGER_MOLLIFIER_ENABLED=1, trigger a burst that exceeds the configured threshold, confirm the dashboard list + run-detail flow matches the non-buffered experience.