Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6817b96
feat: sync foundation for asyncio support (phase 1 of #1251)
AhmadMasry Jun 18, 2026
2dc0b39
Merge remote-tracking branch 'origin/main' into feat/sync-phase1
AhmadMasry Jun 18, 2026
67fa91a
fix(deps): bump locked greenlet 3.1.1->3.5.2 for Python 3.14
AhmadMasry Jun 19, 2026
16d4674
refactor: consolidate connection socket-shutdown into driver_dialect.…
AhmadMasry Jun 23, 2026
e29005d
feat: delegate driver-specific attributes via wrapper __getattr__
AhmadMasry Jun 24, 2026
c135739
chore(deps): bump opentelemetry to 1.43.0 (fixes Python 3.10/3.11 -We…
AhmadMasry Jun 24, 2026
3ca454d
test(integration): add excludePython314 configuration parameter
AhmadMasry Jun 25, 2026
90124c5
fix: leak (not close) a connection whose timed-out auxiliary-query wo…
AhmadMasry Jun 26, 2026
ac3fe98
revert: drop SQLAlchemy connection-provider transient-connect retry
AhmadMasry Jun 26, 2026
ff9f038
fix: delegate single-underscore driver attrs through sync wrapper __g…
AhmadMasry Jun 26, 2026
4a5b4f6
feat: route conn.execute/set_read_only/set_autocommit through the plu…
AhmadMasry Jun 27, 2026
9cd7140
fix(test): Optional default for abort_releases in test_decorators
AhmadMasry Jun 27, 2026
1d668b3
fix: thread the operation connection into execute()'s timeout path
AhmadMasry Jun 27, 2026
ad9f279
chore: remove unused RWS_RECHECK_READER_ROLE property
AhmadMasry Jun 27, 2026
de85a9d
fix: collapse sql_alchemy_connection_provider typing import to one line
AhmadMasry Jun 29, 2026
622ef04
fix: thread conn= into host-monitoring + limitless execute timeout paths
AhmadMasry Jun 29, 2026
b390985
docs: add CHANGELOG entries for the sync foundation (PR #1252)
AhmadMasry Jun 29, 2026
f8b0adc
docs: address SQLAlchemy review feedback (PR #1252)
AhmadMasry Jun 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
poetry-version: ["1.8.2"]

steps:
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/#semantic-versioning-200).

## [Unreleased]
### :magic_wand: Added
* Python 3.14 support. ([PR #1252](https://github.com/aws/aws-advanced-python-wrapper/pull/1252))

### :bug: Fixed
* Cross-thread use-after-free (SIGSEGV) when an offloaded query times out (e.g. during failover) and the connection is later closed or reused while the query is still running: on timeout the driver dialects now shut down the connection socket and wait for the worker to unwind before propagating, and leak rather than close a connection whose worker cannot be drained. ([PR #1252](https://github.com/aws/aws-advanced-python-wrapper/pull/1252))

## [3.0.0] - 2026-06-02

### :crab: Breaking Changes
Expand Down
58 changes: 21 additions & 37 deletions aws_advanced_python_wrapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from logging import DEBUG, getLogger

from aws_advanced_python_wrapper.pep249 import (DatabaseError, DataError,
Error, IntegrityError,
InterfaceError, InternalError,
NotSupportedError,
OperationalError,
ProgrammingError)
from .cleanup import release_resources
from .driver_info import DriverInfo
from .utils.utils import LogUtils
from .wrapper import AwsWrapperConnection

# PEP249 compliance
connect = AwsWrapperConnection.connect
apilevel = "2.0"
threadsafety = 2
paramstyle = "pyformat"

# Public API
__all__ = [
'connect',
'AwsWrapperConnection',
'release_resources',
'set_logger',
'apilevel',
'threadsafety',
'paramstyle',
'Error',
'InterfaceError',
'DatabaseError',
'DataError',
'OperationalError',
'IntegrityError',
'InternalError',
'ProgrammingError',
'NotSupportedError'
]
from aws_advanced_python_wrapper import _dbapi
from aws_advanced_python_wrapper.cleanup import release_resources
from aws_advanced_python_wrapper.driver_info import DriverInfo
from aws_advanced_python_wrapper.utils.utils import LogUtils
from aws_advanced_python_wrapper.wrapper import AwsWrapperConnection

# Populate the full PEP 249 module surface (exceptions, type ctors/singletons,
# apilevel/threadsafety/paramstyle). `connect` stays bound to
# AwsWrapperConnection.connect for back-compat with existing callers.
_dbapi.install(sys.modules[__name__].__dict__, connect=AwsWrapperConnection.connect)

__version__ = DriverInfo.DRIVER_VERSION


def set_logger(name='aws_advanced_python_wrapper', level=DEBUG, format_string=None):
def set_logger(name="aws_advanced_python_wrapper", level=DEBUG, format_string=None):
LogUtils.setup_logger(getLogger(name), level, format_string)


__all__ = (
"AwsWrapperConnection",
"release_resources",
"set_logger",
*_dbapi._PEP249_NAMES,
"connect",
)
135 changes: 135 additions & 0 deletions aws_advanced_python_wrapper/_dbapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Canonical PEP 249 module surface shared by the top-level wrapper module
and the per-driver DBAPI submodules (`aws_advanced_python_wrapper.psycopg`,
`aws_advanced_python_wrapper.mysql_connector`).

Consumers should NOT import from this module directly. The public DBAPI
module surface lives on the top-level module and the per-driver submodules,
populated via `install()`.
"""

from __future__ import annotations

from datetime import date as Date # noqa: N812
from datetime import datetime as Timestamp # noqa: N812
from datetime import time as Time # noqa: N812
from time import localtime
from typing import Callable, Optional

from aws_advanced_python_wrapper.pep249 import DatabaseError # noqa: F401
from aws_advanced_python_wrapper.pep249 import DataError # noqa: F401
from aws_advanced_python_wrapper.pep249 import Error # noqa: F401
from aws_advanced_python_wrapper.pep249 import IntegrityError # noqa: F401
from aws_advanced_python_wrapper.pep249 import InterfaceError # noqa: F401
from aws_advanced_python_wrapper.pep249 import InternalError # noqa: F401
from aws_advanced_python_wrapper.pep249 import NotSupportedError # noqa: F401
from aws_advanced_python_wrapper.pep249 import OperationalError # noqa: F401
from aws_advanced_python_wrapper.pep249 import ProgrammingError # noqa: F401
from aws_advanced_python_wrapper.pep249 import Warning # noqa: F401

apilevel = "2.0"
threadsafety = 2
paramstyle = "pyformat"


def Binary(data: bytes) -> bytes: # noqa: N802
return bytes(data)


def DateFromTicks(ticks: float) -> Date: # noqa: N802
return Date(*localtime(ticks)[:3])


def TimeFromTicks(ticks: float) -> Time: # noqa: N802
return Time(*localtime(ticks)[3:6])


def TimestampFromTicks(ticks: float) -> Timestamp: # noqa: N802
return Timestamp(*localtime(ticks)[:6])


class _DBAPISet(frozenset):
"""Type-object singleton per PEP 249: compares equal to any contained type code."""

def __eq__(self, other: object) -> bool:
if isinstance(other, (int, str)):
return other in self
return super().__eq__(other)

def __ne__(self, other: object) -> bool:
return not self.__eq__(other)

def __hash__(self) -> int:
return super().__hash__()


# Type-code sources:
# PG: psycopg.postgres.types (OIDs)
# MySQL: mysql.connector.FieldType (ints)
# Union both into each singleton.

# PG text-like OIDs: text(25), varchar(1043), bpchar(1042), char(18),
# name(19), json(114), jsonb(3802)
# MySQL FieldType string-like: VAR_STRING(253), STRING(254), VARCHAR(15)
STRING = _DBAPISet([25, 1043, 1042, 18, 19, 114, 3802, 253, 254, 15])

# PG binary: bytea(17)
# MySQL FieldType BLOB family: TINY_BLOB(249), MEDIUM_BLOB(250),
# LONG_BLOB(251), BLOB(252)
BINARY = _DBAPISet([17, 249, 250, 251, 252])

# PG numeric: int2(21), int4(23), int8(20), float4(700), float8(701),
# numeric(1700), money(790)
# MySQL FieldType numeric: DECIMAL(0), TINY(1), SHORT(2), LONG(3),
# FLOAT(4), DOUBLE(5), LONGLONG(8), INT24(9),
# NEWDECIMAL(246)
NUMBER = _DBAPISet([21, 23, 20, 700, 701, 1700, 790, 0, 1, 2, 3, 4, 5, 8, 9, 246])

# PG datetime: date(1082), time(1083), timestamp(1114), timestamptz(1184),
# timetz(1266), interval(1186)
# MySQL FieldType datetime: DATE(10), TIME(11), DATETIME(12), YEAR(13),
# NEWDATE(14), TIMESTAMP(7)
DATETIME = _DBAPISet([1082, 1083, 1114, 1184, 1266, 1186, 10, 11, 12, 13, 14, 7])

# PG rowid: oid(26). MySQL has no ROWID equivalent; left PG-only.
ROWID = _DBAPISet([26])


_PEP249_NAMES = (
"Warning", "Error", "InterfaceError", "DatabaseError",
"DataError", "OperationalError", "IntegrityError",
"InternalError", "ProgrammingError", "NotSupportedError",
"Date", "Time", "Timestamp",
"DateFromTicks", "TimeFromTicks", "TimestampFromTicks",
"Binary", "STRING", "BINARY", "NUMBER", "DATETIME", "ROWID",
"apilevel", "threadsafety", "paramstyle",
)


def install(module_ns: dict, connect: Optional[Callable] = None) -> None:
"""Populate `module_ns` with the PEP 249 module surface.

If `connect` is provided, `module_ns['connect']` is set to it and 'connect'
is added to `module_ns['__all__']`.
"""
source = globals()
for name in _PEP249_NAMES:
module_ns[name] = source[name]
if connect is not None:
module_ns["connect"] = connect
module_ns["__all__"] = (*_PEP249_NAMES, "connect")
else:
module_ns["__all__"] = tuple(_PEP249_NAMES)
9 changes: 8 additions & 1 deletion aws_advanced_python_wrapper/cluster_topology_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from aws_advanced_python_wrapper.hostinfo import HostInfo, Topology
from aws_advanced_python_wrapper.utils import services_container
from aws_advanced_python_wrapper.utils.atomic import AtomicReference
from aws_advanced_python_wrapper.utils.decorators import \
is_connection_abandoned
from aws_advanced_python_wrapper.utils.events import (EventBase,
MonitorResetEvent)
from aws_advanced_python_wrapper.utils.messages import Messages
Expand Down Expand Up @@ -392,7 +394,12 @@ def _open_any_connection_and_update_topology(self) -> Topology:

def _close_connection(self, connection: Optional[Connection]) -> None:
try:
if connection is not None:
# Skip connections an auxiliary-query worker could not be drained off
# (is_connection_abandoned): closing one while the worker is still
# using it is a cross-thread use-after-free that crashes the process.
# The worker holds the last reference and frees it safely on its own
# thread once it finishes.
if connection is not None and not is_connection_abandoned(connection):
connection.close()
except Exception:
pass
Expand Down
11 changes: 10 additions & 1 deletion aws_advanced_python_wrapper/default_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,21 @@ def force_connect(
self._connection_provider_manager.default_provider)

def execute(self, target: object, method_name: str, execute_func: Callable, *args: Any, **kwargs: Any) -> Any:
# The connection the operation runs on, so driver_dialect.execute can
# interrupt-and-wait it on timeout instead of leaving it running on a
# worker thread for a later close/reuse to race (env-4 SIGSEGV). target is
# the raw cursor (cursor methods) or raw connection (connection methods).
driver_dialect = self._plugin_service.driver_dialect
timeout_conn = driver_dialect.get_connection_from_obj(target)
if timeout_conn is None:
timeout_conn = target

telemetry_factory = self._plugin_service.get_telemetry_factory()
context = telemetry_factory.open_telemetry_context(
self._plugin_service.driver_dialect.driver_name, TelemetryTraceLevel.NESTED)

try:
result = self._plugin_service.driver_dialect.execute(method_name, execute_func, *args, **kwargs)
result = driver_dialect.execute(method_name, execute_func, *args, conn=timeout_conn, **kwargs)
finally:
if context is not None:
context.close_context()
Expand Down
11 changes: 9 additions & 2 deletions aws_advanced_python_wrapper/driver_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def execute(
exec_func: Callable,
*args: Any,
exec_timeout: Optional[float] = None,
conn: Optional[Connection] = None,
**kwargs: Any) -> Cursor:
if DbApiMethod.ALL.method_name not in self.network_bound_methods and method_name not in self.network_bound_methods:
return exec_func()
Expand All @@ -138,7 +139,13 @@ def execute(

if exec_timeout > 0:
try:
execute_with_timeout = timeout(self._thread_pool, exec_timeout)(exec_func)
# Pass self (the driver dialect) + conn so that, on timeout, the
# abandoned operation's socket is shut down (via
# driver_dialect.abort_connection) and its worker thread is awaited
# before we propagate -- otherwise a later close/reuse of conn races
# the still-running operation (cross-thread use-after-free in the
# driver, env-4 SIGSEGV).
execute_with_timeout = timeout(self._thread_pool, exec_timeout, self, conn)(exec_func)
return execute_with_timeout()
except TimeoutError as e:
raise QueryTimeoutError(Messages.get_formatted("DriverDialect.ExecuteTimeout", method_name)) from e
Expand All @@ -161,7 +168,7 @@ def ping(self, conn: Connection) -> bool:
try:
with conn.cursor() as cursor:
query = DriverDialect._QUERY
self.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=10)
self.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=10, conn=conn)
cursor.fetchone()
return True
except Exception:
Expand Down
23 changes: 17 additions & 6 deletions aws_advanced_python_wrapper/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from typing import Optional

from .pep249 import Error
from .pep249 import Error, InterfaceError, NotSupportedError, OperationalError


class AwsWrapperError(Error):
Expand All @@ -30,15 +30,15 @@ def __init__(self, message: str = "", original_error: Optional[Exception] = None
self.driver_error = original_error


class UnsupportedOperationError(AwsWrapperError):
class UnsupportedOperationError(AwsWrapperError, NotSupportedError):
__module__ = "aws_advanced_python_wrapper"


class QueryTimeoutError(AwsWrapperError):
class QueryTimeoutError(AwsWrapperError, OperationalError):
__module__ = "aws_advanced_python_wrapper"


class FailoverError(Error):
class FailoverError(OperationalError):
__module__ = "aws_advanced_python_wrapper"


Expand All @@ -51,12 +51,23 @@ class FailoverFailedError(FailoverError):


class FailoverSuccessError(FailoverError):
# SA classification is handled at the dialect boundary by
# ``sqlalchemy_dialects._exception_handling._FailoverSuccessRewrapMixin``,
# which catches FailoverSuccessError in ``do_execute`` /
# ``do_executemany`` and re-raises as the dialect's native
# OperationalError. Do NOT add driver-native OperationalError classes
# (psycopg / mysql.connector) as bases here: Django's
# ``wrap_database_errors`` walks ``issubclass`` against the driver's
# own error module and would swallow FailoverSuccessError before any
# user ``except FailoverSuccessError:`` handler could see it
# (regression seen in tests/integration/container/django/
# test_django_plugins.py::test_django_failover_during_query).
__module__ = "aws_advanced_python_wrapper"


class ReadWriteSplittingError(AwsWrapperError):
class ReadWriteSplittingError(AwsWrapperError, InterfaceError):
__module__ = "aws_advanced_python_wrapper"


class AwsConnectError(AwsWrapperError):
class AwsConnectError(AwsWrapperError, OperationalError):
__module__ = "aws_advanced_python_wrapper"
11 changes: 11 additions & 0 deletions aws_advanced_python_wrapper/exception_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ def is_network_exception(self, error: Optional[Exception] = None, sql_state: Opt
def is_login_exception(self, error: Optional[Exception] = None, sql_state: Optional[str] = None) -> bool:
"""
Checks whether the given error is caused by failing to authenticate the user.

Note for subclassers: some callers (notably ``HostMonitor`` in
``cluster_topology_monitor`` since commit ``724de17``) treat a
``True`` result as **bounded transient** — they retry on Aurora's
PAM-service-restart window during writer promotion before giving up.
If you override this method and intend a fast-fail "credentials are
permanently bad" signal, be aware your override may be retried up
to ~5 seconds before propagating. Callers that need fail-fast
semantics should classify those errors elsewhere (e.g. as a
dedicated non-network non-login exception).

:param error: The error raised by the target driver.
:param sql_state: The SQL State associated with the error.
:return: True if the error is caused by a login issue, False otherwise.
Expand Down
2 changes: 1 addition & 1 deletion aws_advanced_python_wrapper/host_monitoring_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def _execute_conn_check(self, conn: Connection, timeout_sec: float):
driver_dialect = self._plugin_service.driver_dialect
with conn.cursor() as cursor:
query = Monitor._QUERY
driver_dialect.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=timeout_sec)
driver_dialect.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=timeout_sec, conn=conn)
cursor.fetchone()

def sleep(self, duration: int):
Expand Down
2 changes: 1 addition & 1 deletion aws_advanced_python_wrapper/host_monitoring_v2_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def _execute_conn_check(self, conn: Connection, timeout_sec: float):
driver_dialect = self._plugin_service.driver_dialect
with conn.cursor() as cursor:
query = HostMonitorV2._QUERY
driver_dialect.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=timeout_sec)
driver_dialect.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=timeout_sec, conn=conn)
cursor.fetchone()

def _update_host_health_status(
Expand Down
Loading