diff --git a/.server-changes/session-route-hardening.md b/.server-changes/session-route-hardening.md new file mode 100644 index 00000000000..2734b35a784 --- /dev/null +++ b/.server-changes/session-route-hardening.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids. diff --git a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts index 4fbdb454d92..39c30894416 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts @@ -106,12 +106,14 @@ const { action, loader } = createActionApiRoute( }); // Step 2: Register the waitpoint on the session channel so the next - // append fires it. Keyed by (addressingKey, io) — the canonical - // string for the row. The append handler drains by the same - // canonical key, so writers and readers converge regardless of - // which URL form the agent vs. the appending caller used. + // append fires it. Keyed by (environmentId, addressingKey, io) — the + // canonical string for the row, scoped to the environment because + // externalIds are only unique per environment. The append handler + // drains by the same key, so writers and readers converge regardless + // of which URL form the agent vs. the appending caller used. const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined; await addSessionStreamWaitpoint( + authentication.environment.id, addressingKey, body.io, result.waitpoint.id, @@ -152,6 +154,7 @@ const { action, loader } = createActionApiRoute( }); await removeSessionStreamWaitpoint( + authentication.environment.id, addressingKey, body.io, result.waitpoint.id diff --git a/apps/webapp/app/routes/api.v1.sessions.$session.ts b/apps/webapp/app/routes/api.v1.sessions.$session.ts index 9b6fb339989..58208de35e3 100644 --- a/apps/webapp/app/routes/api.v1.sessions.$session.ts +++ b/apps/webapp/app/routes/api.v1.sessions.$session.ts @@ -74,6 +74,24 @@ const { action } = createActionApiRoute( return json({ error: "Session not found" }, { status: 404 }); } + // The externalId is the canonical addressing key once set: the S2 + // stream names, the waitpoint cache key, and the minted session PAT + // scope all derive from it. Re-keying a session would orphan its + // streams (the chat goes silent) and invalidate the PAT's scope, so + // reject any change. Same-value PATCHes stay idempotent. + if ( + body.externalId !== undefined && + body.externalId !== existing.externalId + ) { + return json( + { + error: + "externalId cannot be changed after creation; close this session and create a new one with the desired externalId", + }, + { status: 422 } + ); + } + try { const updated = await prisma.session.update({ where: { id: existing.id }, diff --git a/apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts b/apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts index fc0335847bb..80140f05d20 100644 --- a/apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts +++ b/apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts @@ -4,6 +4,7 @@ import { $replica } from "~/db.server"; import { chatSnapshotStorageKey } from "~/services/realtime/chatSnapshot.server"; import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server"; import { + anyResource, createActionApiRoute, createLoaderApiRoute, } from "~/services/routeBuilders/apiBuilder.server"; @@ -21,8 +22,31 @@ const routeConfig = { resolveSessionByIdOrExternalId($replica, auth.environment.id, params.sessionId), }; +// Authorize against the union of the URL form, friendlyId, and externalId — +// same shape as the sibling session routes. Without an authorization block +// the route builder skips scope checks entirely, so any session-scoped JWT +// in the environment could presign URLs for any other session's snapshot. +function sessionResource( + paramId: string, + session: { friendlyId: string; externalId: string | null } | null | undefined +) { + const ids = new Set([paramId]); + if (session) { + ids.add(session.friendlyId); + if (session.externalId) ids.add(session.externalId); + } + return anyResource([...ids].map((id) => ({ type: "sessions" as const, id }))); +} + export const { action } = createActionApiRoute( - { ...routeConfig, method: "PUT" }, + { + ...routeConfig, + method: "PUT", + authorization: { + action: "write", + resource: (params, _, __, ___, session) => sessionResource(params.sessionId, session), + }, + }, async ({ authentication, resource: session }) => { if (!session) { return json({ error: "Session not found" }, { status: 404 }); @@ -42,7 +66,15 @@ export const { action } = createActionApiRoute( } ); -export const loader = createLoaderApiRoute(routeConfig, async ({ authentication, resource: session }) => { +export const loader = createLoaderApiRoute( + { + ...routeConfig, + authorization: { + action: "read", + resource: (session, params) => sessionResource(params.sessionId, session), + }, + }, + async ({ authentication, resource: session }) => { if (!session) { return json({ error: "Session not found" }, { status: 404 }); } diff --git a/apps/webapp/app/routes/api.v1.sessions.ts b/apps/webapp/app/routes/api.v1.sessions.ts index 308901b0874..44f8c7ef69f 100644 --- a/apps/webapp/app/routes/api.v1.sessions.ts +++ b/apps/webapp/app/routes/api.v1.sessions.ts @@ -18,7 +18,10 @@ import { type SessionTriggerConfig, } from "~/services/realtime/sessionRunManager.server"; import { chatSnapshotStoragePathForSession } from "~/services/realtime/chatSnapshot.server"; -import { serializeSession } from "~/services/realtime/sessions.server"; +import { + serializeSession, + serializeSessionsWithFriendlyRunIds, +} from "~/services/realtime/sessions.server"; import { SessionsRepository } from "~/services/sessionsRepository/sessionsRepository.server"; import { anyResource, @@ -91,17 +94,29 @@ export const loader = createLoaderApiRoute( }, }); - return json({ - data: rows.map((row) => - serializeSession({ - ...row, - // Columns the list query doesn't select — filled so `serializeSession` - // can operate on a narrowed payload without type errors. - projectId: authentication.environment.projectId, - environmentType: authentication.environment.type, - organizationId: authentication.environment.organizationId, - } as Session) + // Batched friendlyId translation: `currentRunId` on the wire is the + // public `run_*` form, matching the single-session routes. One `IN` + // lookup per page. + const data = await serializeSessionsWithFriendlyRunIds( + rows.map( + (row) => + ({ + ...row, + // Columns the list query doesn't select — filled so the + // serializer can operate on a narrowed payload without type errors. + projectId: authentication.environment.projectId, + environmentType: authentication.environment.type, + organizationId: authentication.environment.organizationId, + }) as Session ), + { + projectId: authentication.environment.projectId, + runtimeEnvironmentId: authentication.environment.id, + } + ); + + return json({ + data, pagination: { ...(pagination.nextCursor ? { next: pagination.nextCursor } : {}), ...(pagination.previousCursor ? { previous: pagination.previousCursor } : {}), @@ -225,6 +240,17 @@ const { action } = createActionApiRoute( ); } + // Same guard as the append / end-and-continue handlers: an expired + // row must not spawn a run, because every subsequent `.in/append` + // would 400 on the expiry check — a run boots but the chat can + // never receive input. + if (session.expiresAt && session.expiresAt.getTime() < Date.now()) { + return json( + { error: "Session is expired; use a different externalId to create a new session" }, + { status: 409 } + ); + } + // Session is task-bound — every session has a live run by // construction. `ensureRunForSession` is idempotent: on the // cached path it sees `currentRunId` is alive and returns it diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts index dbdb9f47d52..e5fc0d5c344 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -11,7 +11,11 @@ import { resolveSessionByIdOrExternalId, } from "~/services/realtime/sessions.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; -import { drainSessionStreamWaitpoints } from "~/services/sessionStreamWaitpointCache.server"; +import { + claimSessionStreamPart, + drainSessionStreamWaitpoints, + releaseSessionStreamPart, +} from "~/services/sessionStreamWaitpointCache.server"; import { anyResource, createActionApiRoute, @@ -91,6 +95,17 @@ const { action, loader } = createActionApiRoute( ); } + // `.out` is the agent→client channel. Only PRIVATE (secret key) auth — + // i.e. the agent run itself — may write to it. Session-scoped JWTs carry + // `write:sessions:` for `.in`; without this gate they could forge + // assistant chunks and complete `.out` waitpoints on their own session. + if (params.io === "out" && authentication.type !== "PRIVATE") { + return json( + { ok: false, error: "Appending to the out channel requires secret key authentication" }, + { status: 403 } + ); + } + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", { session, }); @@ -132,25 +147,54 @@ const { action, loader } = createActionApiRoute( const addressingKey = canonicalSessionAddressingKey(session, params.session); const part = await request.text(); - const partId = request.headers.get("X-Part-Id") ?? nanoid(7); + const clientPartId = request.headers.get("X-Part-Id"); + const partId = clientPartId ?? nanoid(7); - const [appendError] = await tryCatch( - realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io) - ); + // Idempotency on client-supplied part ids: atomically claim the id before + // appending. A concurrent or retried POST that loses the claim skips the + // append (no duplicate record) but still falls through to the drain below, + // so a retry whose first attempt died before waking the waitpoint can still + // recover it. The claim is released on append failure so a genuine retry + // can re-claim and proceed. + const wonClaim = clientPartId + ? await claimSessionStreamPart( + authentication.environment.id, + addressingKey, + params.io, + clientPartId + ) + : true; + + if (wonClaim) { + const [appendError] = await tryCatch( + realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io) + ); - if (appendError) { - if (appendError instanceof ServiceValidationError) { + if (appendError) { + if (clientPartId) { + await releaseSessionStreamPart( + authentication.environment.id, + addressingKey, + params.io, + clientPartId + ); + } + if (appendError instanceof ServiceValidationError) { + return json( + { ok: false, error: appendError.message }, + { status: appendError.status ?? 422 } + ); + } + logger.error("Failed to append to session stream", { + sessionId: session.id, + io: params.io, + error: appendError, + }); return json( - { ok: false, error: appendError.message }, - { status: appendError.status ?? 422 } + { ok: false, error: "Something went wrong, please try again." }, + { status: 500 } ); } - logger.error("Failed to append to session stream", { - sessionId: session.id, - io: params.io, - error: appendError, - }); - return json({ ok: false, error: "Something went wrong, please try again." }, { status: 500 }); } // Fire any run-scoped waitpoints registered against this channel. Best @@ -160,7 +204,7 @@ const { action, loader } = createActionApiRoute( // `sessions.open(...).in.wait()`, so writers and readers converge // regardless of which URL form they used. const [drainError, waitpointIds] = await tryCatch( - drainSessionStreamWaitpoints(addressingKey, params.io) + drainSessionStreamWaitpoints(authentication.environment.id, addressingKey, params.io) ); if (drainError) { logger.error("Failed to drain session stream waitpoints", { diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts index 77f03f4ce5c..038e053a8b0 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts @@ -128,7 +128,7 @@ export async function action({ request, params }: ActionFunctionArgs) { // Drain any waitpoints registered for this channel — same as the // public append. Best-effort; failure doesn't fail the append. const [drainError, waitpointIds] = await tryCatch( - drainSessionStreamWaitpoints(addressingKey, io) + drainSessionStreamWaitpoints(environment.id, addressingKey, io) ); if (drainError) { logger.error("Failed to drain session stream waitpoints (playground)", { diff --git a/apps/webapp/app/services/realtime/sessions.server.ts b/apps/webapp/app/services/realtime/sessions.server.ts index 594d417292c..226ec443d4c 100644 --- a/apps/webapp/app/services/realtime/sessions.server.ts +++ b/apps/webapp/app/services/realtime/sessions.server.ts @@ -75,10 +75,10 @@ export function canonicalSessionAddressingKey( * * Note: `currentRunId` is left as-is — Prisma stores the internal run id * (cuid), but `SessionItem.currentRunId` is the *friendly* form. Routes - * that emit a single `SessionItem` should use - * {@link serializeSessionWithFriendlyRunId} instead, which resolves the - * friendlyId via a TaskRun lookup. List endpoints stay on this raw form - * to avoid N+1 lookups when paginating. + * that emit `SessionItem`s must translate: single-row endpoints via + * {@link serializeSessionWithFriendlyRunId}, list endpoints via the + * batched {@link serializeSessionsWithFriendlyRunIds}. Never put this + * raw form on the wire directly. */ export function serializeSession(session: Session): SessionItem { return { @@ -125,3 +125,38 @@ export async function serializeSessionWithFriendlyRunId( currentRunId: run?.friendlyId ?? null, }; } + +/** + * Batched form of {@link serializeSessionWithFriendlyRunId} for list + * endpoints: one `IN` lookup per page instead of N+1. `currentRunId` on + * the wire is always the public `run_*` friendlyId — the raw + * {@link serializeSession} form leaks the internal cuid, which customers + * can't use with `runs.retrieve(...)`. + */ +export async function serializeSessionsWithFriendlyRunIds( + sessions: Session[], + scope: { projectId: string; runtimeEnvironmentId: string } +): Promise { + const runIds = [...new Set(sessions.map((s) => s.currentRunId).filter((id): id is string => !!id))]; + + // `currentRunId` is a plain string pointer (no FK), so scope the lookup to + // the caller's tenant — a stale value must not resolve a run in another env. + const runs = runIds.length + ? await $replica.taskRun.findMany({ + where: { + id: { in: runIds }, + projectId: scope.projectId, + runtimeEnvironmentId: scope.runtimeEnvironmentId, + }, + select: { id: true, friendlyId: true }, + }) + : []; + const friendlyIdByRunId = new Map(runs.map((run) => [run.id, run.friendlyId])); + + return sessions.map((session) => ({ + ...serializeSession(session), + currentRunId: session.currentRunId + ? friendlyIdByRunId.get(session.currentRunId) ?? null + : null, + })); +} diff --git a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts index 93f4b397481..31121678282 100644 --- a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts @@ -5,14 +5,24 @@ import { singleton } from "~/utils/singleton"; import { logger } from "./logger.server"; // "ssw" — session-stream-waitpoint. Parallel to the input-stream variant -// (`isw:{runFriendlyId}:{streamId}`). Keyed purely on `{sessionId, io}` so -// a send() lands on the channel regardless of which run is waiting, and +// (`isw:{runFriendlyId}:{streamId}`). Keyed on `{environmentId, addressingKey, io}` +// so a send() lands on the channel regardless of which run is waiting, and // multiple concurrent waiters (e.g. two agents on one chat) all wake. +// The environmentId prefix is load-bearing: the addressing key is the +// user-supplied externalId (unique only per environment), and this Redis +// is shared — without it, two environments using the same externalId +// would drain each other's waitpoints. const KEY_PREFIX = "ssw:"; const DEFAULT_TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days -function buildKey(sessionFriendlyId: string, io: "out" | "in"): string { - return `${KEY_PREFIX}${sessionFriendlyId}:${io}`; +function buildKey(environmentId: string, addressingKey: string, io: "out" | "in"): string { + return `${KEY_PREFIX}${environmentId}:${addressingKey}:${io}`; +} + +// Pre-env-scoping key format, drained for one release so waitpoints from the +// previous deploy still wake. Removable once this has been live > turn timeout. +function buildLegacyKey(addressingKey: string, io: "out" | "in"): string { + return `${KEY_PREFIX}${addressingKey}:${io}`; } function initializeRedis(): Redis | undefined { @@ -67,7 +77,8 @@ const ADD_WAITPOINT_SCRIPT = ` * channel are allowed (stored as a Redis set). */ export async function addSessionStreamWaitpoint( - sessionFriendlyId: string, + environmentId: string, + addressingKey: string, io: "out" | "in", waitpointId: string, ttlMs?: number @@ -75,7 +86,7 @@ export async function addSessionStreamWaitpoint( if (!redis) return; try { - const key = buildKey(sessionFriendlyId, io); + const key = buildKey(environmentId, addressingKey, io); await redis.eval( ADD_WAITPOINT_SCRIPT, 1, @@ -85,7 +96,8 @@ export async function addSessionStreamWaitpoint( ); } catch (error) { logger.error("Failed to set session stream waitpoint cache", { - sessionFriendlyId, + environmentId, + addressingKey, io, error, }); @@ -98,26 +110,36 @@ export async function addSessionStreamWaitpoint( * empty set even if two appends race. */ export async function drainSessionStreamWaitpoints( - sessionFriendlyId: string, + environmentId: string, + addressingKey: string, io: "out" | "in" ): Promise { if (!redis) return []; try { - const key = buildKey(sessionFriendlyId, io); + const key = buildKey(environmentId, addressingKey, io); + const legacyKey = buildLegacyKey(addressingKey, io); const pipeline = redis.multi(); pipeline.smembers(key); pipeline.del(key); + pipeline.smembers(legacyKey); + pipeline.del(legacyKey); const results = await pipeline.exec(); if (!results) return []; - const [smembersResult] = results; - if (!smembersResult) return []; - const [err, members] = smembersResult; - if (err) return []; - return Array.isArray(members) ? (members as string[]) : []; + // Union members from the env-scoped key and the legacy key (dual-read). + const ids = new Set(); + for (const idx of [0, 2]) { + const entry = results[idx]; + if (!entry) continue; + const [err, members] = entry; + if (err || !Array.isArray(members)) continue; + for (const m of members as string[]) ids.add(m); + } + return [...ids]; } catch (error) { logger.error("Failed to drain session stream waitpoint cache", { - sessionFriendlyId, + environmentId, + addressingKey, io, error, }); @@ -129,19 +151,106 @@ export async function drainSessionStreamWaitpoints( * Remove a single waitpoint from the pending set. Called after a race * where `.wait()` completed the waitpoint from pre-arrived data. */ +// "ssa" — session-stream-append. Idempotency claim for the append route: +// when a caller supplies an `X-Part-Id`, the first request atomically claims +// the key (SET NX) before appending; a concurrent or retried POST with the +// same id fails the claim and skips the append, so it never produces a +// duplicate record (or double-fires the waitpoint drain). The claim is +// released if the append fails, so a retry of a genuinely failed append +// still goes through. 5-minute window — covers retry storms, not a +// permanent idempotency store. +const APPEND_DEDUPE_PREFIX = "ssa:"; +const APPEND_DEDUPE_TTL_SECONDS = 5 * 60; + +function buildAppendDedupeKey( + environmentId: string, + addressingKey: string, + io: "out" | "in", + partId: string +): string { + // Encode the free-form segments — `addressingKey` (externalId) and `partId` + // (X-Part-Id) are user-supplied and may contain `:`, which would otherwise + // let different tuples collide and falsely suppress an append. + return `${APPEND_DEDUPE_PREFIX}${encodeURIComponent(environmentId)}:${encodeURIComponent( + addressingKey + )}:${io}:${encodeURIComponent(partId)}`; +} + +/** + * Atomically claim a part id before appending. Returns true if this caller + * won the claim (first to see this id) and should perform the append, false + * if the id was already claimed (a concurrent or retried POST) and the append + * should be skipped. Fails open (returns true) when Redis is unavailable — + * appends degrade to at-least-once, never to dropped. + */ +export async function claimSessionStreamPart( + environmentId: string, + addressingKey: string, + io: "out" | "in", + partId: string +): Promise { + if (!redis) return true; + + try { + // SET NX is the atomic claim: "OK" when set (we won), null when the key + // already exists (someone else owns this id). + const result = await redis.set( + buildAppendDedupeKey(environmentId, addressingKey, io, partId), + "1", + "EX", + APPEND_DEDUPE_TTL_SECONDS, + "NX" + ); + return result === "OK"; + } catch (error) { + logger.error("Failed to claim session stream append part", { + environmentId, + addressingKey, + io, + partId, + error, + }); + return true; + } +} + +/** Release a claim so a retry can proceed — called when the append itself failed. */ +export async function releaseSessionStreamPart( + environmentId: string, + addressingKey: string, + io: "out" | "in", + partId: string +): Promise { + if (!redis) return; + + try { + await redis.del(buildAppendDedupeKey(environmentId, addressingKey, io, partId)); + } catch (error) { + logger.error("Failed to release session stream append part", { + environmentId, + addressingKey, + io, + partId, + error, + }); + } +} + export async function removeSessionStreamWaitpoint( - sessionFriendlyId: string, + environmentId: string, + addressingKey: string, io: "out" | "in", waitpointId: string ): Promise { if (!redis) return; try { - const key = buildKey(sessionFriendlyId, io); + const key = buildKey(environmentId, addressingKey, io); await redis.srem(key, waitpointId); } catch (error) { logger.error("Failed to remove session stream waitpoint cache entry", { - sessionFriendlyId, + environmentId, + addressingKey, io, error, });