Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ dependencies = [
"litellm>=1.60.0,<2.0.0",
"logfire>=4.19.0",
"psutil>=5.9.0",
# uvloop's C event loop has no self._ready.popleft() codepath, so the
# asyncpg engine-dispose race ("IndexError: pop from an empty deque") that
# crashes the Postgres backend cannot fire under it. Not available on Windows.
"uvloop>=0.21.0; sys_platform != 'win32'",
]

[project.urls]
Expand Down
25 changes: 21 additions & 4 deletions src/basic_memory/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import asyncio
import os
from contextlib import suppress
from logging.config import fileConfig

from loguru import logger

# Allow nested event loops (needed for pytest-asyncio and other async contexts)
# Note: nest_asyncio doesn't work with uvloop or Python 3.14+, so we handle those cases separately
import sys
Expand All @@ -13,9 +16,15 @@
import nest_asyncio

nest_asyncio.apply()
except (ImportError, ValueError):
# nest_asyncio not available or can't patch this loop type (e.g., uvloop)
pass
except (ImportError, ValueError) as exc:
# Trigger: nest_asyncio is absent (ImportError) or refuses to patch the
# running loop (ValueError, e.g. the uvloop policy installed for the
# Postgres backend - #831/#877).
# Why: the uvloop ValueError is now an *expected* path on every Postgres
# startup, so swallowing it silently hides a routine branch.
# Outcome: log at DEBUG (observable, not noisy) and fall through to the
# thread-based migration fallback.
logger.debug(f"nest_asyncio not applied ({exc!r}); using thread-based migration fallback")
# For Python 3.14+, we rely on the thread-based fallback in run_migrations_online()

from sqlalchemy import engine_from_config, pool
Expand Down Expand Up @@ -115,7 +124,15 @@ async def run_async_migrations(connectable):
"""Run migrations asynchronously with AsyncEngine."""
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
# Trigger: startup migrations on the asyncpg backend dispose the engine
# while the event loop may be tearing down around them.
# Why: that race surfaces "IndexError: pop from an empty deque" from
# base_events._run_once (#831/#877); shielding lets dispose finish atomically
# and suppressing CancelledError keeps a cancelled teardown from re-raising it.
# Outcome: the migration engine always disposes cleanly. (uvloop is the
# structural fix for the race; this hardens the teardown path.)
with suppress(asyncio.CancelledError):
await asyncio.shield(connectable.dispose())


def _run_async_migrations_with_asyncio_run(connectable) -> None:
Expand Down
8 changes: 8 additions & 0 deletions src/basic_memory/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ def app_callback(
container = CliContainer.create()
set_container(container)

# Trigger: Postgres backend resolved at CLI startup, before any asyncio.run().
# Why: uvloop must own the event-loop policy before the loop is created so the
# asyncpg engine-dispose race (#831/#877) cannot fire. No-op for SQLite.
# Outcome: subsequent asyncio.run() calls in CLI commands use uvloop on Postgres.
from basic_memory.db import maybe_install_uvloop

maybe_install_uvloop(container.config)

# Trigger: first-run init confirmation before command output.
# Why: informational "initialized" message belongs above command results, not in the upsell panel.
# Outcome: one-time plain line printed before the subcommand runs.
Expand Down
12 changes: 12 additions & 0 deletions src/basic_memory/cli/commands/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ def _run_background_auto_update() -> None:
if transport == "stdio":
threading.Thread(target=_run_background_auto_update, daemon=True).start()

# Trigger: MCP server startup on the Postgres backend, before the transport
# creates its event loop.
# Why: the watcher/lifespan path runs startup migrations + engine.dispose() on
# asyncpg, which races stdlib asyncio teardown and crashes the container loop
# (#831/#877). uvloop's C scheduler structurally avoids that race and must own
# the loop policy before the loop is created. No-op for SQLite.
# Outcome: `basic-memory mcp` on Postgres runs on uvloop. (The CLI callback also
# installs it; this keeps the server startup seam explicit and self-contained.)
from basic_memory.db import maybe_install_uvloop

maybe_install_uvloop(ConfigManager().config)

# Run the MCP server (blocks)
# Lifespan handles: initialization, migrations, file sync, cleanup
logger.info(f"Starting MCP server with {transport.upper()} transport")
Expand Down
69 changes: 63 additions & 6 deletions src/basic_memory/db.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import os
import sys
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, suppress
from enum import Enum, auto
from pathlib import Path
from typing import AsyncGenerator, Optional
Expand Down Expand Up @@ -39,6 +39,44 @@
if sys.platform == "win32": # pragma: no cover
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())


