Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 159 additions & 129 deletions app/features/agents/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,153 +459,183 @@ async def stream_chat(
agent_type=session.agent_type,
)

# Stream the response
# Stream the response. Ollama's OpenAI-compat endpoint rejects
# PydanticAI's streamed request with 400 "invalid message content type:
# <nil>" (#342), while the non-streaming run() path works — so fall back
# to run() for the ollama provider and emit the result as a single
# text_delta plus the usual approval/complete events. Cloud providers
# keep the true token-streaming path.
default_model = self.settings.agent_default_model
provider = default_model.split(":", 1)[0] if ":" in default_model else ""
stream_supported = provider != "ollama"
try:
with _sequential_tool_execution():
async with asyncio.timeout(self.settings.agent_timeout_seconds):
async with agent.run_stream(
message,
deps=deps,
message_history=message_history,
) as result:
try:
async for text in result.stream_text():
yield StreamEvent(
event_type="text_delta",
data={"delta": text},
timestamp=datetime.now(UTC),
final_result: Any
usage: Any
all_messages: list[ModelMessage]

if stream_supported:
async with agent.run_stream(
message,
deps=deps,
message_history=message_history,
) as result:
try:
async for text in result.stream_text():
yield StreamEvent(
event_type="text_delta",
data={"delta": text},
timestamp=datetime.now(UTC),
)
except Exception as e:
# Structured output agents (output_type=...) cannot
# stream raw text deltas. Skip delta streaming and
# only emit the final complete event.
logger.info(
"agents.stream_chat_text_delta_unavailable",
session_id=session_id,
error=str(e),
error_type=type(e).__name__,
)
except Exception as e:
# Structured output agents (output_type=...) cannot stream raw text deltas.
# In that case we skip delta streaming and only emit the final complete event.
logger.info(
"agents.stream_chat_text_delta_unavailable",
session_id=session_id,
error=str(e),
error_type=type(e).__name__,
# NOTE: PydanticAI exposes get_output() on StreamedRunResult.
final_result = await result.get_output()
usage = result.usage()
all_messages = result.all_messages()
else:
# #342 — non-streaming fallback for the ollama provider.
run_result = await agent.run(
message,
deps=deps,
message_history=message_history,
)
final_result = run_result.output
usage = run_result.usage()
all_messages = run_result.all_messages()

# Update session (shared by both paths)
session.message_history = self._serialize_messages(all_messages)
session.total_tokens_used += usage.total_tokens or 0
session.tool_calls_count += deps.tool_call_count
session.last_activity = datetime.now(UTC)
session.expires_at = session.last_activity + timedelta(
minutes=self.settings.agent_session_ttl_minutes
)

await db.flush()

# Check for pending approval actions (mirror chat() logic)
pending_action = None
pending_approval = False
stream_now = datetime.now(UTC)

# Primary trigger (#336): a gated tool recorded a
# machine-readable approval request on deps. Deterministic
# — the experiment agent's ExperimentReport output has no
# pending_action field, so the legacy triggers below never
# fired and the approval_required event was never emitted.
if deps.pending_action:
pending_approval = True
pending_action = self._record_pending_action(
session,
action_type=str(deps.pending_action.get("action_type", "unknown")),
arguments=deps.pending_action.get("arguments") or {},
description=str(
deps.pending_action.get("description")
or "Agent requested approval for "
f"{deps.pending_action.get('action_type', 'unknown')}"
),
now=stream_now,
)
# Legacy trigger: structured output carried pending_action.
elif hasattr(final_result, "pending_action") and final_result.pending_action:
pending_approval = True
pending_action_data = final_result.pending_action
# Extract action details - support both dict and object with attributes
if isinstance(pending_action_data, dict):
action_type = pending_action_data.get("action_type", "unknown")
arguments = pending_action_data.get("arguments", {})
description = pending_action_data.get(
"description", f"Agent requested approval for {action_type}"
)
else:
action_type = getattr(pending_action_data, "action_type", "unknown")
arguments = getattr(pending_action_data, "arguments", {})
description = getattr(
pending_action_data,
"description",
f"Agent requested approval for {action_type}",
)

# Get final result and update session
# NOTE: PydanticAI v1.48 exposes get_output() on StreamedRunResult.
final_result: Any = await result.get_output()
usage = result.usage()

session.message_history = self._serialize_messages(result.all_messages())
session.total_tokens_used += usage.total_tokens or 0
session.tool_calls_count += deps.tool_call_count
session.last_activity = datetime.now(UTC)
session.expires_at = session.last_activity + timedelta(
minutes=self.settings.agent_session_ttl_minutes
pending_action = self._record_pending_action(
session, action_type, arguments, description, stream_now
)
# Fallback: check approval_required flag (legacy trigger)
elif (
hasattr(final_result, "approval_required")
and final_result.approval_required
):
pending_approval = True
pending_action = self._record_pending_action(
session,
"unknown",
{},
"Agent requested approval for an action",
stream_now,
)

await db.flush()

# Check for pending approval actions (mirror chat() logic)
pending_action = None
pending_approval = False
stream_now = datetime.now(UTC)

# Primary trigger (#336): a gated tool recorded a
# machine-readable approval request on deps. Deterministic
# — the experiment agent's ExperimentReport output has no
# pending_action field, so the legacy triggers below never
# fired and the approval_required event was never emitted.
if deps.pending_action:
pending_approval = True
pending_action = self._record_pending_action(
session,
action_type=str(deps.pending_action.get("action_type", "unknown")),
arguments=deps.pending_action.get("arguments") or {},
description=str(
deps.pending_action.get("description")
or "Agent requested approval for "
f"{deps.pending_action.get('action_type', 'unknown')}"
),
now=stream_now,
)
# Legacy trigger: structured output carried pending_action.
elif (
hasattr(final_result, "pending_action") and final_result.pending_action
):
pending_approval = True
pending_action_data = final_result.pending_action
# Extract action details - support both dict and object with attributes
if isinstance(pending_action_data, dict):
action_type = pending_action_data.get("action_type", "unknown")
arguments = pending_action_data.get("arguments", {})
description = pending_action_data.get(
"description", f"Agent requested approval for {action_type}"
)
else:
action_type = getattr(pending_action_data, "action_type", "unknown")
arguments = getattr(pending_action_data, "arguments", {})
description = getattr(
pending_action_data,
"description",
f"Agent requested approval for {action_type}",
)
await db.flush()

pending_action = self._record_pending_action(
session, action_type, arguments, description, stream_now
)
# Fallback: check approval_required flag (legacy trigger)
# Build the response text (shared by both paths).
response_message: str = "No response generated."
if final_result:
if hasattr(final_result, "answer") and final_result.answer:
response_message = str(final_result.answer)
elif hasattr(final_result, "summary") and final_result.summary:
response_message = str(final_result.summary)
elif (
hasattr(final_result, "approval_required")
and final_result.approval_required
hasattr(final_result, "recommendations")
and final_result.recommendations
):
pending_approval = True
pending_action = self._record_pending_action(
session,
"unknown",
{},
"Agent requested approval for an action",
stream_now,
)

await db.flush()

# If approval is required, emit approval_required event
if pending_approval and pending_action:
yield StreamEvent(
event_type="approval_required",
data={
"action": pending_action,
"message": "Human approval required before proceeding.",
},
timestamp=stream_now,
)

# Yield completion event
response_message: str = "No response generated."
if final_result:
if hasattr(final_result, "answer") and final_result.answer:
response_message = str(final_result.answer)
elif hasattr(final_result, "summary") and final_result.summary:
response_message = str(final_result.summary)
elif (
hasattr(final_result, "recommendations")
and final_result.recommendations
):
recommendations = final_result.recommendations
if isinstance(recommendations, list) and recommendations:
response_message = "\n".join(
str(item) for item in recommendations
)
else:
response_message = str(final_result)
recommendations = final_result.recommendations
if isinstance(recommendations, list) and recommendations:
response_message = "\n".join(str(item) for item in recommendations)
else:
response_message = str(final_result)
else:
response_message = str(final_result)

# #342 — the ollama (non-streaming) path produced no token
# deltas; emit the full text once so the FE renders the reply.
# Cloud streaming behavior is unchanged.
if not stream_supported and response_message != "No response generated.":
yield StreamEvent(
event_type="complete",
event_type="text_delta",
data={"delta": response_message},
timestamp=datetime.now(UTC),
)

# If approval is required, emit approval_required event
if pending_approval and pending_action:
yield StreamEvent(
event_type="approval_required",
data={
"message": response_message,
"tokens_used": usage.total_tokens or 0,
"tool_calls_count": deps.tool_call_count,
"pending_approval": pending_approval,
"action": pending_action,
"message": "Human approval required before proceeding.",
},
timestamp=datetime.now(UTC),
timestamp=stream_now,
)

yield StreamEvent(
event_type="complete",
data={
"message": response_message,
"tokens_used": usage.total_tokens or 0,
"tool_calls_count": deps.tool_call_count,
"pending_approval": pending_approval,
},
timestamp=datetime.now(UTC),
)
except TimeoutError as e:
raise TimeoutError(
f"Agent response timed out after {self.settings.agent_timeout_seconds} seconds"
Expand Down
Loading