diff --git a/pyproject.toml b/pyproject.toml index 9ff8e6815..cc486e26d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,10 @@ dependencies = [ "litellm>=1.60.0,<2.0.0", "logfire>=4.19.0", "psutil>=5.9.0", + # uvloop's C event loop has no self._ready.popleft() codepath, so the + # asyncpg engine-dispose race ("IndexError: pop from an empty deque") that + # crashes the Postgres backend cannot fire under it. Not available on Windows. + "uvloop>=0.21.0; sys_platform != 'win32'", ] [project.urls] diff --git a/src/basic_memory/alembic/env.py b/src/basic_memory/alembic/env.py index 700ecef50..153adb0ce 100644 --- a/src/basic_memory/alembic/env.py +++ b/src/basic_memory/alembic/env.py @@ -2,8 +2,11 @@ import asyncio import os +from contextlib import suppress from logging.config import fileConfig +from loguru import logger + # Allow nested event loops (needed for pytest-asyncio and other async contexts) # Note: nest_asyncio doesn't work with uvloop or Python 3.14+, so we handle those cases separately import sys @@ -13,9 +16,15 @@ import nest_asyncio nest_asyncio.apply() - except (ImportError, ValueError): - # nest_asyncio not available or can't patch this loop type (e.g., uvloop) - pass + except (ImportError, ValueError) as exc: + # Trigger: nest_asyncio is absent (ImportError) or refuses to patch the + # running loop (ValueError, e.g. the uvloop policy installed for the + # Postgres backend - #831/#877). + # Why: the uvloop ValueError is now an *expected* path on every Postgres + # startup, so swallowing it silently hides a routine branch. + # Outcome: log at DEBUG (observable, not noisy) and fall through to the + # thread-based migration fallback. + logger.debug(f"nest_asyncio not applied ({exc!r}); using thread-based migration fallback") # For Python 3.14+, we rely on the thread-based fallback in run_migrations_online() from sqlalchemy import engine_from_config, pool @@ -115,7 +124,15 @@ async def run_async_migrations(connectable): """Run migrations asynchronously with AsyncEngine.""" async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) - await connectable.dispose() + # Trigger: startup migrations on the asyncpg backend dispose the engine + # while the event loop may be tearing down around them. + # Why: that race surfaces "IndexError: pop from an empty deque" from + # base_events._run_once (#831/#877); shielding lets dispose finish atomically + # and suppressing CancelledError keeps a cancelled teardown from re-raising it. + # Outcome: the migration engine always disposes cleanly. (uvloop is the + # structural fix for the race; this hardens the teardown path.) + with suppress(asyncio.CancelledError): + await asyncio.shield(connectable.dispose()) def _run_async_migrations_with_asyncio_run(connectable) -> None: diff --git a/src/basic_memory/cli/app.py b/src/basic_memory/cli/app.py index 2e88c810a..5c8addc24 100644 --- a/src/basic_memory/cli/app.py +++ b/src/basic_memory/cli/app.py @@ -57,6 +57,14 @@ def app_callback( container = CliContainer.create() set_container(container) + # Trigger: Postgres backend resolved at CLI startup, before any asyncio.run(). + # Why: uvloop must own the event-loop policy before the loop is created so the + # asyncpg engine-dispose race (#831/#877) cannot fire. No-op for SQLite. + # Outcome: subsequent asyncio.run() calls in CLI commands use uvloop on Postgres. + from basic_memory.db import maybe_install_uvloop + + maybe_install_uvloop(container.config) + # Trigger: first-run init confirmation before command output. # Why: informational "initialized" message belongs above command results, not in the upsell panel. # Outcome: one-time plain line printed before the subcommand runs. diff --git a/src/basic_memory/cli/commands/mcp.py b/src/basic_memory/cli/commands/mcp.py index 9a0731142..9a256ac51 100644 --- a/src/basic_memory/cli/commands/mcp.py +++ b/src/basic_memory/cli/commands/mcp.py @@ -98,6 +98,18 @@ def _run_background_auto_update() -> None: if transport == "stdio": threading.Thread(target=_run_background_auto_update, daemon=True).start() + # Trigger: MCP server startup on the Postgres backend, before the transport + # creates its event loop. + # Why: the watcher/lifespan path runs startup migrations + engine.dispose() on + # asyncpg, which races stdlib asyncio teardown and crashes the container loop + # (#831/#877). uvloop's C scheduler structurally avoids that race and must own + # the loop policy before the loop is created. No-op for SQLite. + # Outcome: `basic-memory mcp` on Postgres runs on uvloop. (The CLI callback also + # installs it; this keeps the server startup seam explicit and self-contained.) + from basic_memory.db import maybe_install_uvloop + + maybe_install_uvloop(ConfigManager().config) + # Run the MCP server (blocks) # Lifespan handles: initialization, migrations, file sync, cleanup logger.info(f"Starting MCP server with {transport.upper()} transport") diff --git a/src/basic_memory/db.py b/src/basic_memory/db.py index 6133e9530..d9f56a05b 100644 --- a/src/basic_memory/db.py +++ b/src/basic_memory/db.py @@ -1,7 +1,7 @@ import asyncio import os import sys -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, suppress from enum import Enum, auto from pathlib import Path from typing import AsyncGenerator, Optional @@ -39,6 +39,44 @@ if sys.platform == "win32": # pragma: no cover asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + +def maybe_install_uvloop(config: BasicMemoryConfig) -> bool: + """Install the uvloop event-loop policy for the Postgres backend. + + Trigger: process entrypoint starting with database_backend == postgres, + uvloop importable, and a non-Windows platform. + Why: asyncpg engine teardown (engine.dispose()) races the stdlib asyncio + loop shutdown and surfaces "IndexError: pop from an empty deque" from + base_events._run_once (see #831/#877). uvloop's C scheduler has no + self._ready.popleft() codepath, so that class of crash cannot fire under it. + Outcome: Postgres deployments run on uvloop; SQLite users keep the default + loop (no behavior change, smaller blast radius). Must run before the event + loop is created, i.e. before asyncio.run(). + + Returns: + True if the uvloop policy was installed, False otherwise. + """ + # uvloop is not available on Windows; the default loop already differs there. + if sys.platform == "win32": # pragma: no cover + return False + + # Limit the change to the backend that actually hits the asyncpg dispose race. + if config.database_backend != DatabaseBackend.POSTGRES: + return False + + # Deferred import: uvloop is an optional, platform-gated dependency and the + # default (SQLite) path must not require it to be installed. + try: + import uvloop + except ImportError: # pragma: no cover + logger.warning("uvloop not available - using default event loop for Postgres backend") + return False + + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + logger.info("Installed uvloop event-loop policy for Postgres backend") + return True + + # Module level state _engine: Optional[AsyncEngine] = None _session_maker: Optional[async_sessionmaker[AsyncSession]] = None @@ -320,7 +358,15 @@ async def shutdown_db() -> None: # pragma: no cover global _engine, _session_maker if _engine: - await _engine.dispose() + # Trigger: teardown can run while the surrounding task is being cancelled + # (e.g. lifespan shutdown, unshielded CLI cleanup). + # Why: a cancellation landing mid-dispose surfaces the asyncpg + # "IndexError: pop from an empty deque" race (#831/#877); shielding lets + # dispose finish atomically, and suppressing CancelledError keeps a + # cancelled shutdown from re-raising the underlying race. + # Outcome: connections always close cleanly even under cancellation. + with suppress(asyncio.CancelledError): + await asyncio.shield(_engine.dispose()) _engine = None _session_maker = None @@ -364,7 +410,14 @@ async def engine_session_factory( yield created_engine, created_session_maker finally: - await created_engine.dispose() + # Trigger: context-manager teardown can run while the surrounding task is + # being cancelled (e.g. a test aborting mid-fixture). + # Why: on the asyncpg backend a cancellation landing mid-dispose surfaces + # the "IndexError: pop from an empty deque" race (#831/#877); shield the + # dispose and suppress CancelledError to match the other dispose seams. + # Outcome: the per-context engine always disposes cleanly under cancellation. + with suppress(asyncio.CancelledError): + await asyncio.shield(created_engine.dispose()) # Only clear module-level globals if they still point to this context's # engine/session. This avoids clobbering newer globals from other callers. @@ -433,7 +486,11 @@ async def run_migrations( # Trigger: run_migrations() created a temporary engine while module-level # session maker was not initialized. # Why: temporary aiosqlite worker threads can outlive CLI command execution - # and block process shutdown if the engine is not disposed. - # Outcome: always dispose temporary engines after migration work completes. + # and block process shutdown if the engine is not disposed. On the asyncpg + # backend a cancellation landing mid-dispose surfaces the same "IndexError: + # pop from an empty deque" race as the other dispose seams (#831/#877), so + # shield the dispose and suppress CancelledError to match them. + # Outcome: always dispose temporary engines cleanly, even under cancellation. if temp_engine is not None: - await temp_engine.dispose() + with suppress(asyncio.CancelledError): + await asyncio.shield(temp_engine.dispose()) diff --git a/test-int/test_postgres_dispose_cycle.py b/test-int/test_postgres_dispose_cycle.py new file mode 100644 index 000000000..05adf72b5 --- /dev/null +++ b/test-int/test_postgres_dispose_cycle.py @@ -0,0 +1,69 @@ +"""Integration test for the asyncpg engine-dispose race (issue #831 / #877). + +On the Postgres backend (``postgresql+asyncpg``), the "open async engine -> work +-> await engine.dispose()" cycle could crash with +``IndexError: pop from an empty deque`` raised from SQLAlchemy's async dispose +machinery as it races event-loop teardown. This made Postgres + file watcher +unusable (container restart loop). + +These tests exercise the real create-engine -> query -> dispose cycle against a +live Postgres instance (testcontainers) for many iterations. The race is +timing-dependent, so we loop enough to be meaningful and assert clean +completion. They are skipped on SQLite, which is unaffected. + +Run with: + BASIC_MEMORY_TEST_POSTGRES=1 uv run pytest \ + test-int/test_postgres_dispose_cycle.py -q +""" + +import pytest +from sqlalchemy import text + +from basic_memory import db +from basic_memory.config import DatabaseBackend +from basic_memory.db import DatabaseType, _create_postgres_engine + + +@pytest.mark.asyncio +async def test_create_query_dispose_cycle_is_clean(app_config, db_backend): + """Repeatedly create -> query -> dispose an asyncpg engine without crashing.""" + if db_backend != "postgres": + pytest.skip("Postgres-specific test - asyncpg dispose race does not affect SQLite") + + db_url = DatabaseType.get_db_url(app_config.database_path, DatabaseType.POSTGRES, app_config) + + # Loop enough iterations to make the timing-dependent dispose race observable. + for _ in range(50): + engine = _create_postgres_engine(db_url, app_config) + async with engine.connect() as conn: + result = await conn.execute(text("SELECT 1")) + assert result.scalar() == 1 + # This dispose is the call site that raced loop teardown in #831/#877. + await engine.dispose() + + +@pytest.mark.asyncio +async def test_shutdown_db_dispose_is_clean(app_config, config_manager, db_backend): + """Repeated get_or_create_db -> query -> shutdown_db cycles complete cleanly. + + Exercises the shielded ``shutdown_db`` teardown path against asyncpg. The + autouse ``cleanup_global_db_after_test`` fixture in conftest restores module + state afterwards. + """ + if db_backend != "postgres": + pytest.skip("Postgres-specific test - asyncpg dispose race does not affect SQLite") + + assert app_config.database_backend == DatabaseBackend.POSTGRES + + for _ in range(25): + engine, session_maker = await db.get_or_create_db( + app_config.database_path, + db_type=DatabaseType.POSTGRES, + ensure_migrations=False, + config=app_config, + ) + async with db.scoped_session(session_maker) as session: + result = await session.execute(text("SELECT 1")) + assert result.scalar() == 1 + # Shielded dispose - must not surface the asyncpg deque race. + await db.shutdown_db() diff --git a/tests/cli/test_cli_exit.py b/tests/cli/test_cli_exit.py index 1cc02aeb2..a4e643df0 100644 --- a/tests/cli/test_cli_exit.py +++ b/tests/cli/test_cli_exit.py @@ -15,7 +15,11 @@ def test_bm_version_exits_cleanly(): ["uv", "run", "bm", "--version"], capture_output=True, text=True, - timeout=10, + # `uv run` startup under full-suite load (especially on Windows runners) + # can exceed a tight 10s budget even though --version short-circuits + # before any heavy work. This test guards against hangs, not a startup + # performance budget, so match the looser --help timeout below. + timeout=20, cwd=Path(__file__).parent.parent.parent, # Project root ) assert result.returncode == 0 diff --git a/tests/cli/test_cli_telemetry.py b/tests/cli/test_cli_telemetry.py index 1e3ab5ae3..884521df8 100644 --- a/tests/cli/test_cli_telemetry.py +++ b/tests/cli/test_cli_telemetry.py @@ -2,6 +2,7 @@ from __future__ import annotations +from types import SimpleNamespace from typing import Any, cast import logfire @@ -29,8 +30,16 @@ def test_app_callback_registers_command_operation(monkeypatch) -> None: resource = object() monkeypatch.setattr(cli_app, "init_cli_logging", lambda: None) - monkeypatch.setattr(cli_app.CliContainer, "create", staticmethod(lambda: object())) + # Container needs a `config` attribute because app_callback passes it to the + # uvloop policy installer; the helper itself is stubbed below for this + # telemetry-focused test. + monkeypatch.setattr( + cli_app.CliContainer, "create", staticmethod(lambda: SimpleNamespace(config=object())) + ) monkeypatch.setattr(cli_app, "set_container", lambda container: None) + # app_callback installs the uvloop policy for the Postgres backend; stub the + # helper so this telemetry test does not depend on event-loop policy state. + monkeypatch.setattr("basic_memory.db.maybe_install_uvloop", lambda config: False) monkeypatch.setattr(cli_app, "maybe_show_init_line", lambda command_name: None) monkeypatch.setattr(cli_app, "maybe_show_cloud_promo", lambda command_name: None) monkeypatch.setattr(cli_app, "maybe_run_periodic_auto_update", lambda command_name: None) diff --git a/tests/cli/test_db_reindex.py b/tests/cli/test_db_reindex.py index 16cf838e8..49cf2c696 100644 --- a/tests/cli/test_db_reindex.py +++ b/tests/cli/test_db_reindex.py @@ -9,6 +9,7 @@ from typer.testing import CliRunner from basic_memory.cli.app import app +from basic_memory.config import DatabaseBackend import basic_memory.cli.commands.db as db_cmd # noqa: F401 @@ -21,6 +22,8 @@ def _stub_app_config(*, semantic_search_enabled: bool = True) -> SimpleNamespace semantic_search_enabled=semantic_search_enabled, database_path=Path("/tmp/basic-memory.db"), get_project_mode=lambda project_name: None, + # app_callback reads this to decide whether to install the uvloop policy. + database_backend=DatabaseBackend.SQLITE, ) diff --git a/tests/db/test_uvloop_policy.py b/tests/db/test_uvloop_policy.py new file mode 100644 index 000000000..b093a5cd3 --- /dev/null +++ b/tests/db/test_uvloop_policy.py @@ -0,0 +1,83 @@ +"""Unit tests for the uvloop event-loop policy installer. + +Covers the structural fix for issue #831 / #877: the asyncpg engine-dispose +race ("IndexError: pop from an empty deque") is avoided by running the Postgres +backend on uvloop, whose C scheduler has no self._ready.popleft() codepath. + +These tests verify the gating logic of ``maybe_install_uvloop`` without touching +a real database. The global event-loop policy is saved and restored around each +test so installing uvloop here cannot leak into the rest of the suite. +""" + +import asyncio +import sys + +import pytest + +from basic_memory.config import BasicMemoryConfig, DatabaseBackend +from basic_memory.db import maybe_install_uvloop + + +@pytest.fixture +def restore_event_loop_policy(): + """Save/restore the global event-loop policy around a test.""" + original = asyncio.get_event_loop_policy() + try: + yield + finally: + asyncio.set_event_loop_policy(original) + + +def _postgres_config() -> BasicMemoryConfig: + return BasicMemoryConfig( + env="test", + database_backend=DatabaseBackend.POSTGRES, + database_url="postgresql+asyncpg://user:pass@localhost/db", + ) + + +def _sqlite_config() -> BasicMemoryConfig: + return BasicMemoryConfig(env="test", database_backend=DatabaseBackend.SQLITE) + + +@pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not available on Windows") +def test_installs_uvloop_for_postgres_backend(restore_event_loop_policy): + """uvloop policy is installed when backend is Postgres and uvloop is available.""" + import uvloop + + installed = maybe_install_uvloop(_postgres_config()) + + assert installed is True + assert isinstance(asyncio.get_event_loop_policy(), uvloop.EventLoopPolicy) + + +def test_no_uvloop_for_sqlite_backend(restore_event_loop_policy): + """SQLite users keep the default loop - the helper is a no-op.""" + before = asyncio.get_event_loop_policy() + + installed = maybe_install_uvloop(_sqlite_config()) + + assert installed is False + # Policy must be unchanged for the default (SQLite) path. + assert asyncio.get_event_loop_policy() is before + + +@pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not available on Windows") +def test_uvloop_unavailable_is_a_safe_noop(restore_event_loop_policy, monkeypatch): + """When uvloop cannot be imported the helper returns False without raising.""" + import builtins + + real_import = builtins.__import__ + + def _fail_uvloop_import(name, *args, **kwargs): + if name == "uvloop": + raise ImportError("simulated missing uvloop") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", _fail_uvloop_import) + + before = asyncio.get_event_loop_policy() + installed = maybe_install_uvloop(_postgres_config()) + + assert installed is False + assert asyncio.get_event_loop_policy() is before diff --git a/uv.lock b/uv.lock index b004a0194..51bb382b4 100644 --- a/uv.lock +++ b/uv.lock @@ -304,6 +304,7 @@ dependencies = [ { name = "sqlite-vec" }, { name = "typer" }, { name = "unidecode" }, + { name = "uvloop", marker = "sys_platform != 'win32'" }, { name = "watchfiles" }, ] @@ -370,6 +371,7 @@ requires-dist = [ { name = "sqlite-vec", specifier = ">=0.1.6" }, { name = "typer", specifier = ">=0.9.0" }, { name = "unidecode", specifier = ">=1.3.8" }, + { name = "uvloop", marker = "sys_platform != 'win32'", specifier = ">=0.21.0" }, { name = "watchfiles", specifier = ">=1.0.4" }, ]