def maybe_install_uvloop(config: BasicMemoryConfig) -> bool:
"""Install the uvloop event-loop policy for the Postgres backend.

Trigger: process entrypoint starting with database_backend == postgres,
uvloop importable, and a non-Windows platform.
Why: asyncpg engine teardown (engine.dispose()) races the stdlib asyncio
loop shutdown and surfaces "IndexError: pop from an empty deque" from
base_events._run_once (see #831/#877). uvloop's C scheduler has no
self._ready.popleft() codepath, so that class of crash cannot fire under it.
Outcome: Postgres deployments run on uvloop; SQLite users keep the default
loop (no behavior change, smaller blast radius). Must run before the event
loop is created, i.e. before asyncio.run().

Returns:
True if the uvloop policy was installed, False otherwise.
"""
# uvloop is not available on Windows; the default loop already differs there.
if sys.platform == "win32": # pragma: no cover
return False

# Limit the change to the backend that actually hits the asyncpg dispose race.
if config.database_backend != DatabaseBackend.POSTGRES:
return False

# Deferred import: uvloop is an optional, platform-gated dependency and the
# default (SQLite) path must not require it to be installed.
try:
import uvloop
except ImportError: # pragma: no cover
logger.warning("uvloop not available - using default event loop for Postgres backend")
return False

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("Installed uvloop event-loop policy for Postgres backend")
return True


# Module level state
_engine: Optional[AsyncEngine] = None
_session_maker: Optional[async_sessionmaker[AsyncSession]] = None
Expand Down Expand Up @@ -320,7 +358,15 @@ async def shutdown_db() -> None: # pragma: no cover
global _engine, _session_maker

if _engine:
await _engine.dispose()
# Trigger: teardown can run while the surrounding task is being cancelled
# (e.g. lifespan shutdown, unshielded CLI cleanup).
# Why: a cancellation landing mid-dispose surfaces the asyncpg
# "IndexError: pop from an empty deque" race (#831/#877); shielding lets
# dispose finish atomically, and suppressing CancelledError keeps a
# cancelled shutdown from re-raising the underlying race.
# Outcome: connections always close cleanly even under cancellation.
with suppress(asyncio.CancelledError):
await asyncio.shield(_engine.dispose())
_engine = None
_session_maker = None

