Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
cbd9f84
feat(webapp,run-engine): mollifier drainer replay + stale sweep + can…
d-cs May 26, 2026
36396ef
fix(webapp,run-engine): replay-layer code-review follow-ups
d-cs May 26, 2026
20c3767
test(webapp): isolate mollifierDrainerWorker test from runEngine sing…
d-cs May 26, 2026
a839810
test(webapp): bump mollifierStaleSweep redisTest timeouts to 20s
d-cs May 26, 2026
9496ac8
fix(webapp,run-engine): replay-layer Devin follow-ups
d-cs May 26, 2026
129980e
docs(server-changes): scope mollifier drainer-replay note to this PR
d-cs May 27, 2026
773f30a
fix(webapp): honour cancel-wins conflict + requeue on transient PG ou…
d-cs May 27, 2026
8a25fe7
docs(mollifier): strip internal planning labels from comments
d-cs May 27, 2026
6fbaf13
docs(mollifier): fix stale ZSET comments and strip residual plan-doc …
d-cs May 28, 2026
2ffa4d2
perf(webapp): shard the mollifier stale sweep to bound per-tick work
d-cs May 28, 2026
62ae1c2
test(webapp): fill mollifier stale-sweep coverage gaps
d-cs May 28, 2026
5dc1692
fix(webapp): GC drained-env orphans in stale sweep + propagate batch …
d-cs May 28, 2026
c1a5ba5
fix(webapp): stale-sweep stop() awaits in-flight tick before closing …
d-cs May 28, 2026
e9e0a8e
fix(webapp,run-engine): three follow-ups on Devin review (PR #3754)
d-cs May 28, 2026
ccabb6f
fix(redis-worker,webapp): drainer writes SYSTEM_FAILURE on max-attemp…
d-cs May 29, 2026
c8244cb
fix(webapp): suppress runFailed emit for mollifier-buffered terminal …
d-cs May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mollifier-drainer-terminal-failure-callback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": minor
---

Add `onTerminalFailure` callback to `MollifierDrainerOptions` so the customer's run lands a SYSTEM_FAILURE PG row even when the drainer exhausts `maxAttempts` on a retryable PG error. Previously, retryable-error exhaustion called `buffer.fail()` directly, which atomically marks FAILED + DELs the entry hash with no PG write — silent data loss when PG was unreachable across the full retry budget. The callback fires before `buffer.fail()` on any terminal path (`cause: "non-retryable"` or `"max-attempts-exhausted"`); throwing a retryable error from the callback causes the drainer to requeue rather than fail.
6 changes: 6 additions & 0 deletions .server-changes/mollifier-drainer-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Mollifier drainer replay: replay buffered entries into `engine.trigger`, stale-entry sweep, a drainer-health gauge, and run-engine cancelled/failed run APIs.
2 changes: 2 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
Expand Down Expand Up @@ -228,6 +229,7 @@ Worker.init().catch((error) => {
});

initMollifierDrainerWorker();
initMollifierStaleSweepWorker();

bootstrap().catch((error) => {
logError(error);
Expand Down
37 changes: 30 additions & 7 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1063,13 +1063,16 @@ const EnvironmentSchema = z
// Separate switch for the drainer (consumer side) so it can be split
// off onto a dedicated worker service. Unset → inherits
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
// flip two switches. In multi-replica deployments, set this to "0"
// explicitly on every replica except the one dedicated drainer
// service — otherwise every replica's polling loop races for the
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a
// buffer when the system is off.
// flip two switches. Multi-replica drainers are correct — `popAndMarkDraining`
// is an atomic ZPOPMIN + status flip in one Lua call, so only one replica
// can win any given entry — but inefficient: polling load (SMEMBERS +
// per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY`
// is per-process so engine load also multiplies. Splitting the drainer
// onto a dedicated worker keeps that traffic off the request-serving
// replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch;
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a buffer
// when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
Expand Down Expand Up @@ -1098,6 +1101,26 @@ const EnvironmentSchema = z
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
// Periodic sweep that scans buffer queue LISTs for entries whose
// dwell exceeds the stale threshold. Independent of the drainer —
// its job is exactly to make a stuck/offline drainer visible to
// ops. Defaults: enabled when the mollifier is enabled, run every
// 5 minutes, alert on anything that's been dwelling for 5+ minutes
// (matches the sweep interval — "anything still here when we
// check" is the simplest threshold that converges).
TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z
.string()
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),
TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),

BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
Expand Down
53 changes: 52 additions & 1 deletion apps/webapp/app/runEngine/services/triggerFailedTask.server.ts
Comment thread
d-cs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { getEventRepository } from "~/v3/eventRepository/index.server";
import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server";
import { DefaultQueueManager } from "../concerns/queues.server";
import type { TriggerTaskRequest } from "../types";

Expand Down Expand Up @@ -176,6 +177,14 @@ export class TriggerFailedTaskService {
event.setAttribute("runId", failedRunFriendlyId);
event.failWithError(taskRunError);

// `emitRunFailedEvent: false` because this call site owns the
// trace-event lifecycle via the outer `traceEvent({
// incomplete: false, isError: true })`. Letting the engine
// emit `runFailed` here would race the
// `completeFailedRunEvent` listener against the outer trace
// event's own completion write for the same (traceId, spanId).
// We re-trigger the alerts side directly after the trace
// event closes, below.
return await this.engine.createFailedTaskRun({
friendlyId: failedRunFriendlyId,
environment: {
Expand All @@ -200,12 +209,30 @@ export class TriggerFailedTaskService {
spanId: event.spanId,
traceContext: traceContext as Record<string, unknown>,
taskEventStore: store,
emitRunFailedEvent: false,
...(queueName !== undefined && { queue: queueName }),
...(lockedQueueId !== undefined && { lockedQueueId }),
});
}
);

// Alerts side of `runFailed` — the engine emit was suppressed
// above so the trace-event completion isn't double-written; we
// still need the alert pipeline to fire so customers' ERROR
// channels see the failure. Best-effort: a failed enqueue logs
// but doesn't block returning the friendlyId, mirroring the
// engine handler's behaviour at runEngineHandlers.server.ts:81.
try {
await PerformTaskRunAlertsService.enqueue(failedRun.id);
} catch (alertsError) {
logger.warn("TriggerFailedTaskService: alert enqueue failed", {
taskId: request.taskId,
friendlyId: failedRun.friendlyId,
error:
alertsError instanceof Error ? alertsError.message : String(alertsError),
});
}

return failedRun.friendlyId;
} catch (createError) {
const createErrorMsg =
Expand Down Expand Up @@ -264,7 +291,7 @@ export class TriggerFailedTaskService {
}
}

await this.engine.createFailedTaskRun({
const failedRun = await this.engine.createFailedTaskRun({
friendlyId: failedRunFriendlyId,
environment: {
id: opts.environmentId,
Expand All @@ -286,8 +313,32 @@ export class TriggerFailedTaskService {
depth,
resumeParentOnCompletion: opts.resumeParentOnCompletion,
batch: opts.batch,
// Suppress the engine's `runFailed` bus emit — the listener
// (`runEngineHandlers.server.ts` `runFailed`) calls
// `completeFailedRunEvent`, which writes a ClickHouse trace event
// row keyed on (traceId, spanId). This caller has no trace
// context (the method name is literally `callWithoutTraceEvents`)
// so the emit would write a row with empty traceId/spanId —
// orphan event in the store. We still want alert coverage,
// though, so enqueue directly below.
emitRunFailedEvent: false,
});

// Alerts side of `runFailed` — the engine emit was suppressed
// above so we don't create an orphan trace event; enqueue the
// alert directly so customers' ERROR channels still see the
// failure. Best-effort, mirroring the `call()` path.
try {
await PerformTaskRunAlertsService.enqueue(failedRun.id);
} catch (alertsError) {
logger.warn("TriggerFailedTaskService.callWithoutTraceEvents: alert enqueue failed", {
taskId: opts.taskId,
friendlyId: failedRun.friendlyId,
error:
alertsError instanceof Error ? alertsError.message : String(alertsError),
});
}

return failedRunFriendlyId;
} catch (createError) {
logger.error("TriggerFailedTaskService: failed to create pre-failed TaskRun (no trace)", {
Expand Down
50 changes: 15 additions & 35 deletions apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { createHash } from "node:crypto";
import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker";
import { MollifierDrainer } from "@trigger.dev/redis-worker";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { engine as runEngine } from "~/v3/runEngine.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { getMollifierBuffer } from "./mollifierBuffer.server";
import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server";
import {
createDrainerHandler,
createDrainerTerminalFailureHandler,
isRetryablePgError,
} from "./mollifierDrainerHandler.server";
import type { MollifierSnapshot } from "./mollifierSnapshot.server";

// Distinct error class for the deterministic "fail loud at boot" throws
// below. The bootstrap in `mollifierDrainerWorker.server.ts` catches
Expand All @@ -25,7 +31,7 @@ export class MollifierConfigurationError extends Error {
}
}

function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> {
function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
const buffer = getMollifierBuffer();
if (!buffer) {
// Unreachable in normal config: getMollifierDrainer() gates on the
Expand Down Expand Up @@ -68,40 +74,14 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
});

// No-op ack handler: the trigger has ALREADY been written to Postgres
// via engine.trigger (dual-write at the call site). Popping + acking
// here proves the dequeue mechanism works end-to-end without duplicating
// the work. A later change replaces this with an engine.trigger replay
// that performs the actual Postgres write.
const drainer = new MollifierDrainer<BufferedTriggerPayload>({
const drainer = new MollifierDrainer<MollifierSnapshot>({
buffer,
handler: async (input) => {
// Hash the (re-serialised, canonical) payload on the drain side rather
// than on the trigger hot path. Burst-time CPU stays with engine.trigger;
// the drainer is the natural place for the audit-equivalence checksum.
// Re-serialisation is identity for the BufferedTriggerPayload shape
// (only strings/numbers/plain objects), so this hash matches what the
// call site wrote into Redis.
const reserialised = serialiseSnapshot(input.payload);
const payloadHash = createHash("sha256").update(reserialised).digest("hex");
logger.info("mollifier.drained", {
runId: input.runId,
envId: input.envId,
orgId: input.orgId,
taskId: input.payload.taskId,
attempts: input.attempts,
ageMs: Date.now() - input.createdAt.getTime(),
payloadBytes: reserialised.length,
payloadHash,
});
},
handler: createDrainerHandler({ engine: runEngine, prisma }),
onTerminalFailure: createDrainerTerminalFailureHandler({ engine: runEngine, prisma }),
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
// A no-op handler shouldn't throw, but if something does (e.g. an
// unexpected deserialise failure), don't loop — let it FAIL terminally
// so the entry is observable in metrics.
isRetryable: () => false,
isRetryable: isRetryablePgError,
});

return drainer;
Expand All @@ -114,7 +94,7 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
// handler registration, leaving a narrow window where a SIGTERM landing
// between `start()` and `process.once("SIGTERM", ...)` would skip the
// graceful stop. The split is intentional.
export function getMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> | null {
export function getMollifierDrainer(): MollifierDrainer<MollifierSnapshot> | null {
if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null;
return singleton("mollifierDrainer", initializeMollifierDrainer);
}
Loading