Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions apps/webapp/app/components/runs/v3/CancelRunDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,18 @@ import { SpinnerWhite } from "~/components/primitives/Spinner";
type CancelRunDialogProps = {
runFriendlyId: string;
redirectPath: string;
// Fired on submit so the parent can close the Radix Dialog without
// wrapping the submit button in `DialogClose` — that wrapper races
// submit (close fires first, unmounts the form, and the cancel POST
// never lands). Optional so existing call sites still type-check.
onCancelSubmitted?: () => void;
};

export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialogProps) {
export function CancelRunDialog({
runFriendlyId,
redirectPath,
onCancelSubmitted,
}: CancelRunDialogProps) {
const navigation = useNavigation();

const formAction = `/resources/taskruns/${runFriendlyId}/cancel`;
Expand All @@ -27,7 +36,11 @@ export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialog
</Paragraph>
<FormButtons
confirmButton={
<Form action={`/resources/taskruns/${runFriendlyId}/cancel`} method="post">
<Form
action={`/resources/taskruns/${runFriendlyId}/cancel`}
method="post"
onSubmit={() => onCancelSubmitted?.()}
>
<Button
type="submit"
name="redirectUrl"
Expand Down
26 changes: 25 additions & 1 deletion apps/webapp/app/presenters/v3/RunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ export class RunEnvironmentMismatchError extends Error {
}
}

// Thrown by `call()` when the run isn't in PG. The route loader catches
// this and falls back to the mollifier buffer via `tryMollifiedRunFallback`.
// Using a typed error (rather than Prisma's `findFirstOrThrow` exception)
// keeps the buffered case off the PrismaClient error path — that path
// emits a `PrismaClient error` log every time it fires, which on the
// run-detail page polls becomes per-tick log spam and Sentry noise for
// any run that legitimately lives in the buffer.
export class RunNotInPgError extends Error {
constructor(public readonly runFriendlyId: string) {
super(`Run ${runFriendlyId} not in PG`);
this.name = "RunNotInPgError";
}
}

export class RunPresenter {
#prismaClient: PrismaClient;

Expand All @@ -42,7 +56,13 @@ export class RunPresenter {
showDeletedLogs: boolean;
showDebug: boolean;
}) {
const run = await this.#prismaClient.taskRun.findFirstOrThrow({
// `findFirst` + explicit null check (not `findFirstOrThrow`) because
// a missing PG row is the *expected* path for buffered runs — the
// route catches `RunNotInPgError` and falls back to the synthesised
// buffer view. `findFirstOrThrow` would log a `PrismaClient error`
// every tick of the page poll, masking real DB issues with synthetic
// not-found noise.
const run = await this.#prismaClient.taskRun.findFirst({
select: {
id: true,
createdAt: true,
Expand Down Expand Up @@ -106,6 +126,10 @@ export class RunPresenter {
},
});

if (!run) {
throw new RunNotInPgError(runFriendlyId);
}

if (environmentSlug !== run.runtimeEnvironment.slug) {
throw new RunEnvironmentMismatchError(
`Run ${runFriendlyId} is not in environment ${environmentSlug}`
Expand Down
45 changes: 39 additions & 6 deletions apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse";
import { throttle } from "~/utils/throttle";
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
import { deserialiseMollifierSnapshot } from "~/v3/mollifier/mollifierSnapshot.server";
import { tracePubSub } from "~/v3/services/tracePubSub.server";

const PING_INTERVAL = 5_000;
Expand Down Expand Up @@ -37,17 +39,48 @@ export class RunStreamPresenter {
},
});

if (!run) {
// Fall back to the mollifier buffer when the run isn't in PG yet.
// The buffered run has no execution events to stream, but we still
// attach a trace-pubsub subscription using the snapshot's traceId
// so that the moment the drainer materialises the row and execution
// begins, those events flow to this open SSE connection. Closing
// with 404 would force the dashboard to keep retrying.
let traceId: string | null = run?.traceId ?? null;
if (!traceId) {
const buffer = getMollifierBuffer();
if (buffer) {
try {
const entry = await buffer.getEntry(runFriendlyId);
if (entry) {
// Go through the webapp wrapper so this read-side module
// shares a single deserialisation path with readFallback —
// see the contract comment in syntheticRedirectInfo.server.ts.
const snapshot = deserialiseMollifierSnapshot(entry.payload);
if (typeof snapshot.traceId === "string") {
traceId = snapshot.traceId;
}
}
} catch (err) {
logger.warn("RunStreamPresenter buffer fallback failed", {
runFriendlyId,
err: err instanceof Error ? err.message : String(err),
});
}
}
}

if (!traceId) {
throw new Response("Not found", { status: 404 });
}
const resolvedRun = { traceId };

logger.info("RunStreamPresenter.start", {
runFriendlyId,
traceId: run.traceId,
traceId: resolvedRun.traceId,
});

// Subscribe to trace updates
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(resolvedRun.traceId);

// Only send max every 1 second
const throttledSend = throttle(
Expand Down Expand Up @@ -105,7 +138,7 @@ export class RunStreamPresenter {
cleanup: () => {
logger.info("RunStreamPresenter.cleanup", {
runFriendlyId,
traceId: run.traceId,
traceId: resolvedRun.traceId,
});

// Remove message listener
Expand All @@ -119,13 +152,13 @@ export class RunStreamPresenter {
.then(() => {
logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", {
runFriendlyId,
traceId: run.traceId,
traceId: resolvedRun.traceId,
});
})
.catch((error) => {
logger.error("RunStreamPresenter.cleanup.unsubscribe failed", {
runFriendlyId,
traceId: run.traceId,
traceId: resolvedRun.traceId,
error: {
name: error.name,
message: error.message,
Expand Down
50 changes: 48 additions & 2 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import {
extractAIEmbedData,
} from "~/components/runs/v3/ai";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
import { buildSyntheticSpanRun } from "~/v3/mollifier/syntheticSpanRun.server";

export type PromptSpanData = {
slug: string;
Expand Down Expand Up @@ -72,9 +74,21 @@ function extractPromptSpanData(properties: Record<string, unknown>): PromptSpanD
};
}

// SpanRun is grounded in the PG-path `getRun` method rather than
// inferred from `call`'s return type. The buffered branch of `call`
// routes through `buildSyntheticSpanRun`, and that helper is annotated
// `Promise<SpanRun>` — if SpanRun were derived from `call` it would
// close a loop TS no longer tolerates ("Type alias 'Result' circularly
// references itself"). `getRun` is the canonical source for the shape
// (the synthetic helper just rebuilds the same shape from a buffer
// snapshot), and it doesn't recurse, so grounding here breaks the
// cycle while keeping Span available off `call` (Span's path through
// `#getSpan` has no synthetic indirection).
export type SpanRun = NonNullable<
Awaited<ReturnType<InstanceType<typeof SpanPresenter>["getRun"]>>
>;
type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
export type Span = NonNullable<NonNullable<Result>["span"]>;
export type SpanRun = NonNullable<NonNullable<Result>["run"]>;
type FindRunResult = NonNullable<
Awaited<ReturnType<InstanceType<typeof SpanPresenter>["findRun"]>>
>;
Expand All @@ -84,12 +98,18 @@ export class SpanPresenter extends BasePresenter {
public async call({
userId,
projectSlug,
envSlug,
spanId,
runFriendlyId,
linkedRunId,
}: {
userId: string;
projectSlug: string;
// Optional for backwards compatibility, required for the mollifier
// buffer fallback when the parent run isn't yet in PG — we need to
// resolve the env id to satisfy `findRunByIdWithMollifierFallback`'s
// auth check.
envSlug?: string;
spanId: string;
runFriendlyId: string;
linkedRunId?: string;
Expand Down Expand Up @@ -127,7 +147,32 @@ export class SpanPresenter extends BasePresenter {
});

if (!parentRun) {
return;
// PG miss → fall back to the mollifier buffer. Without this the
// right-side span detail panel on the run-detail page never
// resolves for buffered runs: `call()` returns undefined, the
// resource route redirects with an "Event not found" toast, the
// run-detail page reloads, the toast fires again — a perpetual
// spin until the drainer materialises the row. Synthesise a
// SpanRun straight from the buffer snapshot, reusing
// `buildSyntheticSpanRun` (the same helper the run-detail
// loader's header fallback already uses).
if (!envSlug) return;
const envRow = await this._replica.runtimeEnvironment.findFirst({
where: { project: { id: project.id }, slug: envSlug },
select: { id: true, slug: true, type: true, organizationId: true },
});
if (!envRow) return;
const buffered = await findRunByIdWithMollifierFallback({
runId: runFriendlyId,
environmentId: envRow.id,
organizationId: envRow.organizationId,
});
if (!buffered) return;
const synth = await buildSyntheticSpanRun({
run: buffered,
environment: { id: envRow.id, slug: envRow.slug, type: envRow.type },
});
return { type: "run" as const, run: synth };
}

const { traceId } = parentRun;
Expand Down Expand Up @@ -373,6 +418,7 @@ export class SpanPresenter extends BasePresenter {
traceId: run.traceId,
spanId: run.spanId,
isCached: !!linkedRunId,
isBuffered: false,
machinePreset: machine?.name,
taskEventStore: run.taskEventStore,
externalTraceId,
Expand Down
37 changes: 34 additions & 3 deletions apps/webapp/app/routes/@.runs.$runParam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { z } from "zod";
import { prisma } from "~/db.server";
import { redirectWithErrorMessage } from "~/models/message.server";
import { requireUser } from "~/services/session.server";
import { impersonate, rootPath, v3RunPath } from "~/utils/pathBuilder";
import { impersonate, rootPath, v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder";
import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server";

const ParamsSchema = z.object({
runParam: z.string(),
Expand Down Expand Up @@ -32,6 +33,7 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
friendlyId: runParam,
},
select: {
spanId: true,
runtimeEnvironment: {
select: {
slug: true,
Expand All @@ -51,16 +53,45 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
});

if (!run) {
// Admin impersonation route — bypass org membership so admins can
// open any buffered run by friendlyId, mirroring the existing PG
// behaviour above (no membership filter on the find).
const buffered = await findBufferedRunRedirectInfo({
runFriendlyId: runParam,
userId: user.id,
skipOrgMembershipCheck: true,
});
if (buffered) {
// Preselect the root span so the run-detail trace tree opens with
// the buffered run's span highlighted, matching the sibling
// redirect routes (runs.$runParam.ts, projects.v3.$projectRef…).
const path = buffered.spanId
? v3RunSpanPath(
{ slug: buffered.organizationSlug },
{ slug: buffered.projectSlug },
{ slug: buffered.environmentSlug },
{ friendlyId: runParam },
{ spanId: buffered.spanId }
)
: v3RunPath(
{ slug: buffered.organizationSlug },
{ slug: buffered.projectSlug },
{ slug: buffered.environmentSlug },
{ friendlyId: runParam }
);
return redirect(impersonate(path));
}
return redirectWithErrorMessage(rootPath(), request, "Run doesn't exist", {
ephemeral: false,
});
}

const path = v3RunPath(
const path = v3RunSpanPath(
{ slug: run.project.organization.slug },
{ slug: run.project.slug },
{ slug: run.runtimeEnvironment.slug },
{ friendlyId: runParam }
{ friendlyId: runParam },
{ spanId: run.spanId }
);

return redirect(impersonate(path));
Expand Down
Loading