-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat(webapp): mollifier API GET read-fallback — synthetic primitives + route wiring #3755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: mollifier-phase-3-replay
Are you sure you want to change the base?
Changes from all commits
6e3d886
aa77c28
ed17a8d
9897ba5
2dd15bb
a3ed3a0
a31732b
af90408
94e5b65
6ae0a34
d38d8dc
0afa415
bd4d4a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Mollifier API read-fallback: serve buffered runs from synthetic run/trace/span data on the retrieve, trace, spans, and attempts endpoints. |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,12 +9,17 @@ import { | |||||||||
| logger, | ||||||||||
| } from "@trigger.dev/core/v3"; | ||||||||||
| import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization"; | ||||||||||
| import { BatchId } from "@trigger.dev/core/v3/isomorphic"; | ||||||||||
| import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly"; | ||||||||||
| import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database"; | ||||||||||
| import assertNever from "assert-never"; | ||||||||||
| import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; | ||||||||||
| import { $replica, prisma } from "~/db.server"; | ||||||||||
| import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; | ||||||||||
| import { | ||||||||||
| findRunByIdWithMollifierFallback, | ||||||||||
| type SyntheticRun, | ||||||||||
| } from "~/v3/mollifier/readFallback.server"; | ||||||||||
| import { generatePresignedUrl } from "~/v3/objectStore.server"; | ||||||||||
| import { tracer } from "~/v3/tracer.server"; | ||||||||||
| import { startSpanWithEnv } from "~/v3/tracing.server"; | ||||||||||
|
|
@@ -64,13 +69,46 @@ type CommonRelatedRun = Prisma.Result< | |||||||||
| "findFirstOrThrow" | ||||||||||
| >; | ||||||||||
|
|
||||||||||
| type FoundRun = NonNullable<Awaited<ReturnType<typeof ApiRetrieveRunPresenter.findRun>>>; | ||||||||||
| // Full shape returned by findRun() — the commonRunSelect fields plus the | ||||||||||
| // extras the route handler reads. Declared explicitly (not inferred via | ||||||||||
| // ReturnType<typeof findRun>) so findRun can return a synthesised buffered | ||||||||||
| // run without the type becoming self-referential. | ||||||||||
| // Exported so the buffer-synthesis helper below can be unit-tested | ||||||||||
| // against a stable shape without re-deriving it (FoundRun's exact field | ||||||||||
| // list is what the buffered run must match for `call()` not to surprise). | ||||||||||
| export type FoundRun = CommonRelatedRun & { | ||||||||||
| traceId: string; | ||||||||||
| payload: string; | ||||||||||
| payloadType: string; | ||||||||||
| output: string | null; | ||||||||||
| outputType: string; | ||||||||||
| error: Prisma.JsonValue; | ||||||||||
| attempts: { id: string }[]; | ||||||||||
| attemptNumber: number | null; | ||||||||||
| engine: "V1" | "V2"; | ||||||||||
| taskEventStore: string; | ||||||||||
| parentTaskRun: CommonRelatedRun | null; | ||||||||||
| rootTaskRun: CommonRelatedRun | null; | ||||||||||
| childRuns: CommonRelatedRun[]; | ||||||||||
| // True when this run was synthesised from the mollifier buffer rather | ||||||||||
| // than read from Postgres. Callers that would otherwise query backing | ||||||||||
| // stores keyed on PG identifiers (e.g. ClickHouse event lookups by | ||||||||||
| // traceId) can short-circuit to an empty response — buffered runs | ||||||||||
| // haven't executed and have no events to fetch. Devin's analysis on | ||||||||||
| // PR #3755 (events endpoint) flagged the pre-fix code as making a | ||||||||||
| // wasted ClickHouse round-trip when this is set; gate on this flag | ||||||||||
| // instead. | ||||||||||
| isBuffered: boolean; | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| export class ApiRetrieveRunPresenter { | ||||||||||
| constructor(private readonly apiVersion: API_VERSIONS) {} | ||||||||||
|
|
||||||||||
| public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) { | ||||||||||
| return $replica.taskRun.findFirst({ | ||||||||||
| public static async findRun( | ||||||||||
| friendlyId: string, | ||||||||||
| env: AuthenticatedEnvironment, | ||||||||||
| ): Promise<FoundRun | null> { | ||||||||||
| const pgRow = await $replica.taskRun.findFirst({ | ||||||||||
| where: { | ||||||||||
| friendlyId, | ||||||||||
| runtimeEnvironmentId: env.id, | ||||||||||
|
|
@@ -102,6 +140,23 @@ export class ApiRetrieveRunPresenter { | |||||||||
| }, | ||||||||||
| }, | ||||||||||
| }); | ||||||||||
|
|
||||||||||
| if (pgRow) return { ...pgRow, isBuffered: false }; | ||||||||||
|
|
||||||||||
| // Postgres miss → fall back to the mollifier buffer. When the gate | ||||||||||
| // diverted a trigger, the run lives in Redis until the drainer replays | ||||||||||
| // it through engine.trigger. Synthesise the FoundRun shape so call() | ||||||||||
| // returns a `QUEUED` (or `FAILED`) response with empty output, no | ||||||||||
| // attempts, no relations. | ||||||||||
| const buffered = await findRunByIdWithMollifierFallback({ | ||||||||||
| runId: friendlyId, | ||||||||||
| environmentId: env.id, | ||||||||||
| organizationId: env.organizationId, | ||||||||||
| }); | ||||||||||
|
|
||||||||||
| if (!buffered) return null; | ||||||||||
|
|
||||||||||
| return synthesiseFoundRunFromBuffer(buffered); | ||||||||||
|
d-cs marked this conversation as resolved.
|
||||||||||
| } | ||||||||||
|
|
||||||||||
| public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) { | ||||||||||
|
|
@@ -475,3 +530,153 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction { | |||||||||
| return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger"; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Build a FoundRun-shaped object from a buffered (mollified) run. The run | ||||||||||
| // is in the Redis buffer; engine.trigger hasn't created the Postgres row | ||||||||||
| // yet, so every field that comes from execution state (output, attempts, | ||||||||||
| // completedAt, cost, relations) takes a default. The presenter's call() | ||||||||||
| // handles QUEUED-state runs without surprise. | ||||||||||
| function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus { | ||||||||||
| switch (status) { | ||||||||||
| case "FAILED": | ||||||||||
| return "SYSTEM_FAILURE"; | ||||||||||
| case "CANCELED": | ||||||||||
| return "CANCELED"; | ||||||||||
| default: | ||||||||||
| return "PENDING"; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // The PG path stores `TaskRun.payload` as `String?`, so in production | ||||||||||
| // the buffered snapshot's `payload` is always a string. We defensively | ||||||||||
| // coerce other types instead of silently dropping them: an object gets | ||||||||||
| // JSON-stringified (matches how the trigger path would serialise it), | ||||||||||
| // anything truly unrenderable falls back to an empty string. The log | ||||||||||
| // line surfaces format drift to ops without crashing the read path. | ||||||||||
| function synthesisePayload(buffered: SyntheticRun): string { | ||||||||||
| const payload = buffered.payload; | ||||||||||
| if (typeof payload === "string") return payload; | ||||||||||
| if (payload === undefined || payload === null) return ""; | ||||||||||
| try { | ||||||||||
| const serialised = JSON.stringify(payload); | ||||||||||
| logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", { | ||||||||||
| runFriendlyId: buffered.friendlyId, | ||||||||||
| payloadType: typeof payload, | ||||||||||
| }); | ||||||||||
| return typeof serialised === "string" ? serialised : ""; | ||||||||||
| } catch { | ||||||||||
| logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", { | ||||||||||
| runFriendlyId: buffered.friendlyId, | ||||||||||
| payloadType: typeof payload, | ||||||||||
| }); | ||||||||||
| return ""; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Mirror synthesisePayload for metadata. The PG path stores | ||||||||||
| // `TaskRun.metadata` as `String?`, and the snapshot writes it from | ||||||||||
| // `metadataPacket.data` (also a string), so in production it is always a | ||||||||||
| // string or absent. We coerce defensively — an object gets JSON-stringified | ||||||||||
| // (matching how the trigger path serialises it) rather than silently | ||||||||||
| // dropped to null, and the log line surfaces format drift to ops. | ||||||||||
| function synthesiseMetadata(buffered: SyntheticRun): string | null { | ||||||||||
| const metadata = buffered.metadata; | ||||||||||
| if (typeof metadata === "string") return metadata; | ||||||||||
| if (metadata === undefined || metadata === null) return null; | ||||||||||
| try { | ||||||||||
| const serialised = JSON.stringify(metadata); | ||||||||||
| logger.warn("ApiRetrieveRunPresenter: buffered snapshot.metadata non-string coerced", { | ||||||||||
| runFriendlyId: buffered.friendlyId, | ||||||||||
| metadataType: typeof metadata, | ||||||||||
| }); | ||||||||||
| return typeof serialised === "string" ? serialised : null; | ||||||||||
| } catch { | ||||||||||
| logger.error("ApiRetrieveRunPresenter: buffered snapshot.metadata unserialisable", { | ||||||||||
| runFriendlyId: buffered.friendlyId, | ||||||||||
| metadataType: typeof metadata, | ||||||||||
| }); | ||||||||||
| return null; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Exported for unit testing. Used by `findRun()` above when the | ||||||||||
| // Postgres lookup misses and the buffer carries the run — keep the shape | ||||||||||
| // in lockstep with `FoundRun`'s field list so `call()` treats a synthesised | ||||||||||
| // buffered run identically to a freshly-triggered PG row. | ||||||||||
| export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { | ||||||||||
| const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status); | ||||||||||
|
|
||||||||||
| const errorJson: Prisma.JsonValue = buffered.error | ||||||||||
| ? { | ||||||||||
| type: "STRING_ERROR", | ||||||||||
| raw: `${buffered.error.code}: ${buffered.error.message}`, | ||||||||||
| } | ||||||||||
| : null; | ||||||||||
|
|
||||||||||
| const metadata: string | null = synthesiseMetadata(buffered); | ||||||||||
|
|
||||||||||
| return { | ||||||||||
| // `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId` | ||||||||||
| // is the user-facing `run_xxx` token. Downstream logging keyed off | ||||||||||
| // `taskRun.id` correlates with other systems via the cuid — using | ||||||||||
| // the friendlyId here breaks log correlation. `SyntheticRun` carries | ||||||||||
| // the cuid alongside the friendlyId for exactly this reason | ||||||||||
| // (RunId.fromFriendlyId in readFallback.server.ts). | ||||||||||
| id: buffered.id, | ||||||||||
| friendlyId: buffered.friendlyId, | ||||||||||
| status, | ||||||||||
| taskIdentifier: buffered.taskIdentifier ?? "", | ||||||||||
| createdAt: buffered.createdAt, | ||||||||||
| startedAt: null, | ||||||||||
| updatedAt: buffered.cancelledAt ?? buffered.createdAt, | ||||||||||
| completedAt: buffered.cancelledAt ?? null, | ||||||||||
|
Comment on lines
+631
to
+632
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 FAILED buffered runs report When a buffered run has The CANCELED path correctly sets
Suggested change
Was this helpful? React with 👍 or 👎 to provide feedback. |
||||||||||
| expiredAt: null, | ||||||||||
| delayUntil: buffered.delayUntil ?? null, | ||||||||||
| metadata, | ||||||||||
| metadataType: buffered.metadataType ?? "application/json", | ||||||||||
| ttl: buffered.ttl ?? null, | ||||||||||
| costInCents: 0, | ||||||||||
| baseCostInCents: 0, | ||||||||||
| usageDurationMs: 0, | ||||||||||
| idempotencyKey: buffered.idempotencyKey ?? null, | ||||||||||
| idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null, | ||||||||||
| isTest: buffered.isTest, | ||||||||||
| depth: buffered.depth, | ||||||||||
| // Scheduled triggers go through the same TriggerTaskService path as | ||||||||||
| // API triggers and aren't bypassed by the mollifier gate, so a | ||||||||||
| // scheduled run can land in the buffer with its scheduleId set on the | ||||||||||
| // snapshot. Forward it so resolveSchedule() can hydrate the `schedule` | ||||||||||
| // field in the API response instead of silently dropping it until the | ||||||||||
| // drainer materialises. | ||||||||||
| scheduleId: buffered.scheduleId ?? null, | ||||||||||
| lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null, | ||||||||||
| resumeParentOnCompletion: buffered.resumeParentOnCompletion, | ||||||||||
| // Reconstruct the batch from the snapshot's internal id so a buffered | ||||||||||
| // run reports the same `batchId` / triggerFunction as it will once | ||||||||||
| // materialised, and so batch-scoped JWTs authorise against it (the | ||||||||||
| // route authorization callbacks read `run.batch?.friendlyId`). | ||||||||||
| batch: buffered.batchId | ||||||||||
| ? { id: buffered.batchId, friendlyId: BatchId.toFriendlyId(buffered.batchId) } | ||||||||||
| : null, | ||||||||||
| runTags: buffered.tags, | ||||||||||
| traceId: buffered.traceId ?? "", | ||||||||||
| payload: synthesisePayload(buffered), | ||||||||||
| payloadType: buffered.payloadType ?? "application/json", | ||||||||||
| output: null, | ||||||||||
| outputType: "application/json", | ||||||||||
| error: errorJson, | ||||||||||
| attempts: [], | ||||||||||
| attemptNumber: null, | ||||||||||
| engine: "V2", | ||||||||||
| taskEventStore: "taskEvent", | ||||||||||
| // Empty string when absent (matches syntheticSpanRun.server.ts and lets | ||||||||||
| // `createCommonRunStructure`'s `run.workerQueue || undefined` coerce the | ||||||||||
| // API response's `region` to undefined instead of advertising a | ||||||||||
| // misleading "main" region for a not-yet-assigned buffered run). | ||||||||||
| workerQueue: buffered.workerQueue ?? "", | ||||||||||
| parentTaskRun: null, | ||||||||||
| rootTaskRun: null, | ||||||||||
| childRuns: [], | ||||||||||
| isBuffered: true, | ||||||||||
| }; | ||||||||||
| } | ||||||||||
Uh oh!
There was an error while loading. Please reload this page.