Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/mollifier-reads.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Mollifier API read-fallback: serve buffered runs from synthetic run/trace/span data on the retrieve, trace, spans, and attempts endpoints.
211 changes: 208 additions & 3 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Comment thread
d-cs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import {
logger,
} from "@trigger.dev/core/v3";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import {
findRunByIdWithMollifierFallback,
type SyntheticRun,
} from "~/v3/mollifier/readFallback.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";
Expand Down Expand Up @@ -64,13 +69,46 @@ type CommonRelatedRun = Prisma.Result<
"findFirstOrThrow"
>;

type FoundRun = NonNullable<Awaited<ReturnType<typeof ApiRetrieveRunPresenter.findRun>>>;
// Full shape returned by findRun() — the commonRunSelect fields plus the
// extras the route handler reads. Declared explicitly (not inferred via
// ReturnType<typeof findRun>) so findRun can return a synthesised buffered
// run without the type becoming self-referential.
// Exported so the buffer-synthesis helper below can be unit-tested
// against a stable shape without re-deriving it (FoundRun's exact field
// list is what the buffered run must match for `call()` not to surprise).
export type FoundRun = CommonRelatedRun & {
traceId: string;
payload: string;
payloadType: string;
output: string | null;
outputType: string;
error: Prisma.JsonValue;
attempts: { id: string }[];
attemptNumber: number | null;
engine: "V1" | "V2";
taskEventStore: string;
parentTaskRun: CommonRelatedRun | null;
rootTaskRun: CommonRelatedRun | null;
childRuns: CommonRelatedRun[];
// True when this run was synthesised from the mollifier buffer rather
// than read from Postgres. Callers that would otherwise query backing
// stores keyed on PG identifiers (e.g. ClickHouse event lookups by
// traceId) can short-circuit to an empty response — buffered runs
// haven't executed and have no events to fetch. Devin's analysis on
// PR #3755 (events endpoint) flagged the pre-fix code as making a
// wasted ClickHouse round-trip when this is set; gate on this flag
// instead.
isBuffered: boolean;
};

export class ApiRetrieveRunPresenter {
constructor(private readonly apiVersion: API_VERSIONS) {}

public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) {
return $replica.taskRun.findFirst({
public static async findRun(
friendlyId: string,
env: AuthenticatedEnvironment,
): Promise<FoundRun | null> {
const pgRow = await $replica.taskRun.findFirst({
where: {
friendlyId,
runtimeEnvironmentId: env.id,
Expand Down Expand Up @@ -102,6 +140,23 @@ export class ApiRetrieveRunPresenter {
},
},
});

if (pgRow) return { ...pgRow, isBuffered: false };

// Postgres miss → fall back to the mollifier buffer. When the gate
// diverted a trigger, the run lives in Redis until the drainer replays
// it through engine.trigger. Synthesise the FoundRun shape so call()
// returns a `QUEUED` (or `FAILED`) response with empty output, no
// attempts, no relations.
const buffered = await findRunByIdWithMollifierFallback({
runId: friendlyId,
environmentId: env.id,
organizationId: env.organizationId,
});

if (!buffered) return null;

return synthesiseFoundRunFromBuffer(buffered);
Comment thread
d-cs marked this conversation as resolved.
}

public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) {
Expand Down Expand Up @@ -475,3 +530,153 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction {
return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger";
}
}

// Build a FoundRun-shaped object from a buffered (mollified) run. The run
// is in the Redis buffer; engine.trigger hasn't created the Postgres row
// yet, so every field that comes from execution state (output, attempts,
// completedAt, cost, relations) takes a default. The presenter's call()
// handles QUEUED-state runs without surprise.
function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus {
switch (status) {
case "FAILED":
return "SYSTEM_FAILURE";
case "CANCELED":
return "CANCELED";
default:
return "PENDING";
}
}

// The PG path stores `TaskRun.payload` as `String?`, so in production
// the buffered snapshot's `payload` is always a string. We defensively
// coerce other types instead of silently dropping them: an object gets
// JSON-stringified (matches how the trigger path would serialise it),
// anything truly unrenderable falls back to an empty string. The log
// line surfaces format drift to ops without crashing the read path.
function synthesisePayload(buffered: SyntheticRun): string {
const payload = buffered.payload;
if (typeof payload === "string") return payload;
if (payload === undefined || payload === null) return "";
try {
const serialised = JSON.stringify(payload);
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", {
runFriendlyId: buffered.friendlyId,
payloadType: typeof payload,
});
return typeof serialised === "string" ? serialised : "";
} catch {
logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", {
runFriendlyId: buffered.friendlyId,
payloadType: typeof payload,
});
return "";
}
}

