Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions tests/test_021_issue_594_access_token_uaf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.

Regression test for issue #594:
Passing SQL_COPT_SS_ACCESS_TOKEN (1256) via attrs_before triggered a
use-after-free during SQLDriverConnect. PR #568 copied the token
bytes into a stack-local std::string in Connection::setAttribute that
was freed when setAttribute returned, but SQL_COPT_SS_ACCESS_TOKEN is
a *deferred* ODBC attribute: the driver stashes the caller's pointer
at SQLSetConnectAttr time and only dereferences it later, when
SQLDriverConnect builds the FedAuth Login7 packet.

Observed symptoms in 1.7.1:
* macOS arm64: Fatal Python error: Bus error (SIGBUS)
* Windows x64: "Authentication token is missing in the federated
authentication message"
* Azure SQL DB: TCP Provider error 0x2746 (server-side reset)

Strategy
--------
A subprocess driver (tests/tools/_issue_594_helper.py) brings up the
local mock TDS server, performs a few FedAuth connects with a known
sentinel token, and asserts that the bytes the server actually
received in the Login7 FedAuth feature extension match the bytes that
were passed in. Running it as a subprocess means a SIGBUS in a buggy
build surfaces as a nonzero exit code (rc=138 / -10 on macOS) instead
of taking down the pytest worker; on platforms where the symptom is
silent corruption (Linux) or a driver-side error (Windows), the
token-integrity check in the helper still catches it.

Against stock 1.7.1 on macOS arm64 a single iteration deterministically
SIGBUSes. A handful of iterations is used as defense in depth for other
platforms and future allocator variance.
"""
from __future__ import annotations

import subprocess
import sys
import time
from pathlib import Path

import pytest

TOOLS_DIR = Path(__file__).parent / "tools"
sys.path.insert(0, str(TOOLS_DIR))

from mock_tds_server import generate_self_signed_cert # noqa: E402


@pytest.fixture(scope="module")
def cert_pair(tmp_path_factory):
d = tmp_path_factory.mktemp("mocktds-cert")
cert = d / "server.pem"
key = d / "server.key"
generate_self_signed_cert(str(cert), str(key))
return str(cert), str(key)


def test_access_token_round_trips_intact(cert_pair):
"""A FedAuth connect via SQL_COPT_SS_ACCESS_TOKEN (1256) must deliver
the token bytes unchanged to the server. Pre-fix the stack-local
buffer was freed before SQLDriverConnect read it, causing SIGBUS on
macOS, "Authentication token is missing" on Windows, or a TCP
reset (0x2746) against Azure SQL DB."""
cert, key = cert_pair
helper = TOOLS_DIR / "_issue_594_helper.py"
iters = "3"

started = time.monotonic()
p = subprocess.run(
[sys.executable, str(helper), cert, key, iters],
capture_output=True,
text=True,
timeout=120,
)
elapsed = time.monotonic() - started

# rc 138 (macOS) / -10 (signal) / -7 (SIGBUS on linux) all mean the
# helper crashed in native code — the classic #594 signature.
assert p.returncode == 0, (
f"helper exited with rc={p.returncode} after {elapsed:.1f}s. "
f"A negative rc or rc==138 indicates a native crash (SIGBUS / "
f"use-after-free, issue #594). rc=4 indicates the token was "
f"corrupted in flight; rc=5 indicates the driver aborted before "
f"Login7.\nstdout={p.stdout!r}\nstderr={p.stderr[-600:]!r}"
)
assert p.stdout.startswith("OK "), (
f"unexpected helper output: stdout={p.stdout!r} stderr={p.stderr!r}"
)
Empty file added tests/tools/__init__.py
Empty file.
124 changes: 124 additions & 0 deletions tests/tools/_issue_594_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""
Subprocess helper for tests/test_021_issue_594_access_token_uaf.py.

Runs N FedAuth connects against the local mock TDS server using
SQL_COPT_SS_ACCESS_TOKEN (1256) and verifies that the access token
bytes arrive at the server byte-identical to what we passed in.

Run in a subprocess so a SIGBUS on a buggy build (issue #594) surfaces
as a nonzero exit code (rc=138 / -10 on macOS) instead of taking down
the pytest worker.

Usage:
python _issue_594_helper.py <cert_path> <key_path> [iters]

Exit codes:
0 - all iterations succeeded and the token round-tripped intact
4 - token mismatch (UAF corrupted the token bytes)
5 - mock server never received a token (connect aborted before Login7)
other nonzero / signal - crash (SIGBUS, etc.) on a buggy build

Output: one line "OK <iters>" on success, otherwise a diagnostic line.
"""
from __future__ import annotations

