Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
b880950
Deflake the session-level timeout test with trio's virtual clock (#2788)
maxisbey Jun 5, 2026
ea6a479
tests: avoid abandoned-async-generator warnings under the trio backend
maxisbey Jun 11, 2026
8d73896
tests: re-export StreamingASGITransport as the sanctioned bridge impo…
maxisbey Jun 11, 2026
6a6e7b7
Run transport security tests in process instead of over sockets (#2764)
maxisbey Jun 11, 2026
fd71a10
Filter known anyio stream teardown warnings in streamable HTTP securi…
maxisbey Jun 11, 2026
92c4fe0
Run SSE and Unicode transport tests in process instead of over socket…
maxisbey Jun 11, 2026
a94f3d5
Guard in-process HTTP tests against sse-starlette's global exit event
maxisbey Jun 11, 2026
43cf9ec
Run StreamableHTTP transport tests in process instead of over sockets…
maxisbey Jun 11, 2026
b80a9bd
Force per-test GC so leaked-stream warnings stay inside the scoped fi…
maxisbey Jun 11, 2026
9dc96d6
Guard streamable-HTTP test modules against sse-starlette's module-glo…
maxisbey Jun 11, 2026
600c96e
Extend the per-test GC fixture to the SSE and Unicode HTTP test modules
maxisbey Jun 11, 2026
39938a2
Harden the streaming ASGI bridge against trailing responses
maxisbey Jun 11, 2026
b732867
Verify SSE request-context isolation inside each connection block
maxisbey Jun 11, 2026
3e53636
Exclude the connection loop's exit arc from branch coverage
maxisbey Jun 11, 2026
16ceeea
Exclude post-teardown header checks from 3.11 coverage measurement
maxisbey Jun 11, 2026
e155e1b
Deflake child process cleanup tests with polling instead of fixed sleeps
maxisbey Jun 11, 2026
4a17006
Harden stop detection and avoid redundant tree termination in cleanup…
maxisbey Jun 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 153 additions & 161 deletions tests/client/test_http_unicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,80 @@
(server→client and client→server) using the streamable HTTP transport.
"""

import multiprocessing
import socket
from collections.abc import Generator
import gc
from collections.abc import AsyncIterator, Iterator
from contextlib import asynccontextmanager
from typing import Any

import httpx
import pytest
from sse_starlette.sse import AppStatus
from starlette.applications import Starlette
from starlette.routing import Mount

import mcp.types as types
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamable_http_client
from tests.test_helpers import wait_for_server
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import CallToolResult, TextContent, Tool
from tests.interaction.transports import StreamingASGITransport

# The in-process app is mounted at this origin purely so URLs are well-formed; nothing listens here.
BASE_URL = "http://127.0.0.1:8000"

# v1's streamable-HTTP server transport leaks a handful of anyio memory streams on teardown when
# run in process; the old subprocess harness never observed them. The interaction suite registers
# the same two scoped filters globally from tests/interaction/conftest.py (see the comment there),
# but they only take effect when that package's conftest is loaded; these markers keep the tests
# themselves passing in isolated runs. Markers are item-scoped, so the autouse
# `_collect_leaked_streams` fixture below garbage-collects each test's leaks inside its own
# teardown, where these filters apply; without it, leaks GC'd at session cleanup escape the
# scoped ignores. The filters are scoped to anyio's MemoryObject*Stream leak signature so an
# unrelated leak still fails the suite.
pytestmark = [
pytest.mark.filterwarnings("ignore:.*MemoryObject(Send|Receive)Stream:pytest.PytestUnraisableExceptionWarning"),
pytest.mark.filterwarnings("ignore:.*MemoryObject(Send|Receive)Stream:ResourceWarning"),
]


@pytest.fixture(autouse=True)
def _collect_leaked_streams() -> Iterator[None]:
"""Garbage-collect each test's leaked memory streams inside its own teardown.
The filterwarnings marks above only apply while a test in this file is the
active warning context. The leaked streams sit in reference cycles, so without
a forced collection their deallocator warnings fire wherever the garbage
collector happens to run next: during an unrelated test (failing it, since the
global ``filterwarnings = ["error"]`` has no ignore there) or at pytest's
session-unconfigure unraisable sweep (exit code 1 after all tests passed when
running without xdist, e.g. ``-n 0`` for ``--pdb`` debugging).
"""
yield
gc.collect()


@pytest.fixture(autouse=True)
def _reset_sse_starlette_exit_event() -> Iterator[None]:
"""Reset sse-starlette's module-global exit Event around each test.
sse-starlette <3.0 (allowed by this branch's dependency floor; CI's lowest-direct leg
installs it) stores an `anyio.Event` on the `AppStatus` class the first time an
`EventSourceResponse` runs; that Event is bound to the test's event loop and breaks every
subsequent in-process SSE response (and `json_response=False` below means every request
in this module is served as one). sse-starlette 3.x switched to a ContextVar and has no
such attribute. Resetting on both sides of the test keeps this module immune to a stale
Event left behind by an earlier test on the same worker as well as cleaning up after its
own. This mirrors the autouse fixtures in tests/shared/test_sse.py and
tests/interaction/conftest.py.
"""
if hasattr(AppStatus, "should_exit_event"): # pragma: no branch
# setattr keeps pyright happy: the locked sse-starlette 3.x has no such attribute.
setattr(AppStatus, "should_exit_event", None) # pragma: lax no cover
yield
if hasattr(AppStatus, "should_exit_event"): # pragma: no branch
setattr(AppStatus, "should_exit_event", None) # pragma: lax no cover


# Test constants with various Unicode characters
UNICODE_TEST_STRINGS = {
Expand All @@ -35,28 +100,12 @@
}


def run_unicode_server(port: int) -> None: # pragma: no cover
"""Run the Unicode test server in a separate process."""
# Import inside the function since this runs in a separate process
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any

import uvicorn
from starlette.applications import Starlette
from starlette.routing import Mount

import mcp.types as types
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import TextContent, Tool

# Need to recreate the server setup in this process
server = Server(name="unicode_test_server")
def make_unicode_server() -> Server[object, object]:
"""The Unicode echo server: tool and prompt contents that exercise non-ASCII round trips."""
server: Server[object, object] = Server(name="unicode_test_server")

@server.list_tools()
async def list_tools() -> list[Tool]:
"""List tools with Unicode descriptions."""
async def handle_list_tools() -> list[Tool]:
return [
Tool(
name="echo_unicode",
Expand All @@ -72,22 +121,12 @@ async def list_tools() -> list[Tool]:
]

@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any] | None) -> list[TextContent]:
"""Handle tool calls with Unicode content."""
if name == "echo_unicode":
text = arguments.get("text", "") if arguments else ""
return [
TextContent(
type="text",
text=f"Echo: {text}",
)
]
else:
raise ValueError(f"Unknown tool: {name}")
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> CallToolResult:
assert name == "echo_unicode"
return CallToolResult(content=[TextContent(type="text", text=f"Echo: {arguments['text']}")])

@server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
"""List prompts with Unicode names and descriptions."""
async def handle_list_prompts() -> list[types.Prompt]:
return [
types.Prompt(
name="unicode_prompt",
Expand All @@ -97,137 +136,90 @@ async def list_prompts() -> list[types.Prompt]:
]

@server.get_prompt()
async def get_prompt(name: str, arguments: dict[str, Any] | None) -> types.GetPromptResult:
"""Get a prompt with Unicode content."""
if name == "unicode_prompt":
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text="Hello世界🌍Привет안녕مرحباשלום",
),
)
]
)
raise ValueError(f"Unknown prompt: {name}")

# Create the session manager
session_manager = StreamableHTTPSessionManager(
app=server,
json_response=False, # Use SSE for testing
)

@asynccontextmanager
async def lifespan(app: Starlette) -> AsyncGenerator[None, None]:
async with session_manager.run():
yield

# Create an ASGI application
app = Starlette(
debug=True,
routes=[
Mount("/mcp", app=session_manager.handle_request),
],
lifespan=lifespan,
)

# Run the server
config = uvicorn.Config(
app=app,
host="127.0.0.1",
port=port,
log_level="error",
)
uvicorn_server = uvicorn.Server(config)
uvicorn_server.run()


@pytest.fixture
def unicode_server_port() -> int:
"""Find an available port for the Unicode test server."""
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]


@pytest.fixture
def running_unicode_server(unicode_server_port: int) -> Generator[str, None, None]:
"""Start a Unicode test server in a separate process."""
proc = multiprocessing.Process(target=run_unicode_server, kwargs={"port": unicode_server_port}, daemon=True)
proc.start()

# Wait for server to be ready
wait_for_server(unicode_server_port)

try:
yield f"http://127.0.0.1:{unicode_server_port}"
finally:
# Clean up - try graceful termination first
proc.terminate()
proc.join(timeout=2)
if proc.is_alive(): # pragma: no cover
proc.kill()
proc.join(timeout=1)
async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult:
assert name == "unicode_prompt"
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(type="text", text="Hello世界🌍Привет안녕مرحباשלום"),
)
]
)

return server


@asynccontextmanager
async def unicode_session() -> AsyncIterator[ClientSession]:
"""Yield an initialized ClientSession speaking streamable HTTP (SSE responses) to the
Unicode test server, entirely in process."""
# SSE response mode, so Unicode rides the SSE event encoding rather than a plain JSON body.
session_manager = StreamableHTTPSessionManager(app=make_unicode_server(), json_response=False)
app = Starlette(routes=[Mount("/mcp", app=session_manager.handle_request)])

async with (
session_manager.run(),
# follow_redirects matches the SDK's own client factory; Starlette's Mount 307-redirects
# the bare /mcp path to /mcp/.
httpx.AsyncClient(
transport=StreamingASGITransport(app), base_url=BASE_URL, follow_redirects=True
) as http_client,
streamable_http_client(f"{BASE_URL}/mcp", http_client=http_client) as (
read_stream,
write_stream,
_get_session_id,
),
ClientSession(read_stream, write_stream) as session,
):
await session.initialize()
yield session


@pytest.mark.anyio
async def test_streamable_http_client_unicode_tool_call(running_unicode_server: str) -> None:
async def test_streamable_http_client_unicode_tool_call() -> None:
"""Test that Unicode text is correctly handled in tool calls via streamable HTTP."""
base_url = running_unicode_server
endpoint_url = f"{base_url}/mcp"

async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

# Test 1: List tools (server→client Unicode in descriptions)
tools = await session.list_tools()
assert len(tools.tools) == 1
async with unicode_session() as session:
# Test 1: List tools (server→client Unicode in descriptions)
tools = await session.list_tools()
assert len(tools.tools) == 1

# Check Unicode in tool descriptions
echo_tool = tools.tools[0]
assert echo_tool.name == "echo_unicode"
assert echo_tool.description is not None
assert "🔤" in echo_tool.description
assert "👋" in echo_tool.description
# Check Unicode in tool descriptions
echo_tool = tools.tools[0]
assert echo_tool.name == "echo_unicode"
assert echo_tool.description is not None
assert "🔤" in echo_tool.description
assert "👋" in echo_tool.description

# Test 2: Send Unicode text in tool call (client→server→client)
for test_name, test_string in UNICODE_TEST_STRINGS.items():
result = await session.call_tool("echo_unicode", arguments={"text": test_string})
# Test 2: Send Unicode text in tool call (client→server→client)
for test_name, test_string in UNICODE_TEST_STRINGS.items():
result = await session.call_tool("echo_unicode", arguments={"text": test_string})

# Verify server correctly received and echoed back Unicode
assert len(result.content) == 1
content = result.content[0]
assert content.type == "text"
assert f"Echo: {test_string}" == content.text, f"Failed for {test_name}"
# Verify server correctly received and echoed back Unicode
assert len(result.content) == 1
content = result.content[0]
assert content.type == "text"
assert f"Echo: {test_string}" == content.text, f"Failed for {test_name}"


@pytest.mark.anyio
async def test_streamable_http_client_unicode_prompts(running_unicode_server: str) -> None:
async def test_streamable_http_client_unicode_prompts() -> None:
"""Test that Unicode text is correctly handled in prompts via streamable HTTP."""
base_url = running_unicode_server
endpoint_url = f"{base_url}/mcp"

async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

# Test 1: List prompts (server→client Unicode in descriptions)
prompts = await session.list_prompts()
assert len(prompts.prompts) == 1

prompt = prompts.prompts[0]
assert prompt.name == "unicode_prompt"
assert prompt.description is not None
assert "Слой хранилища, где располагаются" in prompt.description

# Test 2: Get prompt with Unicode content (server→client)
result = await session.get_prompt("unicode_prompt", arguments={})
assert len(result.messages) == 1

message = result.messages[0]
assert message.role == "user"
assert message.content.type == "text"
assert message.content.text == "Hello世界🌍Привет안녕مرحباשלום"
async with unicode_session() as session:
# Test 1: List prompts (server→client Unicode in descriptions)
prompts = await session.list_prompts()
assert len(prompts.prompts) == 1

prompt = prompts.prompts[0]
assert prompt.name == "unicode_prompt"
assert prompt.description is not None
assert "Слой хранилища, где располагаются" in prompt.description

# Test 2: Get prompt with Unicode content (server→client)
result = await session.get_prompt("unicode_prompt", arguments={})
assert len(result.messages) == 1

message = result.messages[0]
assert message.role == "user"
assert message.content.type == "text"
assert message.content.text == "Hello世界🌍Привет안녕مرحباשלום"
Loading
Loading