// Mirror synthesisePayload for metadata. The PG path stores
// `TaskRun.metadata` as `String?`, and the snapshot writes it from
// `metadataPacket.data` (also a string), so in production it is always a
// string or absent. We coerce defensively — an object gets JSON-stringified
// (matching how the trigger path serialises it) rather than silently
// dropped to null, and the log line surfaces format drift to ops.
function synthesiseMetadata(buffered: SyntheticRun): string | null {
const metadata = buffered.metadata;
if (typeof metadata === "string") return metadata;
if (metadata === undefined || metadata === null) return null;
try {
const serialised = JSON.stringify(metadata);
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.metadata non-string coerced", {
runFriendlyId: buffered.friendlyId,
metadataType: typeof metadata,
});
return typeof serialised === "string" ? serialised : null;
} catch {
logger.error("ApiRetrieveRunPresenter: buffered snapshot.metadata unserialisable", {
runFriendlyId: buffered.friendlyId,
metadataType: typeof metadata,
});
return null;
}
}

// Exported for unit testing. Used by `findRun()` above when the
// Postgres lookup misses and the buffer carries the run — keep the shape
// in lockstep with `FoundRun`'s field list so `call()` treats a synthesised
// buffered run identically to a freshly-triggered PG row.
export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);

const errorJson: Prisma.JsonValue = buffered.error
? {
type: "STRING_ERROR",
raw: `${buffered.error.code}: ${buffered.error.message}`,
}
: null;

const metadata: string | null = synthesiseMetadata(buffered);

