From 729aed7e44ed5dd2a9be14aa52f6f7717c230f8d Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 13:34:18 -0700 Subject: [PATCH 1/9] docs(spec): multi-message subagent cards (AG-UI full transcript) Co-Authored-By: Claude Fable 5 --- ...-ui-multi-message-subagent-cards-design.md | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 docs/superpowers/specs/2026-06-18-ag-ui-multi-message-subagent-cards-design.md diff --git a/docs/superpowers/specs/2026-06-18-ag-ui-multi-message-subagent-cards-design.md b/docs/superpowers/specs/2026-06-18-ag-ui-multi-message-subagent-cards-design.md new file mode 100644 index 00000000..8d532738 --- /dev/null +++ b/docs/superpowers/specs/2026-06-18-ag-ui-multi-message-subagent-cards-design.md @@ -0,0 +1,147 @@ +# Multi-message subagent cards (AG-UI) — Design + +**Status:** Approved (brainstorm 2026-06-18) + +**Goal:** Extend the AG-UI subagent cards so each subagent accumulates and renders a **full transcript** — multiple assistant text turns, reasoning, and tool calls (with results) — streamed live, instead of a single accumulating text blob. + +## Background + +F5 shipped subagent cards over AG-UI via native ACTIVITY events: a graph emits a `subagent_activity` CUSTOM event → an owned `ActivityEmittingAgent._dispatch_event` maps it to `ACTIVITY_SNAPSHOT`/`ACTIVITY_DELTA` → the L1 reducer (`libs/ag-ui`) accumulates activities generically → `toAgent()` projects `activityType==='subagent'` to the neutral `Subagent` contract → `chat-subagents`/`chat-subagent-card` render. + +Today the activity `content` carries a single accumulating `text` field; the projection synthesizes **one** `Message` from it (`to-agent.ts` `subagentFor`), and the card shows that latest text. This design carries the subagent's full message transcript instead. + +The neutral types already support a transcript: `Message` has `role: 'tool'`, `toolCallId`, `toolCallIds`, and `reasoning`; `ToolCall` has `{id, name, args, status, result?, error?}`. The only neutral-contract addition is exposing the subagent's tool calls. + +## Architecture (Approach 1 — fat activity content) + +One ACTIVITY per subagent (unchanged identity model), whose `content` carries the transcript as arrays; streamed via JSON-Patch DELTAs. The L1 reducer is **untouched** (it already applies arbitrary ACTIVITY patches). + +### A. Neutral contract (libs/chat) + +Add `toolCalls` to `Subagent` (`libs/chat/src/lib/agent/subagent.ts`): + +```ts +export interface Subagent { + toolCallId: string; + name?: string; + status: Signal; + messages: Signal; + toolCalls: Signal; // NEW — the subagent's tool calls (name/args/result) + state: Signal>; +} +``` + +`Message` and `ToolCall` are unchanged. + +### B. Wire shape (L3 ↔ L1 activity `content`) + +``` +{ toolCallId, name, status, + messages: [{ id, role, content, toolCallIds?, reasoning? }, …], + toolCalls: [{ id, name, args, status, result?, error? }, …] } +``` + +DELTAs (RFC-6902 JSON-Patch), live: +- `ACTIVITY_SNAPSHOT` on subagent start → `{toolCallId, name, status:'running', messages:[], toolCalls:[]}`. +- New message: `{op:'add', path:'/messages/-', value:{id, role:'assistant', content:''}}`. +- Token stream into the in-progress message: `{op:'replace', path:'/messages//content', value:}` (text_so_far pattern — replace, since JSON-Patch has no string append). +- Tool call: `{op:'add', path:'/toolCalls/-', value:{id, name, args, status:'running'}}` plus `{op:'replace', path:'/messages//toolCallIds', value:[…]}`. +- Tool result: `{op:'replace', path:'/toolCalls//status', value:'complete'}` + `{op:'replace', path:'/toolCalls//result', value:}`. +- Finish: `{op:'replace', path:'/status', value:'complete'}`. + +The reducer's existing `applyPatch` handles all of these — **no reducer change**. + +### C. Projection (libs/ag-ui `to-agent.ts`) + +`subagentFor` maps the new arrays, with explicit back-compat: + +```ts +messages: computed(() => { + const c = entry.content(); + if (Array.isArray(c['messages'])) { + return (c['messages'] as RawMsg[]).map((m) => ({ + id: m.id, role: m.role, content: m.content ?? '', + ...(m.toolCallIds ? { toolCallIds: m.toolCallIds } : {}), + ...(m.reasoning ? { reasoning: m.reasoning } : {}), + })); + } + // Back-compat: single accumulating text (current shipped emitter). + return [{ id, role: 'assistant', content: String(c['text'] ?? '') }]; +}), +toolCalls: computed(() => { + const c = entry.content(); + return Array.isArray(c['toolCalls']) ? (c['toolCalls'] as ToolCall[]) : []; +}), +``` + +The stable per-subagent wrapper (keyed by messageId) and the prune loop are unchanged. + +### D. Card rendering (libs/chat `chat-subagent-card`, hybrid) + +Replace the single "latest message" block with an ordered transcript: + +```html +@for (m of subagent().messages(); track m.id) { +
+ @if (m.reasoning) {
{{ m.reasoning }}
} + @if (textOf(m); as t) { } + @for (tc of toolCallsFor(m); track tc.id) { + + } +
+} +``` + +- `toolCallsFor(m)` resolves `m.toolCallIds` against `subagent().toolCalls()`. +- Reuse the existing **`ChatToolCallCardComponent`** (per-call card; result included) — the "hard part" of tool rendering — without the `Agent`-coupled `chat-tool-calls` wrapper. +- The in-progress (last) message's `content` updates in place; `track m.id` keeps the DOM stable (no `@for` re-creation). +- Compact, nested styling distinct from the main thread. + +### E. L3 emission (examples/ag-ui graph) + +Extend `SubagentStreamHandler` + `activity_transform.py`: +- on assistant token → `add` a message (first token) then `replace` its content (text_so_far); +- on tool call → `add` to `toolCalls` + `replace` the assistant message's `toolCallIds`; +- on tool result → `replace` that tool call's `result`/`status`. + +Transport unchanged from F5 (`adispatch_custom_event` → `subagent_activity` CUSTOM → `ActivityEmittingAgent._dispatch_event` → ACTIVITY_DELTA). `activity_transform` gains small pure patch-builders for messages/toolcalls. The research subagraph is already LLM-driven (reasoning + a search tool + a summary), so it produces a genuine multi-message transcript for the demo. + +The cockpit `ag-ui/subagents` capability stays on the single-text path (exercises the back-compat branch) — not migrated here. + +## Data flow + +``` +research subgraph stream + → SubagentStreamHandler (delineates messages/tool calls) + → adispatch_custom_event('subagent_activity', {phase, ...}) + → ActivityEmittingAgent._dispatch_event → activity_transform → ACTIVITY_SNAPSHOT/DELTA + → L1 reducer applyPatch (generic, unchanged) → activities store + → toAgent subagentFor → Subagent{ messages[], toolCalls[] } + → chat-subagents → chat-subagent-card → ordered transcript + reused tool-call cards +``` + +## Error handling + +- Malformed/partial wire entries: the projection defensively defaults (`content ?? ''`, non-array → `[]`); a message missing an `id` falls back to its index for `track`. +- A tool result arriving before its call (out-of-order): `toolCallsFor` simply finds nothing yet; the card renders the call once it appears (id lookup, never positional). +- Back-compat: emitters that still send `text` render as a single assistant message; emitters sending `messages` render the full transcript. Both paths are unit-tested. + +## Testing + +- **Unit (libs/ag-ui):** projection maps `content.messages`→`Message[]` and `content.toolCalls`→`ToolCall[]`; the `text` back-compat fallback; stable wrapper identity + content liveness across DELTAs. +- **Unit (libs/chat):** `chat-subagent-card` renders an ordered transcript (≥2 messages), reasoning, and a reused tool-call card; updates live when the last message's content changes; `track m.id` stability. +- **Unit (python):** `activity_transform` message/toolcall patch-builders produce the expected JSON-Patch ops for each phase. +- **e2e (examples/ag-ui):** during a research run, a subagent card surfaces ≥2 messages and a tool-call card — durable-signal assertions (F5 e2e precedent; robust under aimock replay). +- **Gates:** `ag-ui` + `chat` lint/test; examples/ag-ui e2e; cockpit `ag-ui/subagents` e2e still green (back-compat); Railway deploy regen if any `cockpit/ag-ui/*` source changes; api-docs regen (the `Subagent.toolCalls` addition is public API). + +## Scope guardrails (YAGNI) + +- No neutral-`Message` change; the L1 reducer is untouched. +- `role:'tool'` result messages are not separately rendered (the tool-call card already shows the result). +- The cockpit `ag-ui/subagents` capability is not migrated (it validates the back-compat branch). +- No new chat-tool-calls decoupling refactor — reuse the per-call card directly. + +## Public API delta + +- `Subagent.toolCalls: Signal` (additive). +- No other public surface changes. From 6cf16900123370c1fab4a2013ac7e7b6f14a836f Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 13:42:15 -0700 Subject: [PATCH 2/9] docs(plan): multi-message subagent cards implementation plan Co-Authored-By: Claude Fable 5 --- ...6-18-ag-ui-multi-message-subagent-cards.md | 438 ++++++++++++++++++ 1 file changed, 438 insertions(+) create mode 100644 docs/superpowers/plans/2026-06-18-ag-ui-multi-message-subagent-cards.md diff --git a/docs/superpowers/plans/2026-06-18-ag-ui-multi-message-subagent-cards.md b/docs/superpowers/plans/2026-06-18-ag-ui-multi-message-subagent-cards.md new file mode 100644 index 00000000..c4040393 --- /dev/null +++ b/docs/superpowers/plans/2026-06-18-ag-ui-multi-message-subagent-cards.md @@ -0,0 +1,438 @@ +# Multi-message Subagent Cards (AG-UI) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Render each AG-UI subagent's full live transcript (multiple assistant text turns, reasoning, and tool calls with results) in `chat-subagent-card`, instead of a single accumulating text blob. + +**Architecture:** One ACTIVITY per subagent carries the transcript as `messages[]` + `toolCalls[]` in its `content`, streamed via JSON-Patch DELTAs (the L1 reducer is untouched — it already applies arbitrary ACTIVITY patches). `to-agent.ts` projects those arrays onto the neutral `Subagent` contract; the card renders an ordered transcript, delegating tool-call rendering to the existing `ChatToolCallCardComponent`. The example graph's `SubagentStreamHandler` emits the per-message/per-tool DELTAs. + +**Tech Stack:** Angular signals, Vitest, AG-UI ACTIVITY events (`@ag-ui/core`), Python (LangChain `AsyncCallbackHandler` + `adispatch_custom_event`), Nx, Playwright. + +**Branch:** `feat/ag-ui-multi-message-subagent-cards` (spec already committed there). + +**Spec:** `docs/superpowers/specs/2026-06-18-ag-ui-multi-message-subagent-cards-design.md` + +--- + +## File Structure + +- **Modify** `libs/chat/src/lib/agent/subagent.ts` — add optional `toolCalls?: Signal` to `Subagent`. +- **Modify** `libs/ag-ui/src/lib/to-agent.ts` — project `content.messages`→`Message[]` and `content.toolCalls`→`ToolCall[]`; keep `text` back-compat. +- **Modify** `libs/ag-ui/src/lib/to-agent.spec.ts` (or the nearest existing ag-ui projection spec) — projection tests. +- **Modify** `libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.ts` (+ its `.spec.ts`) — hybrid transcript renderer. +- **Modify** `examples/ag-ui/python/src/streaming/subagent_stream_handler.py` — stateful multi-message/tool emission. +- **Modify** `examples/ag-ui/python/src/streaming/activity_transform.py` (+ its test) — JSON-Patch builders for the new phases. +- **Modify** `examples/ag-ui/angular/e2e/` — add a multi-message subagent e2e spec. + +`Message` and `ToolCall` (libs/chat) are unchanged. The L1 reducer (`libs/ag-ui/src/lib/reducer.ts`) is unchanged. + +--- + +### Task 1: `Subagent.toolCalls` contract (additive, non-breaking) + +**Files:** +- Modify: `libs/chat/src/lib/agent/subagent.ts` + +- [ ] **Step 1: Add the optional field** + +In `libs/chat/src/lib/agent/subagent.ts`, add `toolCalls` after `messages`. Import `ToolCall`: + +```ts +// SPDX-License-Identifier: MIT +import type { Signal } from '@angular/core'; +import type { Message } from './message'; +import type { ToolCall } from './tool-call'; + +export type SubagentStatus = 'pending' | 'running' | 'complete' | 'error'; + +export interface Subagent { + /** Tool call ID that spawned this subagent. */ + toolCallId: string; + /** Optional human-readable name. */ + name?: string; + status: Signal; + messages: Signal; + /** + * The subagent's own tool calls (name/args/result), referenced by + * `Message.toolCallIds` in `messages`. Optional: adapters that don't surface + * subagent tool calls omit it; consumers default to `[]`. + */ + toolCalls?: Signal; + state: Signal>; +} +``` + +- [ ] **Step 2: Verify the libs still build (optional ⇒ no ripple)** + +Run: `npx nx run-many -t lint test --projects=chat,ag-ui,langgraph --skip-nx-cache` +Expected: PASS. Because `toolCalls` is optional, no existing `Subagent` constructor (only `to-agent.ts` + test fakes) needs changes yet. (Ignore harmless "pyenv: cannot rehash" stderr.) + +- [ ] **Step 3: Commit** + +```bash +git add libs/chat/src/lib/agent/subagent.ts +git commit -m "feat(chat): add optional Subagent.toolCalls to the neutral contract" +``` + +--- + +### Task 2: Projection — `to-agent.ts` maps the transcript arrays + +**Files:** +- Modify: `libs/ag-ui/src/lib/to-agent.ts` +- Test: `libs/ag-ui/src/lib/to-agent.spec.ts` + +Context: `subagentFor(id, entry)` (currently ~lines 220-235) builds a stable `Subagent` wrapper from `entry.content()` (a `WritableSignal>`). Today `messages` is synthesized from `content.text`. We map `content.messages`/`content.toolCalls` with a `text` fallback. + +- [ ] **Step 1: Write the failing tests** + +In `libs/ag-ui/src/lib/to-agent.spec.ts`, find how existing subagent-projection tests drive activities (they emit `ACTIVITY_SNAPSHOT`/`ACTIVITY_DELTA` through the reducer, then read `agent.subagents()`). Add a describe block. If the file lacks a helper to push activity events, mirror the existing subagent test's setup. The behavioral assertions: + +```ts +it('projects content.messages into Subagent.messages', () => { + // Arrange: drive an ACTIVITY_SNAPSHOT for activityType 'subagent' whose + // content.messages = [{id:'m1',role:'assistant',content:'hi',toolCallIds:['t1'],reasoning:'think'}] + // (use the same mechanism the existing 'subagent' projection test uses) + const sa = agent.subagents!().get('sub-1')!; + expect(sa.messages().map(m => ({ id: m.id, role: m.role, content: m.content, toolCallIds: m.toolCallIds, reasoning: m.reasoning }))) + .toEqual([{ id: 'm1', role: 'assistant', content: 'hi', toolCallIds: ['t1'], reasoning: 'think' }]); +}); + +it('projects content.toolCalls into Subagent.toolCalls', () => { + // content.toolCalls = [{id:'t1',name:'search',args:{q:'x'},status:'complete',result:{n:1}}] + const sa = agent.subagents!().get('sub-1')!; + expect(sa.toolCalls!()).toEqual([{ id: 't1', name: 'search', args: { q: 'x' }, status: 'complete', result: { n: 1 } }]); +}); + +it('falls back to a single message when only content.text is present (back-compat)', () => { + // content = { toolCallId:'sub-1', status:'running', text:'partial answer' } (no messages/toolCalls) + const sa = agent.subagents!().get('sub-1')!; + expect(sa.messages()).toEqual([{ id: 'sub-1', role: 'assistant', content: 'partial answer' }]); + expect(sa.toolCalls!()).toEqual([]); +}); +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `npx nx test ag-ui --skip-nx-cache -- to-agent` +Expected: FAIL — `messages` still synthesized from `text`; `toolCalls` undefined. + +- [ ] **Step 3: Implement the projection** + +In `libs/ag-ui/src/lib/to-agent.ts`, add a `ToolCall` import (from `@threadplane/chat` — match how `Subagent`/`Message` are imported at the top of the file) and replace the `subagentFor` wrapper body's `messages` computed + add a `toolCalls` computed: + +```ts +function subagentFor(id: string, entry: ActivityEntry): Subagent { + let w = subagentWrappers.get(id); + if (!w) { + w = { + toolCallId: (entry.content()['toolCallId'] as string) ?? id, + name: entry.content()['name'] as string | undefined, + status: computed(() => (entry.content()['status'] as SubagentStatus) ?? 'running'), + messages: computed(() => { + const c = entry.content(); + const raw = c['messages']; + if (Array.isArray(raw)) { + return (raw as Array>).map((m, i) => ({ + id: (m['id'] as string) ?? `${id}-${i}`, + role: (m['role'] as Message['role']) ?? 'assistant', + content: typeof m['content'] === 'string' ? (m['content'] as string) : (m['content'] as Message['content']) ?? '', + ...(Array.isArray(m['toolCallIds']) ? { toolCallIds: m['toolCallIds'] as string[] } : {}), + ...(typeof m['reasoning'] === 'string' ? { reasoning: m['reasoning'] as string } : {}), + })); + } + // Back-compat: single accumulating text blob (current shipped emitter). + return [{ id, role: 'assistant', content: String(c['text'] ?? '') }]; + }), + toolCalls: computed(() => { + const raw = entry.content()['toolCalls']; + return Array.isArray(raw) ? (raw as ToolCall[]) : []; + }), + state: computed(() => (entry.content()['state'] as Record) ?? {}), + }; + subagentWrappers.set(id, w); + } + return w; +} +``` + +- [ ] **Step 4: Run to verify pass** + +Run: `npx nx test ag-ui --skip-nx-cache -- to-agent` +Expected: PASS (new + existing ag-ui projection tests). + +- [ ] **Step 5: Commit** + +```bash +git add libs/ag-ui/src/lib/to-agent.ts libs/ag-ui/src/lib/to-agent.spec.ts +git commit -m "feat(ag-ui): project subagent transcript (messages[] + toolCalls[]) with text back-compat" +``` + +--- + +### Task 3: Card — hybrid transcript renderer + +**Files:** +- Modify: `libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.ts` +- Test: `libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.spec.ts` + +Context: `ChatToolCallCardComponent` takes `[toolCall]: ToolCallInfo` where `ToolCallInfo = {id, name, args, result?, status?}`. The card maps each `ToolCall` → `ToolCallInfo` and renders one card per call. + +- [ ] **Step 1: Write the failing test** + +In the spec, render `ChatSubagentCardComponent` with a fake `Subagent` whose `messages()` has two messages (one with `toolCallIds:['t1']` + `reasoning`) and `toolCalls()` has `t1`. Assert: + +```ts +it('renders an ordered transcript with reasoning and a delegated tool-call card', () => { + const sa: Subagent = { + toolCallId: 'sub-1', + status: signal('running'), + messages: signal([ + { id: 'm1', role: 'assistant', content: 'searching', reasoning: 'plan', toolCallIds: ['t1'] }, + { id: 'm2', role: 'assistant', content: 'done' }, + ]), + toolCalls: signal([{ id: 't1', name: 'search', args: { q: 'x' }, status: 'complete', result: { n: 1 } }]), + state: signal({}), + }; + // mount with [subagent]="sa" + const host = fixture.nativeElement as HTMLElement; + expect(host.querySelectorAll('.sac__msg').length).toBe(2); + expect(host.textContent).toContain('searching'); + expect(host.textContent).toContain('done'); + expect(host.textContent).toContain('plan'); // reasoning + expect(host.querySelectorAll('chat-tool-call-card').length).toBe(1); // delegated render +}); +``` + +(Match the existing spec's TestBed setup — it already imports `ChatSubagentCardComponent`. Add `ChatToolCallCardComponent` to the test module imports if the harness needs it.) + +- [ ] **Step 2: Run to verify failure** + +Run: `npx nx test chat --skip-nx-cache -- chat-subagent-card` +Expected: FAIL — card still renders only the "Latest message" `
`; no `.sac__msg` / `chat-tool-call-card`.
+
+- [ ] **Step 3: Implement the hybrid renderer**
+
+Replace the template body (the `sac__count` + "Latest message" block) and the class internals:
+
+```ts
+import { Component, ChangeDetectionStrategy, input, computed } from '@angular/core';
+import { ChatToolCallCardComponent, type ToolCallInfo } from '../chat-tool-call-card/chat-tool-call-card.component';
+import { MarkdownComponent } from '../../markdown/markdown.component'; // match the existing markdown renderer import used elsewhere in chat
+import type { Subagent, SubagentStatus } from '../../agent/subagent';
+import type { Message, ToolCall } from '../../agent';
+// ...existing ChatTrace import + statusColor/statusToTraceState helpers stay...
+
+@Component({
+  selector: 'chat-subagent-card',
+  standalone: true,
+  changeDetection: ChangeDetectionStrategy.OnPush,
+  imports: [/* ChatTrace, */ ChatToolCallCardComponent, MarkdownComponent],
+  styles: [/* keep existing styles; add .sac__msg / .sac__reasoning compact styles */],
+  template: `
+    
+      
+        {{ subagent().name ?? 'Subagent' }}
+        {{ subagent().toolCallId }}
+        {{ subagent().status() }}
+      
+      
{{ subagent().messages().length }} message(s)
+ @for (m of subagent().messages(); track m.id) { +
+ @if (m.reasoning) {
{{ m.reasoning }}
} + @if (textOf(m); as t) { } + @for (tc of toolCallsFor(m); track tc.id) { + + } +
+ } +
+ `, +}) +export class ChatSubagentCardComponent { + readonly subagent = input.required(); + readonly state = computed(() => statusToTraceState(this.subagent().status())); + + protected textOf(m: Message): string { + const c = m.content; + return typeof c === 'string' ? c : ''; + } + protected toolCallsFor(m: Message): ToolCall[] { + const ids = m.toolCallIds ?? []; + if (ids.length === 0) return []; + const all = this.subagent().toolCalls?.() ?? []; + return ids.map((id) => all.find((tc) => tc.id === id)).filter((tc): tc is ToolCall => !!tc); + } + protected toToolCallInfo(tc: ToolCall): ToolCallInfo { + return { id: tc.id, name: tc.name, args: tc.args, result: tc.result, status: tc.status }; + } +} +``` + +Notes for the implementer: verify the exact markdown component selector/inputs the chat lib uses elsewhere (e.g. `grep -rn "chat-markdown\|MarkdownComponent" libs/chat/src/lib | head`) and match it; if the lib renders markdown via a different selector/input, use that. Keep the existing `statusColor`/`statusToTraceState`/`ChatTrace` imports. + +- [ ] **Step 4: Run to verify pass** + +Run: `npx nx test chat --skip-nx-cache -- chat-subagent-card` +Expected: PASS. + +- [ ] **Step 5: Build the chat lib (catch template/import errors)** + +Run: `npx nx build chat --skip-nx-cache` +Expected: success. + +- [ ] **Step 6: Commit** + +```bash +git add libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.ts libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.spec.ts +git commit -m "feat(chat): chat-subagent-card renders full transcript (hybrid, reuses tool-call card)" +``` + +--- + +### Task 4: Spike — de-risk the Python callback emission + +**Files:** (temporary instrumentation only — reverted at task end) +- Modify (temp): `examples/ag-ui/python/src/streaming/subagent_stream_handler.py` + +The `SubagentStreamHandler` (an `AsyncCallbackHandler`) currently uses only `on_llm_new_token`. Multi-message emission needs to delineate (a) distinct assistant turns, (b) tool calls (id/name/args), (c) tool results. This task determines, against the REAL research subgraph, which callbacks fire and what payloads they carry — before committing to the emission code in Task 5. + +- [ ] **Step 1: Instrument and log** + +Temporarily add logging to the handler for `on_llm_start`, `on_llm_new_token`, `on_llm_end`, `on_tool_start`, `on_tool_end` — printing the run_id and the relevant payload fields. For `on_llm_end`, inspect `response.generations[0][0].message.tool_calls` (the AIMessage tool calls: `{id, name, args}`). For `on_tool_end`, inspect `output`/`outputs`. + +- [ ] **Step 2: Run a real research turn and capture** + +Use the existing local serve path (the examples/ag-ui dev harness with OPENAI_API_KEY, or the aimock fixture harness) to drive a prompt that triggers the research subagent, and capture the handler log. The existing F5 e2e / live-smoke notes in memory describe how to serve examples/ag-ui locally. + +- [ ] **Step 3: Document the validated mechanism** + +Write the findings into the plan-adjacent spike note (append to the spec's "L3 emission" section or a short comment block in `subagent_stream_handler.py`): which callback opens a new message, where tool-call id/name/args are reliably available, and where the tool result is available. Confirm whether `on_llm_start`/`on_llm_end` fire once per assistant turn for this subgraph. + +- [ ] **Step 4: Revert the temporary logging** + +Remove the print instrumentation. No commit for this task (it's a spike) unless the documented note lands in the spec — in which case commit only the doc note. + +**If the spike shows the callbacks do NOT cleanly delineate messages/tool calls** (e.g. tool calls aren't available in `on_llm_end` for this subgraph): STOP and report — Task 5's emission strategy must be revised (e.g. tap the subgraph's message stream instead of callbacks). Do not force the callback approach. + +--- + +### Task 5: Python emission — multi-message handler + transform builders + +**Files:** +- Modify: `examples/ag-ui/python/src/streaming/activity_transform.py` (+ its test, e.g. `examples/ag-ui/python/tests/test_activity_transform.py` — match the existing test location) +- Modify: `examples/ag-ui/python/src/streaming/subagent_stream_handler.py` + +Implement using the mechanism validated in Task 4. The transform stays a pure, stateless 1:1 mapper; the handler is the stateful accumulator (it already buffers text). + +- [ ] **Step 1: Write the failing transform tests** + +In the activity_transform test file, add cases for the new phases (the existing tests cover `started`/`message`/`finished`). The transform maps a `subagent_activity` CUSTOM value `{subagent_id, phase, ...}` → an `ActivitySnapshotEvent`/`ActivityDeltaEvent`. New phases and expected patches: + +```python +def test_message_start_adds_message(): + ev = subagent_custom_to_activity(custom('subagent_activity', { + 'subagent_id': 's1', 'phase': 'message_start', 'message_id': 'm1', 'index': 0})) + assert ev.type == EventType.ACTIVITY_DELTA + assert {'op': 'add', 'path': '/messages/-', 'value': {'id': 'm1', 'role': 'assistant', 'content': '', 'toolCallIds': []}} in ev.patch + +def test_message_streams_content_at_index(): + ev = subagent_custom_to_activity(custom('subagent_activity', { + 'subagent_id': 's1', 'phase': 'message', 'index': 0, 'text': 'hello'})) + assert ev.patch == [{'op': 'replace', 'path': '/messages/0/content', 'value': 'hello'}] + +def test_tool_call_appends_and_links(): + ev = subagent_custom_to_activity(custom('subagent_activity', { + 'subagent_id': 's1', 'phase': 'tool_call', 'index': 0, + 'tool_call_id': 't1', 'name': 'search', 'args': {'q': 'x'}, 'tool_call_ids': ['t1']})) + assert {'op': 'add', 'path': '/toolCalls/-', 'value': {'id': 't1', 'name': 'search', 'args': {'q': 'x'}, 'status': 'running'}} in ev.patch + assert {'op': 'replace', 'path': '/messages/0/toolCallIds', 'value': ['t1']} in ev.patch + +def test_tool_result_updates_call(): + ev = subagent_custom_to_activity(custom('subagent_activity', { + 'subagent_id': 's1', 'phase': 'tool_result', 'tool_index': 0, 'result': {'n': 1}, 'status': 'complete'})) + assert {'op': 'replace', 'path': '/toolCalls/0/status', 'value': 'complete'} in ev.patch + assert {'op': 'replace', 'path': '/toolCalls/0/result', 'value': {'n': 1}} in ev.patch +``` + +Also update the `started` snapshot test to expect the new empty arrays: `content={'toolCallId': sid, 'name': ..., 'status': 'running', 'messages': [], 'toolCalls': []}`. + +- [ ] **Step 2: Run to verify failure** + +Run: `cd examples/ag-ui/python && uv run pytest tests/test_activity_transform.py -q` +Expected: FAIL on the new phase cases. + +- [ ] **Step 3: Implement the transform builders** + +In `activity_transform.py`, change the `started` snapshot to seed `messages: [], toolCalls: []` (drop the old `text: ''`), and add `phase` handlers returning the patches asserted above (`message_start`, `message`, `tool_call`, `tool_result`; keep `finished` → replace `/status`). Keep the function pure (build patches purely from the event fields — `index`, `tool_index`, ids — so no transform-side state). + +- [ ] **Step 4: Run to verify pass** + +Run: `cd examples/ag-ui/python && uv run pytest tests/test_activity_transform.py -q` +Expected: PASS. + +- [ ] **Step 5: Implement the stateful handler** + +Rewrite `subagent_stream_handler.py` per the Task-4-validated mechanism: track a current message index + per-message buffer + tool-call ids; emit `message_start` on a new assistant turn, `message` (text_so_far for the current index) on each token, `tool_call` when a tool call is observed, `tool_result` when its result arrives. Each emit is `adispatch_custom_event('subagent_activity', {subagent_id, phase, ...})`, best-effort (the existing try/except for missing run context stays). + +- [ ] **Step 6: Verify the python package imports + unit tests pass** + +Run: `cd examples/ag-ui/python && uv run pytest -q` +Expected: PASS. + +- [ ] **Step 7: Commit** + +```bash +git add examples/ag-ui/python/src/streaming/activity_transform.py examples/ag-ui/python/src/streaming/subagent_stream_handler.py examples/ag-ui/python/tests/test_activity_transform.py +git commit -m "feat(examples/ag-ui): emit multi-message subagent transcript (handler + transform)" +``` + +--- + +### Task 6: e2e + full gates + +**Files:** +- Create/Modify: an `examples/ag-ui/angular/e2e/*.spec.ts` (extend the existing subagent e2e) + +- [ ] **Step 1: Add the multi-message e2e** + +Extend the existing examples/ag-ui subagent e2e (the F5 spec) so that during a research run it asserts the subagent card surfaces **≥2** transcript messages AND at least one `chat-tool-call-card`. Use durable-signal assertions (poll/`expect`-based, robust under aimock replay) exactly as the F5 e2e does. Reuse the F5 e2e's harness/fixtures; if a new aimock fixture is needed for a multi-step research transcript, add it alongside the existing ones. + +- [ ] **Step 2: Run the e2e** + +Run: `npx nx e2e --skip-nx-cache` (find the exact project name in `examples/ag-ui/angular/project.json`). First free any stale ports if the harness uses fixed ones. +Expected: the multi-message + tool-call assertions pass. + +- [ ] **Step 3: Full gates** + +```bash +npx nx run-many -t lint test --projects=chat,ag-ui --skip-nx-cache +npx nx build chat --skip-nx-cache +cd examples/ag-ui/python && uv run pytest -q && cd - +npm run generate-api-docs # Subagent.toolCalls is public API +``` +- Confirm `git diff apps/website/content/docs/*/api/api-docs.json` reflects the `toolCalls` addition; `git diff | grep -i copilotkit` MUST be empty. +- Confirm the cockpit `ag-ui/subagents` capability is unaffected (it uses the single-text/back-compat path) — run its e2e if changed: `npx nx e2e cockpit-ag-ui-subagents-angular --skip-nx-cache`. +- If any `cockpit/ag-ui/*` python/graph source changed (it should NOT in this plan), regenerate the Railway deploy bundle: `npx tsx scripts/generate-ag-ui-deployment-config.ts` and commit `deployments/ag-ui-dev/`. (This plan does not touch cockpit, so expect no regen.) + +- [ ] **Step 4: Commit + push + PR** + +```bash +git add examples/ag-ui/angular/e2e apps/website/content/docs +git commit -m "test(examples/ag-ui): e2e for multi-message subagent card; regen api-docs" +git push -u origin feat/ag-ui-multi-message-subagent-cards +gh pr create --title "feat(ag-ui): multi-message subagent cards (full transcript)" --body "..." +``` + +PR body: summarize the transcript wire shape, the additive `Subagent.toolCalls`, the back-compat projection, and the hybrid card; link the spec. No external-repo references. + +--- + +## Notes for the implementer + +- The L1 reducer (`libs/ag-ui/src/lib/reducer.ts`) is **not** modified — it already applies arbitrary ACTIVITY JSON-Patches; the new `/messages/*` and `/toolCalls/*` paths flow through unchanged. +- `Subagent.toolCalls` is **optional** — every consumer must default (`subagent().toolCalls?.() ?? []`). This keeps the change non-breaking (only `to-agent.ts` and test fakes construct a neutral `Subagent`; the langgraph adapter uses its own `SubagentStreamRef`). +- Keep `role:'tool'` result messages out of the card render path — the tool-call card already shows the result. +- Do NOT migrate the cockpit `ag-ui/subagents` capability — it deliberately exercises the back-compat (`text`) branch. +- Any new `*.spec.ts` placed under an app's `src/` must `import { describe, it, expect, ... } from 'vitest'` (tsconfig type-checks specs). +- Persist NO external-repo names anywhere (spec/docs/code/comments/PR). From 1e19e39a54cd705af0d6aaac74085e34a135e23c Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 13:59:26 -0700 Subject: [PATCH 3/9] feat(chat): add optional Subagent.toolCalls to the neutral contract --- libs/chat/src/lib/agent/subagent.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/libs/chat/src/lib/agent/subagent.ts b/libs/chat/src/lib/agent/subagent.ts index 49852fc7..2ebccb66 100644 --- a/libs/chat/src/lib/agent/subagent.ts +++ b/libs/chat/src/lib/agent/subagent.ts @@ -1,6 +1,7 @@ // SPDX-License-Identifier: MIT import type { Signal } from '@angular/core'; import type { Message } from './message'; +import type { ToolCall } from './tool-call'; export type SubagentStatus = 'pending' | 'running' | 'complete' | 'error'; @@ -11,5 +12,11 @@ export interface Subagent { name?: string; status: Signal; messages: Signal; + /** + * The subagent's own tool calls (name/args/result), referenced by + * `Message.toolCallIds` in `messages`. Optional: adapters that don't surface + * subagent tool calls omit it; consumers default to `[]`. + */ + toolCalls?: Signal; state: Signal>; } From 93a40fac60d307328ad9a801b60eaa1b01e44337 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 14:03:18 -0700 Subject: [PATCH 4/9] feat(ag-ui): project subagent transcript (messages[] + toolCalls[]) with text back-compat Co-Authored-By: Claude Sonnet 4.6 --- libs/ag-ui/src/lib/to-agent.spec.ts | 54 +++++++++++++++++++++++++++++ libs/ag-ui/src/lib/to-agent.ts | 21 +++++++++-- 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/libs/ag-ui/src/lib/to-agent.spec.ts b/libs/ag-ui/src/lib/to-agent.spec.ts index ef2bd3c7..85c4c07c 100644 --- a/libs/ag-ui/src/lib/to-agent.spec.ts +++ b/libs/ag-ui/src/lib/to-agent.spec.ts @@ -551,3 +551,57 @@ describe('subagents projection (F5)', () => { expect(agent.subagents!().get('tc-1')?.messages()[0].content).toBe(''); }); }); + +describe('subagents transcript projection (F5-transcript)', () => { + function snapshotWithContent(id: string, content: Record) { + return { + type: 'ACTIVITY_SNAPSHOT', + messageId: id, + activityType: 'subagent', + content: { toolCallId: id, ...content }, + replace: true, + }; + } + + it('projects content.messages[] to subagent.messages()', () => { + const source = new StubAgent(); + const agent = toAgent(source as never); + source.emit(snapshotWithContent('tc-1', { + status: 'running', + messages: [ + { id: 'm1', role: 'assistant', content: 'hi', toolCallIds: ['t1'], reasoning: 'think' }, + ], + }) as never); + const sa = agent.subagents!().get('tc-1'); + expect(sa?.messages()).toEqual([ + { id: 'm1', role: 'assistant', content: 'hi', toolCallIds: ['t1'], reasoning: 'think' }, + ]); + }); + + it('projects content.toolCalls[] to subagent.toolCalls!()', () => { + const source = new StubAgent(); + const agent = toAgent(source as never); + source.emit(snapshotWithContent('tc-1', { + status: 'running', + toolCalls: [ + { id: 't1', name: 'search', args: { q: 'x' }, status: 'complete', result: { n: 1 } }, + ], + }) as never); + const sa = agent.subagents!().get('tc-1'); + expect(sa?.toolCalls!()).toEqual([ + { id: 't1', name: 'search', args: { q: 'x' }, status: 'complete', result: { n: 1 } }, + ]); + }); + + it('falls back to text when content has no messages/toolCalls (back-compat)', () => { + const source = new StubAgent(); + const agent = toAgent(source as never); + source.emit(snapshotWithContent('sub-1', { + status: 'running', + text: 'partial', + }) as never); + const sa = agent.subagents!().get('sub-1'); + expect(sa?.messages()).toEqual([{ id: 'sub-1', role: 'assistant', content: 'partial' }]); + expect(sa?.toolCalls!()).toEqual([]); + }); +}); diff --git a/libs/ag-ui/src/lib/to-agent.ts b/libs/ag-ui/src/lib/to-agent.ts index 6d03fe56..488fa356 100644 --- a/libs/ag-ui/src/lib/to-agent.ts +++ b/libs/ag-ui/src/lib/to-agent.ts @@ -224,9 +224,24 @@ export function toAgent(source: AbstractAgent, options: ToAgentOptions = {}): Ag toolCallId: (entry.content()['toolCallId'] as string) ?? id, name: entry.content()['name'] as string | undefined, status: computed(() => (entry.content()['status'] as SubagentStatus) ?? 'running'), - messages: computed(() => [ - { id, role: 'assistant', content: String(entry.content()['text'] ?? '') }, - ]), + messages: computed(() => { + const c = entry.content(); + const raw = c['messages']; + if (Array.isArray(raw)) { + return (raw as Array>).map((m, i) => ({ + id: (m['id'] as string) ?? `${id}-${i}`, + role: (m['role'] as Message['role']) ?? 'assistant', + content: typeof m['content'] === 'string' ? (m['content'] as string) : (m['content'] as Message['content']) ?? '', + ...(Array.isArray(m['toolCallIds']) ? { toolCallIds: m['toolCallIds'] as string[] } : {}), + ...(typeof m['reasoning'] === 'string' ? { reasoning: m['reasoning'] as string } : {}), + })); + } + return [{ id, role: 'assistant', content: String(c['text'] ?? '') }]; + }), + toolCalls: computed(() => { + const raw = entry.content()['toolCalls']; + return Array.isArray(raw) ? (raw as ToolCall[]) : []; + }), state: computed(() => (entry.content()['state'] as Record) ?? {}), }; subagentWrappers.set(id, w); From c16314d0ce0e0c53f874378ac77c2a1c44320558 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 14:11:22 -0700 Subject: [PATCH 5/9] feat(chat): chat-subagent-card renders full transcript (hybrid, reuses tool-call card) Co-Authored-By: Claude Sonnet 4.6 --- .../chat-subagent-card.component.spec.ts | 53 +++++++++++++++++ .../chat-subagent-card.component.ts | 57 ++++++++++++------- 2 files changed, 91 insertions(+), 19 deletions(-) diff --git a/libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.spec.ts b/libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.spec.ts index f18058b9..288cf90f 100644 --- a/libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.spec.ts +++ b/libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.spec.ts @@ -1,12 +1,65 @@ // SPDX-License-Identifier: MIT import { describe, it, expect } from 'vitest'; +import { TestBed } from '@angular/core/testing'; +import { signal } from '@angular/core'; +import { By } from '@angular/platform-browser'; import { ChatSubagentCardComponent, statusColor } from './chat-subagent-card.component'; +import type { Subagent } from '../../agent/subagent'; describe('ChatSubagentCardComponent', () => { it('is defined', () => { expect(ChatSubagentCardComponent).toBeDefined(); expect(typeof ChatSubagentCardComponent).toBe('function'); }); + + it('renders full transcript with reasoning, text, and tool-call cards', async () => { + const fakeSubagent: Subagent = { + toolCallId: 'tc-root', + name: 'Research', + // Use 'running' so chat-trace auto-expands and the transcript DOM is rendered. + status: signal('running'), + messages: signal([ + { + id: 'm1', + role: 'assistant', + content: 'searching', + reasoning: 'plan', + toolCallIds: ['t1'], + }, + { + id: 'm2', + role: 'assistant', + content: 'done', + }, + ]), + toolCalls: signal([ + { id: 't1', name: 'search', args: { q: 'x' }, status: 'complete', result: { n: 1 } }, + ]), + state: signal({}), + }; + + TestBed.configureTestingModule({ + imports: [ChatSubagentCardComponent], + }); + + const fixture = TestBed.createComponent(ChatSubagentCardComponent); + fixture.componentRef.setInput('subagent', fakeSubagent); + fixture.detectChanges(); + await fixture.whenStable(); + fixture.detectChanges(); + + const host = fixture.nativeElement as HTMLElement; + const msgEls = fixture.debugElement.queryAll(By.css('.sac__msg')); + expect(msgEls.length).toBe(2); + + const text = host.textContent ?? ''; + expect(text).toContain('searching'); + expect(text).toContain('done'); + expect(text).toContain('plan'); + + const toolCallEls = fixture.debugElement.queryAll(By.css('chat-tool-call-card')); + expect(toolCallEls.length).toBe(1); + }); }); describe('statusColor', () => { diff --git a/libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.ts b/libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.ts index 8bc41887..6ccf3d92 100644 --- a/libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.ts +++ b/libs/chat/src/lib/compositions/chat-subagent-card/chat-subagent-card.component.ts @@ -2,8 +2,11 @@ // SPDX-License-Identifier: MIT import { Component, ChangeDetectionStrategy, input, computed } from '@angular/core'; import { ChatTraceComponent, type TraceState } from '../../primitives/chat-trace/chat-trace.component'; +import { ChatToolCallCardComponent, type ToolCallInfo } from '../chat-tool-call-card/chat-tool-call-card.component'; +import { ChatStreamingMdComponent } from '../../streaming/streaming-markdown.component'; import { CHAT_HOST_TOKENS } from '../../styles/chat-tokens'; import type { Subagent, SubagentStatus } from '../../agent/subagent'; +import type { Message, ToolCall } from '../../agent'; /** * Returns a CSS style string for a subagent's status badge. @@ -32,7 +35,7 @@ function statusToTraceState(s: SubagentStatus): TraceState { @Component({ selector: 'chat-subagent-card', standalone: true, - imports: [ChatTraceComponent], + imports: [ChatTraceComponent, ChatToolCallCardComponent, ChatStreamingMdComponent], changeDetection: ChangeDetectionStrategy.OnPush, styles: [CHAT_HOST_TOKENS, ` :host { display: block; } @@ -50,27 +53,35 @@ function statusToTraceState(s: SubagentStatus): TraceState { .sac__pill[data-status="complete"] { color: var(--ngaf-chat-success); } .sac__pill[data-status="error"] { background: var(--ngaf-chat-error-bg); color: var(--ngaf-chat-error-text); } .sac__count { font-size: var(--ngaf-chat-font-size-xs); color: var(--ngaf-chat-text-muted); } - .sac__latest-label { font-size: 11px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; color: var(--ngaf-chat-text-muted); margin: 8px 0 4px; } - .sac__latest { - font-family: var(--ngaf-chat-font-mono); + .sac__msg { padding: 6px 0; } + .sac__msg + .sac__msg { border-top: 1px solid var(--ngaf-chat-separator); } + .sac__reasoning { font-size: var(--ngaf-chat-font-size-xs); - color: var(--ngaf-chat-text); - white-space: pre-wrap; - overflow-x: auto; - margin: 0; + color: var(--ngaf-chat-text-muted); + font-style: italic; + margin-bottom: 4px; } `], template: ` - Subagent + {{ subagent().name ?? 'Subagent' }} {{ subagent().toolCallId }} {{ subagent().status() }}
{{ subagent().messages().length }} message(s)
- @if (subagent().messages().length > 0) { -

Latest message

-
{{ latestMessageContent() }}
+ @for (m of subagent().messages(); track m.id) { +
+ @if (m.reasoning) { +
{{ m.reasoning }}
+ } + @if (textOf(m); as t) { + + } + @for (tc of toolCallsFor(m); track tc.id) { + + } +
}
`, @@ -79,11 +90,19 @@ export class ChatSubagentCardComponent { readonly subagent = input.required(); readonly state = computed(() => statusToTraceState(this.subagent().status())); - readonly latestMessageContent = computed(() => { - const messages = this.subagent().messages(); - if (messages.length === 0) return ''; - const last = messages[messages.length - 1]; - const c = last.content; - return typeof c === 'string' ? c : JSON.stringify(c); - }); + protected textOf(m: Message): string { + const c = m.content; + return typeof c === 'string' ? c : ''; + } + + protected toolCallsFor(m: Message): ToolCall[] { + const ids = m.toolCallIds ?? []; + if (ids.length === 0) return []; + const all = this.subagent().toolCalls?.() ?? []; + return ids.map((id) => all.find((tc) => tc.id === id)).filter((tc): tc is ToolCall => !!tc); + } + + protected toToolCallInfo(tc: ToolCall): ToolCallInfo { + return { id: tc.id, name: tc.name, args: tc.args, result: tc.result, status: tc.status }; + } } From 191350aa77b4f7bffe0d1b4b3ff7b620c99f461a Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 14:22:52 -0700 Subject: [PATCH 6/9] feat(examples/ag-ui): transform builders for multi-message subagent transcript phases Co-Authored-By: Claude Sonnet 4.6 --- .../src/streaming/activity_transform.py | 95 ++++++++++++- .../python/tests/test_activity_transform.py | 131 +++++++++++++++++- 2 files changed, 217 insertions(+), 9 deletions(-) diff --git a/examples/ag-ui/python/src/streaming/activity_transform.py b/examples/ag-ui/python/src/streaming/activity_transform.py index dd09c01e..e992f8eb 100644 --- a/examples/ag-ui/python/src/streaming/activity_transform.py +++ b/examples/ag-ui/python/src/streaming/activity_transform.py @@ -1,9 +1,12 @@ """Maps a `subagent_activity` CUSTOM event (emitted by the research tool / SubagentStreamHandler via adispatch_custom_event) to a native AG-UI ACTIVITY event. -Pure and stateless (1:1): the handler sends accumulated `text_so_far`, so each -DELTA carries the full text via JSON-patch `replace` (JSON-patch has no string -append). Anything that is not a `subagent_activity` CUSTOM event returns None. +Pure and stateless (1:1): build patches purely from the event fields — never +track state in the transform. Anything that is not a `subagent_activity` CUSTOM +event returns None. + +Supported phases: started, message_start, message, tool_call, tool_result, +finished. Unknown phases return None. """ import json from typing import Optional @@ -38,16 +41,97 @@ def subagent_custom_to_activity(event: BaseEvent) -> Optional[BaseEvent]: type=EventType.ACTIVITY_SNAPSHOT, message_id=sid, activity_type=ACTIVITY_TYPE, - content={"toolCallId": sid, "name": value.get("name"), "status": "running", "text": ""}, + content={ + "toolCallId": sid, + "name": value.get("name"), + "status": "running", + "messages": [], + "toolCalls": [], + }, replace=True, ) + + if phase == "message_start": + message_index = value.get("message_index") + return ActivityDeltaEvent( + type=EventType.ACTIVITY_DELTA, + message_id=sid, + activity_type=ACTIVITY_TYPE, + patch=[ + { + "op": "add", + "path": "/messages/-", + "value": { + "id": f"{sid}-{message_index}", + "role": "assistant", + "content": "", + "toolCallIds": [], + }, + } + ], + ) + if phase == "message": + message_index = value.get("message_index") + return ActivityDeltaEvent( + type=EventType.ACTIVITY_DELTA, + message_id=sid, + activity_type=ACTIVITY_TYPE, + patch=[ + { + "op": "replace", + "path": f"/messages/{message_index}/content", + "value": value.get("text", ""), + } + ], + ) + + if phase == "tool_call": + message_index = value.get("message_index") + tool_call_id = value.get("tool_call_id") return ActivityDeltaEvent( type=EventType.ACTIVITY_DELTA, message_id=sid, activity_type=ACTIVITY_TYPE, - patch=[{"op": "replace", "path": "/text", "value": value.get("text", "")}], + patch=[ + { + "op": "add", + "path": "/toolCalls/-", + "value": { + "id": tool_call_id, + "name": value.get("name"), + "args": value.get("args"), + "status": "running", + }, + }, + { + "op": "add", + "path": f"/messages/{message_index}/toolCallIds/-", + "value": tool_call_id, + }, + ], ) + + if phase == "tool_result": + tool_index = value.get("tool_index") + return ActivityDeltaEvent( + type=EventType.ACTIVITY_DELTA, + message_id=sid, + activity_type=ACTIVITY_TYPE, + patch=[ + { + "op": "replace", + "path": f"/toolCalls/{tool_index}/status", + "value": value.get("status", "complete"), + }, + { + "op": "replace", + "path": f"/toolCalls/{tool_index}/result", + "value": value.get("result"), + }, + ], + ) + if phase == "finished": return ActivityDeltaEvent( type=EventType.ACTIVITY_DELTA, @@ -55,4 +139,5 @@ def subagent_custom_to_activity(event: BaseEvent) -> Optional[BaseEvent]: activity_type=ACTIVITY_TYPE, patch=[{"op": "replace", "path": "/status", "value": value.get("status", "complete")}], ) + return None diff --git a/examples/ag-ui/python/tests/test_activity_transform.py b/examples/ag-ui/python/tests/test_activity_transform.py index ea30b93f..24c4baf5 100644 --- a/examples/ag-ui/python/tests/test_activity_transform.py +++ b/examples/ag-ui/python/tests/test_activity_transform.py @@ -14,16 +14,122 @@ def test_started_maps_to_activity_snapshot(): assert ev.type == EventType.ACTIVITY_SNAPSHOT assert ev.message_id == "tc-1" assert ev.activity_type == "subagent" - assert ev.content == {"toolCallId": "tc-1", "name": "research", "status": "running", "text": ""} + assert ev.content == { + "toolCallId": "tc-1", + "name": "research", + "status": "running", + "messages": [], + "toolCalls": [], + } assert ev.replace is True -def test_message_maps_to_activity_delta_replace_text(): +def test_message_start_maps_to_activity_delta_add_message(): ev = subagent_custom_to_activity(_custom( - {"subagent_id": "tc-1", "phase": "message", "text": "Paris is"})) + {"subagent_id": "tc-1", "phase": "message_start", "message_index": 0})) assert ev.type == EventType.ACTIVITY_DELTA assert ev.message_id == "tc-1" - assert ev.patch == [{"op": "replace", "path": "/text", "value": "Paris is"}] + assert ev.patch == [ + { + "op": "add", + "path": "/messages/-", + "value": {"id": "tc-1-0", "role": "assistant", "content": "", "toolCallIds": []}, + } + ] + + +def test_message_start_uses_message_index_in_id(): + ev = subagent_custom_to_activity(_custom( + {"subagent_id": "tc-1", "phase": "message_start", "message_index": 2})) + assert ev.patch[0]["value"]["id"] == "tc-1-2" + + +def test_message_maps_to_activity_delta_replace_content(): + ev = subagent_custom_to_activity(_custom( + {"subagent_id": "tc-1", "phase": "message", "message_index": 0, "text": "Paris is"})) + assert ev.type == EventType.ACTIVITY_DELTA + assert ev.message_id == "tc-1" + assert ev.patch == [{"op": "replace", "path": "/messages/0/content", "value": "Paris is"}] + + +def test_message_uses_correct_index_in_path(): + ev = subagent_custom_to_activity(_custom( + {"subagent_id": "tc-1", "phase": "message", "message_index": 3, "text": "hello"})) + assert ev.patch == [{"op": "replace", "path": "/messages/3/content", "value": "hello"}] + + +def test_tool_call_maps_to_two_op_patch(): + ev = subagent_custom_to_activity(_custom({ + "subagent_id": "tc-1", + "phase": "tool_call", + "message_index": 0, + "tool_call_id": "call-abc", + "name": "search", + "args": {"query": "Paris"}, + })) + assert ev.type == EventType.ACTIVITY_DELTA + assert ev.message_id == "tc-1" + assert ev.patch == [ + { + "op": "add", + "path": "/toolCalls/-", + "value": {"id": "call-abc", "name": "search", "args": {"query": "Paris"}, "status": "running"}, + }, + { + "op": "add", + "path": "/messages/0/toolCallIds/-", + "value": "call-abc", + }, + ] + + +def test_tool_call_uses_message_index_in_path(): + ev = subagent_custom_to_activity(_custom({ + "subagent_id": "tc-1", + "phase": "tool_call", + "message_index": 2, + "tool_call_id": "call-xyz", + "name": "lookup", + "args": {}, + })) + assert ev.patch[1]["path"] == "/messages/2/toolCallIds/-" + + +def test_tool_result_maps_to_two_op_patch(): + ev = subagent_custom_to_activity(_custom({ + "subagent_id": "tc-1", + "phase": "tool_result", + "tool_index": 0, + "result": "Paris, France", + "status": "complete", + })) + assert ev.type == EventType.ACTIVITY_DELTA + assert ev.message_id == "tc-1" + assert ev.patch == [ + {"op": "replace", "path": "/toolCalls/0/status", "value": "complete"}, + {"op": "replace", "path": "/toolCalls/0/result", "value": "Paris, France"}, + ] + + +def test_tool_result_defaults_status_to_complete(): + ev = subagent_custom_to_activity(_custom({ + "subagent_id": "tc-1", + "phase": "tool_result", + "tool_index": 1, + "result": "some result", + })) + assert ev.patch[0] == {"op": "replace", "path": "/toolCalls/1/status", "value": "complete"} + + +def test_tool_result_uses_tool_index_in_path(): + ev = subagent_custom_to_activity(_custom({ + "subagent_id": "tc-1", + "phase": "tool_result", + "tool_index": 3, + "result": "done", + })) + assert ev.patch[0]["path"] == "/toolCalls/3/status" + assert ev.patch[1]["path"] == "/toolCalls/3/result" def test_finished_maps_to_activity_delta_replace_status(): @@ -33,6 +139,23 @@ def test_finished_maps_to_activity_delta_replace_status(): assert ev.patch == [{"op": "replace", "path": "/status", "value": "complete"}] +def test_finished_defaults_status_to_complete(): + ev = subagent_custom_to_activity(_custom( + {"subagent_id": "tc-1", "phase": "finished"})) + assert ev.patch == [{"op": "replace", "path": "/status", "value": "complete"}] + + +def test_finished_custom_status(): + ev = subagent_custom_to_activity(_custom( + {"subagent_id": "tc-1", "phase": "finished", "status": "error"})) + assert ev.patch == [{"op": "replace", "path": "/status", "value": "error"}] + + +def test_unknown_phase_returns_none(): + assert subagent_custom_to_activity(_custom( + {"subagent_id": "tc-1", "phase": "unknown_phase"})) is None + + def test_non_subagent_event_returns_none(): assert subagent_custom_to_activity( CustomEvent(type=EventType.CUSTOM, name="state_update", value={})) is None From 222f73aae268ceabbd0c809a878b65fb19c8f323 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 14:27:37 -0700 Subject: [PATCH 7/9] feat(examples/ag-ui): research subagent reason->tool->answer loop + structured transcript emission Co-Authored-By: Claude Fable 5 --- examples/ag-ui/python/src/graph.py | 192 +++++++++++++++--- .../src/streaming/subagent_stream_handler.py | 42 +++- .../python/tests/test_subagent_emission.py | 122 +++++++++++ .../tests/test_subagent_stream_handler.py | 28 ++- 4 files changed, 351 insertions(+), 33 deletions(-) create mode 100644 examples/ag-ui/python/tests/test_subagent_emission.py diff --git a/examples/ag-ui/python/src/graph.py b/examples/ag-ui/python/src/graph.py index 90717416..befc422a 100644 --- a/examples/ag-ui/python/src/graph.py +++ b/examples/ag-ui/python/src/graph.py @@ -46,7 +46,10 @@ from threadplane.middleware.langgraph import bind_client_tools, client_tool_names from src.streaming.a2ui_partial_handler import A2uiPartialHandler -from src.streaming.subagent_stream_handler import SubagentStreamHandler +from src.streaming.subagent_stream_handler import ( + SubagentStreamHandler, + SubagentRunState, +) from src.streaming.envelope_tool import render_a2ui_surface from src.streaming.envelope_normalizer import normalize_envelope_args from src.schemas.a2ui_v1 import A2UI_V1_SCHEMA_PROMPT @@ -267,34 +270,168 @@ def request_approval(reason: str) -> str: # logic) is what causes LangGraph to emit stream events under namespace # prefix `tools:` for the child run, which is what the @threadplane/langgraph # SubagentTracker keys on to populate `agent.subagents()`. +# +# The child runs a genuine reason → tool → answer loop: an `agent` node +# (LLM bound to a single offline `lookup` tool) → conditional edge → a +# `tools` node when the AIMessage carries tool_calls, else END; the tools +# node loops back to `agent`. A per-run `iterations` counter caps the loop +# so it always terminates (the agent is told to answer after one lookup). +# Each node emits the structured transcript (`message_start` / `tool_call` / +# `tool_result`) as `subagent_activity` CUSTOM events the L2 transform turns +# into AG-UI ACTIVITY DELTAs; live token text is streamed separately by the +# SubagentStreamHandler (which tags each `message` with the same +# `message_index` the subgraph opened the turn with). class ResearchState(TypedDict): messages: Annotated[list, add_messages] topic: Optional[str] + iterations: int + + +# Max reason→tool→answer round trips the subagent makes before it is forced +# to answer. One lookup then an answer is enough for a deterministic demo. +_RESEARCH_MAX_ITERATIONS = 1 + +_RESEARCH_FACTS = { + "signals": ( + "Angular signals are a fine-grained reactivity primitive: a signal " + "holds a value, computed() derives from signals, and effect() reacts " + "to changes — all without manual subscriptions or zone.js." + ), + "control flow": ( + "Angular's built-in control flow (@if, @for, @switch) replaces the " + "*ngIf / *ngFor structural directives with native template syntax that " + "is faster to compile and ships less runtime." + ), + "zoneless": ( + "Zoneless change detection lets Angular run without zone.js by " + "tracking signal reads and async state directly via " + "provideExperimentalZonelessChangeDetection." + ), +} +_RESEARCH_DEFAULT_FACT = ( + "Angular is a TypeScript-first web framework built around components, " + "dependency injection, and (increasingly) signal-based reactivity." +) -async def research_node(state: ResearchState) -> dict: - """Single-node child graph: focus on the topic, return a short brief. +@tool +def lookup(query: str) -> str: + """Look up a short, authoritative fact about the given topic. - Uses gpt-5-mini directly (the parent's model selection does not - propagate into the subagent — the subagent is a focused contractor). + Deterministic and fully offline — returns a canned fact keyed by the + query so the demo (and its replay fixtures) stay reproducible. + """ + q = (query or "").lower() + for key, fact in _RESEARCH_FACTS.items(): + if key in q: + return fact + return _RESEARCH_DEFAULT_FACT + + +def _make_research_llm(force_answer: bool): + """Build the subagent chat model for one turn. On the gathering turn it is + bound to the `lookup` tool; on the forced-answer turn it is plain so the + model can't keep calling tools. Isolated as a seam so tests can inject a + fake tool-calling model.""" + base = ChatOpenAI(model="gpt-5-mini", streaming=True) + return base if force_answer else base.bind_tools([lookup]) + + +def _build_research_subgraph(emit, run_state, llm_factory=_make_research_llm): + """Compile a research subgraph whose nodes emit the structured transcript + through the captured `emit` closure and bump the shared `run_state` + counters. Built per research-tool invocation so each run gets its own + emission wiring + message/tool indices. + + `emit(payload)` dispatches a `subagent_activity` CUSTOM event already + keyed by the parent tool_call_id. `run_state` is the SubagentRunState + the SubagentStreamHandler reads `message_index` from for live tokens. + `llm_factory(force_answer)` returns the model for a turn — overridable + in tests with a fake tool-calling chat model. """ - topic = state.get("topic") or "" - llm = ChatOpenAI(model="gpt-5-mini", streaming=True) - system = SystemMessage(content=( - "You are a focused research subagent. Given a topic, return a " - "concise factual summary (3-6 bullets). Do not ask the user " - "questions; the parent agent already gathered the topic." - )) - user = HumanMessage(content=f"Topic: {topic}") - response = await llm.ainvoke([system, user]) - return {"messages": [response]} - -_research_builder = StateGraph(ResearchState) -_research_builder.add_node("research_node", research_node) -_research_builder.set_entry_point("research_node") -_research_builder.add_edge("research_node", END) -research_subgraph = _research_builder.compile() + async def agent_node(state: ResearchState) -> dict: + topic = state.get("topic") or "" + iterations = state.get("iterations") or 0 + # Open a new assistant turn. The handler reads this index for the + # live `message` tokens it streams during this LLM call. + run_state.message_index = iterations + await emit({"phase": "message_start", "message_index": iterations}) + + force_answer = iterations >= _RESEARCH_MAX_ITERATIONS + system = SystemMessage(content=( + "You are a focused research subagent. You have a `lookup` tool " + "for retrieving authoritative facts. " + ( + "You have already gathered facts — now write the final " + "answer: a concise factual summary (3-6 bullets). Do NOT call " + "any more tools." + if force_answer else + "Call `lookup` exactly once to gather a fact about the topic, " + "then on your next turn write the final summary. Do not ask " + "the user questions; the parent agent already gathered the " + "topic." + ) + )) + messages = [system, HumanMessage(content=f"Topic: {topic}")] + # Carry prior assistant/tool messages so the model sees the lookup result. + messages += list(state.get("messages") or []) + + call_llm = llm_factory(force_answer) + response = await call_llm.ainvoke(messages) + + # Emit one tool_call event per call on the returned AIMessage. + tool_calls = getattr(response, "tool_calls", None) or [] + for tc in tool_calls: + await emit({ + "phase": "tool_call", + "message_index": iterations, + "tool_call_id": tc.get("id"), + "name": tc.get("name"), + "args": tc.get("args"), + }) + + return {"messages": [response], "iterations": iterations + 1} + + async def tools_node(state: ResearchState) -> dict: + last = state["messages"][-1] + tool_calls = getattr(last, "tool_calls", None) or [] + out = [] + for tc in tool_calls: + name = tc.get("name") + args = tc.get("args") or {} + if name == "lookup": + result = lookup.invoke(args) + else: + result = f"(unknown tool: {name})" + out.append(ToolMessage(content=result, tool_call_id=tc.get("id"))) + # tool_index is the 0-based position of this call in the run's + # toolCalls[] (same order tool_call events were emitted). + await emit({ + "phase": "tool_result", + "tool_index": run_state.tool_index, + "result": result, + "status": "complete", + }) + run_state.tool_index += 1 + return {"messages": out} + + def should_continue(state: ResearchState) -> Literal["tools", "__end__"]: + last = state["messages"][-1] + if isinstance(last, AIMessage) and last.tool_calls: + return "tools" + return "__end__" + + builder = StateGraph(ResearchState) + builder.add_node("agent", agent_node) + builder.add_node("tools", tools_node) + builder.set_entry_point("agent") + builder.add_conditional_edges( + "agent", + should_continue, + {"tools": "tools", "__end__": END}, + ) + builder.add_edge("tools", "agent") + return builder.compile() @tool @@ -313,8 +450,8 @@ async def research( Always pass a stable identifier like "research". The subagent run is also surfaced to the UI as a native AG-UI ACTIVITY - (activityType "subagent"): started → message-per-token → finished, keyed - by this tool's own call id. + (activityType "subagent"): started → reason/tool/answer transcript → + finished, keyed by this tool's own call id. """ async def _emit(payload: dict) -> None: @@ -325,10 +462,13 @@ async def _emit(payload: dict) -> None: except Exception: pass + run_state = SubagentRunState() + subgraph = _build_research_subgraph(_emit, run_state) + await _emit({"phase": "started", "name": subagent_type}) - result = await research_subgraph.ainvoke( - {"topic": topic, "messages": []}, - config={"callbacks": [SubagentStreamHandler(tool_call_id)]}, + result = await subgraph.ainvoke( + {"topic": topic, "messages": [], "iterations": 0}, + config={"callbacks": [SubagentStreamHandler(tool_call_id, run_state)]}, ) await _emit({"phase": "finished", "status": "complete"}) diff --git a/examples/ag-ui/python/src/streaming/subagent_stream_handler.py b/examples/ag-ui/python/src/streaming/subagent_stream_handler.py index 09a1623d..1a00a1cc 100644 --- a/examples/ag-ui/python/src/streaming/subagent_stream_handler.py +++ b/examples/ag-ui/python/src/streaming/subagent_stream_handler.py @@ -2,26 +2,60 @@ `message` events, keyed by the parent tool_call_id. Accumulates `text_so_far` so the L2 transform stays stateless. `started`/`finished` are emitted by the research tool body. Uses adispatch_custom_event (the bridge reads on_custom_event -from astream_events; get_stream_writer would surface only as a RAW event).""" -from typing import Any +from astream_events; get_stream_writer would surface only as a RAW event). + +Each `message` event also carries the current `message_index` — the 0-based +ordinal of the assistant turn the tokens belong to. The subgraph owns the +counter (it opens each turn with a `message_start`); the handler reads it +through a shared mutable ref (`SubagentRunState`) so the index it tags stays +in lock-step with the transcript the subgraph emits. The handler resets its +text buffer whenever the subgraph advances to a new turn so each message's +`text_so_far` starts fresh.""" +from typing import Any, Optional from uuid import UUID from langchain_core.callbacks import AsyncCallbackHandler, adispatch_custom_event +class SubagentRunState: + """Per-research-run shared state. The subgraph nodes own `message_index` + (bumping it as each assistant turn opens) and `tool_index` (the running + position in the run's toolCalls[]); the SubagentStreamHandler reads + `message_index` so its streamed `message` events tag the right turn.""" + + def __init__(self) -> None: + self.message_index: int = 0 + self.tool_index: int = 0 + + class SubagentStreamHandler(AsyncCallbackHandler): - def __init__(self, subagent_id: str) -> None: + def __init__(self, subagent_id: str, run_state: Optional[SubagentRunState] = None) -> None: self._id = subagent_id self._buffer = "" + self._run_state = run_state if run_state is not None else SubagentRunState() + # Track which turn the current buffer belongs to so we reset the + # accumulated text when the subgraph advances to a new assistant turn. + self._buffer_index = self._run_state.message_index async def on_llm_new_token(self, token: str, *, run_id: UUID | None = None, **kwargs: Any) -> None: if not token: return + index = self._run_state.message_index + if index != self._buffer_index: + # New assistant turn opened since the last token — start fresh so + # `text_so_far` is scoped to this message, not the whole run. + self._buffer = "" + self._buffer_index = index self._buffer += token try: await adispatch_custom_event( "subagent_activity", - {"subagent_id": self._id, "phase": "message", "text": self._buffer}, + { + "subagent_id": self._id, + "phase": "message", + "message_index": index, + "text": self._buffer, + }, ) except Exception: return # no ambient run context (some unit-test paths) — best-effort diff --git a/examples/ag-ui/python/tests/test_subagent_emission.py b/examples/ag-ui/python/tests/test_subagent_emission.py new file mode 100644 index 00000000..051953cf --- /dev/null +++ b/examples/ag-ui/python/tests/test_subagent_emission.py @@ -0,0 +1,122 @@ +"""In-process verification of the research subagent's reason → tool → answer +loop and its structured `subagent_activity` transcript emission. + +Runs the enriched subgraph with a FAKE tool-calling chat model (no network): +turn 0 returns an AIMessage carrying a `lookup` tool_call; turn 1 returns the +final answer. We intercept every `adispatch_custom_event` the subgraph nodes +fire and assert the ORDER + payloads of the structured phases: + + message_start(0) → tool_call(0, …) → tool_result(0, …) + → message_start(1) → finished-by-tool? no → final answer + +(Live `message` token events come from SubagentStreamHandler, which the fake +model doesn't drive — so this test asserts the node-emitted phases only, which +is exactly the transcript skeleton the L2 transform consumes.) +""" +from typing import Any + +import pytest +from langchain_core.messages import AIMessage +from langchain_core.runnables import Runnable + +import src.graph as graph_mod +from src.graph import _build_research_subgraph +from src.streaming.subagent_stream_handler import SubagentRunState + + +class _FakeToolCallingModel(Runnable): + """A tiny Runnable standing in for ChatOpenAI. First invocation returns an + AIMessage with a `lookup` tool_call; subsequent invocations return a final + text answer. `bind_tools` is a no-op (returns self) so the subgraph's + gathering-turn `.bind_tools([lookup])` works unchanged.""" + + def __init__(self) -> None: + self.calls = 0 + + def bind_tools(self, tools: Any, **kwargs: Any) -> "_FakeToolCallingModel": + return self + + def invoke(self, input: Any, config: Any = None, **kwargs: Any) -> AIMessage: + self.calls += 1 + if self.calls == 1: + return AIMessage( + content="", + tool_calls=[ + { + "id": "call_lookup_1", + "name": "lookup", + "args": {"query": "angular signals"}, + } + ], + ) + return AIMessage(content="- Signals are reactive.\n- No zone.js needed.") + + async def ainvoke(self, input: Any, config: Any = None, **kwargs: Any) -> AIMessage: + return self.invoke(input, config, **kwargs) + + +@pytest.mark.asyncio +async def test_subgraph_emits_reason_tool_answer_transcript(): + events: list[dict] = [] + + async def fake_emit(payload: dict) -> None: + events.append({"subagent_id": "tc-research", **payload}) + + fake_model = _FakeToolCallingModel() + run_state = SubagentRunState() + subgraph = _build_research_subgraph( + fake_emit, run_state, llm_factory=lambda force_answer: fake_model + ) + + result = await subgraph.ainvoke( + {"topic": "Angular signals", "messages": [], "iterations": 0} + ) + + phases = [(e["phase"], e) for e in events] + phase_names = [p for p, _ in phases] + + # Full structured phase sequence the node loop emits. + assert phase_names == [ + "message_start", # turn 0 opens + "tool_call", # turn 0 calls lookup + "tool_result", # tool node runs lookup + "message_start", # turn 1 opens (forced-answer turn) + ], phase_names + + by_phase = {p: e for p, e in phases} + + # message_start indices: 0 then 1. + starts = [e["message_index"] for p, e in phases if p == "message_start"] + assert starts == [0, 1], starts + + # tool_call carries id/name/args + the originating message_index (0). + tc = by_phase["tool_call"] + assert tc["message_index"] == 0 + assert tc["tool_call_id"] == "call_lookup_1" + assert tc["name"] == "lookup" + assert tc["args"] == {"query": "angular signals"} + + # tool_result carries the matching tool_index (0), the lookup result text, + # and a complete status. + tr = by_phase["tool_result"] + assert tr["tool_index"] == 0 + assert tr["status"] == "complete" + assert isinstance(tr["result"], str) and "signal" in tr["result"].lower() + + # Loop terminates: the forced-answer turn returns a plain answer (no tool + # calls), so the run ends with a final AIMessage and exactly two turns. + assert fake_model.calls == 2 + last = result["messages"][-1] + assert isinstance(last, AIMessage) + assert not last.tool_calls + assert "Signals" in last.content + + +@pytest.mark.asyncio +async def test_lookup_tool_is_deterministic_and_offline(): + # The canned fact lookup must be reproducible for the aimock fixture. + assert graph_mod.lookup.invoke({"query": "angular signals"}) == \ + graph_mod.lookup.invoke({"query": "tell me about SIGNALS please"}) + # Unknown topic falls back to the default fact, never raises / hits network. + assert graph_mod.lookup.invoke({"query": "quantum widgets"}) == \ + graph_mod._RESEARCH_DEFAULT_FACT diff --git a/examples/ag-ui/python/tests/test_subagent_stream_handler.py b/examples/ag-ui/python/tests/test_subagent_stream_handler.py index ef2aec68..8d5100ef 100644 --- a/examples/ag-ui/python/tests/test_subagent_stream_handler.py +++ b/examples/ag-ui/python/tests/test_subagent_stream_handler.py @@ -5,7 +5,10 @@ import pytest -from src.streaming.subagent_stream_handler import SubagentStreamHandler +from src.streaming.subagent_stream_handler import ( + SubagentStreamHandler, + SubagentRunState, +) class TestSubagentStreamHandler: @@ -17,9 +20,11 @@ async def test_emits_accumulated_text_so_far(self): await handler.on_llm_new_token("Paris ", run_id=uuid4()) await handler.on_llm_new_token("is", run_id=uuid4()) assert dispatch.call_args_list[0].args == ( - "subagent_activity", {"subagent_id": "tc-1", "phase": "message", "text": "Paris "}) + "subagent_activity", + {"subagent_id": "tc-1", "phase": "message", "message_index": 0, "text": "Paris "}) assert dispatch.call_args_list[1].args == ( - "subagent_activity", {"subagent_id": "tc-1", "phase": "message", "text": "Paris is"}) + "subagent_activity", + {"subagent_id": "tc-1", "phase": "message", "message_index": 0, "text": "Paris is"}) @pytest.mark.asyncio async def test_buffers_isolated_across_instances(self): @@ -31,6 +36,23 @@ async def test_buffers_isolated_across_instances(self): assert dispatch.call_args_list[0].args[1]["text"] == "x" assert dispatch.call_args_list[1].args[1]["text"] == "y" + @pytest.mark.asyncio + async def test_tags_message_index_from_run_state(self): + run_state = SubagentRunState() + handler = SubagentStreamHandler(subagent_id="tc-1", run_state=run_state) + with patch("src.streaming.subagent_stream_handler.adispatch_custom_event", + new_callable=AsyncMock) as dispatch: + await handler.on_llm_new_token("first", run_id=uuid4()) + # Subgraph advances to the next assistant turn. + run_state.message_index = 1 + await handler.on_llm_new_token("second", run_id=uuid4()) + first, second = dispatch.call_args_list + assert first.args[1]["message_index"] == 0 + assert first.args[1]["text"] == "first" + # Buffer resets per turn so text_so_far is scoped to the new turn. + assert second.args[1]["message_index"] == 1 + assert second.args[1]["text"] == "second" + @pytest.mark.asyncio async def test_dispatch_failure_is_silent(self): handler = SubagentStreamHandler(subagent_id="tc-1") From 3b718e0e8cb677078fdf3de36c8a1ccef34eec68 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 14:34:34 -0700 Subject: [PATCH 8/9] test(examples/ag-ui): aimock fixture + e2e for multi-message subagent transcript Co-Authored-By: Claude Fable 5 --- .../ag-ui/angular/e2e/fixtures/subagent.json | 31 ++++- .../ag-ui/angular/e2e/subagent-card.spec.ts | 111 ++++++++++++------ 2 files changed, 101 insertions(+), 41 deletions(-) diff --git a/examples/ag-ui/angular/e2e/fixtures/subagent.json b/examples/ag-ui/angular/e2e/fixtures/subagent.json index df8ddb6f..ef2f869c 100644 --- a/examples/ag-ui/angular/e2e/fixtures/subagent.json +++ b/examples/ag-ui/angular/e2e/fixtures/subagent.json @@ -2,23 +2,23 @@ "fixtures": [ { "match": { - "userMessage": "Research the Louvre and summarize", + "userMessage": "Research Angular signals and summarize", "hasToolResult": true }, "response": { - "content": "Here's what I found about the Louvre: it's a major Paris museum and one of the most visited in the world, opened in the late 18th century and home to tens of thousands of works across many collections. Let me know if you'd like detail on a particular wing or era." + "content": "Here's what the research subagent found about Angular signals: they are a fine-grained reactivity primitive — a signal holds a value, computed() derives from other signals, and effect() reacts to changes, all without manual subscriptions. Let me know if you'd like a code example." } }, { "match": { - "userMessage": "Research the Louvre and summarize" + "userMessage": "Research Angular signals and summarize" }, "response": { "toolCalls": [ { "name": "research", "arguments": { - "topic": "the Louvre", + "topic": "Angular signals", "subagent_type": "research" } } @@ -27,10 +27,29 @@ }, { "match": { - "userMessage": "Topic: the Louvre" + "userMessage": "Topic: Angular signals", + "systemMessage": "Do NOT call any more tools", + "hasToolResult": true + }, + "response": { + "content": "Angular signals are a fine-grained reactivity primitive. A signal holds a value, computed() derives from other signals, and effect() reacts to changes — all without manual subscriptions or zone.js. This makes change detection more precise and is the foundation for zoneless Angular." + } + }, + { + "match": { + "userMessage": "Topic: Angular signals", + "systemMessage": "Call `lookup` exactly once", + "hasToolResult": false }, "response": { - "content": "The Louvre opened in 1793 and holds about 35,000 works." + "toolCalls": [ + { + "name": "lookup", + "arguments": { + "query": "Angular signals" + } + } + ] } } ] diff --git a/examples/ag-ui/angular/e2e/subagent-card.spec.ts b/examples/ag-ui/angular/e2e/subagent-card.spec.ts index 1d9f5467..959f7897 100644 --- a/examples/ag-ui/angular/e2e/subagent-card.spec.ts +++ b/examples/ag-ui/angular/e2e/subagent-card.spec.ts @@ -2,28 +2,37 @@ import { test, expect, type Page } from '@playwright/test'; import { openDemo, waitForFinalAssistant } from './test-helpers'; -// Distinctive child-research sentence the subagent streams. It must be unique -// enough that finding it in the parent's assistant bubble is a real leak, not a -// coincidental substring of the orchestrator's final answer (which deliberately -// paraphrases instead of quoting this verbatim). -const CHILD_SENTENCE = 'The Louvre opened in 1793 and holds about 35,000 works.'; +// Distinctive child-research sentence the subagent's final answer streams. It +// must be unique enough that finding it in the parent's assistant bubble is a +// real leak, not a coincidental substring of the orchestrator's final answer +// (which deliberately paraphrases instead of quoting this verbatim). +const CHILD_SENTENCE = 'foundation for zoneless Angular'; interface SubagentProbe { size: number; - entries: { name?: string; status?: string; text?: string }[]; + entries: { + name?: string; + status?: string; + messageCount: number; + messageTexts: string[]; + toolCalls: { name?: string; result?: string }[]; + }[]; } // Reads the live `agent.subagents()` projection off the shell component via // Angular's dev-mode global. The chat-subagents primitive renders a -// for each subagent whose status is pending/running, so -// the map IS the data the card binds to. Under the aimock harness the run -// settles near-instantly (started → finished within one SSE flush), so the -// card transits the RUNNING state below a render frame and is filtered out of -// the DOM by the time the assistant turn finalizes — exactly the reason the -// cockpit subagents spec asserts on durable signals rather than the card -// element. We read the projected map directly: it proves the ACTIVITY -// snapshot/delta pipeline populated the subagent (name + streamed child text) -// and that it settled to `complete`. +// for each subagent whose status is pending/running, and +// the card binds the ORDERED transcript: `messages()` (the assistant turn(s) +// the child streamed, each carrying `toolCallIds`/reasoning) and `toolCalls()` +// (the child's own `lookup` calls, rendered as ). Under +// the aimock harness the run settles near-instantly (started → finished within +// one SSE flush), so the card transits the RUNNING state below a render frame +// and is filtered out of the DOM by the time the assistant turn finalizes — +// exactly the reason the cockpit subagents spec (and the original card spec) +// asserts on durable signals rather than the card element. We read the +// projected map directly: it IS the data the card renders, and it proves the +// ACTIVITY snapshot/delta pipeline reconstructed the full reason→tool→answer +// transcript and that it settled to `complete`. async function readSubagents(page: Page): Promise { return page.evaluate(() => { const ng = (window as unknown as { ng?: { getComponent?: (el: Element) => unknown } }).ng; @@ -39,11 +48,16 @@ async function readSubagents(page: Page): Promise { name?: string; status?: () => string; messages?: () => { content?: string }[]; + toolCalls?: () => { name?: string; result?: string }[]; }; + const messages = sa.messages?.() ?? []; + const toolCalls = sa.toolCalls?.() ?? []; out.entries.push({ name: sa.name, status: sa.status?.(), - text: sa.messages?.()[0]?.content, + messageCount: messages.length, + messageTexts: messages.map((m) => (typeof m.content === 'string' ? m.content : '')), + toolCalls: toolCalls.map((tc) => ({ name: tc.name, result: tc.result })), }); }); return out; @@ -51,18 +65,20 @@ async function readSubagents(page: Page): Promise { } // Research delegation over the AG-UI transport: the parent LLM calls the -// `research` tool, the langgraph child subgraph streams a summary, and the +// `research` tool, the langgraph child subgraph runs a genuine reason → tool → +// answer loop (an LLM call that returns a `lookup` tool_call, the offline +// `lookup` tool, then a second plain LLM call that writes the summary), and the // ag-ui server converts the subagent_activity CUSTOM events into native // ACTIVITY_SNAPSHOT/ACTIVITY_DELTA. The @threadplane/ag-ui reducer projects the -// activity to agent.subagents() (what chat-subagents renders as a live card) -// and the child's research text must stay OUT of the parent's bubble. -test('research delegation renders a live subagent card that settles complete', async ({ +// activity to agent.subagents() (the ordered transcript chat-subagent-card +// renders) and the child's research text must stay OUT of the parent's bubble. +test('research delegation reconstructs the multi-message subagent transcript', async ({ page, }) => { await openDemo(page); const input = page.getByRole('textbox', { name: /message|prompt/i }); - await input.fill('Research the Louvre and summarize'); + await input.fill('Research Angular signals and summarize'); await page.getByRole('button', { name: /send/i }).click(); // The orchestrator dispatched the research subagent — its tool-call card is a @@ -74,28 +90,53 @@ test('research delegation renders a live subagent card that settles complete', a await expect(researchCall.first()).toBeVisible({ timeout: 30_000 }); // Wait for the parent turn to finalize so the full run (delegation + child - // stream + settle) has played out. + // reason/tool/answer loop + settle) has played out. const bubble = await waitForFinalAssistant(page); // The live subagent-card data path populated and settled: agent.subagents() - // carries the research subagent with its streamed child summary, now - // `complete`. This is the exact projection chat-subagents binds the card to. - // Poll until the research subagent reaches `complete` to avoid CI micro-races - // where signal propagation hasn't settled at the moment of the first read. - await expect.poll( - async () => { - const subs = await readSubagents(page); - const research = subs.entries.find((e) => e.name === 'research'); - return research?.status ?? null; - }, - { timeout: 15_000 }, - ).toBe('complete'); + // carries the research subagent, now `complete`. Poll until it reaches + // `complete` to avoid CI micro-races where signal propagation hasn't settled + // at the moment of the first read. + await expect + .poll( + async () => { + const subs = await readSubagents(page); + const research = subs.entries.find((e) => e.name === 'research'); + return research?.status ?? null; + }, + { timeout: 15_000 } + ) + .toBe('complete'); const subs = await readSubagents(page); expect(subs.size).toBeGreaterThan(0); const research = subs.entries.find((e) => e.name === 'research'); expect(research, 'a research subagent should be projected').toBeTruthy(); - expect(research?.text).toContain(CHILD_SENTENCE); + + // Transcript shape: the reason→tool→answer loop produces at least TWO + // assistant turns — the tool-calling turn (which carries the `lookup` + // tool_call) and the final answer turn. This is the ordered transcript the + // renders one `.sac__msg` per message. + expect( + research!.messageCount, + 'subagent transcript should surface the tool-call turn plus the answer turn' + ).toBeGreaterThanOrEqual(2); + + // At least one subagent tool call — the `lookup` call the child made — is + // projected with its name and (offline, canned) result. This is what each + // inside the subagent card binds to. + expect( + research!.toolCalls.length, + 'subagent should surface its lookup tool call' + ).toBeGreaterThanOrEqual(1); + const lookup = research!.toolCalls.find((tc) => tc.name === 'lookup'); + expect(lookup, 'the lookup tool call should be projected by name').toBeTruthy(); + expect(lookup!.result, 'the lookup tool call should carry its canned result').toContain( + 'reactivity primitive' + ); + + // The streamed final answer is part of the transcript. + expect(research!.messageTexts.join('\n')).toContain(CHILD_SENTENCE); // The child research text must NOT leak into the parent's rendered markdown // surface. Target chat-streaming-md (the assistant's answer) rather than the From 80cc64541b52906fdc9331a4269caae5f8134606 Mon Sep 17 00:00:00 2001 From: Brian Love Date: Thu, 18 Jun 2026 14:36:26 -0700 Subject: [PATCH 9/9] docs(chat): regenerate api-docs for Subagent.toolCalls; scrub plan-doc reference --- .../content/docs/chat/api/api-docs.json | 54 ++++++++++++++++--- ...6-18-ag-ui-multi-message-subagent-cards.md | 2 +- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/apps/website/content/docs/chat/api/api-docs.json b/apps/website/content/docs/chat/api/api-docs.json index 19ef10b7..edfd55c0 100644 --- a/apps/website/content/docs/chat/api/api-docs.json +++ b/apps/website/content/docs/chat/api/api-docs.json @@ -3603,12 +3603,6 @@ "params": [], "examples": [], "properties": [ - { - "name": "latestMessageContent", - "type": "Signal", - "description": "", - "optional": false - }, { "name": "state", "type": "Signal", @@ -3622,7 +3616,47 @@ "optional": false } ], - "methods": [] + "methods": [ + { + "name": "textOf", + "signature": "textOf(m: Message): string", + "description": "", + "params": [ + { + "name": "m", + "type": "Message", + "description": "", + "optional": false + } + ] + }, + { + "name": "toolCallsFor", + "signature": "toolCallsFor(m: Message): ToolCall[]", + "description": "", + "params": [ + { + "name": "m", + "type": "Message", + "description": "", + "optional": false + } + ] + }, + { + "name": "toToolCallInfo", + "signature": "toToolCallInfo(tc: ToolCall): ToolCallInfo", + "description": "", + "params": [ + { + "name": "tc", + "type": "ToolCall", + "description": "", + "optional": false + } + ] + } + ] }, { "name": "ChatSubagentsComponent", @@ -6654,6 +6688,12 @@ "type": "string", "description": "Tool call ID that spawned this subagent.", "optional": false + }, + { + "name": "toolCalls", + "type": "Signal", + "description": "The subagent's own tool calls (name/args/result), referenced by\n`Message.toolCallIds` in `messages`. Optional: adapters that don't surface\nsubagent tool calls omit it; consumers default to `[]`.", + "optional": true } ], "examples": [] diff --git a/docs/superpowers/plans/2026-06-18-ag-ui-multi-message-subagent-cards.md b/docs/superpowers/plans/2026-06-18-ag-ui-multi-message-subagent-cards.md index c4040393..00be8f93 100644 --- a/docs/superpowers/plans/2026-06-18-ag-ui-multi-message-subagent-cards.md +++ b/docs/superpowers/plans/2026-06-18-ag-ui-multi-message-subagent-cards.md @@ -411,7 +411,7 @@ npx nx build chat --skip-nx-cache cd examples/ag-ui/python && uv run pytest -q && cd - npm run generate-api-docs # Subagent.toolCalls is public API ``` -- Confirm `git diff apps/website/content/docs/*/api/api-docs.json` reflects the `toolCalls` addition; `git diff | grep -i copilotkit` MUST be empty. +- Confirm `git diff apps/website/content/docs/*/api/api-docs.json` reflects the `toolCalls` addition; the diff MUST contain no references to external research repositories. - Confirm the cockpit `ag-ui/subagents` capability is unaffected (it uses the single-text/back-compat path) — run its e2e if changed: `npx nx e2e cockpit-ag-ui-subagents-angular --skip-nx-cache`. - If any `cockpit/ag-ui/*` python/graph source changed (it should NOT in this plan), regenerate the Railway deploy bundle: `npx tsx scripts/generate-ag-ui-deployment-config.ts` and commit `deployments/ag-ui-dev/`. (This plan does not touch cockpit, so expect no regen.)