From c650a25c3cf65ebf81c8622f67752f50f0819b3c Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Thu, 11 Jun 2026 15:27:35 +0000 Subject: [PATCH 1/3] Deflake child process cleanup tests with polling instead of fixed sleeps The TestChildProcessCleanup tests asserted that spawned writers had started after fixed sleeps (0.5s startup, 0.3s observation window). On loaded CI runners the child interpreter can take longer than that to boot, so the "child should be writing" assertion failed with "assert 0 > 0" before the cleanup logic under test ever ran. Replace the fixed sleeps with bounded polling: - _wait_for_first_write polls until a marker file has grown, proving the writer reached its write loop, with a 15s timeout. - _wait_for_writes_to_stop polls until two samples taken 0.3s apart (3x the writers' 0.1s write interval) observe the same size; if the file never stops growing the timeout fails the test, so a genuine cleanup failure is still reported. Also terminate the spawned process tree in each test's finally block, so a failed assertion can no longer leak a running process tree, and collect garbage before leaving each test so subprocess transports are finalized while the test's ResourceWarning filters are still active. Removing the unconditional sleeps also makes the tests faster. --- tests/client/test_stdio.py | 354 ++++++++++++++++++------------------- 1 file changed, 176 insertions(+), 178 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index ba58da7321..42be287a49 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -1,4 +1,5 @@ import errno +import gc import os import shutil import sys @@ -10,7 +11,12 @@ import pytest from mcp.client.session import ClientSession -from mcp.client.stdio import StdioServerParameters, _create_platform_compatible_process, stdio_client +from mcp.client.stdio import ( + StdioServerParameters, + _create_platform_compatible_process, + _terminate_process_tree, + stdio_client, +) from mcp.shared.exceptions import McpError from mcp.shared.message import SessionMessage from mcp.types import CONNECTION_CLOSED, JSONRPCMessage, JSONRPCRequest, JSONRPCResponse @@ -219,6 +225,36 @@ def sigint_handler(signum, frame): raise +async def _wait_for_first_write(path: str) -> None: + """Poll until the file at *path* exists and has grown beyond its initial empty state. + + The marker files below are created empty before the writer is spawned, so any + growth proves the writing process booted and reached its write loop. Polling + replaces fixed startup sleeps, which flake on loaded machines where interpreter + startup can exceed any fixed window. Bounded so a writer that never starts + fails the test instead of hanging it. + """ + with anyio.fail_after(15): + while not os.path.exists(path) or os.path.getsize(path) == 0: + await anyio.sleep(0.05) + + +async def _wait_for_writes_to_stop(path: str) -> None: + """Poll until the file at *path* stops growing. + + Returns once two consecutive samples taken 0.3 seconds apart (three times the + writers' 0.1 second write interval) observe the same size. The sentinel forces + at least one full sampling interval before the first comparison. If the file + never stops growing, the timeout fails the test: a writer that survives + _terminate_process_tree is a genuine cleanup failure that must not be masked. + """ + last_size = -1 + with anyio.fail_after(15): + while os.path.getsize(path) != last_size: + last_size = os.path.getsize(path) + await anyio.sleep(0.3) + + class TestChildProcessCleanup: """ Tests for child process cleanup functionality using _terminate_process_tree. @@ -259,84 +295,66 @@ async def test_basic_child_process_cleanup(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: parent_marker = f.name - try: - # Parent script that spawns a child process - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import os - - # Mark that parent started - with open({escape_path_for_python(parent_marker)}, 'w') as f: - f.write('parent started\\n') - - # Child script that writes continuously - child_script = f''' - import time - with open({escape_path_for_python(marker_file)}, 'a') as f: - while True: - f.write(f"{time.time()}") - f.flush() - time.sleep(0.1) - ''' - - # Start the child process - child = subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent just sleeps + # Parent script that spawns a child process + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + import os + + # Mark that parent started + with open({escape_path_for_python(parent_marker)}, 'w') as f: + f.write('parent started\\n') + + # Child script that writes continuously + child_script = f''' + import time + with open({escape_path_for_python(marker_file)}, 'a') as f: while True: + f.write(f"{time.time()}") + f.flush() time.sleep(0.1) - """ - ) + ''' - print("\nStarting child process termination test...") + # Start the child process + child = subprocess.Popen([sys.executable, '-c', child_script]) - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + # Parent just sleeps + while True: + time.sleep(0.1) + """ + ) - # Wait for processes to start - await anyio.sleep(0.5) + # Start the parent process + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - # Verify parent started - assert os.path.exists(parent_marker), "Parent process didn't start" + try: + # Wait for the parent to start and the child to reach its write loop + await _wait_for_first_write(parent_marker) + assert os.path.getsize(parent_marker) > 0, "Parent process didn't start" - # Verify child is writing - if os.path.exists(marker_file): # pragma: no branch - initial_size = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size_after_wait = os.path.getsize(marker_file) - assert size_after_wait > initial_size, "Child process should be writing" - print(f"Child is writing (file grew from {initial_size} to {size_after_wait} bytes)") + await _wait_for_first_write(marker_file) + assert os.path.getsize(marker_file) > 0, "Child process should be writing" # Terminate using our function - print("Terminating process and children...") - from mcp.client.stdio import _terminate_process_tree - await _terminate_process_tree(proc) - # Verify processes stopped - await anyio.sleep(0.5) - if os.path.exists(marker_file): # pragma: no branch - size_after_cleanup = os.path.getsize(marker_file) - await anyio.sleep(0.5) - final_size = os.path.getsize(marker_file) - - print(f"After cleanup: file size {size_after_cleanup} -> {final_size}") - assert final_size == size_after_cleanup, ( - f"Child process still running! File grew by {final_size - size_after_cleanup} bytes" - ) - - print("SUCCESS: Child process was properly terminated") - + # Verify the child stopped writing; a survivor times out and fails the test + await _wait_for_writes_to_stop(marker_file) finally: + # Terminate again so no failure above can leak the spawned tree + # (safe: _terminate_process_tree tolerates an already-dead tree) + await _terminate_process_tree(proc) # Clean up files for f in [marker_file, parent_marker]: try: os.unlink(f) except OSError: # pragma: no cover pass + # Collect subprocess transports now, while this test's warning filters + # are active, so GC-time ResourceWarnings cannot hit a later test + gc.collect() @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") @@ -353,88 +371,78 @@ async def test_nested_process_tree(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f3: grandchild_file = f3.name - try: - # Simple nested process tree test - # We create parent -> child -> grandchild, each writing to a file - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import os - - # Child will spawn grandchild and write to child file - child_script = f'''import subprocess - import sys - import time - - # Grandchild just writes to file - grandchild_script = \"\"\"import time - with open({escape_path_for_python(grandchild_file)}, 'a') as f: - while True: - f.write(f"gc {{time.time()}}") - f.flush() - time.sleep(0.1)\"\"\" - - # Spawn grandchild - subprocess.Popen([sys.executable, '-c', grandchild_script]) - - # Child writes to its file - with open({escape_path_for_python(child_file)}, 'a') as f: - while True: - f.write(f"c {time.time()}") - f.flush() - time.sleep(0.1)''' - - # Spawn child process - subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent writes to its file - with open({escape_path_for_python(parent_file)}, 'a') as f: - while True: - f.write(f"p {time.time()}") - f.flush() - time.sleep(0.1) - """ - ) + # Simple nested process tree test + # We create parent -> child -> grandchild, each writing to a file + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + import os + + # Child will spawn grandchild and write to child file + child_script = f'''import subprocess + import sys + import time + + # Grandchild just writes to file + grandchild_script = \"\"\"import time + with open({escape_path_for_python(grandchild_file)}, 'a') as f: + while True: + f.write(f"gc {{time.time()}}") + f.flush() + time.sleep(0.1)\"\"\" - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + # Spawn grandchild + subprocess.Popen([sys.executable, '-c', grandchild_script]) - # Let all processes start - await anyio.sleep(1.0) + # Child writes to its file + with open({escape_path_for_python(child_file)}, 'a') as f: + while True: + f.write(f"c {time.time()}") + f.flush() + time.sleep(0.1)''' - # Verify all are writing - for file_path, name in [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")]: - if os.path.exists(file_path): # pragma: no branch - initial_size = os.path.getsize(file_path) - await anyio.sleep(0.3) - new_size = os.path.getsize(file_path) - assert new_size > initial_size, f"{name} process should be writing" + # Spawn child process + subprocess.Popen([sys.executable, '-c', child_script]) - # Terminate the whole tree - from mcp.client.stdio import _terminate_process_tree + # Parent writes to its file + with open({escape_path_for_python(parent_file)}, 'a') as f: + while True: + f.write(f"p {time.time()}") + f.flush() + time.sleep(0.1) + """ + ) - await _terminate_process_tree(proc) + # Start the parent process + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - # Verify all stopped - await anyio.sleep(0.5) + try: + # Wait for every level of the tree to reach its write loop for file_path, name in [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")]: - if os.path.exists(file_path): # pragma: no branch - size1 = os.path.getsize(file_path) - await anyio.sleep(0.3) - size2 = os.path.getsize(file_path) - assert size1 == size2, f"{name} still writing after cleanup!" + await _wait_for_first_write(file_path) + assert os.path.getsize(file_path) > 0, f"{name} process should be writing" - print("SUCCESS: All processes in tree terminated") + # Terminate the whole tree + await _terminate_process_tree(proc) + # Verify every level stopped writing; a survivor times out and fails the test + for file_path in (parent_file, child_file, grandchild_file): + await _wait_for_writes_to_stop(file_path) finally: + # Terminate again so no failure above can leak the spawned tree + # (safe: _terminate_process_tree tolerates an already-dead tree) + await _terminate_process_tree(proc) # Clean up all marker files for f in [parent_file, child_file, grandchild_file]: try: os.unlink(f) except OSError: # pragma: no cover pass + # Collect subprocess transports now, while this test's warning filters + # are active, so GC-time ResourceWarnings cannot hit a later test + gc.collect() @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") @@ -448,72 +456,62 @@ async def test_early_parent_exit(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: marker_file = f.name - try: - # Parent that spawns child and waits briefly - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import signal - - # Child that continues running - child_script = f'''import time - with open({escape_path_for_python(marker_file)}, 'a') as f: - while True: - f.write(f"child {time.time()}") - f.flush() - time.sleep(0.1)''' - - # Start child in same process group - subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent waits a bit then exits on SIGTERM - def handle_term(sig, frame): - sys.exit(0) - - signal.signal(signal.SIGTERM, handle_term) - - # Wait + # Parent that spawns child and waits briefly + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + import signal + + # Child that continues running + child_script = f'''import time + with open({escape_path_for_python(marker_file)}, 'a') as f: while True: - time.sleep(0.1) - """ - ) + f.write(f"child {time.time()}") + f.flush() + time.sleep(0.1)''' - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + # Start child in same process group + subprocess.Popen([sys.executable, '-c', child_script]) - # Let child start writing - await anyio.sleep(0.5) + # Parent waits a bit then exits on SIGTERM + def handle_term(sig, frame): + sys.exit(0) - # Verify child is writing - if os.path.exists(marker_file): # pragma: no cover - size1 = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size2 = os.path.getsize(marker_file) - assert size2 > size1, "Child should be writing" + signal.signal(signal.SIGTERM, handle_term) - # Terminate - this will kill the process group even if parent exits first - from mcp.client.stdio import _terminate_process_tree + # Wait + while True: + time.sleep(0.1) + """ + ) - await _terminate_process_tree(proc) + # Start the parent process + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - # Verify child stopped - await anyio.sleep(0.5) - if os.path.exists(marker_file): # pragma: no branch - size3 = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size4 = os.path.getsize(marker_file) - assert size3 == size4, "Child should be terminated" + try: + # Wait for the child to reach its write loop + await _wait_for_first_write(marker_file) + assert os.path.getsize(marker_file) > 0, "Child should be writing" - print("SUCCESS: Child terminated even with parent exit during cleanup") + # Terminate - this will kill the process group even if parent exits first + await _terminate_process_tree(proc) + # Verify the child stopped writing; a survivor times out and fails the test + await _wait_for_writes_to_stop(marker_file) finally: + # Terminate again so no failure above can leak the spawned tree + # (safe: _terminate_process_tree tolerates an already-dead tree) + await _terminate_process_tree(proc) # Clean up marker file try: os.unlink(marker_file) except OSError: # pragma: no cover pass + # Collect subprocess transports now, while this test's warning filters + # are active, so GC-time ResourceWarnings cannot hit a later test + gc.collect() @pytest.mark.anyio From 8fe1f69f8c09070f0a616b0e3f1a55346dda6507 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Thu, 11 Jun 2026 15:40:01 +0000 Subject: [PATCH 2/3] Harden stop detection and avoid redundant tree termination in cleanup tests Two follow-up fixes to the child process cleanup tests: Require three consecutive stable samples before declaring writes stopped. The previous check exited on the first pair of samples 0.3s apart with no growth, retrying within the 15s budget, which made it easier for a CPU-starved (but alive) writer to be mistaken for a terminated one. The counter resets on any observed growth, and a file that never stops growing still fails the test via the timeout. Only re-terminate the process tree in the finally block if the test failed before reaching its own _terminate_process_tree call. The unconditional second call ran termination against an already-closed job object handle on Windows and logged a spurious fallback warning on POSIX in every passing run. The skipped branch only executes on failing runs, so it is excluded from coverage. --- tests/client/test_stdio.py | 41 +++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 42be287a49..4a93f998b9 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -242,16 +242,26 @@ async def _wait_for_first_write(path: str) -> None: async def _wait_for_writes_to_stop(path: str) -> None: """Poll until the file at *path* stops growing. - Returns once two consecutive samples taken 0.3 seconds apart (three times the - writers' 0.1 second write interval) observe the same size. The sentinel forces - at least one full sampling interval before the first comparison. If the file + Returns once the size is unchanged across three successive 0.3 second gaps + (each three times the writers' 0.1 second write interval), so a writer that + is merely starved of CPU for a single gap is not mistaken for a terminated + one. Any observed growth resets the consecutive-stable counter. The sentinel + forces at least one non-stable iteration before counting starts. If the file never stops growing, the timeout fails the test: a writer that survives _terminate_process_tree is a genuine cleanup failure that must not be masked. """ last_size = -1 + stable_pairs = 0 with anyio.fail_after(15): - while os.path.getsize(path) != last_size: - last_size = os.path.getsize(path) + while True: + current_size = os.path.getsize(path) + if current_size == last_size: + stable_pairs += 1 + else: + stable_pairs = 0 + last_size = current_size + if stable_pairs == 3: + return await anyio.sleep(0.3) @@ -328,6 +338,7 @@ async def test_basic_child_process_cleanup(self): # Start the parent process proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + tree_killed = False try: # Wait for the parent to start and the child to reach its write loop @@ -339,13 +350,13 @@ async def test_basic_child_process_cleanup(self): # Terminate using our function await _terminate_process_tree(proc) + tree_killed = True # Verify the child stopped writing; a survivor times out and fails the test await _wait_for_writes_to_stop(marker_file) finally: - # Terminate again so no failure above can leak the spawned tree - # (safe: _terminate_process_tree tolerates an already-dead tree) - await _terminate_process_tree(proc) + if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight + await _terminate_process_tree(proc) # Clean up files for f in [marker_file, parent_marker]: try: @@ -417,6 +428,7 @@ async def test_nested_process_tree(self): # Start the parent process proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + tree_killed = False try: # Wait for every level of the tree to reach its write loop @@ -426,14 +438,14 @@ async def test_nested_process_tree(self): # Terminate the whole tree await _terminate_process_tree(proc) + tree_killed = True # Verify every level stopped writing; a survivor times out and fails the test for file_path in (parent_file, child_file, grandchild_file): await _wait_for_writes_to_stop(file_path) finally: - # Terminate again so no failure above can leak the spawned tree - # (safe: _terminate_process_tree tolerates an already-dead tree) - await _terminate_process_tree(proc) + if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight + await _terminate_process_tree(proc) # Clean up all marker files for f in [parent_file, child_file, grandchild_file]: try: @@ -489,6 +501,7 @@ def handle_term(sig, frame): # Start the parent process proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + tree_killed = False try: # Wait for the child to reach its write loop @@ -497,13 +510,13 @@ def handle_term(sig, frame): # Terminate - this will kill the process group even if parent exits first await _terminate_process_tree(proc) + tree_killed = True # Verify the child stopped writing; a survivor times out and fails the test await _wait_for_writes_to_stop(marker_file) finally: - # Terminate again so no failure above can leak the spawned tree - # (safe: _terminate_process_tree tolerates an already-dead tree) - await _terminate_process_tree(proc) + if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight + await _terminate_process_tree(proc) # Clean up marker file try: os.unlink(marker_file) From 903cc561c4c77ef5035d8f389881d31e6b34bfbf Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Thu, 11 Jun 2026 16:24:06 +0000 Subject: [PATCH 3/3] Reap and close spawned processes inside the child cleanup tests The cleanup tests killed the spawned process tree but never awaited the process or closed its pipe streams. The asyncio subprocess transports stayed referenced by the per-test event loop, became garbage once that loop closed, and were finalized during a later test on the same worker. Under filterwarnings=error the resulting ResourceWarning fails whichever test happens to be running; on Windows the warning itself can die inside the transport repr with 'I/O operation on closed pipe'. A gc.collect() inside the test cannot help because the transports are still referenced by the live event loop at that point. After the stop-of-writes check confirms the tree is dead, each test now waits on the process (bounded, tolerant of the already-exited process), closes stdin, drains stdout to EOF so the event loop observes the pipe closure and can close the subprocess transport, then closes stdout. The same disposal runs in the failure-path cleanup after its kill. Draining works for both process flavors used on the platforms (anyio Process and the Windows FallbackProcess, which has no aclose), and EOF is guaranteed because every process that inherited the pipe handle is already dead. --- tests/client/test_stdio.py | 47 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 4a93f998b9..987a3ca486 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -9,6 +9,7 @@ import anyio import pytest +from anyio.abc import Process from mcp.client.session import ClientSession from mcp.client.stdio import ( @@ -17,6 +18,7 @@ _terminate_process_tree, stdio_client, ) +from mcp.os.win32.utilities import FallbackProcess from mcp.shared.exceptions import McpError from mcp.shared.message import SessionMessage from mcp.types import CONNECTION_CLOSED, JSONRPCMessage, JSONRPCRequest, JSONRPCResponse @@ -265,6 +267,39 @@ async def _wait_for_writes_to_stop(path: str) -> None: await anyio.sleep(0.3) +async def _dispose_process(proc: Process | FallbackProcess) -> None: + """Reap a dead process and close its pipe streams inside the test that spawned it. + + Without this, the subprocess transports stay referenced by the per-test event + loop, become garbage only after that loop closes, and their GC-time + ResourceWarnings fire during a later test on the same worker (on Windows + proactor the warning can itself die in __repr__ on a closed pipe). An in-test + gc.collect() cannot catch that, so the process is reaped and closed + deterministically here. Draining stdout to EOF guarantees the event loop has + observed the pipe closure (anyio's reader aclose alone does not close the + underlying transport), which lets asyncio close the subprocess transport + before the test returns. + + Precondition: the WHOLE process tree must already be confirmed dead. wait() + tolerates an already-exited process and returns promptly, but on the Windows + fallback path it runs popen.wait in a thread and is effectively uncancellable, + and stdout only reaches EOF once every tree member that inherited the pipe + handle is gone. The timeout fails the test rather than hanging it if that + precondition is ever violated. + """ + with anyio.fail_after(15): + await proc.wait() + assert proc.stdin is not None + await proc.stdin.aclose() + assert proc.stdout is not None + while True: + try: + await proc.stdout.receive() + except anyio.EndOfStream: + break + await proc.stdout.aclose() + + class TestChildProcessCleanup: """ Tests for child process cleanup functionality using _terminate_process_tree. @@ -354,9 +389,13 @@ async def test_basic_child_process_cleanup(self): # Verify the child stopped writing; a survivor times out and fails the test await _wait_for_writes_to_stop(marker_file) + + # Tree is dead: reap and close the process so nothing leaks into later tests + await _dispose_process(proc) finally: if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight await _terminate_process_tree(proc) + await _dispose_process(proc) # Clean up files for f in [marker_file, parent_marker]: try: @@ -443,9 +482,13 @@ async def test_nested_process_tree(self): # Verify every level stopped writing; a survivor times out and fails the test for file_path in (parent_file, child_file, grandchild_file): await _wait_for_writes_to_stop(file_path) + + # Tree is dead: reap and close the process so nothing leaks into later tests + await _dispose_process(proc) finally: if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight await _terminate_process_tree(proc) + await _dispose_process(proc) # Clean up all marker files for f in [parent_file, child_file, grandchild_file]: try: @@ -514,9 +557,13 @@ def handle_term(sig, frame): # Verify the child stopped writing; a survivor times out and fails the test await _wait_for_writes_to_stop(marker_file) + + # Tree is dead: reap and close the process so nothing leaks into later tests + await _dispose_process(proc) finally: if not tree_killed: # pragma: no cover - cleanup only reached when the test failed mid-flight await _terminate_process_tree(proc) + await _dispose_process(proc) # Clean up marker file try: os.unlink(marker_file)