return {
// `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId`
// is the user-facing `run_xxx` token. Downstream logging keyed off
// `taskRun.id` correlates with other systems via the cuid — using
// the friendlyId here breaks log correlation. `SyntheticRun` carries
// the cuid alongside the friendlyId for exactly this reason
// (RunId.fromFriendlyId in readFallback.server.ts).
id: buffered.id,
friendlyId: buffered.friendlyId,
status,
taskIdentifier: buffered.taskIdentifier ?? "",
createdAt: buffered.createdAt,
startedAt: null,
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
completedAt: buffered.cancelledAt ?? null,
Comment on lines +631 to +632
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 FAILED buffered runs report isCompleted: true but finishedAt: undefined — inconsistent API response

When a buffered run has status: "FAILED", synthesiseFoundRunFromBuffer maps it to SYSTEM_FAILURE (line 541-542) but sets completedAt: buffered.cancelledAt ?? null (line 632). Since cancelledAt is undefined for FAILED (non-canceled) runs, completedAt is null. Downstream in createCommonRunStructure (ApiRetrieveRunPresenter.server.ts:508), this becomes finishedAt: undefined. However, apiBooleanHelpersFromRunStatus returns isCompleted: true for SYSTEM_FAILURE. This creates an API response where isCompleted is true but finishedAt is absent — an inconsistency that doesn't occur for PG-resident SYSTEM_FAILURE runs (where the engine always populates completedAt). SDK consumers or pollers that stop polling when isCompleted is true and then read finishedAt will see undefined.

The CANCELED path correctly sets completedAt: buffered.cancelledAt (a real Date), so only the FAILED path is affected. A reasonable fix is to fall back to buffered.createdAt for FAILED runs so there's always a finishedAt when the run is terminal.

Suggested change
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
completedAt: buffered.cancelledAt ?? null,
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
completedAt: buffered.cancelledAt ?? (status === "SYSTEM_FAILURE" ? buffered.createdAt : null),
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

expiredAt: null,
delayUntil: buffered.delayUntil ?? null,
metadata,
metadataType: buffered.metadataType ?? "application/json",
ttl: buffered.ttl ?? null,
costInCents: 0,
baseCostInCents: 0,
usageDurationMs: 0,
idempotencyKey: buffered.idempotencyKey ?? null,
idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null,
isTest: buffered.isTest,
depth: buffered.depth,
// Scheduled triggers go through the same TriggerTaskService path as
// API triggers and aren't bypassed by the mollifier gate, so a
// scheduled run can land in the buffer with its scheduleId set on the
// snapshot. Forward it so resolveSchedule() can hydrate the `schedule`
// field in the API response instead of silently dropping it until the
// drainer materialises.
scheduleId: buffered.scheduleId ?? null,
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
// Reconstruct the batch from the snapshot's internal id so a buffered
// run reports the same `batchId` / triggerFunction as it will once
// materialised, and so batch-scoped JWTs authorise against it (the
// route authorization callbacks read `run.batch?.friendlyId`).
batch: buffered.batchId
? { id: buffered.batchId, friendlyId: BatchId.toFriendlyId(buffered.batchId) }
: null,
runTags: buffered.tags,
traceId: buffered.traceId ?? "",
payload: synthesisePayload(buffered),
payloadType: buffered.payloadType ?? "application/json",
output: null,
outputType: "application/json",
error: errorJson,
attempts: [],
attemptNumber: null,
engine: "V2",
taskEventStore: "taskEvent",
// Empty string when absent (matches syntheticSpanRun.server.ts and lets
// `createCommonRunStructure`'s `run.workerQueue || undefined` coerce the
// API response's `region` to undefined instead of advertising a
// misleading "main" region for a not-yet-assigned buffered run).
workerQueue: buffered.workerQueue ?? "",
parentTaskRun: null,
rootTaskRun: null,
childRuns: [],
isBuffered: true,
};
}
10 changes: 10 additions & 0 deletions apps/webapp/app/routes/api.v1.runs.$runId.events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ export const loader = createLoaderApiRoute(
},
},
async ({ resource: run, authentication }) => {
// Short-circuit for mollifier-buffered runs. The drainer hasn't
// materialised execution events yet (the gate intercepts before
// any trace event is written), so a ClickHouse round-trip is
// guaranteed to come back empty. `findRun` now sets `isBuffered`
// explicitly on its return value — gate on that rather than
// probing surrogate fields like `traceId === ""`.
if (run.isBuffered) {
return json({ events: [] }, { status: 200 });
}

const eventRepository = await getEventRepositoryForStore(
run.taskEventStore,
authentication.environment.organization.id
Expand Down
69 changes: 59 additions & 10 deletions apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,69 @@ import {
} from "~/services/routeBuilders/apiBuilder.server";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
import { buildSyntheticSpanDetailBody } from "~/v3/mollifier/syntheticApiResponses.server";

const ParamsSchema = z.object({
runId: z.string(),
spanId: z.string(),
});

// Resolve the run from either Postgres or the mollifier buffer.
// Buffered runs only have one valid spanId (the queued span recorded at
// gate time and reused as the run's root spanId when the drainer
// materialises). Any other spanId returns a deterministic 404; the queued
// span returns a minimal synthesised shape so the customer's SDK sees the
// same 200 contract they'd get for a freshly-triggered run.
type ResolvedRun =
| { source: "pg"; run: Awaited<ReturnType<typeof findPgRun>> & {} }
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };

async function findPgRun(runId: string, environmentId: string) {
return $replica.taskRun.findFirst({
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
});
}

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
findResource: (params, auth) => {
return $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
runtimeEnvironmentId: auth.environment.id,
},
findResource: async (params, auth): Promise<ResolvedRun | null> => {
const pgRun = await findPgRun(params.runId, auth.environment.id);
if (pgRun) return { source: "pg", run: pgRun };

const buffered = await findRunByIdWithMollifierFallback({
runId: params.runId,
environmentId: auth.environment.id,
organizationId: auth.environment.organizationId,
});
if (buffered) return { source: "buffer", run: buffered };

return null;
},
shouldRetryNotFound: true,
authorization: {
action: "read",
resource: (run) => {
resource: (resolved) => {
if (resolved.source === "pg") {
const run = resolved.run;
const resources = [
{ type: "runs", id: run.friendlyId },
{ type: "tasks", id: run.taskIdentifier },
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
];
if (run.batchId) {
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
}
return anyResource(resources);
}
const run = resolved.run;
const resources = [
{ type: "runs", id: run.friendlyId },
{ type: "tasks", id: run.taskIdentifier },
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
...run.tags.map((tag) => ({ type: "tags", id: tag })),
];
if (run.batchId) {
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
Expand All @@ -44,7 +80,20 @@ export const loader = createLoaderApiRoute(
},
},
},
async ({ params, resource: run, authentication }) => {
async ({ params, resource: resolved, authentication }) => {
if (resolved.source === "buffer") {
// Buffered runs have exactly one valid spanId — the queued span the
// mollifier gate recorded at trigger time, which becomes the run's
// root spanId once the drainer materialises. Any other spanId is a
// deterministic 404. The matching spanId returns a minimal shape
// representing "span exists, no execution data yet."
if (resolved.run.spanId !== params.spanId) {
return json({ error: "Span not found" }, { status: 404 });
}
return json(buildSyntheticSpanDetailBody(resolved.run), { status: 200 });
}

const run = resolved.run;
const eventRepository = await getEventRepositoryForStore(
run.taskEventStore,
authentication.environment.organization.id
Expand Down
Loading