From ec3abbc398bc750d327010439e6326305807cde2 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 19 May 2026 13:50:29 +0100 Subject: [PATCH 1/9] feat(supervisor): wide-event module + kill switch --- apps/supervisor/src/env.ts | 5 + apps/supervisor/src/wideEvents/context.ts | 14 ++ apps/supervisor/src/wideEvents/emit.test.ts | 115 ++++++++++ apps/supervisor/src/wideEvents/emit.ts | 72 ++++++ apps/supervisor/src/wideEvents/index.ts | 28 +++ .../src/wideEvents/middleware.test.ts | 207 ++++++++++++++++++ apps/supervisor/src/wideEvents/middleware.ts | 122 +++++++++++ apps/supervisor/src/wideEvents/new.test.ts | 81 +++++++ apps/supervisor/src/wideEvents/new.ts | 76 +++++++ apps/supervisor/src/wideEvents/record.test.ts | 112 ++++++++++ apps/supervisor/src/wideEvents/record.ts | 82 +++++++ apps/supervisor/src/wideEvents/state.ts | 62 ++++++ .../src/wideEvents/traceparent.test.ts | 43 ++++ apps/supervisor/src/wideEvents/traceparent.ts | 39 ++++ 14 files changed, 1058 insertions(+) create mode 100644 apps/supervisor/src/wideEvents/context.ts create mode 100644 apps/supervisor/src/wideEvents/emit.test.ts create mode 100644 apps/supervisor/src/wideEvents/emit.ts create mode 100644 apps/supervisor/src/wideEvents/index.ts create mode 100644 apps/supervisor/src/wideEvents/middleware.test.ts create mode 100644 apps/supervisor/src/wideEvents/middleware.ts create mode 100644 apps/supervisor/src/wideEvents/new.test.ts create mode 100644 apps/supervisor/src/wideEvents/new.ts create mode 100644 apps/supervisor/src/wideEvents/record.test.ts create mode 100644 apps/supervisor/src/wideEvents/record.ts create mode 100644 apps/supervisor/src/wideEvents/state.ts create mode 100644 apps/supervisor/src/wideEvents/traceparent.test.ts create mode 100644 apps/supervisor/src/wideEvents/traceparent.ts diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index f2d54741eee..0e52578daf7 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -256,6 +256,11 @@ const Env = z // Debug DEBUG: BoolEnv.default(false), SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), + + // Wide-event observability - off by default. Emits one flat-keyed JSON + // line per natural unit of work (dequeue iteration, HTTP request, socket + // lifecycle). High-QPS hotpath, so the kill switch must be honoured. + TRIGGER_WIDE_EVENTS_ENABLED: BoolEnv.default(false), }) .superRefine((data, ctx) => { if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { diff --git a/apps/supervisor/src/wideEvents/context.ts b/apps/supervisor/src/wideEvents/context.ts new file mode 100644 index 00000000000..a89859c2707 --- /dev/null +++ b/apps/supervisor/src/wideEvents/context.ts @@ -0,0 +1,14 @@ +import { AsyncLocalStorage } from "node:async_hooks"; +import type { State } from "./state.js"; + +/** + * AsyncLocalStorage threading per-operation `State` through the call stack. + * Wrappers enter a state via `wideEventStorage.run(state, () => fn())` and + * any code in the async call tree retrieves it via `fromContext()`. + */ +export const wideEventStorage = new AsyncLocalStorage(); + +/** Returns the State attached to the current async context, or null. */ +export function fromContext(): State | null { + return wideEventStorage.getStore() ?? null; +} diff --git a/apps/supervisor/src/wideEvents/emit.test.ts b/apps/supervisor/src/wideEvents/emit.test.ts new file mode 100644 index 00000000000..e9df12a3d4e --- /dev/null +++ b/apps/supervisor/src/wideEvents/emit.test.ts @@ -0,0 +1,115 @@ +import { describe, it, expect } from "vitest"; +import { emit, EmitMessage } from "./emit.js"; +import { newState } from "./new.js"; + +function captureEmit(state: Parameters[0]): Record { + const captured: string[] = []; + const origWrite = process.stdout.write; + process.stdout.write = ((chunk: unknown) => { + captured.push(String(chunk)); + return true; + }) as typeof process.stdout.write; + try { + emit(state); + } finally { + process.stdout.write = origWrite; + } + expect(captured).toHaveLength(1); + const line = captured[0]; + if (!line) throw new Error("no captured line"); + return JSON.parse(line) as Record; +} + +describe("emit", () => { + it("emits a single line with the stable message + request_id", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + s.durationMs = 5; + const out = captureEmit(s); + expect(out.msg).toBe(EmitMessage); + expect(out.request_id).toBe(s.requestId); + expect(out.service).toBe("supervisor"); + expect(out.ok).toBe(true); + expect(out.status).toBe(200); + expect(out.duration_ms).toBe(5); + }); + + it("omits empty optional fields", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + const out = captureEmit(s); + expect(out).not.toHaveProperty("trace_id"); + expect(out).not.toHaveProperty("version"); + expect(out).not.toHaveProperty("commit_sha"); + expect(out).not.toHaveProperty("instance_id"); + expect(out).not.toHaveProperty("error.code"); + }); + + it("flattens meta keys as meta.", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + s.meta.run_id = "run_abc"; + s.meta.deployment_id = "dep_xyz"; + const out = captureEmit(s); + expect(out["meta.run_id"]).toBe("run_abc"); + expect(out["meta.deployment_id"]).toBe("dep_xyz"); + expect(out).not.toHaveProperty("meta"); + }); + + it("flattens phases as phase..", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + s.phases.push({ name: "warm_start", durationMs: 12, ok: true, attempts: 1 }); + s.phases.push({ + name: "workload_create", + durationMs: 3, + ok: false, + attempts: 2, + errorCode: "Error", + errorMsg: "boom", + sub: { create_ms: 1 }, + }); + const out = captureEmit(s); + expect(out["phase.warm_start.duration_ms"]).toBe(12); + expect(out["phase.warm_start.ok"]).toBe(true); + expect(out["phase.warm_start.attempts"]).toBe(1); + expect(out["phase.workload_create.duration_ms"]).toBe(3); + expect(out["phase.workload_create.ok"]).toBe(false); + expect(out["phase.workload_create.attempts"]).toBe(2); + expect(out["phase.workload_create.error_code"]).toBe("Error"); + expect(out["phase.workload_create.error_message"]).toBe("boom"); + expect(out["phase.workload_create.create_ms"]).toBe(1); + }); + + it("includes error.code/message/kind when state.error is set", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 500; + s.error = { code: "InternalError", message: "kaboom", kind: "internal" }; + const out = captureEmit(s); + expect(out["error.code"]).toBe("InternalError"); + expect(out["error.message"]).toBe("kaboom"); + expect(out["error.kind"]).toBe("internal"); + }); + + it("truncates very long error messages", () => { + const s = newState({ service: "supervisor", env: {} }); + s.error = { code: "Big", message: "x".repeat(2000), kind: "internal" }; + const out = captureEmit(s); + expect((out["error.message"] as string).length).toBe(512); + }); + + it("flattens extras at the top level", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + s.extras.route = "/health"; + s.extras["dispatch.result"] = "hit"; + const out = captureEmit(s); + expect(out.route).toBe("/health"); + expect(out["dispatch.result"]).toBe("hit"); + }); +}); diff --git a/apps/supervisor/src/wideEvents/emit.ts b/apps/supervisor/src/wideEvents/emit.ts new file mode 100644 index 00000000000..85a7a082ca3 --- /dev/null +++ b/apps/supervisor/src/wideEvents/emit.ts @@ -0,0 +1,72 @@ +import type { State } from "./state.js"; + +/** + * Stable slog message string for every wide event. Downstream filters (jq, + * Axiom queries, Vector pipelines) pin to this constant. The `service` field + * disambiguates which service emitted it. + */ +export const EmitMessage = "wide_event"; + +const MAX_ERROR_MSG_BYTES = 512; + +/** + * Serializes a State as a single flat-keyed JSON line on stdout. Keys are + * flat (no nested objects) to keep jq filtering and Axiom indexing cheap. + * Empty optional fields are omitted. + */ +export function emit(state: State): void { + const out: Record = { + msg: EmitMessage, + request_id: state.requestId, + }; + + if (state.traceId) out.trace_id = state.traceId; + appendIfSet(out, "service", state.service); + appendIfSet(out, "version", state.version); + appendIfSet(out, "commit_sha", state.commitSha); + appendIfSet(out, "region", state.region); + appendIfSet(out, "node_id", state.nodeId); + appendIfSet(out, "instance_id", state.instanceId); + + out.ok = state.ok; + if (state.statusCode !== 0) out.status = state.statusCode; + out.duration_ms = state.durationMs; + + if (state.error) { + appendIfSet(out, "error.code", state.error.code); + const msg = + state.error.message.length > MAX_ERROR_MSG_BYTES + ? state.error.message.slice(0, MAX_ERROR_MSG_BYTES) + : state.error.message; + appendIfSet(out, "error.message", msg); + appendIfSet(out, "error.kind", state.error.kind); + } + + for (const [k, v] of Object.entries(state.meta)) { + out["meta." + k] = v; + } + + for (const p of state.phases) { + const prefix = "phase." + p.name + "."; + out[prefix + "duration_ms"] = p.durationMs; + out[prefix + "ok"] = p.ok; + out[prefix + "attempts"] = p.attempts; + if (p.errorCode) out[prefix + "error_code"] = p.errorCode; + if (p.errorMsg) out[prefix + "error_message"] = p.errorMsg; + if (p.sub) { + for (const [sk, sv] of Object.entries(p.sub)) { + out[prefix + sk] = sv; + } + } + } + + for (const [k, v] of Object.entries(state.extras)) { + out[k] = v; + } + + process.stdout.write(JSON.stringify(out) + "\n"); +} + +function appendIfSet(out: Record, key: string, value: string | undefined): void { + if (value) out[key] = value; +} diff --git a/apps/supervisor/src/wideEvents/index.ts b/apps/supervisor/src/wideEvents/index.ts new file mode 100644 index 00000000000..742d5c018e8 --- /dev/null +++ b/apps/supervisor/src/wideEvents/index.ts @@ -0,0 +1,28 @@ +/** + * Wide-event observability surface for the supervisor. One flat-keyed JSON + * line per natural unit of work (HTTP request, dequeue iteration, socket + * lifecycle event). Events join across services via `trace_id` (parsed from + * the inbound W3C `traceparent`) and `meta.run_id`. + * + * Off by default behind a kill switch - the dispatch hotpath runs at high + * QPS, so logging pressure must be cleanly removable. + */ +export { type Env, isValidRequestId, newState, type NewStateOptions } from "./new.js"; +export { emit, EmitMessage } from "./emit.js"; +export { parseTraceId } from "./traceparent.js"; +export { fromContext, wideEventStorage } from "./context.js"; +export { + type PhaseOpt, + recordPhase, + recordPhaseSince, + timePhase, +} from "./record.js"; +export { + emitOneShot, + runWideEvent, + setExtra, + setMeta, + type WideEventLifecycleOptions, + type WideEventOptions, +} from "./middleware.js"; +export type { ErrorInfo, PhaseRecord, State } from "./state.js"; diff --git a/apps/supervisor/src/wideEvents/middleware.test.ts b/apps/supervisor/src/wideEvents/middleware.test.ts new file mode 100644 index 00000000000..18a6469d7b8 --- /dev/null +++ b/apps/supervisor/src/wideEvents/middleware.test.ts @@ -0,0 +1,207 @@ +import { describe, it, expect } from "vitest"; +import { fromContext } from "./context.js"; +import { emitOneShot, runWideEvent, setMeta } from "./middleware.js"; + +function captureStdout(fn: () => Promise | unknown): Promise { + const captured: string[] = []; + const orig = process.stdout.write; + process.stdout.write = ((chunk: unknown) => { + captured.push(String(chunk)); + return true; + }) as typeof process.stdout.write; + return Promise.resolve(fn()) + .finally(() => { + process.stdout.write = orig; + }) + .then(() => captured); +} + +describe("runWideEvent", () => { + it("emits one event with ok=true when no statusCode is set", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true, route: "/x", method: "POST" }, + async () => undefined + ); + }); + expect(lines).toHaveLength(1); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(true); + expect(ev.service).toBe("supervisor"); + expect(ev.route).toBe("/x"); + expect(ev.method).toBe("POST"); + expect(typeof ev.duration_ms).toBe("number"); + expect(typeof ev.request_id).toBe("string"); + }); + + it("derives ok from statusCode set via finalize", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => undefined, + (state) => { + state.statusCode = 200; + } + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(true); + expect(ev.status).toBe(200); + }); + + it("treats 4xx as ok=false", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => undefined, + (state) => { + state.statusCode = 400; + } + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(false); + expect(ev.status).toBe(400); + }); + + it("emits ok=false with error.kind=internal on throw", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => { + throw new Error("boom"); + } + ).catch(() => undefined); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(false); + expect(ev.status).toBe(500); + expect(ev["error.kind"]).toBe("internal"); + expect(ev["error.message"]).toBe("boom"); + }); + + it("threads state through AsyncLocalStorage", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => { + setMeta(fromContext(), "run_id", "run_abc"); + } + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev["meta.run_id"]).toBe("run_abc"); + expect(ev.ok).toBe(true); + }); + + it("picks up inbound traceparent for trace_id", async () => { + const tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"; + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true, traceparent: tp }, + async () => undefined + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.trace_id).toBe("4bf92f3577b34da6a3ce929d0e0e4736"); + }); + + it("honours setup() to attach meta and extras before fn runs", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { + service: "supervisor", + env: {}, + enabled: true, + setup: (state) => { + state.meta.run_id = "run_abc"; + state.extras.iteration = "dequeue"; + }, + }, + async () => undefined + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev["meta.run_id"]).toBe("run_abc"); + expect(ev.iteration).toBe("dequeue"); + }); + + it("short-circuits to pass-through when enabled=false", async () => { + let seenState: ReturnType = null; + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: false }, + async () => { + seenState = fromContext(); + } + ); + }); + expect(lines).toHaveLength(0); + expect(seenState).toBe(null); + }); + + it("isolates state across concurrent invocations", async () => { + const lines = await captureStdout(async () => { + await Promise.all( + ["a", "b", "c"].map((tag) => + runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => { + const s = fromContext(); + if (!s) throw new Error("no state"); + s.meta.tag = tag; + await new Promise((r) => setTimeout(r, 5)); + expect(s.meta.tag).toBe(tag); + } + ) + ) + ); + }); + const tags = lines.map((l) => (JSON.parse(l) as Record)["meta.tag"]); + expect(tags.sort()).toEqual(["a", "b", "c"]); + }); +}); + +describe("emitOneShot", () => { + it("emits a single event with populated meta when enabled", async () => { + const lines = await captureStdout(() => { + emitOneShot({ + service: "supervisor", + env: {}, + enabled: true, + populate: (s) => { + s.meta.run_id = "run_abc"; + s.extras.event = "run:start"; + }, + }); + }); + expect(lines).toHaveLength(1); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(true); + expect(ev["meta.run_id"]).toBe("run_abc"); + expect(ev.event).toBe("run:start"); + }); + + it("emits nothing when disabled", async () => { + const lines = await captureStdout(() => { + emitOneShot({ service: "supervisor", env: {}, enabled: false }); + }); + expect(lines).toHaveLength(0); + }); +}); diff --git a/apps/supervisor/src/wideEvents/middleware.ts b/apps/supervisor/src/wideEvents/middleware.ts new file mode 100644 index 00000000000..5dafdfa7d13 --- /dev/null +++ b/apps/supervisor/src/wideEvents/middleware.ts @@ -0,0 +1,122 @@ +import { emit } from "./emit.js"; +import { newState, type Env } from "./new.js"; +import { wideEventStorage } from "./context.js"; +import type { State } from "./state.js"; + +/** Options common to every wide-event lifecycle. */ +export type WideEventOptions = { + service: string; + env: Env; + /** + * Kill switch. When false, lifecycles degenerate into transparent + * pass-through - no State allocation, no AsyncLocalStorage run, no emit. + * Important for the dispatch hotpath where logging pressure must be + * cleanly removable. + */ + enabled: boolean; +}; + +/** Per-invocation options layered on top of `WideEventOptions`. */ +export type WideEventLifecycleOptions = WideEventOptions & { + /** Route template (HTTP only) captured into `extras.route`. */ + route?: string; + /** HTTP method captured into `extras.method`. */ + method?: string; + /** Inbound W3C traceparent (HTTP header, queue message field). */ + traceparent?: string; + /** Inbound request id (e.g. `x-request-id` header). */ + inboundRequestId?: string; + /** Runs after the state is built, before the wrapped fn. Use to attach meta. */ + setup?: (state: State) => void; +}; + +/** + * Runs `fn` inside an AsyncLocalStorage state and emits one wide event on + * completion or error. `finalize` runs after `fn` returns but before emit - + * use it to read out-of-band outcome info (e.g. `res.statusCode` for an HTTP + * route) and assign to `state.statusCode`. The wrapper computes `ok` from + * `statusCode` if it's set; otherwise it defaults to true on success. + * + * Returns the original `fn` result. When `enabled=false`, `fn` runs unchanged + * with no event emitted. + */ +export async function runWideEvent( + opts: WideEventLifecycleOptions, + fn: () => Promise | T, + finalize?: (state: State) => void +): Promise { + if (!opts.enabled) { + return fn(); + } + + const state = newState({ + service: opts.service, + env: opts.env, + inboundRequestId: opts.inboundRequestId, + traceparent: opts.traceparent, + }); + if (opts.route) state.extras.route = opts.route; + if (opts.method) state.extras.method = opts.method; + if (opts.setup) opts.setup(state); + + const start = performance.now(); + try { + const result = await wideEventStorage.run(state, () => Promise.resolve(fn())); + state.durationMs = Math.round(performance.now() - start); + if (finalize) finalize(state); + if (state.statusCode !== 0) { + state.ok = state.statusCode >= 200 && state.statusCode < 300; + } else { + state.ok = true; + } + emit(state); + return result; + } catch (err) { + state.durationMs = Math.round(performance.now() - start); + const e = err instanceof Error ? err : new Error(String(err)); + if (state.statusCode === 0) state.statusCode = 500; + state.ok = false; + state.error = { + code: e.name || "Error", + message: e.message, + kind: "internal", + }; + emit(state); + throw err; + } +} + +/** + * One-shot wide event with no wrapped operation. Use for socket lifecycle + * events (`run:start`, `run:stop`) where there is no surrounding async unit + * of work to time. `populate` runs synchronously to attach meta/extras + * before emit. + */ +export function emitOneShot( + opts: WideEventOptions & { + traceparent?: string; + populate?: (state: State) => void; + } +): void { + if (!opts.enabled) return; + const state = newState({ + service: opts.service, + env: opts.env, + traceparent: opts.traceparent, + }); + if (opts.populate) opts.populate(state); + state.ok = true; + emit(state); +} + +/** Convenience accessor for in-handler meta mutation. */ +export function setMeta(state: State | null, key: string, value: string): void { + if (!state) return; + state.meta[key] = value; +} + +/** Convenience for free-form fields (did_warm_start, dispatch.result, ...). */ +export function setExtra(state: State | null, key: string, value: unknown): void { + if (!state) return; + state.extras[key] = value; +} diff --git a/apps/supervisor/src/wideEvents/new.test.ts b/apps/supervisor/src/wideEvents/new.test.ts new file mode 100644 index 00000000000..476c49c3d0e --- /dev/null +++ b/apps/supervisor/src/wideEvents/new.test.ts @@ -0,0 +1,81 @@ +import { describe, it, expect } from "vitest"; +import { isValidRequestId, newState } from "./new.js"; + +describe("isValidRequestId", () => { + it("accepts visible ASCII", () => { + expect(isValidRequestId("req-abc-123_456.7")).toBe(true); + expect(isValidRequestId("a")).toBe(true); + }); + + it("rejects empty string", () => { + expect(isValidRequestId("")).toBe(false); + }); + + it("rejects overlong strings (>128 bytes)", () => { + expect(isValidRequestId("a".repeat(128))).toBe(true); + expect(isValidRequestId("a".repeat(129))).toBe(false); + }); + + it("rejects whitespace, newlines, control chars", () => { + expect(isValidRequestId("has space")).toBe(false); + expect(isValidRequestId("has\ttab")).toBe(false); + expect(isValidRequestId("has\nnewline")).toBe(false); + expect(isValidRequestId("\x00null")).toBe(false); + }); + + it("rejects high-bit / non-ASCII", () => { + expect(isValidRequestId("café")).toBe(false); + expect(isValidRequestId("a\x7f")).toBe(false); + }); +}); + +describe("newState", () => { + const env = { version: "1.0.0", commitSha: "abc123", region: "us-east-1", nodeId: "node-1" }; + + it("populates service identity from env", () => { + const s = newState({ service: "supervisor", env }); + expect(s.service).toBe("supervisor"); + expect(s.version).toBe("1.0.0"); + expect(s.commitSha).toBe("abc123"); + expect(s.region).toBe("us-east-1"); + expect(s.nodeId).toBe("node-1"); + }); + + it("mints a fresh request id when none provided", () => { + const s = newState({ service: "test", env: {} }); + expect(s.requestId).toMatch(/^req-[0-9a-f]{32}$/); + }); + + it("honours a valid inbound request id", () => { + const s = newState({ service: "test", env: {}, inboundRequestId: "trace-abc-123" }); + expect(s.requestId).toBe("trace-abc-123"); + }); + + it("rejects unsafe inbound request id and mints a fresh one", () => { + const s = newState({ service: "test", env: {}, inboundRequestId: "has space" }); + expect(s.requestId).toMatch(/^req-[0-9a-f]{32}$/); + }); + + it("parses traceparent into traceId and preserves the raw header", () => { + const tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"; + const s = newState({ service: "test", env: {}, traceparent: tp }); + expect(s.traceId).toBe("4bf92f3577b34da6a3ce929d0e0e4736"); + expect(s.traceparent).toBe(tp); + }); + + it("leaves traceId empty when no traceparent provided", () => { + const s = newState({ service: "test", env: {} }); + expect(s.traceId).toBe(""); + expect(s.traceparent).toBe(""); + }); + + it("initialises empty meta/extras/phases", () => { + const s = newState({ service: "test", env: {} }); + expect(s.meta).toEqual({}); + expect(s.extras).toEqual({}); + expect(s.phases).toEqual([]); + expect(s.ok).toBe(false); + expect(s.statusCode).toBe(0); + expect(s.durationMs).toBe(0); + }); +}); diff --git a/apps/supervisor/src/wideEvents/new.ts b/apps/supervisor/src/wideEvents/new.ts new file mode 100644 index 00000000000..2b7c88b7cc9 --- /dev/null +++ b/apps/supervisor/src/wideEvents/new.ts @@ -0,0 +1,76 @@ +import { randomBytes } from "node:crypto"; +import { parseTraceId } from "./traceparent.js"; +import type { State } from "./state.js"; + +const MAX_REQUEST_ID_LEN = 128; + +/** + * Validates an inbound request id. Non-empty, no longer than 128 bytes, + * composed entirely of visible ASCII (0x21..0x7E). Rejects newlines, control + * characters, whitespace, DEL, high-bit bytes - any of which could poison the + * log pipeline if echoed back verbatim. + */ +export function isValidRequestId(s: string): boolean { + if (s.length === 0 || s.length > MAX_REQUEST_ID_LEN) return false; + for (let i = 0; i < s.length; i++) { + const c = s.charCodeAt(i); + if (c < 0x21 || c > 0x7e) return false; + } + return true; +} + +/** + * Service-level identity that's constant for the lifetime of the process. + * Populated once at startup, copied into every State. + */ +export type Env = { + version?: string; + commitSha?: string; + region?: string; + nodeId?: string; +}; + +export type NewStateOptions = { + service: string; + env: Env; + /** Optional inbound request id (e.g. from `x-request-id`). If unsafe or absent, a fresh `req-` is minted. */ + inboundRequestId?: string; + /** Optional inbound W3C traceparent (HTTP header, queue message field). */ + traceparent?: string; +}; + +/** + * Builds a State for a wide-event lifecycle. + * + * - requestId: honours `inboundRequestId` if present and safe; otherwise + * mints a fresh `req-` id. + * - traceId: parsed from the provided traceparent (graceful empty if + * absent or malformed). + * - traceparent: preserved verbatim for downstream propagation. + */ +export function newState(opts: NewStateOptions): State { + const traceparent = opts.traceparent ?? ""; + const inbound = opts.inboundRequestId ?? ""; + const requestId = isValidRequestId(inbound) ? inbound : newRequestId(); + + return { + requestId, + traceId: parseTraceId(traceparent), + traceparent, + service: opts.service, + version: opts.env.version, + commitSha: opts.env.commitSha, + region: opts.env.region, + nodeId: opts.env.nodeId, + meta: {}, + phases: [], + ok: false, + statusCode: 0, + durationMs: 0, + extras: {}, + }; +} + +function newRequestId(): string { + return "req-" + randomBytes(16).toString("hex"); +} diff --git a/apps/supervisor/src/wideEvents/record.test.ts b/apps/supervisor/src/wideEvents/record.test.ts new file mode 100644 index 00000000000..beeb0fff221 --- /dev/null +++ b/apps/supervisor/src/wideEvents/record.test.ts @@ -0,0 +1,112 @@ +import { describe, it, expect } from "vitest"; +import { fromContext, wideEventStorage } from "./context.js"; +import { recordPhase, recordPhaseSince, timePhase } from "./record.js"; +import { newState } from "./new.js"; +import type { State } from "./state.js"; + +function makeState(): State { + return newState({ service: "test", env: {} }); +} + +describe("recordPhase", () => { + it("appends a successful phase", () => { + const s = makeState(); + recordPhase(s, "lookup", performance.now() - 50, undefined); + expect(s.phases).toHaveLength(1); + const phase = s.phases[0]; + if (!phase) throw new Error("missing phase"); + expect(phase.name).toBe("lookup"); + expect(phase.ok).toBe(true); + expect(phase.attempts).toBe(1); + expect(phase.durationMs).toBeGreaterThanOrEqual(45); + }); + + it("appends a failed phase with error code/message", () => { + const s = makeState(); + recordPhase(s, "dispatch", performance.now(), new Error("nope")); + const phase = s.phases[0]; + if (!phase) throw new Error("missing phase"); + expect(phase.ok).toBe(false); + expect(phase.errorCode).toBe("Error"); + expect(phase.errorMsg).toBe("nope"); + }); + + it("truncates very long error messages", () => { + const s = makeState(); + recordPhase(s, "x", performance.now(), new Error("y".repeat(2000))); + const phase = s.phases[0]; + if (!phase) throw new Error("missing phase"); + expect(phase.errorMsg?.length).toBe(512); + }); + + it("honours opts.attempts", () => { + const s = makeState(); + recordPhase(s, "retry", performance.now(), undefined, { attempts: 3 }); + expect(s.phases[0]?.attempts).toBe(3); + }); + + it("attaches sub-timings", () => { + const s = makeState(); + recordPhase(s, "complex", performance.now(), undefined, { sub: { setup_ms: 10, work_ms: 5 } }); + expect(s.phases[0]?.sub).toEqual({ setup_ms: 10, work_ms: 5 }); + }); + + it("is a no-op when state is null", () => { + expect(() => recordPhase(null, "x", performance.now(), undefined)).not.toThrow(); + }); +}); + +describe("timePhase + AsyncLocalStorage threading", () => { + it("records via fromContext on success", async () => { + const s = makeState(); + const value = await wideEventStorage.run(s, () => timePhase("work", async () => 42)); + expect(value).toBe(42); + expect(s.phases).toHaveLength(1); + expect(s.phases[0]?.ok).toBe(true); + }); + + it("records via fromContext on error and rethrows", async () => { + const s = makeState(); + await expect( + wideEventStorage.run(s, () => + timePhase("work", async () => { + throw new Error("boom"); + }) + ) + ).rejects.toThrow("boom"); + expect(s.phases).toHaveLength(1); + expect(s.phases[0]?.ok).toBe(false); + expect(s.phases[0]?.errorMsg).toBe("boom"); + }); + + it("runs fn unchanged when no state on context", async () => { + const value = await timePhase("work", async () => "ok"); + expect(value).toBe("ok"); + }); +}); + +describe("recordPhaseSince", () => { + it("records using a caller-captured start time", async () => { + const s = makeState(); + await wideEventStorage.run(s, async () => { + const start = performance.now(); + await new Promise((r) => setTimeout(r, 10)); + recordPhaseSince("spanning", start, undefined); + }); + expect(s.phases).toHaveLength(1); + expect(s.phases[0]?.durationMs).toBeGreaterThanOrEqual(8); + }); +}); + +describe("fromContext", () => { + it("returns null when no state attached", () => { + expect(fromContext()).toBe(null); + }); + + it("returns the state when inside wideEventStorage.run", () => { + const s = makeState(); + wideEventStorage.run(s, () => { + expect(fromContext()).toBe(s); + }); + }); +}); diff --git a/apps/supervisor/src/wideEvents/record.ts b/apps/supervisor/src/wideEvents/record.ts new file mode 100644 index 00000000000..b37c7246a00 --- /dev/null +++ b/apps/supervisor/src/wideEvents/record.ts @@ -0,0 +1,82 @@ +import { fromContext } from "./context.js"; +import type { PhaseRecord, State } from "./state.js"; + +const MAX_ERROR_MSG_BYTES = 512; + +/** Optional knobs for a phase record. */ +export type PhaseOpt = { + /** Attempt count for the phase (default 1). */ + attempts?: number; + /** Sub-timings to fold into `phase..`. */ + sub?: Record; +}; + +/** + * Appends a phase outcome to `state.phases`. Safe to call on success + * (`err === undefined`) and error paths. `errorMsg` is truncated to 512 bytes + * to keep the wide event compact. No-op if state is null. + */ +export function recordPhase( + state: State | null, + name: string, + startMs: number, + err: Error | undefined, + opts: PhaseOpt = {} +): void { + if (!state) return; + const p: PhaseRecord = { + name, + durationMs: Math.round(performance.now() - startMs), + ok: err === undefined, + attempts: opts.attempts ?? 1, + }; + if (err) { + p.errorCode = err.name || "Error"; + const msg = err.message; + p.errorMsg = msg.length > MAX_ERROR_MSG_BYTES ? msg.slice(0, MAX_ERROR_MSG_BYTES) : msg; + } + if (opts.sub) p.sub = opts.sub; + state.phases.push(p); +} + +/** + * Runs `fn` and appends a phase outcome to the State attached to the current + * async context. If no state is on context (test paths, background work), + * `fn` runs unchanged. The phase is recorded on both success and error paths + * so failed phases still appear in the wide event with duration_ms + + * error_code. + */ +export async function timePhase( + name: string, + fn: () => Promise | T, + opts: PhaseOpt = {} +): Promise { + const start = performance.now(); + try { + const result = await fn(); + recordPhase(fromContext(), name, start, undefined, opts); + return result; + } catch (err) { + recordPhase(fromContext(), name, start, asError(err), opts); + throw err; + } +} + +/** + * Appends a phase outcome to the State attached to the current async context + * using a `startMs` captured by the caller. Use when the phase boundary spans + * multiple calls with intermediate error handling that can't fit inside a + * single `timePhase` closure. Nil-state safe. + */ +export function recordPhaseSince( + name: string, + startMs: number, + err: Error | undefined, + opts: PhaseOpt = {} +): void { + recordPhase(fromContext(), name, startMs, err, opts); +} + +function asError(e: unknown): Error { + return e instanceof Error ? e : new Error(String(e)); +} diff --git a/apps/supervisor/src/wideEvents/state.ts b/apps/supervisor/src/wideEvents/state.ts new file mode 100644 index 00000000000..5fb53ecb936 --- /dev/null +++ b/apps/supervisor/src/wideEvents/state.ts @@ -0,0 +1,62 @@ +/** + * Per-event accumulator backing a single wide event. The supervisor emits one + * flat-keyed JSON line per natural unit of work (dequeue iteration, HTTP + * request, socket lifecycle event). Optional fields are omitted on emit so + * events stay compact. + */ +export type State = { + // Cross-stack correlation. + requestId: string; + traceId: string; + /** + * Raw inbound W3C `traceparent`, preserved verbatim so outbound calls can + * propagate the same trace context without losing the parent span-id. + * Empty when no inbound traceparent was set. + */ + traceparent: string; + + // Service identity (set by `newState` from Env). + service: string; + version?: string; + commitSha?: string; + region?: string; + nodeId?: string; + instanceId?: string; + + // Caller-attached opaque metadata, flattened to `meta.` on emit. + meta: Record; + + // Per-phase outcomes, in completion order. + phases: PhaseRecord[]; + + // Top-level outcome (set after the wrapped operation returns). + ok: boolean; + statusCode: number; + durationMs: number; + error?: ErrorInfo; + + // Free-form ad-hoc additions (route, method, did_warm_start, ...). + extras: Record; +}; + +/** + * Single named phase outcome. Retries collapse into `attempts > 1` with the + * last error reflected in errorCode/errorMsg. + */ +export type PhaseRecord = { + name: string; + durationMs: number; + ok: boolean; + attempts: number; + errorCode?: string; + errorMsg?: string; + sub?: Record; +}; + +/** Top-level error summary for a failed operation. */ +export type ErrorInfo = { + code: string; + message: string; + /** Coarse classification - "client" | "upstream" | "internal" | "timeout". */ + kind: string; +}; diff --git a/apps/supervisor/src/wideEvents/traceparent.test.ts b/apps/supervisor/src/wideEvents/traceparent.test.ts new file mode 100644 index 00000000000..85ed31c3b6f --- /dev/null +++ b/apps/supervisor/src/wideEvents/traceparent.test.ts @@ -0,0 +1,43 @@ +import { describe, it, expect } from "vitest"; +import { parseTraceId } from "./traceparent.js"; + +describe("parseTraceId", () => { + const validTraceId = "4bf92f3577b34da6a3ce929d0e0e4736"; + const validHeader = `00-${validTraceId}-00f067aa0ba902b7-01`; + + it("extracts the trace-id from a valid W3C traceparent", () => { + expect(parseTraceId(validHeader)).toBe(validTraceId); + }); + + it("returns empty string for empty/null/undefined input", () => { + expect(parseTraceId("")).toBe(""); + expect(parseTraceId(null)).toBe(""); + expect(parseTraceId(undefined)).toBe(""); + }); + + it("returns empty for wrong segment count", () => { + expect(parseTraceId("00-abc-def")).toBe(""); + expect(parseTraceId("00-abc-def-01-extra")).toBe(""); + }); + + it("returns empty for non-zero version byte", () => { + expect(parseTraceId(`01-${validTraceId}-00f067aa0ba902b7-01`)).toBe(""); + }); + + it("returns empty for wrong-length trace-id", () => { + expect(parseTraceId("00-abc-00f067aa0ba902b7-01")).toBe(""); + }); + + it("returns empty for non-hex trace-id", () => { + expect(parseTraceId("00-zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz-00f067aa0ba902b7-01")).toBe(""); + }); + + it("returns empty for all-zero trace-id", () => { + expect(parseTraceId("00-00000000000000000000000000000000-00f067aa0ba902b7-01")).toBe(""); + }); + + it("accepts uppercase hex", () => { + const tid = "4BF92F3577B34DA6A3CE929D0E0E4736"; + expect(parseTraceId(`00-${tid}-00f067aa0ba902b7-01`)).toBe(tid); + }); +}); diff --git a/apps/supervisor/src/wideEvents/traceparent.ts b/apps/supervisor/src/wideEvents/traceparent.ts new file mode 100644 index 00000000000..9e84294f067 --- /dev/null +++ b/apps/supervisor/src/wideEvents/traceparent.ts @@ -0,0 +1,39 @@ +/** + * Extracts the trace-id from a W3C `traceparent` header. Returns "" when the + * header is absent, malformed, or carries an all-zero trace-id. + * + * Format: `---` + * version : 2 hex chars, must be "00" + * trace-id: 32 hex chars, non-zero + * span-id : 16 hex chars (not validated - we only need trace-id) + * flags : 2 hex chars (not validated) + */ +export function parseTraceId(header: string | null | undefined): string { + if (!header) return ""; + const parts = header.split("-"); + if (parts.length !== 4) return ""; + if (parts[0] !== "00") return ""; + const tid = parts[1]; + if (!tid || tid.length !== 32) return ""; + if (!isHex(tid)) return ""; + if (isAllZero(tid)) return ""; + return tid; +} + +function isHex(s: string): boolean { + for (let i = 0; i < s.length; i++) { + const c = s.charCodeAt(i); + const isDigit = c >= 0x30 && c <= 0x39; + const isLower = c >= 0x61 && c <= 0x66; + const isUpper = c >= 0x41 && c <= 0x46; + if (!isDigit && !isLower && !isUpper) return false; + } + return true; +} + +function isAllZero(s: string): boolean { + for (let i = 0; i < s.length; i++) { + if (s.charCodeAt(i) !== 0x30) return false; + } + return true; +} From 3591aa9d32087a43dfab92073b953b5cff06cdfc Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 19 May 2026 13:50:39 +0100 Subject: [PATCH 2/9] feat(supervisor): wide events on dequeue + warm-start trace --- apps/supervisor/src/index.ts | 334 ++++++++++++++++++++++------------- 1 file changed, 207 insertions(+), 127 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 6f5913c47ca..c4768658b1b 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -28,6 +28,14 @@ import { FailedPodHandler } from "./services/failedPodHandler.js"; import { getWorkerToken } from "./workerToken.js"; import { OtlpTraceService } from "./services/otlpTraceService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; +import { + fromContext, + recordPhaseSince, + runWideEvent, + setExtra, + setMeta, + type WideEventOptions, +} from "./wideEvents/index.js"; if (env.METRICS_COLLECT_DEFAULTS) { collectDefaultMetrics({ register }); @@ -50,6 +58,12 @@ class ManagedSupervisor { private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; + private readonly wideEventOpts: WideEventOptions = { + service: "supervisor", + env: { nodeId: env.TRIGGER_WORKER_INSTANCE_NAME }, + enabled: env.TRIGGER_WIDE_EVENTS_ENABLED, + }; + constructor() { const { TRIGGER_WORKER_TOKEN, @@ -239,149 +253,202 @@ class ManagedSupervisor { async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => { this.logger.verbose(`Received message with timestamp ${time.toLocaleString()}`, message); - if (message.completedWaitpoints.length > 0) { - this.logger.debug("Run has completed waitpoints", { - runId: message.run.id, - completedWaitpoints: message.completedWaitpoints.length, - }); - } - - if (!message.image) { - this.logger.error("Run has no image", { runId: message.run.id }); - return; - } + const traceparent = extractTraceparent(message.run.traceContext); + + await runWideEvent( + { + ...this.wideEventOpts, + traceparent, + setup: (state) => { + setMeta(state, "run_id", message.run.id); + setMeta(state, "env_id", message.environment.id); + setMeta(state, "org_id", message.organization.id); + setMeta(state, "project_id", message.project.id); + if (message.deployment.friendlyId) { + setMeta(state, "deployment_id", message.deployment.friendlyId); + } + setMeta(state, "machine_preset", message.run.machine.name); + state.extras.iteration = "dequeue"; + state.extras.dequeue_response_ms = dequeueResponseMs; + state.extras.polling_interval_ms = pollingIntervalMs; + state.extras.completed_waitpoints = message.completedWaitpoints.length; + }, + }, + async () => { + if (message.completedWaitpoints.length > 0) { + this.logger.debug("Run has completed waitpoints", { + runId: message.run.id, + completedWaitpoints: message.completedWaitpoints.length, + }); + } - const { checkpoint, ...rest } = message; - - // Register trace context early so snapshot spans work for all paths - // (cold create, restore, warm start). Re-registration on restore is safe - // since dequeue always provides fresh context. - if (this.computeManager?.traceSpansEnabled) { - const traceparent = extractTraceparent(message.run.traceContext); - - if (traceparent) { - this.workloadServer.registerRunTraceContext(message.run.friendlyId, { - traceparent, - envId: message.environment.id, - orgId: message.organization.id, - projectId: message.project.id, - }); - } - } + if (!message.image) { + setExtra(fromContext(), "path_taken", "skipped_no_image"); + this.logger.error("Run has no image", { runId: message.run.id }); + return; + } - if (checkpoint) { - this.logger.debug("Restoring run", { runId: message.run.id }); + const { checkpoint, ...rest } = message; - if (this.computeManager) { - try { - const runnerId = getRestoreRunnerId(message.run.friendlyId, checkpoint.id); - - const didRestore = await this.computeManager.restore({ - snapshotId: checkpoint.location, - runnerId, - runFriendlyId: message.run.friendlyId, - snapshotFriendlyId: message.snapshot.friendlyId, - machine: message.run.machine, - traceContext: message.run.traceContext, + // Register trace context early so snapshot spans work for all paths + // (cold create, restore, warm start). Re-registration on restore is safe + // since dequeue always provides fresh context. + if (this.computeManager?.traceSpansEnabled && traceparent) { + this.workloadServer.registerRunTraceContext(message.run.friendlyId, { + traceparent, envId: message.environment.id, orgId: message.organization.id, projectId: message.project.id, - dequeuedAt: message.dequeuedAt, }); + } + + if (checkpoint) { + setExtra(fromContext(), "path_taken", "restore"); + this.logger.debug("Restoring run", { runId: message.run.id }); + + if (this.computeManager) { + const restoreStart = performance.now(); + try { + const runnerId = getRestoreRunnerId(message.run.friendlyId, checkpoint.id); + + const didRestore = await this.computeManager.restore({ + snapshotId: checkpoint.location, + runnerId, + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + machine: message.run.machine, + traceContext: message.run.traceContext, + envId: message.environment.id, + orgId: message.organization.id, + projectId: message.project.id, + dequeuedAt: message.dequeuedAt, + }); + recordPhaseSince("restore", restoreStart, undefined); + setExtra(fromContext(), "did_restore", didRestore); + + if (didRestore) { + this.logger.debug("Compute restore successful", { + runId: message.run.id, + runnerId, + }); + } else { + this.logger.error("Compute restore failed", { + runId: message.run.id, + runnerId, + }); + } + } catch (error) { + recordPhaseSince( + "restore", + restoreStart, + error instanceof Error ? error : new Error(String(error)) + ); + this.logger.error("Failed to restore run (compute)", { error }); + } + + return; + } - if (didRestore) { - this.logger.debug("Compute restore successful", { - runId: message.run.id, - runnerId, + if (!this.checkpointClient) { + this.logger.error("No checkpoint client", { runId: message.run.id }); + return; + } + + const restoreStart = performance.now(); + try { + const didRestore = await this.checkpointClient.restoreRun({ + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + body: { + ...rest, + checkpoint, + }, }); - } else { - this.logger.error("Compute restore failed", { runId: message.run.id, runnerId }); + recordPhaseSince("restore", restoreStart, undefined); + setExtra(fromContext(), "did_restore", didRestore); + + if (didRestore) { + this.logger.debug("Restore successful", { runId: message.run.id }); + } else { + this.logger.error("Restore failed", { runId: message.run.id }); + } + } catch (error) { + recordPhaseSince( + "restore", + restoreStart, + error instanceof Error ? error : new Error(String(error)) + ); + this.logger.error("Failed to restore run", { error }); } - } catch (error) { - this.logger.error("Failed to restore run (compute)", { error }); + + return; } - return; - } + this.logger.debug("Scheduling run", { runId: message.run.id }); - if (!this.checkpointClient) { - this.logger.error("No checkpoint client", { runId: message.run.id }); - return; - } + const warmStartStart = performance.now(); + const didWarmStart = await this.tryWarmStart(message, traceparent); + const warmStartCheckMs = Math.round(performance.now() - warmStartStart); + recordPhaseSince("warm_start", warmStartStart, undefined); + setExtra(fromContext(), "did_warm_start", didWarmStart); - try { - const didRestore = await this.checkpointClient.restoreRun({ - runFriendlyId: message.run.friendlyId, - snapshotFriendlyId: message.snapshot.friendlyId, - body: { - ...rest, - checkpoint, - }, - }); - - if (didRestore) { - this.logger.debug("Restore successful", { runId: message.run.id }); - } else { - this.logger.error("Restore failed", { runId: message.run.id }); + if (didWarmStart) { + setExtra(fromContext(), "path_taken", "warm_start"); + this.logger.debug("Warm start successful", { runId: message.run.id }); + return; } - } catch (error) { - this.logger.error("Failed to restore run", { error }); - } - return; - } - - this.logger.debug("Scheduling run", { runId: message.run.id }); + setExtra(fromContext(), "path_taken", "cold_create"); - const warmStartStart = performance.now(); - const didWarmStart = await this.tryWarmStart(message); - const warmStartCheckMs = Math.round(performance.now() - warmStartStart); + const createStart = performance.now(); + try { + if (!message.deployment.friendlyId) { + // mostly a type guard, deployments always exists for deployed environments + // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. + throw new Error("Deployment is missing"); + } - if (didWarmStart) { - this.logger.debug("Warm start successful", { runId: message.run.id }); - return; - } + await this.workloadManager.create({ + dequeuedAt: message.dequeuedAt, + dequeueResponseMs, + pollingIntervalMs, + warmStartCheckMs, + envId: message.environment.id, + envType: message.environment.type, + image: message.image, + machine: message.run.machine, + orgId: message.organization.id, + projectId: message.project.id, + deploymentFriendlyId: message.deployment.friendlyId, + deploymentVersion: message.backgroundWorker.version, + runId: message.run.id, + runFriendlyId: message.run.friendlyId, + version: message.version, + nextAttemptNumber: message.run.attemptNumber, + snapshotId: message.snapshot.id, + snapshotFriendlyId: message.snapshot.friendlyId, + placementTags: message.placementTags, + traceContext: message.run.traceContext, + annotations: message.run.annotations, + hasPrivateLink: message.organization.hasPrivateLink, + }); + recordPhaseSince("workload_create", createStart, undefined); - try { - if (!message.deployment.friendlyId) { - // mostly a type guard, deployments always exists for deployed environments - // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. - throw new Error("Deployment is missing"); + // Disabled for now + // this.resourceMonitor.blockResources({ + // cpu: message.run.machine.cpu, + // memory: message.run.machine.memory, + // }); + } catch (error) { + recordPhaseSince( + "workload_create", + createStart, + error instanceof Error ? error : new Error(String(error)) + ); + this.logger.error("Failed to create workload", { error }); + } } - - await this.workloadManager.create({ - dequeuedAt: message.dequeuedAt, - dequeueResponseMs, - pollingIntervalMs, - warmStartCheckMs, - envId: message.environment.id, - envType: message.environment.type, - image: message.image, - machine: message.run.machine, - orgId: message.organization.id, - projectId: message.project.id, - deploymentFriendlyId: message.deployment.friendlyId, - deploymentVersion: message.backgroundWorker.version, - runId: message.run.id, - runFriendlyId: message.run.friendlyId, - version: message.version, - nextAttemptNumber: message.run.attemptNumber, - snapshotId: message.snapshot.id, - snapshotFriendlyId: message.snapshot.friendlyId, - placementTags: message.placementTags, - traceContext: message.run.traceContext, - annotations: message.run.annotations, - hasPrivateLink: message.organization.hasPrivateLink, - }); - - // Disabled for now - // this.resourceMonitor.blockResources({ - // cpu: message.run.machine.cpu, - // memory: message.run.machine.memory, - // }); - } catch (error) { - this.logger.error("Failed to create workload", { error }); - } + ); } ); @@ -404,6 +471,7 @@ class ManagedSupervisor { checkpointClient: this.checkpointClient, computeManager: this.computeManager, tracing: this.tracing, + wideEventOpts: this.wideEventOpts, }); this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); @@ -420,19 +488,31 @@ class ManagedSupervisor { this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]); } - private async tryWarmStart(dequeuedMessage: DequeuedMessage): Promise { + private async tryWarmStart( + dequeuedMessage: DequeuedMessage, + traceparent: string | undefined + ): Promise { if (!this.warmStartUrl) { return false; } const warmStartUrlWithPath = new URL("/warm-start", this.warmStartUrl); + const headers: Record = { + "Content-Type": "application/json", + }; + // Propagate the inbound W3C traceparent so the upstream warm-start + // receiver continues the same trace instead of minting a new one. Gated + // by the same kill switch as the wide-event emission so the whole PR is + // a no-op on the wire when disabled. + if (this.wideEventOpts.enabled && traceparent) { + headers.traceparent = traceparent; + } + try { const res = await fetch(warmStartUrlWithPath.href, { method: "POST", - headers: { - "Content-Type": "application/json", - }, + headers, body: JSON.stringify({ dequeuedMessage }), }); From dc66572b804240e5ff8bbfe988ef96a173d924de Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 19 May 2026 13:50:46 +0100 Subject: [PATCH 3/9] feat(supervisor): wide events on workload server + sockets --- .server-changes/supervisor-wide-events.md | 6 + apps/supervisor/src/workloadServer/index.ts | 594 ++++++++++++-------- 2 files changed, 375 insertions(+), 225 deletions(-) create mode 100644 .server-changes/supervisor-wide-events.md diff --git a/.server-changes/supervisor-wide-events.md b/.server-changes/supervisor-wide-events.md new file mode 100644 index 00000000000..49b73668ddb --- /dev/null +++ b/.server-changes/supervisor-wide-events.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: feature +--- + +Wide-event observability for the dequeue loop, workload-server routes, and run socket lifecycle. Off by default behind `TRIGGER_WIDE_EVENTS_ENABLED`. diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index bd38cc8700f..0ba0e00a420 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -31,6 +31,14 @@ import { } from "../services/computeSnapshotService.js"; import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; import type { OtlpTraceService } from "../services/otlpTraceService.js"; +import type { ServerResponse } from "node:http"; +import { + emitOneShot, + runWideEvent, + setMeta, + type State, + type WideEventOptions, +} from "../wideEvents/index.js"; // Use the official export when upgrading to socket.io@4.8.0 interface DefaultEventsMap { @@ -67,6 +75,7 @@ type WorkloadServerOptions = { checkpointClient?: CheckpointClient; computeManager?: ComputeWorkloadManager; tracing?: OtlpTraceService; + wideEventOpts: WideEventOptions; }; export class WorkloadServer extends EventEmitter { @@ -74,6 +83,7 @@ export class WorkloadServer extends EventEmitter { private readonly snapshotService?: ComputeSnapshotService; private readonly logger = new SimpleStructuredLogger("workload-server"); + private readonly wideEventOpts: WideEventOptions; private readonly httpServer: HttpServer; private readonly websocketServer: Namespace< @@ -103,6 +113,7 @@ export class WorkloadServer extends EventEmitter { this.workerClient = opts.workerClient; this.checkpointClient = opts.checkpointClient; + this.wideEventOpts = opts.wideEventOpts; if (opts.computeManager?.snapshotsEnabled) { this.snapshotService = new ComputeSnapshotService({ @@ -142,6 +153,47 @@ export class WorkloadServer extends EventEmitter { return this.headerValueFromRequest(req, WORKLOAD_HEADERS.PROJECT_REF); } + /** + * Sets common route meta on the wide-event state from URL params. + */ + private attachRouteMeta(state: State, params: unknown): void { + if (!params || typeof params !== "object") return; + const p = params as Record; + if (typeof p.runFriendlyId === "string") setMeta(state, "run_id", p.runFriendlyId); + if (typeof p.snapshotFriendlyId === "string") { + setMeta(state, "snapshot_id", p.snapshotFriendlyId); + } + if (typeof p.deploymentId === "string") setMeta(state, "deployment_id", p.deploymentId); + } + + /** + * Wraps an HTTP route handler body with the wide-event lifecycle. Reads + * `traceparent` and `x-request-id` from `req.headers`, attaches `run_id` / + * `snapshot_id` / `deployment_id` meta from `params` when present, and + * captures the response status from `res.statusCode` after `fn` returns. + */ + private wideRoute( + ctx: { req: IncomingMessage; res: ServerResponse; params?: unknown }, + route: string, + method: string, + fn: () => Promise | T + ): Promise { + return runWideEvent( + { + ...this.wideEventOpts, + route, + method, + traceparent: this.headerValueFromRequest(ctx.req, "traceparent"), + inboundRequestId: this.headerValueFromRequest(ctx.req, "x-request-id"), + setup: (state) => this.attachRouteMeta(state, ctx.params), + }, + fn, + (state) => { + state.statusCode = ctx.res.statusCode; + } + ); + } + private createHttpServer({ host, port }: { host: string; port: number }) { const httpServer = new HttpServer({ port, @@ -162,26 +214,33 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, bodySchema: WorkloadRunAttemptStartRequestBody, - handler: async ({ req, reply, params, body }) => { - const startResponse = await this.workerClient.startRunAttempt( - params.runFriendlyId, - params.snapshotFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - - if (!startResponse.success) { - this.logger.error("Failed to start run", { - params, - error: startResponse.error, - }); - reply.empty(500); - return; - } - - reply.json(startResponse.data satisfies WorkloadRunAttemptStartResponseBody); - return; - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/attempts/start", + "POST", + async () => { + const { req, reply, params, body } = ctx; + const startResponse = await this.workerClient.startRunAttempt( + params.runFriendlyId, + params.snapshotFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + + if (!startResponse.success) { + this.logger.error("Failed to start run", { + params, + error: startResponse.error, + }); + reply.empty(500); + return; + } + + reply.json(startResponse.data satisfies WorkloadRunAttemptStartResponseBody); + return; + } + ), } ) .route( @@ -190,26 +249,35 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, bodySchema: WorkloadRunAttemptCompleteRequestBody, - handler: async ({ req, reply, params, body }) => { - const completeResponse = await this.workerClient.completeRunAttempt( - params.runFriendlyId, - params.snapshotFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - - if (!completeResponse.success) { - this.logger.error("Failed to complete run", { - params, - error: completeResponse.error, - }); - reply.empty(500); - return; - } - - reply.json(completeResponse.data satisfies WorkloadRunAttemptCompleteResponseBody); - return; - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/attempts/complete", + "POST", + async () => { + const { req, reply, params, body } = ctx; + const completeResponse = await this.workerClient.completeRunAttempt( + params.runFriendlyId, + params.snapshotFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + + if (!completeResponse.success) { + this.logger.error("Failed to complete run", { + params, + error: completeResponse.error, + }); + reply.empty(500); + return; + } + + reply.json( + completeResponse.data satisfies WorkloadRunAttemptCompleteResponseBody + ); + return; + } + ), } ) .route( @@ -218,27 +286,34 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, bodySchema: WorkloadHeartbeatRequestBody, - handler: async ({ req, reply, params, body }) => { - const heartbeatResponse = await this.workerClient.heartbeatRun( - params.runFriendlyId, - params.snapshotFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - - if (!heartbeatResponse.success) { - this.logger.error("Failed to heartbeat run", { - params, - error: heartbeatResponse.error, - }); - reply.empty(500); - return; - } - - reply.json({ - ok: true, - } satisfies WorkloadHeartbeatResponseBody); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/heartbeat", + "POST", + async () => { + const { req, reply, params, body } = ctx; + const heartbeatResponse = await this.workerClient.heartbeatRun( + params.runFriendlyId, + params.snapshotFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + + if (!heartbeatResponse.success) { + this.logger.error("Failed to heartbeat run", { + params, + error: heartbeatResponse.error, + }); + reply.empty(500); + return; + } + + reply.json({ + ok: true, + } satisfies WorkloadHeartbeatResponseBody); + } + ), } ) .route( @@ -246,87 +321,94 @@ export class WorkloadServer extends EventEmitter { "GET", { paramsSchema: WorkloadActionParams, - handler: async ({ reply, params, req }) => { - const runnerId = this.runnerIdFromRequest(req); - const deploymentVersion = this.deploymentVersionFromRequest(req); - const projectRef = this.projectRefFromRequest(req); - - this.logger.debug("Suspend request", { - params, - runnerId, - deploymentVersion, - projectRef, - }); - - if (!runnerId || !deploymentVersion || !projectRef) { - this.logger.error("Invalid headers for suspend request", { - ...params, - runnerId, - deploymentVersion, - projectRef, - }); - reply.json( - { - ok: false, - error: "Invalid headers", - } satisfies WorkloadSuspendRunResponseBody, - false, - 400 - ); - return; - } - - if (this.snapshotService) { - // Compute mode: delay snapshot to avoid wasted work on short-lived waitpoints. - // If the run continues before the delay expires, the snapshot is cancelled. - reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202); - - this.snapshotService.schedule(params.runFriendlyId, { - runnerId, - runFriendlyId: params.runFriendlyId, - snapshotFriendlyId: params.snapshotFriendlyId, - }); - - return; - } - - if (!this.checkpointClient) { - reply.json( - { - ok: false, - error: "Checkpoints disabled", - } satisfies WorkloadSuspendRunResponseBody, - false, - 400 - ); - return; - } - - reply.json( - { - ok: true, - } satisfies WorkloadSuspendRunResponseBody, - false, - 202 - ); - - const suspendResult = await this.checkpointClient.suspendRun({ - runFriendlyId: params.runFriendlyId, - snapshotFriendlyId: params.snapshotFriendlyId, - body: { - runnerId, - runId: params.runFriendlyId, - snapshotId: params.snapshotFriendlyId, - projectRef, - deploymentVersion, - }, - }); - - if (!suspendResult) { - this.logger.error("Failed to suspend run", { params }); - return; - } - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/suspend", + "GET", + async () => { + const { reply, params, req } = ctx; + const runnerId = this.runnerIdFromRequest(req); + const deploymentVersion = this.deploymentVersionFromRequest(req); + const projectRef = this.projectRefFromRequest(req); + + this.logger.debug("Suspend request", { + params, + runnerId, + deploymentVersion, + projectRef, + }); + + if (!runnerId || !deploymentVersion || !projectRef) { + this.logger.error("Invalid headers for suspend request", { + ...params, + runnerId, + deploymentVersion, + projectRef, + }); + reply.json( + { + ok: false, + error: "Invalid headers", + } satisfies WorkloadSuspendRunResponseBody, + false, + 400 + ); + return; + } + + if (this.snapshotService) { + // Compute mode: delay snapshot to avoid wasted work on short-lived waitpoints. + // If the run continues before the delay expires, the snapshot is cancelled. + reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202); + + this.snapshotService.schedule(params.runFriendlyId, { + runnerId, + runFriendlyId: params.runFriendlyId, + snapshotFriendlyId: params.snapshotFriendlyId, + }); + + return; + } + + if (!this.checkpointClient) { + reply.json( + { + ok: false, + error: "Checkpoints disabled", + } satisfies WorkloadSuspendRunResponseBody, + false, + 400 + ); + return; + } + + reply.json( + { + ok: true, + } satisfies WorkloadSuspendRunResponseBody, + false, + 202 + ); + + const suspendResult = await this.checkpointClient.suspendRun({ + runFriendlyId: params.runFriendlyId, + snapshotFriendlyId: params.snapshotFriendlyId, + body: { + runnerId, + runId: params.runFriendlyId, + snapshotId: params.snapshotFriendlyId, + projectRef, + deploymentVersion, + }, + }); + + if (!suspendResult) { + this.logger.error("Failed to suspend run", { params }); + return; + } + } + ), } ) .route( @@ -334,33 +416,40 @@ export class WorkloadServer extends EventEmitter { "GET", { paramsSchema: WorkloadActionParams, - handler: async ({ req, reply, params }) => { - this.logger.debug("Run continuation request", { params }); - - // Cancel any pending delayed snapshot for this run - this.snapshotService?.cancel(params.runFriendlyId); - - const continuationResult = await this.workerClient.continueRunExecution( - params.runFriendlyId, - params.snapshotFriendlyId, - this.runnerIdFromRequest(req) - ); - - if (!continuationResult.success) { - this.logger.error("Failed to continue run execution", { params }); - reply.json( - { - ok: false, - error: "Failed to continue run execution", - }, - false, - 400 - ); - return; - } - - reply.json(continuationResult.data as WorkloadContinueRunExecutionResponseBody); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/continue", + "GET", + async () => { + const { req, reply, params } = ctx; + this.logger.debug("Run continuation request", { params }); + + // Cancel any pending delayed snapshot for this run + this.snapshotService?.cancel(params.runFriendlyId); + + const continuationResult = await this.workerClient.continueRunExecution( + params.runFriendlyId, + params.snapshotFriendlyId, + this.runnerIdFromRequest(req) + ); + + if (!continuationResult.success) { + this.logger.error("Failed to continue run execution", { params }); + reply.json( + { + ok: false, + error: "Failed to continue run execution", + }, + false, + 400 + ); + return; + } + + reply.json(continuationResult.data as WorkloadContinueRunExecutionResponseBody); + } + ), } ) .route( @@ -368,24 +457,33 @@ export class WorkloadServer extends EventEmitter { "GET", { paramsSchema: WorkloadActionParams, - handler: async ({ req, reply, params }) => { - const sinceSnapshotResponse = await this.workerClient.getSnapshotsSince( - params.runFriendlyId, - params.snapshotFriendlyId, - this.runnerIdFromRequest(req) - ); - - if (!sinceSnapshotResponse.success) { - this.logger.error("Failed to get snapshots since", { - runId: params.runFriendlyId, - error: sinceSnapshotResponse.error, - }); - reply.empty(500); - return; - } - - reply.json(sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/since/:snapshotFriendlyId", + "GET", + async () => { + const { req, reply, params } = ctx; + const sinceSnapshotResponse = await this.workerClient.getSnapshotsSince( + params.runFriendlyId, + params.snapshotFriendlyId, + this.runnerIdFromRequest(req) + ); + + if (!sinceSnapshotResponse.success) { + this.logger.error("Failed to get snapshots since", { + runId: params.runFriendlyId, + error: sinceSnapshotResponse.error, + }); + reply.empty(500); + return; + } + + reply.json( + sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody + ); + } + ), } ) .route("/api/v1/workload-actions/deployments/:deploymentId/dequeue", "GET", { @@ -393,61 +491,83 @@ export class WorkloadServer extends EventEmitter { deploymentId: z.string(), }), - handler: async ({ req, reply, params }) => { - const dequeueResponse = await this.workerClient.dequeueFromVersion( - params.deploymentId, - 1, - this.runnerIdFromRequest(req) - ); - - if (!dequeueResponse.success) { - this.logger.error("Failed to get latest snapshot", { - deploymentId: params.deploymentId, - error: dequeueResponse.error, - }); - reply.empty(500); - return; - } + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/deployments/:deploymentId/dequeue", + "GET", + async () => { + const { req, reply, params } = ctx; + const dequeueResponse = await this.workerClient.dequeueFromVersion( + params.deploymentId, + 1, + this.runnerIdFromRequest(req) + ); - reply.json(dequeueResponse.data satisfies WorkloadDequeueFromVersionResponseBody); - }, + if (!dequeueResponse.success) { + this.logger.error("Failed to get latest snapshot", { + deploymentId: params.deploymentId, + error: dequeueResponse.error, + }); + reply.empty(500); + return; + } + + reply.json(dequeueResponse.data satisfies WorkloadDequeueFromVersionResponseBody); + } + ), }); if (env.SEND_RUN_DEBUG_LOGS) { httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", { paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }), bodySchema: WorkloadDebugLogRequestBody, - handler: async ({ req, reply, params, body }) => { - reply.empty(204); - - await this.workerClient.sendDebugLog( - params.runFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", + "POST", + async () => { + const { req, reply, params, body } = ctx; + reply.empty(204); + + await this.workerClient.sendDebugLog( + params.runFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + } + ), }); } else { // Lightweight mock route without schemas httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", { - handler: async ({ reply }) => { - reply.empty(204); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", + "POST", + async () => { + ctx.reply.empty(204); + } + ), }); } - // Compute snapshot callback endpoint + // Snapshot callback endpoint (inbound from compute path) httpServer.route("/api/v1/compute/snapshot-complete", "POST", { bodySchema: SnapshotCallbackPayloadSchema, - handler: async ({ reply, body }) => { - if (!this.snapshotService) { - reply.empty(404); - return; - } + handler: async (ctx) => + this.wideRoute(ctx, "/api/v1/compute/snapshot-complete", "POST", async () => { + const { reply, body } = ctx; + if (!this.snapshotService) { + reply.empty(404); + return; + } - const result = await this.snapshotService.handleCallback(body); - reply.empty(result.status); - }, + const result = await this.snapshotService.handleCallback(body); + reply.empty(result.status); + }), }); return httpServer; @@ -591,6 +711,18 @@ export class WorkloadServer extends EventEmitter { try { runConnected(message.run.friendlyId); + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.event = "run:start"; + setMeta(state, "run_id", message.run.friendlyId); + if (socket.data.deploymentId) { + setMeta(state, "deployment_id", socket.data.deploymentId); + } + if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); + state.extras.socket_id = socket.id; + }, + }); } catch (error) { log.error("run:start error", { error }); } @@ -610,6 +742,18 @@ export class WorkloadServer extends EventEmitter { // Don't delete trace context here - run:stop fires after each snapshot/shutdown // but the run may be restored on a new VM and snapshot again. Trace context is // re-populated on dequeue, and entries are small (4 strings per run). + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.event = "run:stop"; + setMeta(state, "run_id", message.run.friendlyId); + if (socket.data.deploymentId) { + setMeta(state, "deployment_id", socket.data.deploymentId); + } + if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); + state.extras.socket_id = socket.id; + }, + }); } catch (error) { log.error("run:stop error", { error }); } From 4a53cabc70b08605d6f64ee580079880644b5174 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 19 May 2026 14:33:04 +0100 Subject: [PATCH 4/9] fix(supervisor): wide-event review fixes + noisy-routes flag + socket lifecycle --- apps/supervisor/src/env.ts | 4 + apps/supervisor/src/index.ts | 4 +- apps/supervisor/src/wideEvents/emit.test.ts | 1 - apps/supervisor/src/wideEvents/emit.ts | 1 - apps/supervisor/src/wideEvents/middleware.ts | 2 +- apps/supervisor/src/wideEvents/state.ts | 1 - apps/supervisor/src/workloadServer/index.ts | 81 ++++++++++++-------- 7 files changed, 56 insertions(+), 38 deletions(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 0e52578daf7..071de4cc814 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -261,6 +261,10 @@ const Env = z // line per natural unit of work (dequeue iteration, HTTP request, socket // lifecycle). High-QPS hotpath, so the kill switch must be honoured. TRIGGER_WIDE_EVENTS_ENABLED: BoolEnv.default(false), + // When true, also emit wide events for high-frequency HTTP routes + // (heartbeat, snapshots-since, logs/debug). Off in prod to keep event + // volume manageable; on in test environments for full-fidelity debugging. + TRIGGER_WIDE_EVENTS_NOISY_ROUTES: BoolEnv.default(false), }) .superRefine((data, ctx) => { if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index c4768658b1b..e2c16c4b1df 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -63,6 +63,7 @@ class ManagedSupervisor { env: { nodeId: env.TRIGGER_WORKER_INSTANCE_NAME }, enabled: env.TRIGGER_WIDE_EVENTS_ENABLED, }; + private readonly wideEventsNoisyRoutes = env.TRIGGER_WIDE_EVENTS_NOISY_ROUTES; constructor() { const { @@ -260,7 +261,7 @@ class ManagedSupervisor { ...this.wideEventOpts, traceparent, setup: (state) => { - setMeta(state, "run_id", message.run.id); + setMeta(state, "run_id", message.run.friendlyId); setMeta(state, "env_id", message.environment.id); setMeta(state, "org_id", message.organization.id); setMeta(state, "project_id", message.project.id); @@ -472,6 +473,7 @@ class ManagedSupervisor { computeManager: this.computeManager, tracing: this.tracing, wideEventOpts: this.wideEventOpts, + wideEventsNoisyRoutes: this.wideEventsNoisyRoutes, }); this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); diff --git a/apps/supervisor/src/wideEvents/emit.test.ts b/apps/supervisor/src/wideEvents/emit.test.ts index e9df12a3d4e..4e778d3e992 100644 --- a/apps/supervisor/src/wideEvents/emit.test.ts +++ b/apps/supervisor/src/wideEvents/emit.test.ts @@ -43,7 +43,6 @@ describe("emit", () => { expect(out).not.toHaveProperty("trace_id"); expect(out).not.toHaveProperty("version"); expect(out).not.toHaveProperty("commit_sha"); - expect(out).not.toHaveProperty("instance_id"); expect(out).not.toHaveProperty("error.code"); }); diff --git a/apps/supervisor/src/wideEvents/emit.ts b/apps/supervisor/src/wideEvents/emit.ts index 85a7a082ca3..a1644237d5d 100644 --- a/apps/supervisor/src/wideEvents/emit.ts +++ b/apps/supervisor/src/wideEvents/emit.ts @@ -26,7 +26,6 @@ export function emit(state: State): void { appendIfSet(out, "commit_sha", state.commitSha); appendIfSet(out, "region", state.region); appendIfSet(out, "node_id", state.nodeId); - appendIfSet(out, "instance_id", state.instanceId); out.ok = state.ok; if (state.statusCode !== 0) out.status = state.statusCode; diff --git a/apps/supervisor/src/wideEvents/middleware.ts b/apps/supervisor/src/wideEvents/middleware.ts index 5dafdfa7d13..a87a1885d56 100644 --- a/apps/supervisor/src/wideEvents/middleware.ts +++ b/apps/supervisor/src/wideEvents/middleware.ts @@ -57,10 +57,10 @@ export async function runWideEvent( }); if (opts.route) state.extras.route = opts.route; if (opts.method) state.extras.method = opts.method; - if (opts.setup) opts.setup(state); const start = performance.now(); try { + if (opts.setup) opts.setup(state); const result = await wideEventStorage.run(state, () => Promise.resolve(fn())); state.durationMs = Math.round(performance.now() - start); if (finalize) finalize(state); diff --git a/apps/supervisor/src/wideEvents/state.ts b/apps/supervisor/src/wideEvents/state.ts index 5fb53ecb936..1929e5e9806 100644 --- a/apps/supervisor/src/wideEvents/state.ts +++ b/apps/supervisor/src/wideEvents/state.ts @@ -21,7 +21,6 @@ export type State = { commitSha?: string; region?: string; nodeId?: string; - instanceId?: string; // Caller-attached opaque metadata, flattened to `meta.` on emit. meta: Record; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 0ba0e00a420..ea7728e6376 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -76,6 +76,8 @@ type WorkloadServerOptions = { computeManager?: ComputeWorkloadManager; tracing?: OtlpTraceService; wideEventOpts: WideEventOptions; + /** When true, high-frequency HTTP routes also emit wide events. */ + wideEventsNoisyRoutes: boolean; }; export class WorkloadServer extends EventEmitter { @@ -84,6 +86,7 @@ export class WorkloadServer extends EventEmitter { private readonly logger = new SimpleStructuredLogger("workload-server"); private readonly wideEventOpts: WideEventOptions; + private readonly wideEventsNoisyRoutes: boolean; private readonly httpServer: HttpServer; private readonly websocketServer: Namespace< @@ -114,6 +117,7 @@ export class WorkloadServer extends EventEmitter { this.workerClient = opts.workerClient; this.checkpointClient = opts.checkpointClient; this.wideEventOpts = opts.wideEventOpts; + this.wideEventsNoisyRoutes = opts.wideEventsNoisyRoutes; if (opts.computeManager?.snapshotsEnabled) { this.snapshotService = new ComputeSnapshotService({ @@ -171,16 +175,25 @@ export class WorkloadServer extends EventEmitter { * `traceparent` and `x-request-id` from `req.headers`, attaches `run_id` / * `snapshot_id` / `deployment_id` meta from `params` when present, and * captures the response status from `res.statusCode` after `fn` returns. + * + * Pass `highFrequency: true` for noisy routes (heartbeat, polling). Those + * still go through the wrapper but only emit when + * `TRIGGER_WIDE_EVENTS_NOISY_ROUTES` is on, so prod can keep them dark + * while test envs capture full-fidelity traffic for debugging. */ private wideRoute( ctx: { req: IncomingMessage; res: ServerResponse; params?: unknown }, route: string, method: string, - fn: () => Promise | T + fn: () => Promise | T, + routeOpts: { highFrequency?: boolean } = {} ): Promise { + const enabled = + this.wideEventOpts.enabled && (!routeOpts.highFrequency || this.wideEventsNoisyRoutes); return runWideEvent( { ...this.wideEventOpts, + enabled, route, method, traceparent: this.headerValueFromRequest(ctx.req, "traceparent"), @@ -312,7 +325,8 @@ export class WorkloadServer extends EventEmitter { reply.json({ ok: true, } satisfies WorkloadHeartbeatResponseBody); - } + }, + { highFrequency: true } ), } ) @@ -482,7 +496,8 @@ export class WorkloadServer extends EventEmitter { reply.json( sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody ); - } + }, + { highFrequency: true } ), } ) @@ -536,7 +551,8 @@ export class WorkloadServer extends EventEmitter { body, this.runnerIdFromRequest(req) ); - } + }, + { highFrequency: true } ), }); } else { @@ -549,7 +565,8 @@ export class WorkloadServer extends EventEmitter { "POST", async () => { ctx.reply.empty(204); - } + }, + { highFrequency: true } ), }); } @@ -640,6 +657,26 @@ export class WorkloadServer extends EventEmitter { }; }; + const emitSocketLifecycle = ( + event: "run_connected" | "run_disconnected", + friendlyId: string, + disconnectReason?: string + ) => { + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.event = event; + setMeta(state, "run_id", friendlyId); + if (socket.data.deploymentId) { + setMeta(state, "deployment_id", socket.data.deploymentId); + } + if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); + state.extras.socket_id = socket.id; + if (disconnectReason) state.extras.disconnect_reason = disconnectReason; + }, + }); + }; + const runConnected = (friendlyId: string) => { socketLogger.debug("runConnected", { ...getSocketMetadata() }); @@ -650,20 +687,22 @@ export class WorkloadServer extends EventEmitter { newRunId: friendlyId, oldRunId: socket.data.runFriendlyId, }); - runDisconnected(socket.data.runFriendlyId); + runDisconnected(socket.data.runFriendlyId, "socket_run_replaced"); } this.runSockets.set(friendlyId, socket); this.emit("runConnected", { run: { friendlyId } }); socket.data.runFriendlyId = friendlyId; + emitSocketLifecycle("run_connected", friendlyId); }; - const runDisconnected = (friendlyId: string) => { + const runDisconnected = (friendlyId: string, reason: string) => { socketLogger.debug("runDisconnected", { ...getSocketMetadata() }); this.runSockets.delete(friendlyId); this.emit("runDisconnected", { run: { friendlyId } }); socket.data.runFriendlyId = undefined; + emitSocketLifecycle("run_disconnected", friendlyId, reason); }; socketLogger.debug("wsServer socket connected", { ...getSocketMetadata() }); @@ -681,7 +720,7 @@ export class WorkloadServer extends EventEmitter { }); if (socket.data.runFriendlyId) { - runDisconnected(socket.data.runFriendlyId); + runDisconnected(socket.data.runFriendlyId, `socket_disconnecting:${reason}`); } }); @@ -711,18 +750,6 @@ export class WorkloadServer extends EventEmitter { try { runConnected(message.run.friendlyId); - emitOneShot({ - ...this.wideEventOpts, - populate: (state) => { - state.extras.event = "run:start"; - setMeta(state, "run_id", message.run.friendlyId); - if (socket.data.deploymentId) { - setMeta(state, "deployment_id", socket.data.deploymentId); - } - if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); - state.extras.socket_id = socket.id; - }, - }); } catch (error) { log.error("run:start error", { error }); } @@ -738,22 +765,10 @@ export class WorkloadServer extends EventEmitter { log.debug("Handling run:stop"); try { - runDisconnected(message.run.friendlyId); + runDisconnected(message.run.friendlyId, "run_stop_message"); // Don't delete trace context here - run:stop fires after each snapshot/shutdown // but the run may be restored on a new VM and snapshot again. Trace context is // re-populated on dequeue, and entries are small (4 strings per run). - emitOneShot({ - ...this.wideEventOpts, - populate: (state) => { - state.extras.event = "run:stop"; - setMeta(state, "run_id", message.run.friendlyId); - if (socket.data.deploymentId) { - setMeta(state, "deployment_id", socket.data.deploymentId); - } - if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); - state.extras.socket_id = socket.id; - }, - }); } catch (error) { log.error("run:stop error", { error }); } From 791e359b09f940028a317a88b2a66a06d22180f2 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 20 May 2026 19:00:33 +0100 Subject: [PATCH 5/9] feat(supervisor): forward traceparent + request_id to compute --- .server-changes/README.md | 8 ++++++++ .../supervisor-compute-traceparent-forwarding.md | 6 ++++++ apps/supervisor/src/workloadManager/compute.ts | 15 +++++++++++++++ internal-packages/compute/src/client.ts | 15 +++++++++++++++ 4 files changed, 44 insertions(+) create mode 100644 .server-changes/supervisor-compute-traceparent-forwarding.md diff --git a/.server-changes/README.md b/.server-changes/README.md index 82716de981c..2b0eeade36b 100644 --- a/.server-changes/README.md +++ b/.server-changes/README.md @@ -38,6 +38,14 @@ Speed up batch queue processing by removing stalls and fixing retry race The body text (below the frontmatter) is a one-line description of the change. Keep it concise — it will appear in release notes. +### Writing guidance + +These entries are public-facing - they ship verbatim in user-visible release notes. A few rules to keep them clean: + +- **One sentence is usually enough.** The body is the bullet in the changelog. If you need a paragraph, you're probably describing the implementation rather than the change. +- **Describe behavior, not implementation.** Skip internal scopes, middleware names, library specifics, framework internals. Users care about what's different for them, not how it's wired. +- **Never name internal tools or infra.** Observability stacks, internal services, infra components, monitoring backends, CI surfaces, AWS specifics - none of these belong in user-facing notes. + ## Lifecycle 1. Engineer adds a `.server-changes/` file in their PR diff --git a/.server-changes/supervisor-compute-traceparent-forwarding.md b/.server-changes/supervisor-compute-traceparent-forwarding.md new file mode 100644 index 00000000000..5d20c086935 --- /dev/null +++ b/.server-changes/supervisor-compute-traceparent-forwarding.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: improvement +--- + +Forward `traceparent` headers on outbound calls to the compute provider so distributed traces stay continuous across services. diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts index 1c00f33aad3..2c51b15f2d4 100644 --- a/apps/supervisor/src/workloadManager/compute.ts +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -10,6 +10,7 @@ import { ComputeClient, stripImageDigest } from "@internal/compute"; import { extractTraceparent, getRunnerId } from "../util.js"; import type { OtlpTraceService } from "../services/otlpTraceService.js"; import { tryCatch } from "@trigger.dev/core"; +import { fromContext } from "../wideEvents/index.js"; type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { gateway: { @@ -46,6 +47,20 @@ export class ComputeWorkloadManager implements WorkloadManager { gatewayUrl: opts.gateway.url, authToken: opts.gateway.authToken, timeoutMs: opts.gateway.timeoutMs, + // Forward the current wide-event scope's traceparent + request_id so the + // downstream service continues the same trace and joins its own wide + // events to ours. When called outside a wide-event scope (or when wide + // events are disabled), `fromContext` returns undefined and propagation + // is skipped. + getPropagationHeaders: () => { + const state = fromContext(); + if (!state) return {}; + const headers: Record = { "x-request-id": state.requestId }; + if (state.traceparent) { + headers.traceparent = state.traceparent; + } + return headers; + }, }); } diff --git a/internal-packages/compute/src/client.ts b/internal-packages/compute/src/client.ts index 97585345eae..d6215678b53 100644 --- a/internal-packages/compute/src/client.ts +++ b/internal-packages/compute/src/client.ts @@ -11,6 +11,13 @@ export type ComputeClientOptions = { gatewayUrl: string; authToken?: string; timeoutMs: number; + /** + * Called once per outbound request to collect cross-service correlation + * headers (e.g. `traceparent`, `x-request-id`) from the caller's current + * scope. The returned record is merged onto the outbound headers. Return + * `{}` (or omit the option) to skip propagation. + */ + getPropagationHeaders?: () => Record; }; export class ComputeClient { @@ -40,6 +47,14 @@ class HttpTransport { if (this.opts.authToken) { h["Authorization"] = `Bearer ${this.opts.authToken}`; } + const propagation = this.opts.getPropagationHeaders?.(); + if (propagation) { + for (const [key, value] of Object.entries(propagation)) { + if (value) { + h[key] = value; + } + } + } return h; } From 5ffe5f0cbc0e7cd3ad4a6395705540b836a30be0 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 20 May 2026 19:56:57 +0100 Subject: [PATCH 6/9] feat(supervisor): wide events on snapshot lifecycle --- .../supervisor-snapshot-lifecycle-events.md | 6 ++ .../src/services/computeSnapshotService.ts | 98 ++++++++++++++++--- apps/supervisor/src/workloadServer/index.ts | 1 + 3 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 .server-changes/supervisor-snapshot-lifecycle-events.md diff --git a/.server-changes/supervisor-snapshot-lifecycle-events.md b/.server-changes/supervisor-snapshot-lifecycle-events.md new file mode 100644 index 00000000000..1970a54e615 --- /dev/null +++ b/.server-changes/supervisor-snapshot-lifecycle-events.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: improvement +--- + +Add observability events at the schedule, dispatch, and callback phases of the snapshot lifecycle. diff --git a/apps/supervisor/src/services/computeSnapshotService.ts b/apps/supervisor/src/services/computeSnapshotService.ts index 041e2902c75..3b88aa98522 100644 --- a/apps/supervisor/src/services/computeSnapshotService.ts +++ b/apps/supervisor/src/services/computeSnapshotService.ts @@ -6,6 +6,15 @@ import { type SnapshotCallbackPayload } from "@internal/compute"; import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; import { TimerWheel } from "./timerWheel.js"; import type { OtlpTraceService } from "./otlpTraceService.js"; +import { + emitOneShot, + fromContext, + recordPhaseSince, + runWideEvent, + setExtra, + setMeta, + type WideEventOptions, +} from "../wideEvents/index.js"; type DelayedSnapshot = { runnerId: string; @@ -24,6 +33,7 @@ export type ComputeSnapshotServiceOptions = { computeManager: ComputeWorkloadManager; workerClient: SupervisorHttpClient; tracing?: OtlpTraceService; + wideEventOpts: WideEventOptions; }; export class ComputeSnapshotService { @@ -37,11 +47,13 @@ export class ComputeSnapshotService { private readonly computeManager: ComputeWorkloadManager; private readonly workerClient: SupervisorHttpClient; private readonly tracing?: OtlpTraceService; + private readonly wideEventOpts: WideEventOptions; constructor(opts: ComputeSnapshotServiceOptions) { this.computeManager = opts.computeManager; this.workerClient = opts.workerClient; this.tracing = opts.tracing; + this.wideEventOpts = opts.wideEventOpts; this.dispatchLimit = pLimit(this.computeManager.snapshotDispatchLimit); this.timerWheel = new TimerWheel({ @@ -62,6 +74,16 @@ export class ComputeSnapshotService { /** Schedule a delayed snapshot for a run. Replaces any pending snapshot for the same run. */ schedule(runFriendlyId: string, data: DelayedSnapshot) { this.timerWheel.submit(runFriendlyId, data); + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.op = "snapshot.schedule"; + state.meta.run_id = runFriendlyId; + state.meta.snapshot_id = data.snapshotFriendlyId; + state.extras.runner_id = data.runnerId; + state.extras.delay_ms = this.computeManager.snapshotDelayMs; + }, + }); this.logger.debug("Snapshot scheduled", { runFriendlyId, snapshotFriendlyId: data.snapshotFriendlyId, @@ -73,6 +95,13 @@ export class ComputeSnapshotService { cancel(runFriendlyId: string): boolean { const cancelled = this.timerWheel.cancel(runFriendlyId); if (cancelled) { + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.op = "snapshot.canceled"; + state.meta.run_id = runFriendlyId; + }, + }); this.logger.debug("Snapshot cancelled", { runFriendlyId }); } return cancelled; @@ -81,6 +110,24 @@ export class ComputeSnapshotService { /** Handle the callback from the gateway after a snapshot completes or fails. */ async handleCallback(body: SnapshotCallbackPayload) { const snapshotId = body.status === "completed" ? body.snapshot_id : undefined; + const runId = body.metadata?.runId; + const snapshotFriendlyId = body.metadata?.snapshotFriendlyId; + + // Enrich the wrapping route's wide event with snapshot metadata. The + // `/api/v1/compute/snapshot-complete` route is registered with `wideRoute`, + // so `fromContext()` returns the State of that route and these calls + // become extras/meta on the same wide event - no nested emission. + const state = fromContext(); + if (state) { + state.extras.op = "snapshot.callback"; + state.extras["snapshot.status"] = body.status; + if (body.instance_id) state.extras["snapshot.instance_id"] = body.instance_id; + if (body.duration_ms !== undefined) state.extras["snapshot.duration_ms"] = body.duration_ms; + if (snapshotId) state.extras["snapshot.id"] = snapshotId; + if (body.status === "failed" && body.error) state.extras["snapshot.error"] = body.error; + } + if (runId) setMeta(state, "run_id", runId); + if (snapshotFriendlyId) setMeta(state, "snapshot_id", snapshotFriendlyId); this.logger.debug("Snapshot callback", { snapshotId, @@ -91,9 +138,6 @@ export class ComputeSnapshotService { durationMs: body.duration_ms, }); - const runId = body.metadata?.runId; - const snapshotFriendlyId = body.metadata?.snapshotFriendlyId; - if (!runId || !snapshotFriendlyId) { this.logger.error("Snapshot callback missing metadata", { body }); return { ok: false as const, status: 400 }; @@ -102,6 +146,7 @@ export class ComputeSnapshotService { this.#emitSnapshotSpan(runId, body.duration_ms, snapshotId); if (body.status === "completed") { + const submitStart = performance.now(); const result = await this.workerClient.submitSuspendCompletion({ runId, snapshotId: snapshotFriendlyId, @@ -113,6 +158,11 @@ export class ComputeSnapshotService { }, }, }); + recordPhaseSince( + "submit_completion", + submitStart, + result.success ? undefined : new Error(String(result.error)) + ); if (result.success) { this.logger.debug("Suspend completion submitted", { @@ -121,6 +171,7 @@ export class ComputeSnapshotService { snapshotId: body.snapshot_id, }); } else { + setExtra(state, "submit_completion.error", String(result.error)); this.logger.error("Failed to submit suspend completion", { runId, snapshotFriendlyId, @@ -128,6 +179,7 @@ export class ComputeSnapshotService { }); } } else { + const submitStart = performance.now(); const result = await this.workerClient.submitSuspendCompletion({ runId, snapshotId: snapshotFriendlyId, @@ -136,8 +188,14 @@ export class ComputeSnapshotService { error: body.error ?? "Snapshot failed", }, }); + recordPhaseSince( + "submit_completion", + submitStart, + result.success ? undefined : new Error(String(result.error)) + ); if (!result.success) { + setExtra(state, "submit_completion.error", String(result.error)); this.logger.error("Failed to submit suspend failure", { runId, snapshotFriendlyId, @@ -184,20 +242,30 @@ export class ComputeSnapshotService { /** Dispatch a snapshot request to the gateway. */ private async dispatch(snapshot: DelayedSnapshot): Promise { - const result = await this.computeManager.snapshot({ - runnerId: snapshot.runnerId, - metadata: { - runId: snapshot.runFriendlyId, - snapshotFriendlyId: snapshot.snapshotFriendlyId, + await runWideEvent( + { + ...this.wideEventOpts, + setup: (state) => { + state.extras.op = "snapshot.dispatch"; + state.meta.run_id = snapshot.runFriendlyId; + state.meta.snapshot_id = snapshot.snapshotFriendlyId; + state.extras.runner_id = snapshot.runnerId; + }, }, - }); + async () => { + const result = await this.computeManager.snapshot({ + runnerId: snapshot.runnerId, + metadata: { + runId: snapshot.runFriendlyId, + snapshotFriendlyId: snapshot.snapshotFriendlyId, + }, + }); - if (!result) { - this.logger.error("Failed to request snapshot", { - runId: snapshot.runFriendlyId, - runnerId: snapshot.runnerId, - }); - } + if (!result) { + throw new Error("Snapshot dispatch returned no result"); + } + } + ); } #emitSnapshotSpan(runFriendlyId: string, durationMs?: number, snapshotId?: string) { diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index ea7728e6376..da6099b4a33 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -124,6 +124,7 @@ export class WorkloadServer extends EventEmitter { computeManager: opts.computeManager, workerClient: opts.workerClient, tracing: opts.tracing, + wideEventOpts: this.wideEventOpts, }); } From d0409fe6f401c86a559d93062f27b7d12243f7fb Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 21 May 2026 09:53:38 +0100 Subject: [PATCH 7/9] feat(supervisor): set op + kind on all wide events --- .../supervisor-op-field-coverage.md | 6 ++++++ apps/supervisor/src/index.ts | 2 ++ .../src/services/computeSnapshotService.ts | 3 +++ apps/supervisor/src/workloadServer/index.ts | 20 +++++++++++++++++-- 4 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 .server-changes/supervisor-op-field-coverage.md diff --git a/.server-changes/supervisor-op-field-coverage.md b/.server-changes/supervisor-op-field-coverage.md new file mode 100644 index 00000000000..49889aa1cb7 --- /dev/null +++ b/.server-changes/supervisor-op-field-coverage.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: improvement +--- + +Tag every supervisor structured event with `op` and `kind` fields for consistent filtering and aggregation. diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index e2c16c4b1df..c0779eb7278 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -269,6 +269,8 @@ class ManagedSupervisor { setMeta(state, "deployment_id", message.deployment.friendlyId); } setMeta(state, "machine_preset", message.run.machine.name); + state.extras.op = "dequeue"; + state.extras.kind = "inbound"; state.extras.iteration = "dequeue"; state.extras.dequeue_response_ms = dequeueResponseMs; state.extras.polling_interval_ms = pollingIntervalMs; diff --git a/apps/supervisor/src/services/computeSnapshotService.ts b/apps/supervisor/src/services/computeSnapshotService.ts index 3b88aa98522..a4684a1049a 100644 --- a/apps/supervisor/src/services/computeSnapshotService.ts +++ b/apps/supervisor/src/services/computeSnapshotService.ts @@ -78,6 +78,7 @@ export class ComputeSnapshotService { ...this.wideEventOpts, populate: (state) => { state.extras.op = "snapshot.schedule"; + state.extras.kind = "event"; state.meta.run_id = runFriendlyId; state.meta.snapshot_id = data.snapshotFriendlyId; state.extras.runner_id = data.runnerId; @@ -99,6 +100,7 @@ export class ComputeSnapshotService { ...this.wideEventOpts, populate: (state) => { state.extras.op = "snapshot.canceled"; + state.extras.kind = "event"; state.meta.run_id = runFriendlyId; }, }); @@ -247,6 +249,7 @@ export class ComputeSnapshotService { ...this.wideEventOpts, setup: (state) => { state.extras.op = "snapshot.dispatch"; + state.extras.kind = "scheduled"; state.meta.run_id = snapshot.runFriendlyId; state.meta.snapshot_id = snapshot.snapshotFriendlyId; state.extras.runner_id = snapshot.runnerId; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index da6099b4a33..e4bb4625a06 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -184,6 +184,7 @@ export class WorkloadServer extends EventEmitter { */ private wideRoute( ctx: { req: IncomingMessage; res: ServerResponse; params?: unknown }, + op: string, route: string, method: string, fn: () => Promise | T, @@ -199,7 +200,11 @@ export class WorkloadServer extends EventEmitter { method, traceparent: this.headerValueFromRequest(ctx.req, "traceparent"), inboundRequestId: this.headerValueFromRequest(ctx.req, "x-request-id"), - setup: (state) => this.attachRouteMeta(state, ctx.params), + setup: (state) => { + state.extras.op = op; + state.extras.kind = "inbound"; + this.attachRouteMeta(state, ctx.params); + }, }, fn, (state) => { @@ -231,6 +236,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "attempt.start", "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/attempts/start", "POST", async () => { @@ -266,6 +272,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "attempt.complete", "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/attempts/complete", "POST", async () => { @@ -303,6 +310,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "heartbeat", "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/heartbeat", "POST", async () => { @@ -339,6 +347,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "suspend", "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/suspend", "GET", async () => { @@ -434,6 +443,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "continue", "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/continue", "GET", async () => { @@ -475,6 +485,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "snapshots.since", "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/since/:snapshotFriendlyId", "GET", async () => { @@ -510,6 +521,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "deployment.dequeue", "/api/v1/workload-actions/deployments/:deploymentId/dequeue", "GET", async () => { @@ -541,6 +553,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "logs.debug", "/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", async () => { @@ -562,6 +575,7 @@ export class WorkloadServer extends EventEmitter { handler: async (ctx) => this.wideRoute( ctx, + "logs.debug", "/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", async () => { @@ -576,7 +590,7 @@ export class WorkloadServer extends EventEmitter { httpServer.route("/api/v1/compute/snapshot-complete", "POST", { bodySchema: SnapshotCallbackPayloadSchema, handler: async (ctx) => - this.wideRoute(ctx, "/api/v1/compute/snapshot-complete", "POST", async () => { + this.wideRoute(ctx, "snapshot.callback", "/api/v1/compute/snapshot-complete", "POST", async () => { const { reply, body } = ctx; if (!this.snapshotService) { reply.empty(404); @@ -666,6 +680,8 @@ export class WorkloadServer extends EventEmitter { emitOneShot({ ...this.wideEventOpts, populate: (state) => { + state.extras.op = event === "run_connected" ? "socket.run.connected" : "socket.run.disconnected"; + state.extras.kind = "event"; state.extras.event = event; setMeta(state, "run_id", friendlyId); if (socket.data.deploymentId) { From 06e39d408230e8aed6358712ddbad6a871932aa9 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 21 May 2026 10:06:05 +0100 Subject: [PATCH 8/9] refactor(supervisor): op + kind first-class on State --- apps/supervisor/src/index.ts | 4 ++-- .../src/services/computeSnapshotService.ts | 13 +++++------ apps/supervisor/src/wideEvents/emit.ts | 3 +++ .../src/wideEvents/middleware.test.ts | 22 +++++++++---------- apps/supervisor/src/wideEvents/middleware.ts | 10 +++++++++ apps/supervisor/src/wideEvents/new.ts | 6 +++++ apps/supervisor/src/wideEvents/state.ts | 16 ++++++++++++++ apps/supervisor/src/workloadServer/index.ts | 12 +++++----- 8 files changed, 59 insertions(+), 27 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index c0779eb7278..3a1e6165fc9 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -259,6 +259,8 @@ class ManagedSupervisor { await runWideEvent( { ...this.wideEventOpts, + op: "dequeue", + kind: "inbound", traceparent, setup: (state) => { setMeta(state, "run_id", message.run.friendlyId); @@ -269,8 +271,6 @@ class ManagedSupervisor { setMeta(state, "deployment_id", message.deployment.friendlyId); } setMeta(state, "machine_preset", message.run.machine.name); - state.extras.op = "dequeue"; - state.extras.kind = "inbound"; state.extras.iteration = "dequeue"; state.extras.dequeue_response_ms = dequeueResponseMs; state.extras.polling_interval_ms = pollingIntervalMs; diff --git a/apps/supervisor/src/services/computeSnapshotService.ts b/apps/supervisor/src/services/computeSnapshotService.ts index a4684a1049a..35ac6acecab 100644 --- a/apps/supervisor/src/services/computeSnapshotService.ts +++ b/apps/supervisor/src/services/computeSnapshotService.ts @@ -76,9 +76,9 @@ export class ComputeSnapshotService { this.timerWheel.submit(runFriendlyId, data); emitOneShot({ ...this.wideEventOpts, + op: "snapshot.schedule", + kind: "event", populate: (state) => { - state.extras.op = "snapshot.schedule"; - state.extras.kind = "event"; state.meta.run_id = runFriendlyId; state.meta.snapshot_id = data.snapshotFriendlyId; state.extras.runner_id = data.runnerId; @@ -98,9 +98,9 @@ export class ComputeSnapshotService { if (cancelled) { emitOneShot({ ...this.wideEventOpts, + op: "snapshot.canceled", + kind: "event", populate: (state) => { - state.extras.op = "snapshot.canceled"; - state.extras.kind = "event"; state.meta.run_id = runFriendlyId; }, }); @@ -121,7 +121,6 @@ export class ComputeSnapshotService { // become extras/meta on the same wide event - no nested emission. const state = fromContext(); if (state) { - state.extras.op = "snapshot.callback"; state.extras["snapshot.status"] = body.status; if (body.instance_id) state.extras["snapshot.instance_id"] = body.instance_id; if (body.duration_ms !== undefined) state.extras["snapshot.duration_ms"] = body.duration_ms; @@ -247,9 +246,9 @@ export class ComputeSnapshotService { await runWideEvent( { ...this.wideEventOpts, + op: "snapshot.dispatch", + kind: "scheduled", setup: (state) => { - state.extras.op = "snapshot.dispatch"; - state.extras.kind = "scheduled"; state.meta.run_id = snapshot.runFriendlyId; state.meta.snapshot_id = snapshot.snapshotFriendlyId; state.extras.runner_id = snapshot.runnerId; diff --git a/apps/supervisor/src/wideEvents/emit.ts b/apps/supervisor/src/wideEvents/emit.ts index a1644237d5d..f3f765cdd17 100644 --- a/apps/supervisor/src/wideEvents/emit.ts +++ b/apps/supervisor/src/wideEvents/emit.ts @@ -27,6 +27,9 @@ export function emit(state: State): void { appendIfSet(out, "region", state.region); appendIfSet(out, "node_id", state.nodeId); + appendIfSet(out, "op", state.op); + appendIfSet(out, "kind", state.kind); + out.ok = state.ok; if (state.statusCode !== 0) out.status = state.statusCode; out.duration_ms = state.durationMs; diff --git a/apps/supervisor/src/wideEvents/middleware.test.ts b/apps/supervisor/src/wideEvents/middleware.test.ts index 18a6469d7b8..afb59f43d6e 100644 --- a/apps/supervisor/src/wideEvents/middleware.test.ts +++ b/apps/supervisor/src/wideEvents/middleware.test.ts @@ -20,7 +20,7 @@ describe("runWideEvent", () => { it("emits one event with ok=true when no statusCode is set", async () => { const lines = await captureStdout(async () => { await runWideEvent( - { service: "supervisor", env: {}, enabled: true, route: "/x", method: "POST" }, + { service: "supervisor", env: {}, enabled: true, op: "test", route: "/x", method: "POST" }, async () => undefined ); }); @@ -39,7 +39,7 @@ describe("runWideEvent", () => { it("derives ok from statusCode set via finalize", async () => { const lines = await captureStdout(async () => { await runWideEvent( - { service: "supervisor", env: {}, enabled: true }, + { service: "supervisor", env: {}, enabled: true, op: "test" }, async () => undefined, (state) => { state.statusCode = 200; @@ -56,7 +56,7 @@ describe("runWideEvent", () => { it("treats 4xx as ok=false", async () => { const lines = await captureStdout(async () => { await runWideEvent( - { service: "supervisor", env: {}, enabled: true }, + { service: "supervisor", env: {}, enabled: true, op: "test" }, async () => undefined, (state) => { state.statusCode = 400; @@ -73,7 +73,7 @@ describe("runWideEvent", () => { it("emits ok=false with error.kind=internal on throw", async () => { const lines = await captureStdout(async () => { await runWideEvent( - { service: "supervisor", env: {}, enabled: true }, + { service: "supervisor", env: {}, enabled: true, op: "test" }, async () => { throw new Error("boom"); } @@ -91,7 +91,7 @@ describe("runWideEvent", () => { it("threads state through AsyncLocalStorage", async () => { const lines = await captureStdout(async () => { await runWideEvent( - { service: "supervisor", env: {}, enabled: true }, + { service: "supervisor", env: {}, enabled: true, op: "test" }, async () => { setMeta(fromContext(), "run_id", "run_abc"); } @@ -108,7 +108,7 @@ describe("runWideEvent", () => { const tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"; const lines = await captureStdout(async () => { await runWideEvent( - { service: "supervisor", env: {}, enabled: true, traceparent: tp }, + { service: "supervisor", env: {}, enabled: true, op: "test", traceparent: tp }, async () => undefined ); }); @@ -124,7 +124,7 @@ describe("runWideEvent", () => { { service: "supervisor", env: {}, - enabled: true, + enabled: true, op: "test", setup: (state) => { state.meta.run_id = "run_abc"; state.extras.iteration = "dequeue"; @@ -144,7 +144,7 @@ describe("runWideEvent", () => { let seenState: ReturnType = null; const lines = await captureStdout(async () => { await runWideEvent( - { service: "supervisor", env: {}, enabled: false }, + { service: "supervisor", env: {}, enabled: false, op: "test" }, async () => { seenState = fromContext(); } @@ -159,7 +159,7 @@ describe("runWideEvent", () => { await Promise.all( ["a", "b", "c"].map((tag) => runWideEvent( - { service: "supervisor", env: {}, enabled: true }, + { service: "supervisor", env: {}, enabled: true, op: "test" }, async () => { const s = fromContext(); if (!s) throw new Error("no state"); @@ -182,7 +182,7 @@ describe("emitOneShot", () => { emitOneShot({ service: "supervisor", env: {}, - enabled: true, + enabled: true, op: "test", populate: (s) => { s.meta.run_id = "run_abc"; s.extras.event = "run:start"; @@ -200,7 +200,7 @@ describe("emitOneShot", () => { it("emits nothing when disabled", async () => { const lines = await captureStdout(() => { - emitOneShot({ service: "supervisor", env: {}, enabled: false }); + emitOneShot({ service: "supervisor", env: {}, enabled: false, op: "test" }); }); expect(lines).toHaveLength(0); }); diff --git a/apps/supervisor/src/wideEvents/middleware.ts b/apps/supervisor/src/wideEvents/middleware.ts index a87a1885d56..034c136414f 100644 --- a/apps/supervisor/src/wideEvents/middleware.ts +++ b/apps/supervisor/src/wideEvents/middleware.ts @@ -18,6 +18,10 @@ export type WideEventOptions = { /** Per-invocation options layered on top of `WideEventOptions`. */ export type WideEventLifecycleOptions = WideEventOptions & { + /** Operation discriminator (`instance.create`, `dequeue`, ...). Required. */ + op: string; + /** Event shape: `inbound` | `outbound` | `event` | `scheduled`. Optional. */ + kind?: string; /** Route template (HTTP only) captured into `extras.route`. */ route?: string; /** HTTP method captured into `extras.method`. */ @@ -54,6 +58,8 @@ export async function runWideEvent( env: opts.env, inboundRequestId: opts.inboundRequestId, traceparent: opts.traceparent, + op: opts.op, + kind: opts.kind, }); if (opts.route) state.extras.route = opts.route; if (opts.method) state.extras.method = opts.method; @@ -94,6 +100,8 @@ export async function runWideEvent( */ export function emitOneShot( opts: WideEventOptions & { + op: string; + kind?: string; traceparent?: string; populate?: (state: State) => void; } @@ -103,6 +111,8 @@ export function emitOneShot( service: opts.service, env: opts.env, traceparent: opts.traceparent, + op: opts.op, + kind: opts.kind, }); if (opts.populate) opts.populate(state); state.ok = true; diff --git a/apps/supervisor/src/wideEvents/new.ts b/apps/supervisor/src/wideEvents/new.ts index 2b7c88b7cc9..79d3e15939d 100644 --- a/apps/supervisor/src/wideEvents/new.ts +++ b/apps/supervisor/src/wideEvents/new.ts @@ -37,6 +37,10 @@ export type NewStateOptions = { inboundRequestId?: string; /** Optional inbound W3C traceparent (HTTP header, queue message field). */ traceparent?: string; + /** Operation discriminator. Dotted `noun.verb`. Defaults to empty (set later). */ + op?: string; + /** Event shape: `inbound` | `outbound` | `event` | `scheduled`. Defaults to empty. */ + kind?: string; }; /** @@ -62,6 +66,8 @@ export function newState(opts: NewStateOptions): State { commitSha: opts.env.commitSha, region: opts.env.region, nodeId: opts.env.nodeId, + op: opts.op ?? "", + kind: opts.kind ?? "", meta: {}, phases: [], ok: false, diff --git a/apps/supervisor/src/wideEvents/state.ts b/apps/supervisor/src/wideEvents/state.ts index 1929e5e9806..a0e2997b1d2 100644 --- a/apps/supervisor/src/wideEvents/state.ts +++ b/apps/supervisor/src/wideEvents/state.ts @@ -22,6 +22,22 @@ export type State = { region?: string; nodeId?: string; + /** + * Operation discriminator. Dotted `noun.verb` (e.g. `instance.create`, + * `snapshot.dispatch`). Low cardinality - bounded set per service, not + * unbounded. Empty allowed during construction but expected to be set + * before emit. + */ + op: string; + + /** + * Event shape. `inbound` for received requests, `outbound` for outgoing + * calls, `event` for ambient occurrences with no meaningful duration, + * `scheduled` for timer-driven work. Empty allowed; omitted from emit + * when empty. + */ + kind: string; + // Caller-attached opaque metadata, flattened to `meta.` on emit. meta: Record; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index e4bb4625a06..f5c5ded14e3 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -196,15 +196,13 @@ export class WorkloadServer extends EventEmitter { { ...this.wideEventOpts, enabled, + op, + kind: "inbound", route, method, traceparent: this.headerValueFromRequest(ctx.req, "traceparent"), inboundRequestId: this.headerValueFromRequest(ctx.req, "x-request-id"), - setup: (state) => { - state.extras.op = op; - state.extras.kind = "inbound"; - this.attachRouteMeta(state, ctx.params); - }, + setup: (state) => this.attachRouteMeta(state, ctx.params), }, fn, (state) => { @@ -679,9 +677,9 @@ export class WorkloadServer extends EventEmitter { ) => { emitOneShot({ ...this.wideEventOpts, + op: event === "run_connected" ? "socket.run.connected" : "socket.run.disconnected", + kind: "event", populate: (state) => { - state.extras.op = event === "run_connected" ? "socket.run.connected" : "socket.run.disconnected"; - state.extras.kind = "event"; state.extras.event = event; setMeta(state, "run_id", friendlyId); if (socket.data.deploymentId) { From eba714103212a6873d2a38ea13606784af91966a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 21 May 2026 14:39:19 +0100 Subject: [PATCH 9/9] feat(supervisor): forward Baggage outbound on compute calls --- .../supervisor/src/wideEvents/baggage.test.ts | 37 ++++++++++++++++ apps/supervisor/src/wideEvents/baggage.ts | 43 +++++++++++++++++++ apps/supervisor/src/wideEvents/index.ts | 1 + .../supervisor/src/workloadManager/compute.ts | 14 ++++-- 4 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 apps/supervisor/src/wideEvents/baggage.test.ts create mode 100644 apps/supervisor/src/wideEvents/baggage.ts diff --git a/apps/supervisor/src/wideEvents/baggage.test.ts b/apps/supervisor/src/wideEvents/baggage.test.ts new file mode 100644 index 00000000000..36b4fa9b99c --- /dev/null +++ b/apps/supervisor/src/wideEvents/baggage.test.ts @@ -0,0 +1,37 @@ +import { describe, it, expect } from "vitest"; +import { encodeBaggage } from "./baggage.js"; + +describe("encodeBaggage", () => { + it("returns empty string for an empty map", () => { + expect(encodeBaggage({})).toBe(""); + }); + + it("encodes a single entry as k=v", () => { + expect(encodeBaggage({ run_id: "run-1" })).toBe("run_id=run-1"); + }); + + it("sorts keys for stable output across hops", () => { + expect(encodeBaggage({ b: "2", a: "1", c: "3" })).toBe("a=1,b=2,c=3"); + }); + + it("skips empty keys and empty values", () => { + expect(encodeBaggage({ "": "v", k: "", real: "x" })).toBe("real=x"); + }); + + it("truncates values longer than the cap", () => { + const long = "x".repeat(1024); + const got = encodeBaggage({ k: long }); + const value = got.slice("k=".length); + expect(value.length).toBe(256); + }); + + it("caps the number of entries", () => { + const meta: Record = {}; + for (let i = 0; i < 50; i++) { + // Sortable two-digit keys so we know which 32 survive. + meta[`k${String(i).padStart(2, "0")}`] = "v"; + } + const got = encodeBaggage(meta); + expect(got.split(",").length).toBe(32); + }); +}); diff --git a/apps/supervisor/src/wideEvents/baggage.ts b/apps/supervisor/src/wideEvents/baggage.ts new file mode 100644 index 00000000000..98dcdb8dc6c --- /dev/null +++ b/apps/supervisor/src/wideEvents/baggage.ts @@ -0,0 +1,43 @@ +/** + * W3C Baggage (https://www.w3.org/TR/baggage/) encoding for outbound peer + * calls. Serialises a State's `meta` map into a `Baggage` header value so + * the downstream service auto-stamps the same labels onto its own wide + * events - even on early-error paths that bail before parsing the request + * body. + * + * Outbound discipline: only call this on peer-to-peer hops within the trust + * boundary. External-endpoint calls (image registries, cloud-provider + * APIs, third-party webhooks) must not include the Baggage header. + */ + +/** + * Cap the number of entries serialised onto the header. A misbehaving + * caller's `meta` map shouldn't blow up downstream event width. + */ +const MAX_BAGGAGE_ENTRIES = 32; + +/** + * Cap each value's length. Defense against an upstream that stuffs + * unbounded payloads into a meta value. + */ +const MAX_BAGGAGE_VALUE_BYTES = 256; + +/** + * Encode a `meta` map as a Baggage header value (`k1=v1,k2=v2`). Keys are + * sorted for stable output across hops; an empty input yields the empty + * string so the caller can skip emitting the header entirely. + */ +export function encodeBaggage(meta: Record): string { + const entries = Object.entries(meta).filter(([k, v]) => k && v); + if (entries.length === 0) return ""; + + entries.sort(([a], [b]) => (a < b ? -1 : a > b ? 1 : 0)); + + const out: string[] = []; + for (const [k, raw] of entries) { + if (out.length >= MAX_BAGGAGE_ENTRIES) break; + const v = raw.length > MAX_BAGGAGE_VALUE_BYTES ? raw.slice(0, MAX_BAGGAGE_VALUE_BYTES) : raw; + out.push(`${k}=${v}`); + } + return out.join(","); +} diff --git a/apps/supervisor/src/wideEvents/index.ts b/apps/supervisor/src/wideEvents/index.ts index 742d5c018e8..4eda429a50a 100644 --- a/apps/supervisor/src/wideEvents/index.ts +++ b/apps/supervisor/src/wideEvents/index.ts @@ -26,3 +26,4 @@ export { type WideEventOptions, } from "./middleware.js"; export type { ErrorInfo, PhaseRecord, State } from "./state.js"; +export { encodeBaggage } from "./baggage.js"; diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts index 2c51b15f2d4..3c40ac9e2e6 100644 --- a/apps/supervisor/src/workloadManager/compute.ts +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -10,7 +10,7 @@ import { ComputeClient, stripImageDigest } from "@internal/compute"; import { extractTraceparent, getRunnerId } from "../util.js"; import type { OtlpTraceService } from "../services/otlpTraceService.js"; import { tryCatch } from "@trigger.dev/core"; -import { fromContext } from "../wideEvents/index.js"; +import { encodeBaggage, fromContext } from "../wideEvents/index.js"; type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { gateway: { @@ -49,9 +49,11 @@ export class ComputeWorkloadManager implements WorkloadManager { timeoutMs: opts.gateway.timeoutMs, // Forward the current wide-event scope's traceparent + request_id so the // downstream service continues the same trace and joins its own wide - // events to ours. When called outside a wide-event scope (or when wide - // events are disabled), `fromContext` returns undefined and propagation - // is skipped. + // events to ours. Additionally serialize caller-supplied meta labels + // into the W3C Baggage header so the downstream service auto-stamps + // them even on early-error paths that bail before parsing the body. + // When called outside a wide-event scope (or when wide events are + // disabled), `fromContext` returns undefined and propagation is skipped. getPropagationHeaders: () => { const state = fromContext(); if (!state) return {}; @@ -59,6 +61,10 @@ export class ComputeWorkloadManager implements WorkloadManager { if (state.traceparent) { headers.traceparent = state.traceparent; } + const baggage = encodeBaggage(state.meta); + if (baggage) { + headers.baggage = baggage; + } return headers; }, });