feat(ai): activity-agnostic observability hook for media activities (#720)#760
feat(ai): activity-agnostic observability hook for media activities (#720)#760season179 wants to merge 3 commits into
Conversation
…g details) otelMiddleware only emitted gen_ai.usage.input_tokens/output_tokens even though TokenUsage already carries provider-reported cost, total tokens, cache/reasoning breakdowns, and duration-based billing. Backends like PostHog had to re-derive cost from their own price tables, losing cache discounts and gateway markup (OpenRouter), and duration-billed activities had no cost signal at all. A shared usageAttributes() helper now builds the full guarded attribute set at all three emission sites (RUN_FINISHED chunk, onUsage, onFinish rollup): - gen_ai.usage.total_tokens / gen_ai.usage.cost (de-facto extensions consumed directly by PostHog and LiteLLM-style backends) - gen_ai.usage.cache_read.input_tokens, cache_creation.input_tokens, reasoning.output_tokens (official GenAI semconv names) - tanstack.ai.usage.duration_seconds and the upstream cost split (no semconv equivalent exists) E2E: new /api/otel-usage route drives the existing openai-usage-details and openrouter-cost aimock mounts through otelMiddleware with a local capture tracer; middleware.spec.ts asserts the attributes land on iteration and root spans. Fixes TanStack#721
…anStack#720) Add an `ActivityObserver` contract (onStart/onFinish/onError, payload discriminated by `activity`) registerable on any activity via a new `observers` option. Observers are awaited in order and strictly non-fatal — a throwing observer is logged and skipped, never breaking the activity. Wire it into generateImage, generateVideo, generateAudio, generateSpeech, and generateTranscription. Chat stays on otelMiddleware; the ActivityKind type includes `chat` for a future unified contract. Ship `otelObserver()` on a new `@tanstack/ai/observability` subpath: one gen_ai.* span per call tagged with the correct gen_ai.operation.name, reusing the shared usage-attribute set (now including tanstack.ai.usage.units_billed) and the gen_ai.client.operation.duration histogram when a Meter is supplied. The observer types are exported from the package root while the otelObserver value lives only on the subpath, so importing @tanstack/ai never requires the optional @opentelemetry/api peer. Streaming generateVideo covers the full create/poll/complete lifecycle and ends its span even if the consumer abandons the stream mid-poll; non-streaming generateVideo records a submit span.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (4)
📝 WalkthroughWalkthroughAdds activity-agnostic observability to ChangesActivity Observability and Full OTel Usage
Sequence Diagram(s)sequenceDiagram
participant Caller
participant MediaActivity as Media Activity<br/>(generateImage/etc.)
participant notifyObserver
participant ActivityObserver as ActivityObserver<br/>(e.g. otelObserver)
participant Adapter
Caller->>MediaActivity: generateImage({ observers: [otelObserver] })
MediaActivity->>notifyObserver: notifyObserverStart(observers, startEvent)
notifyObserver->>ActivityObserver: onStart(ActivityStartEvent)
ActivityObserver->>ActivityObserver: startSpan(gen_ai.operation.name=image_generation)
MediaActivity->>Adapter: adapter.generateImage(request)
Adapter-->>MediaActivity: result + usage
MediaActivity->>notifyObserver: notifyObserverFinish(observers, finishEvent + durationMs + usage)
notifyObserver->>ActivityObserver: onFinish(ActivityFinishEvent)
ActivityObserver->>ActivityObserver: span.setAttributes(usageAttributes)<br/>span.end()<br/>recordDuration()
MediaActivity-->>Caller: result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/advanced/otel.md`:
- Line 194: The openaiImage adapter example in the documentation is using an
outdated model identifier. In the openaiImage function call, replace the model
parameter from 'gpt-image-1' to 'gpt-image-2' to reflect the latest OpenAI image
generation model as defined in the adapter's model-meta.ts file.
In `@packages/ai/src/activities/generateVideo/index.ts`:
- Around line 375-385: The observer lifecycle is mismanaged: notifyObserverStart
at lines 375 and around 395-397 occurs after yields, allowing early breaks to
emit onError without onStart, and notifyObserverFinish at lines 459-472 occurs
after yielding generation:result, causing the finally block (lines 518-539) to
see settled === false and emit a synthetic cancellation error. Move both
notifyObserverStart calls to execute before any yields in the function, and move
notifyObserverFinish to execute before the generation:result yield (or set
settled === true before yielding), so that the finally block's settled flag
accurately reflects whether the observer lifecycle was properly started and
completed.
In `@testing/e2e/src/routes/api.otel-media.ts`:
- Around line 117-127: The payload validation is missing before destructuring
properties from the data object. Currently, if body.forwardedProps, body.data,
or body is null or a primitive value, or if required fields (prompt, provider)
are missing, the destructuring assignment will throw an unhandled error
resulting in a 500 response instead of the proper error response. Add validation
logic before the destructuring of prompt, provider, testId, and aimockPort from
the data object to check that data is a valid object with the required
properties, and return { ok: false, error } with an appropriate error message if
validation fails. Only proceed to createImageAdapter and
createLocalCaptureTracer after successful validation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3e1da7aa-1e1b-402b-98f8-1b412af95984
📒 Files selected for processing (27)
.changeset/activity-observers.md.changeset/otel-full-usage-emission.mddocs/advanced/otel.mddocs/config.jsonpackages/ai/package.jsonpackages/ai/src/activities/generateAudio/index.tspackages/ai/src/activities/generateImage/index.tspackages/ai/src/activities/generateSpeech/index.tspackages/ai/src/activities/generateTranscription/index.tspackages/ai/src/activities/generateVideo/index.tspackages/ai/src/index.tspackages/ai/src/middlewares/otel.tspackages/ai/src/observability/index.tspackages/ai/src/observability/notify.tspackages/ai/src/observability/otel.tspackages/ai/src/observability/types.tspackages/ai/src/observability/usage-attributes.tspackages/ai/src/utilities/errors.tspackages/ai/src/utilities/numbers.tspackages/ai/tests/middlewares/otel.test.tspackages/ai/tests/observability/activity-observers.test.tspackages/ai/tests/observability/otel-observer.test.tspackages/ai/vite.config.tstesting/e2e/src/routeTree.gen.tstesting/e2e/src/routes/api.otel-media.tstesting/e2e/src/routes/api.otel-usage.tstesting/e2e/tests/middleware.spec.ts
| await notifyObserverStart( | ||
| observers, | ||
| { | ||
| activity: 'video', | ||
| requestId, | ||
| provider: adapter.name, | ||
| model, | ||
| modelOptions, | ||
| }, | ||
| logger, | ||
| ) |
There was a problem hiding this comment.
Observer terminal state can be misreported as cancelled on successful completion.
At Line 459, notifyObserverFinish runs after yielding generation:result; if the consumer breaks at that yield, finally (Line 519) still sees settled === false and emits a synthetic cancellation error. Also, notifyObserverStart is currently after the first yield (Line 375), so an early break can produce onError without onStart.
Suggested fix
- yield {
- type: 'RUN_STARTED',
- runId,
- threadId,
- timestamp: Date.now(),
- } as StreamChunk
-
await notifyObserverStart(
observers,
{
activity: 'video',
requestId,
provider: adapter.name,
model,
modelOptions,
},
logger,
)
+
+ yield {
+ type: 'RUN_STARTED',
+ runId,
+ threadId,
+ timestamp: Date.now(),
+ } as StreamChunk
...
- yield {
- type: 'CUSTOM',
- name: 'generation:result',
- value: {
- jobId: jobResult.jobId,
- status: 'completed',
- url: urlResult.url,
- expiresAt: urlResult.expiresAt,
- ...(urlResult.usage ? { usage: urlResult.usage } : {}),
- },
- timestamp: Date.now(),
- } as StreamChunk
-
+ settled = true
await notifyObserverFinish(
observers,
{
activity: 'video',
requestId,
provider: adapter.name,
model,
durationMs: Date.now() - obsStartTime,
usage: urlResult.usage,
},
logger,
)
- settled = true
+
+ yield {
+ type: 'CUSTOM',
+ name: 'generation:result',
+ value: {
+ jobId: jobResult.jobId,
+ status: 'completed',
+ url: urlResult.url,
+ expiresAt: urlResult.expiresAt,
+ ...(urlResult.usage ? { usage: urlResult.usage } : {}),
+ },
+ timestamp: Date.now(),
+ } as StreamChunkAlso applies to: 395-397, 459-472, 518-539
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/activities/generateVideo/index.ts` around lines 375 - 385,
The observer lifecycle is mismanaged: notifyObserverStart at lines 375 and
around 395-397 occurs after yields, allowing early breaks to emit onError
without onStart, and notifyObserverFinish at lines 459-472 occurs after yielding
generation:result, causing the finally block (lines 518-539) to see settled ===
false and emit a synthetic cancellation error. Move both notifyObserverStart
calls to execute before any yields in the function, and move
notifyObserverFinish to execute before the generation:result yield (or set
settled === true before yielding), so that the finally block's settled flag
accurately reflects whether the observer lifecycle was properly started and
completed.
| const body = await request.json() | ||
| const data = body.forwardedProps ?? body.data ?? body | ||
| const { prompt, provider, testId, aimockPort } = data as { | ||
| prompt: string | ||
| provider: Provider | ||
| testId?: string | ||
| aimockPort?: number | ||
| } | ||
|
|
||
| const adapter = createImageAdapter(provider, aimockPort, testId) | ||
| const { tracer, spans } = createLocalCaptureTracer() |
There was a problem hiding this comment.
Guard payload shape before property access/destructuring.
body.forwardedProps ?? body.data ?? body and the typed destructure run before the handler’s error response path. A null/primitive payload (or missing required fields) can throw and surface as an unhandled 500 instead of { ok: false, error }.
Suggested fix
- const body = await request.json()
- const data = body.forwardedProps ?? body.data ?? body
- const { prompt, provider, testId, aimockPort } = data as {
- prompt: string
- provider: Provider
- testId?: string
- aimockPort?: number
- }
-
- const adapter = createImageAdapter(provider, aimockPort, testId)
+ let prompt: string
+ let provider: Provider
+ let testId: string | undefined
+ let aimockPort: number | undefined
+ let adapter: ReturnType<typeof createImageAdapter>
+
+ try {
+ const body: unknown = await request.json()
+ const container =
+ body && typeof body === 'object'
+ ? ((body as Record<string, unknown>).forwardedProps ??
+ (body as Record<string, unknown>).data ??
+ body)
+ : null
+ if (!container || typeof container !== 'object') {
+ throw new Error('Invalid request body')
+ }
+ const raw = container as Record<string, unknown>
+ if (typeof raw.prompt !== 'string' || typeof raw.provider !== 'string') {
+ throw new Error('Missing required fields: prompt/provider')
+ }
+ prompt = raw.prompt
+ provider = raw.provider as Provider
+ testId = typeof raw.testId === 'string' ? raw.testId : undefined
+ aimockPort = typeof raw.aimockPort === 'number' ? raw.aimockPort : undefined
+ adapter = createImageAdapter(provider, aimockPort, testId)
+ } catch (error) {
+ return new Response(
+ JSON.stringify({
+ ok: false,
+ error: error instanceof Error ? error.message : String(error),
+ }),
+ { status: 200, headers: { 'Content-Type': 'application/json' } },
+ )
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@testing/e2e/src/routes/api.otel-media.ts` around lines 117 - 127, The payload
validation is missing before destructuring properties from the data object.
Currently, if body.forwardedProps, body.data, or body is null or a primitive
value, or if required fields (prompt, provider) are missing, the destructuring
assignment will throw an unhandled error resulting in a 500 response instead of
the proper error response. Add validation logic before the destructuring of
prompt, provider, testId, and aimockPort from the data object to check that data
is a valid object with the required properties, and return { ok: false, error }
with an appropriate error message if validation fails. Only proceed to
createImageAdapter and createLocalCaptureTracer after successful validation.
Move notifyObserverFinish + settled=true ahead of the generation:result yield in streaming generateVideo. Previously they ran after the yield, so a consumer that stopped reading once it had the result (without pulling RUN_FINISHED) tripped the finally cleanup with settled still false — reporting a spurious cancellation onError and never firing onFinish. Add a regression test for that abandonment case. Also use the latest gpt-image-2 model id in the otel docs and otelObserver JSDoc examples, and clear lint (import order, redundant casts) in the otel-media e2e route.
|
I like the idea of adding this but not like an observer Prop, chat has ChatMiddleware, this should just be GenerationMiddleware passed in via middleware prop same as chat, the chat one would be a superset that just observes more hooks than the generation one. So that way the API is uniform across both |
Summary
Closes #720.
Adds an activity-agnostic observability hook so non-chat activities (image, video, audio, speech, transcription) get the same OpenTelemetry coverage chat already has through
otelMiddleware.An
ActivityObserveris a small contract with three optional callbacks —onStart,onFinish,onError— whose payloads are discriminated by anactivityfield. Observers are registered per call via a newobserversoption, awaited in registration order, and strictly non-fatal: a throwing observer is logged and skipped, never breaking the activity.The first observer shipped is
otelObserver(), exported from a new@tanstack/ai/observabilitysubpath. It opens onegen_ai.*span per call, tagged with the rightgen_ai.operation.namefor the activity (image_generation,video_generation,audio_generation,text_to_speech,transcription), and reuses the shared usage-attribute set so cost, totals, cache/reasoning detail, duration billing, and media unit counts land identically across activities. When aMeteris supplied it also records thegen_ai.client.operation.durationhistogram — the same metric the chat middleware emits.Stacked on #747
This is stacked on #747 (
feat/otel-full-usage-emission), which introduces the sharedusageAttributes()helper this PR consumes. Merge #747 first.Until #747 merges, the diff below includes its single commit (
c7df2a33, "emit full usage on otel spans") in addition to the #720 work — review that commit in #747, not here. Once #747 lands inmain, this PR's diff narrows automatically to just the activity-observer changes.Test plan
packages/ai/tests/observability/— observer notification semantics,otelObserverspan/metric behavior, and a regression test for the streaming-video abandonment case (20 tests, green).testing/e2eroute + spec exercisingotelObserverongenerateImagethrough the mock transport.pnpm test:prgreen locally (sherif, knip, docs, eslint, lib, types, build) plus the E2E suite.Summary by CodeRabbit
otelObserver()to automatically emit OpenTelemetrygen_ai.*spans per activity, including correct operation names and optional duration metrics.@tanstack/ai/observabilitysubpath.