Expand Down Expand Up @@ -364,7 +410,14 @@ async def engine_session_factory(

yield created_engine, created_session_maker
finally:
await created_engine.dispose()
# Trigger: context-manager teardown can run while the surrounding task is
# being cancelled (e.g. a test aborting mid-fixture).
# Why: on the asyncpg backend a cancellation landing mid-dispose surfaces
# the "IndexError: pop from an empty deque" race (#831/#877); shield the
# dispose and suppress CancelledError to match the other dispose seams.
# Outcome: the per-context engine always disposes cleanly under cancellation.
with suppress(asyncio.CancelledError):
await asyncio.shield(created_engine.dispose())

# Only clear module-level globals if they still point to this context's
# engine/session. This avoids clobbering newer globals from other callers.
Expand Down Expand Up @@ -433,7 +486,11 @@ async def run_migrations(
# Trigger: run_migrations() created a temporary engine while module-level
# session maker was not initialized.
# Why: temporary aiosqlite worker threads can outlive CLI command execution
# and block process shutdown if the engine is not disposed.
# Outcome: always dispose temporary engines after migration work completes.
# and block process shutdown if the engine is not disposed. On the asyncpg
# backend a cancellation landing mid-dispose surfaces the same "IndexError:
# pop from an empty deque" race as the other dispose seams (#831/#877), so
# shield the dispose and suppress CancelledError to match them.
# Outcome: always dispose temporary engines cleanly, even under cancellation.
if temp_engine is not None:
await temp_engine.dispose()
with suppress(asyncio.CancelledError):
await asyncio.shield(temp_engine.dispose())
69 changes: 69 additions & 0 deletions test-int/test_postgres_dispose_cycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Integration test for the asyncpg engine-dispose race (issue #831 / #877).

On the Postgres backend (``postgresql+asyncpg``), the "open async engine -> work
-> await engine.dispose()" cycle could crash with
``IndexError: pop from an empty deque`` raised from SQLAlchemy's async dispose
machinery as it races event-loop teardown. This made Postgres + file watcher
unusable (container restart loop).

These tests exercise the real create-engine -> query -> dispose cycle against a
live Postgres instance (testcontainers) for many iterations. The race is
timing-dependent, so we loop enough to be meaningful and assert clean
completion. They are skipped on SQLite, which is unaffected.

Run with:
BASIC_MEMORY_TEST_POSTGRES=1 uv run pytest \
test-int/test_postgres_dispose_cycle.py -q
"""

import pytest
from sqlalchemy import text

from basic_memory import db
from basic_memory.config import DatabaseBackend
from basic_memory.db import DatabaseType, _create_postgres_engine


@pytest.mark.asyncio
async def test_create_query_dispose_cycle_is_clean(app_config, db_backend):
"""Repeatedly create -> query -> dispose an asyncpg engine without crashing."""
if db_backend != "postgres":
pytest.skip("Postgres-specific test - asyncpg dispose race does not affect SQLite")

db_url = DatabaseType.get_db_url(app_config.database_path, DatabaseType.POSTGRES, app_config)

# Loop enough iterations to make the timing-dependent dispose race observable.
for _ in range(50):
engine = _create_postgres_engine(db_url, app_config)
async with engine.connect() as conn:
result = await conn.execute(text("SELECT 1"))
assert result.scalar() == 1
# This dispose is the call site that raced loop teardown in #831/#877.
await engine.dispose()


@pytest.mark.asyncio
async def test_shutdown_db_dispose_is_clean(app_config, config_manager, db_backend):
"""Repeated get_or_create_db -> query -> shutdown_db cycles complete cleanly.

Exercises the shielded ``shutdown_db`` teardown path against asyncpg. The
autouse ``cleanup_global_db_after_test`` fixture in conftest restores module
state afterwards.
"""
if db_backend != "postgres":
pytest.skip("Postgres-specific test - asyncpg dispose race does not affect SQLite")

assert app_config.database_backend == DatabaseBackend.POSTGRES

for _ in range(25):
engine, session_maker = await db.get_or_create_db(
app_config.database_path,
db_type=DatabaseType.POSTGRES,
ensure_migrations=False,
config=app_config,
)
async with db.scoped_session(session_maker) as session:
result = await session.execute(text("SELECT 1"))
assert result.scalar() == 1
# Shielded dispose - must not surface the asyncpg deque race.
await db.shutdown_db()
6 changes: 5 additions & 1 deletion tests/cli/test_cli_exit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ def test_bm_version_exits_cleanly():
["uv", "run", "bm", "--version"],
capture_output=True,
text=True,
timeout=10,
# `uv run` startup under full-suite load (especially on Windows runners)
# can exceed a tight 10s budget even though --version short-circuits
# before any heavy work. This test guards against hangs, not a startup
# performance budget, so match the looser --help timeout below.
timeout=20,
cwd=Path(__file__).parent.parent.parent, # Project root
)
assert result.returncode == 0
Expand Down
11 changes: 10 additions & 1 deletion tests/cli/test_cli_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from types import SimpleNamespace
from typing import Any, cast

import logfire
Expand Down Expand Up @@ -29,8 +30,16 @@ def test_app_callback_registers_command_operation(monkeypatch) -> None:
resource = object()

monkeypatch.setattr(cli_app, "init_cli_logging", lambda: None)
monkeypatch.setattr(cli_app.CliContainer, "create", staticmethod(lambda: object()))
# Container needs a `config` attribute because app_callback passes it to the
# uvloop policy installer; the helper itself is stubbed below for this
# telemetry-focused test.
monkeypatch.setattr(
cli_app.CliContainer, "create", staticmethod(lambda: SimpleNamespace(config=object()))
)
monkeypatch.setattr(cli_app, "set_container", lambda container: None)
# app_callback installs the uvloop policy for the Postgres backend; stub the
# helper so this telemetry test does not depend on event-loop policy state.
monkeypatch.setattr("basic_memory.db.maybe_install_uvloop", lambda config: False)
monkeypatch.setattr(cli_app, "maybe_show_init_line", lambda command_name: None)
monkeypatch.setattr(cli_app, "maybe_show_cloud_promo", lambda command_name: None)
monkeypatch.setattr(cli_app, "maybe_run_periodic_auto_update", lambda command_name: None)
Expand Down
3 changes: 3 additions & 0 deletions tests/cli/test_db_reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typer.testing import CliRunner

from basic_memory.cli.app import app
from basic_memory.config import DatabaseBackend
import basic_memory.cli.commands.db as db_cmd # noqa: F401


Expand All @@ -21,6 +22,8 @@ def _stub_app_config(*, semantic_search_enabled: bool = True) -> SimpleNamespace
semantic_search_enabled=semantic_search_enabled,
database_path=Path("/tmp/basic-memory.db"),
get_project_mode=lambda project_name: None,
# app_callback reads this to decide whether to install the uvloop policy.
database_backend=DatabaseBackend.SQLITE,
)


Expand Down
Loading
Loading