diff --git a/app/features/agents/service.py b/app/features/agents/service.py index cdc83882..20625e01 100644 --- a/app/features/agents/service.py +++ b/app/features/agents/service.py @@ -459,153 +459,183 @@ async def stream_chat( agent_type=session.agent_type, ) - # Stream the response + # Stream the response. Ollama's OpenAI-compat endpoint rejects + # PydanticAI's streamed request with 400 "invalid message content type: + # " (#342), while the non-streaming run() path works — so fall back + # to run() for the ollama provider and emit the result as a single + # text_delta plus the usual approval/complete events. Cloud providers + # keep the true token-streaming path. + default_model = self.settings.agent_default_model + provider = default_model.split(":", 1)[0] if ":" in default_model else "" + stream_supported = provider != "ollama" try: with _sequential_tool_execution(): async with asyncio.timeout(self.settings.agent_timeout_seconds): - async with agent.run_stream( - message, - deps=deps, - message_history=message_history, - ) as result: - try: - async for text in result.stream_text(): - yield StreamEvent( - event_type="text_delta", - data={"delta": text}, - timestamp=datetime.now(UTC), + final_result: Any + usage: Any + all_messages: list[ModelMessage] + + if stream_supported: + async with agent.run_stream( + message, + deps=deps, + message_history=message_history, + ) as result: + try: + async for text in result.stream_text(): + yield StreamEvent( + event_type="text_delta", + data={"delta": text}, + timestamp=datetime.now(UTC), + ) + except Exception as e: + # Structured output agents (output_type=...) cannot + # stream raw text deltas. Skip delta streaming and + # only emit the final complete event. + logger.info( + "agents.stream_chat_text_delta_unavailable", + session_id=session_id, + error=str(e), + error_type=type(e).__name__, ) - except Exception as e: - # Structured output agents (output_type=...) cannot stream raw text deltas. - # In that case we skip delta streaming and only emit the final complete event. - logger.info( - "agents.stream_chat_text_delta_unavailable", - session_id=session_id, - error=str(e), - error_type=type(e).__name__, + # NOTE: PydanticAI exposes get_output() on StreamedRunResult. + final_result = await result.get_output() + usage = result.usage() + all_messages = result.all_messages() + else: + # #342 — non-streaming fallback for the ollama provider. + run_result = await agent.run( + message, + deps=deps, + message_history=message_history, + ) + final_result = run_result.output + usage = run_result.usage() + all_messages = run_result.all_messages() + + # Update session (shared by both paths) + session.message_history = self._serialize_messages(all_messages) + session.total_tokens_used += usage.total_tokens or 0 + session.tool_calls_count += deps.tool_call_count + session.last_activity = datetime.now(UTC) + session.expires_at = session.last_activity + timedelta( + minutes=self.settings.agent_session_ttl_minutes + ) + + await db.flush() + + # Check for pending approval actions (mirror chat() logic) + pending_action = None + pending_approval = False + stream_now = datetime.now(UTC) + + # Primary trigger (#336): a gated tool recorded a + # machine-readable approval request on deps. Deterministic + # — the experiment agent's ExperimentReport output has no + # pending_action field, so the legacy triggers below never + # fired and the approval_required event was never emitted. + if deps.pending_action: + pending_approval = True + pending_action = self._record_pending_action( + session, + action_type=str(deps.pending_action.get("action_type", "unknown")), + arguments=deps.pending_action.get("arguments") or {}, + description=str( + deps.pending_action.get("description") + or "Agent requested approval for " + f"{deps.pending_action.get('action_type', 'unknown')}" + ), + now=stream_now, + ) + # Legacy trigger: structured output carried pending_action. + elif hasattr(final_result, "pending_action") and final_result.pending_action: + pending_approval = True + pending_action_data = final_result.pending_action + # Extract action details - support both dict and object with attributes + if isinstance(pending_action_data, dict): + action_type = pending_action_data.get("action_type", "unknown") + arguments = pending_action_data.get("arguments", {}) + description = pending_action_data.get( + "description", f"Agent requested approval for {action_type}" + ) + else: + action_type = getattr(pending_action_data, "action_type", "unknown") + arguments = getattr(pending_action_data, "arguments", {}) + description = getattr( + pending_action_data, + "description", + f"Agent requested approval for {action_type}", ) - # Get final result and update session - # NOTE: PydanticAI v1.48 exposes get_output() on StreamedRunResult. - final_result: Any = await result.get_output() - usage = result.usage() - - session.message_history = self._serialize_messages(result.all_messages()) - session.total_tokens_used += usage.total_tokens or 0 - session.tool_calls_count += deps.tool_call_count - session.last_activity = datetime.now(UTC) - session.expires_at = session.last_activity + timedelta( - minutes=self.settings.agent_session_ttl_minutes + pending_action = self._record_pending_action( + session, action_type, arguments, description, stream_now + ) + # Fallback: check approval_required flag (legacy trigger) + elif ( + hasattr(final_result, "approval_required") + and final_result.approval_required + ): + pending_approval = True + pending_action = self._record_pending_action( + session, + "unknown", + {}, + "Agent requested approval for an action", + stream_now, ) - await db.flush() - - # Check for pending approval actions (mirror chat() logic) - pending_action = None - pending_approval = False - stream_now = datetime.now(UTC) - - # Primary trigger (#336): a gated tool recorded a - # machine-readable approval request on deps. Deterministic - # — the experiment agent's ExperimentReport output has no - # pending_action field, so the legacy triggers below never - # fired and the approval_required event was never emitted. - if deps.pending_action: - pending_approval = True - pending_action = self._record_pending_action( - session, - action_type=str(deps.pending_action.get("action_type", "unknown")), - arguments=deps.pending_action.get("arguments") or {}, - description=str( - deps.pending_action.get("description") - or "Agent requested approval for " - f"{deps.pending_action.get('action_type', 'unknown')}" - ), - now=stream_now, - ) - # Legacy trigger: structured output carried pending_action. - elif ( - hasattr(final_result, "pending_action") and final_result.pending_action - ): - pending_approval = True - pending_action_data = final_result.pending_action - # Extract action details - support both dict and object with attributes - if isinstance(pending_action_data, dict): - action_type = pending_action_data.get("action_type", "unknown") - arguments = pending_action_data.get("arguments", {}) - description = pending_action_data.get( - "description", f"Agent requested approval for {action_type}" - ) - else: - action_type = getattr(pending_action_data, "action_type", "unknown") - arguments = getattr(pending_action_data, "arguments", {}) - description = getattr( - pending_action_data, - "description", - f"Agent requested approval for {action_type}", - ) + await db.flush() - pending_action = self._record_pending_action( - session, action_type, arguments, description, stream_now - ) - # Fallback: check approval_required flag (legacy trigger) + # Build the response text (shared by both paths). + response_message: str = "No response generated." + if final_result: + if hasattr(final_result, "answer") and final_result.answer: + response_message = str(final_result.answer) + elif hasattr(final_result, "summary") and final_result.summary: + response_message = str(final_result.summary) elif ( - hasattr(final_result, "approval_required") - and final_result.approval_required + hasattr(final_result, "recommendations") + and final_result.recommendations ): - pending_approval = True - pending_action = self._record_pending_action( - session, - "unknown", - {}, - "Agent requested approval for an action", - stream_now, - ) - - await db.flush() - - # If approval is required, emit approval_required event - if pending_approval and pending_action: - yield StreamEvent( - event_type="approval_required", - data={ - "action": pending_action, - "message": "Human approval required before proceeding.", - }, - timestamp=stream_now, - ) - - # Yield completion event - response_message: str = "No response generated." - if final_result: - if hasattr(final_result, "answer") and final_result.answer: - response_message = str(final_result.answer) - elif hasattr(final_result, "summary") and final_result.summary: - response_message = str(final_result.summary) - elif ( - hasattr(final_result, "recommendations") - and final_result.recommendations - ): - recommendations = final_result.recommendations - if isinstance(recommendations, list) and recommendations: - response_message = "\n".join( - str(item) for item in recommendations - ) - else: - response_message = str(final_result) + recommendations = final_result.recommendations + if isinstance(recommendations, list) and recommendations: + response_message = "\n".join(str(item) for item in recommendations) else: response_message = str(final_result) + else: + response_message = str(final_result) + # #342 — the ollama (non-streaming) path produced no token + # deltas; emit the full text once so the FE renders the reply. + # Cloud streaming behavior is unchanged. + if not stream_supported and response_message != "No response generated.": yield StreamEvent( - event_type="complete", + event_type="text_delta", + data={"delta": response_message}, + timestamp=datetime.now(UTC), + ) + + # If approval is required, emit approval_required event + if pending_approval and pending_action: + yield StreamEvent( + event_type="approval_required", data={ - "message": response_message, - "tokens_used": usage.total_tokens or 0, - "tool_calls_count": deps.tool_call_count, - "pending_approval": pending_approval, + "action": pending_action, + "message": "Human approval required before proceeding.", }, - timestamp=datetime.now(UTC), + timestamp=stream_now, ) + + yield StreamEvent( + event_type="complete", + data={ + "message": response_message, + "tokens_used": usage.total_tokens or 0, + "tool_calls_count": deps.tool_call_count, + "pending_approval": pending_approval, + }, + timestamp=datetime.now(UTC), + ) except TimeoutError as e: raise TimeoutError( f"Agent response timed out after {self.settings.agent_timeout_seconds} seconds" diff --git a/app/features/agents/tests/test_service.py b/app/features/agents/tests/test_service.py index 47b90c31..74709be2 100644 --- a/app/features/agents/tests/test_service.py +++ b/app/features/agents/tests/test_service.py @@ -386,6 +386,7 @@ class TestAgentServiceStreamChat: async def test_stream_chat_model_misbehavior_yields_error_event( self, sample_active_session: AgentSession, + monkeypatch: pytest.MonkeyPatch, ) -> None: """A misbehaving model should yield a recoverable `error` event, not crash. @@ -394,6 +395,9 @@ async def test_stream_chat_model_misbehavior_yields_error_event( raw exception string to the client. """ service = AgentService() + # Pin a streaming-capable (cloud) provider so this exercises the + # run_stream path regardless of the local .env (#342). + monkeypatch.setattr(service.settings, "agent_default_model", "anthropic:claude-test") mock_db = AsyncMock() mock_result = MagicMock() @@ -434,6 +438,7 @@ async def __aexit__(self, *exc: object) -> bool: async def test_stream_chat_runs_tools_sequentially( self, sample_active_session: AgentSession, + monkeypatch: pytest.MonkeyPatch, ) -> None: """stream_chat() must also run the agent under sequential tool execution. @@ -442,6 +447,9 @@ async def test_stream_chat_runs_tools_sequentially( concurrent-session bug from issue #172. """ service = AgentService() + # Pin a streaming-capable (cloud) provider so this exercises the + # run_stream path regardless of the local .env (#342). + monkeypatch.setattr(service.settings, "agent_default_model", "anthropic:claude-test") mock_db = AsyncMock() mock_result = MagicMock() @@ -484,6 +492,120 @@ async def __aexit__(self, *exc: object) -> bool: mock_mode.assert_called_once_with("sequential") + @pytest.mark.asyncio + async def test_stream_chat_ollama_uses_nonstreaming_path( + self, + sample_active_session: AgentSession, + sample_experiment_report: ExperimentReport, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """#342 — an ollama agent uses agent.run() (not run_stream). + + Ollama's OpenAI-compat endpoint rejects PydanticAI's streamed request + with 400 "invalid message content type: ". The service must fall + back to the non-streaming run() path and still emit text_delta + + approval_required (from deps.pending_action, #336) + complete. + """ + service = AgentService() + monkeypatch.setattr(service.settings, "agent_default_model", "ollama:qwen3:8b") + mock_db = AsyncMock() + mock_result = MagicMock() + mock_result.scalar_one_or_none.return_value = sample_active_session + mock_db.execute.return_value = mock_result + + def _run(message: str, *, deps: AgentDeps, message_history: Any) -> MagicMock: + # A gated tool fired during the run and recorded an approval request. + deps.set_pending_action( + "save_scenario", + {"name": "p", "run_id": "r", "store_id": 1, "product_id": 2}, + "Save scenario plan 'p'", + ) + res = MagicMock() + res.output = sample_experiment_report # has a non-empty summary + usage = MagicMock() + usage.total_tokens = 11 + res.usage.return_value = usage + res.all_messages.return_value = [] + return res + + mock_agent = MagicMock() + mock_agent.run = AsyncMock(side_effect=_run) + mock_agent.run_stream = MagicMock( + side_effect=AssertionError("run_stream must not be called for the ollama provider") + ) + + with patch.object(service, "_get_agent", return_value=mock_agent): + events = [ + event + async for event in service.stream_chat( + db=mock_db, + session_id=sample_active_session.session_id, + message="Save a what-if scenario plan", + ) + ] + + types = [e.event_type for e in events] + assert "text_delta" in types # full reply emitted as one delta + assert "approval_required" in types + assert types[-1] == "complete" + approval = next(e for e in events if e.event_type == "approval_required") + assert approval.data["action"].action_type == "save_scenario" + mock_agent.run.assert_awaited_once() + mock_agent.run_stream.assert_not_called() + assert sample_active_session.status == SessionStatus.AWAITING_APPROVAL.value + + @pytest.mark.asyncio + async def test_stream_chat_cloud_keeps_streaming_path( + self, + sample_active_session: AgentSession, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Regression guard for #342 — a cloud provider keeps the run_stream path.""" + service = AgentService() + monkeypatch.setattr(service.settings, "agent_default_model", "anthropic:claude-test") + mock_db = AsyncMock() + mock_result = MagicMock() + mock_result.scalar_one_or_none.return_value = sample_active_session + mock_db.execute.return_value = mock_result + + class _StubStream: + async def __aenter__(self) -> MagicMock: + stream = MagicMock() + + async def _stream_text() -> AsyncIterator[str]: + yield "hello" + + stream.stream_text = _stream_text + stream.get_output = AsyncMock(return_value=None) + usage = MagicMock() + usage.total_tokens = 1 + stream.usage.return_value = usage + stream.all_messages.return_value = [] + return stream + + async def __aexit__(self, *exc: object) -> bool: + return False + + mock_agent = MagicMock() + mock_agent.run_stream = MagicMock(return_value=_StubStream()) + mock_agent.run = AsyncMock( + side_effect=AssertionError("run must not be called for a cloud provider") + ) + + with patch.object(service, "_get_agent", return_value=mock_agent): + events = [ + event + async for event in service.stream_chat( + db=mock_db, + session_id=sample_active_session.session_id, + message="hello", + ) + ] + + mock_agent.run_stream.assert_called_once() + mock_agent.run.assert_not_called() + assert any(e.event_type == "complete" for e in events) + class TestAgentServiceApproval: """Tests for approval workflow.""" @@ -871,9 +993,13 @@ async def test_stream_chat_emits_approval_required_from_deps( self, sample_active_session: AgentSession, sample_experiment_report: ExperimentReport, + monkeypatch: pytest.MonkeyPatch, ) -> None: """stream_chat() must emit approval_required from deps.pending_action.""" service = AgentService() + # Pin a streaming-capable (cloud) provider so this exercises the + # run_stream path regardless of the local .env (#342). + monkeypatch.setattr(service.settings, "agent_default_model", "anthropic:claude-test") mock_db = AsyncMock() mock_result = MagicMock() mock_result.scalar_one_or_none.return_value = sample_active_session