import struct
import sys
import time
from itertools import chain, repeat
from pathlib import Path

HERE = Path(__file__).parent
sys.path.insert(0, str(HERE))

from mock_tds_server import MockTdsServer # noqa: E402

import mssql_python # noqa: E402

# Pooling would cache the underlying ODBC connection and skip Login7 on
# subsequent connects, never re-exercising the access-token handoff.
mssql_python.pooling(enabled=False)

SQL_COPT_SS_ACCESS_TOKEN = 1256


class _CapturingMockTdsServer(MockTdsServer):
"""Records every access token seen during Login7."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.received_tokens = [] # list[str]

def resolve_token_username(self, token):
self.received_tokens.append(token)
return super().resolve_token_username(token)


def _pack_access_token(token_str: str) -> bytes:
token_bytes = bytes(token_str, "utf-8")
encoded = bytes(chain.from_iterable(zip(token_bytes, repeat(0))))
return struct.pack("<i", len(encoded)) + encoded


def main() -> int:
if len(sys.argv) < 3:
print("usage: _issue_594_helper.py CERT KEY [ITERS]", file=sys.stderr)
return 2

cert = sys.argv[1]
key = sys.argv[2]
iters = int(sys.argv[3]) if len(sys.argv) > 3 else 3

srv = _CapturingMockTdsServer(host="127.0.0.1", port=0, cert_file=cert, key_file=key)

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note test

Do not leave debug code in production
srv.start_background()
deadline = time.monotonic() + 5.0
while srv.port == 0 and time.monotonic() < deadline:
time.sleep(0.01)
if srv.port == 0:
print("mock server failed to bind", file=sys.stderr)
return 3

# ~1500-char sentinel matches the size class of a real Azure AD
# bearer token, which is what triggered the original UAF.
sentinel = "MSSQL-PYTHON-ISSUE-594-SENTINEL-" + ("A" * 1500)
attrs = {SQL_COPT_SS_ACCESS_TOKEN: _pack_access_token(sentinel)}
cs = (
f"Server=127.0.0.1,{srv.port};Database=mockdb;"

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note test

Do not leave debug code in production
"Encrypt=Yes;TrustServerCertificate=Yes"
)

try:
for _ in range(iters):
c = mssql_python.connect(cs, attrs_before=attrs, autocommit=True, timeout=10)
cur = c.cursor()
cur.execute("SELECT 1")
cur.fetchone()
cur.close()
c.close()
finally:
srv.stop()

if not srv.received_tokens:
print("FAIL: mock server never received a FedAuth token", file=sys.stderr)
return 5

for i, recv in enumerate(srv.received_tokens):
if recv != sentinel:
div = next(
(j for j in range(min(len(recv), len(sentinel))) if recv[j] != sentinel[j]),
min(len(recv), len(sentinel)),
)
print(
f"FAIL: corrupted access token on connect #{i+1}: "
f"len_sent={len(sentinel)} len_recv={len(recv)} "
f"first_diff_at_idx={div} recv_head={recv[:64]!r}",
file=sys.stderr,
)
return 4

print(f"OK {iters}")
return 0


if __name__ == "__main__":
sys.exit(main())
Loading
Loading