From d42e7309d9cefd4df5468ca7af1c363c8658189f Mon Sep 17 00:00:00 2001 From: yu-med Date: Tue, 9 Jun 2026 17:06:12 +0800 Subject: [PATCH 1/3] feat: add ruff and pip-audit CI gates, fix style violations, and add SECURITY.md --- .github/workflows/ci.yml | 31 ++++++++++++++ README.md | 4 +- SECURITY.md | 54 ++++++++++++++++++++++++ api/export_api.py | 12 +++--- api/projects.py | 4 +- api/search.py | 5 +-- api/sessions.py | 4 +- app.py | 12 +++--- models/export.py | 2 +- pyproject.toml | 13 ++++++ requirements-dev.txt | 2 + scripts/export.py | 42 ++++++++++++------ tests/test_api_integration.py | 2 - tests/test_cli_args.py | 17 ++++---- tests/test_cli_export_exit_codes.py | 1 - tests/test_error_propagation.py | 5 ++- tests/test_exclusion_helpers.py | 3 -- tests/test_export_engine_parity.py | 2 - tests/test_export_exclusion_filtering.py | 4 -- tests/test_export_state.py | 4 -- tests/test_jsonl_parser.py | 34 ++++++++++----- tests/test_null_usage_tokens.py | 3 +- tests/test_session_path.py | 5 ++- tests/test_xss_sanitization.py | 4 +- utils/export_day_filter.py | 5 ++- utils/jsonl_parser.py | 10 +++++ utils/md_exporter.py | 6 +-- 27 files changed, 211 insertions(+), 79 deletions(-) create mode 100644 SECURITY.md diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cefe358..d668e74 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,6 +43,37 @@ jobs: assert client.get("/").status_code == 200 PY + lint-and-audit: + name: ruff + pip-audit (${{ matrix.os }}) + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest] + permissions: + contents: read + steps: + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + with: + persist-credentials: false + + - uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5 + with: + python-version: "3.12" + cache: pip + cache-dependency-path: | + requirements.txt + requirements-dev.txt + + - name: Install dev dependencies + run: pip install -r requirements-dev.txt + + - name: Ruff + run: ruff check . + + - name: pip-audit + run: pip-audit -r requirements.txt + pytest: name: pytest (${{ matrix.os }}) runs-on: ${{ matrix.os }} diff --git a/README.md b/README.md index f8c149c..655fb95 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,8 @@ REST endpoints for projects, sessions, search, and export are documented in **[` JSON error responses include a machine-readable `"code"` (stable `UPPER_SNAKE_CASE`) and a human-readable `"error"` message. See the [error code catalog](docs/api-reference.md#error-code-catalog) for the full table. +- **Security policy** — see [`SECURITY.md`](SECURITY.md) for supported versions and how to report vulnerabilities privately + ### CLI Export - Standalone script to export all sessions to Markdown with YAML frontmatter - Rich Markdown: token usage, tool calls, thinking blocks, model info, timestamps @@ -155,7 +157,7 @@ npm ci && npm test # only if you changed static/js/ ## Continuous integration -Every push and pull request runs **`pytest`**, **API integration tests**, and **vitest** on **Ubuntu** (Python 3.12, Node 20) via [`.github/workflows/ci.yml`](.github/workflows/ci.yml). A separate job verifies that `pip install -r requirements.txt` (production-only) is sufficient to import and boot the app. +Every push and pull request runs **`ruff`**, **`pip-audit`**, **`pytest`**, **mypy**, **API integration tests**, and **vitest** on **Ubuntu and Windows** (Python 3.12, Node 20) via [`.github/workflows/ci.yml`](.github/workflows/ci.yml). A separate job verifies that `pip install -r requirements.txt` (production-only) is sufficient to import and boot the app. ## Exported Markdown Format diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..b4fd988 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,54 @@ +# Security Policy + +## Supported Versions + +This project is pre-release. Security fixes are applied to the **latest `master` branch only** (currently `0.1.0.dev0`). + +| Version | Supported | +| -------------- | --------- | +| latest `master`| Yes | +| older commits | No | + +## Reporting a Vulnerability + +**Please do not open public GitHub issues for security vulnerabilities.** + +Report vulnerabilities privately via [GitHub Security Advisories](https://github.com/cppalliance/claude-code-chat-browser/security/advisories/new). Private vulnerability reporting must be enabled on the repository (Settings → Security → Private vulnerability reporting). If you cannot use that form, contact the repository maintainers through an existing private channel. + +## Response Timeline + +| Stage | Target | +| ----- | ------ | +| Acknowledgment | Within **72 hours** of a valid report | +| Initial assessment | Within **7 days** | +| Fix for confirmed issues | Target **14 days** for issues affecting the default local-only deployment | + +Timelines may extend for complex issues; we will keep reporters informed. + +## Scope + +### In scope + +Security issues in this repository that affect users of the default local setup: + +- **Path traversal** — session and export paths resolved via `safe_join` in `utils/session_path.py` +- **Cross-site scripting (XSS)** — rendered session HTML in `static/js/` (mitigated by DOMPurify + SRI in `static/index.html`) +- **Export integrity** — bulk zip and per-session export in `api/export_api.py` and `utils/export_engine.py` +- **Local file boundaries** — read-only access to `~/.claude/projects/`; writes limited to export output and app state +- **Debug-mode exposure** — Flask/Werkzeug debugger when `--debug` is combined with a non-loopback `--host` (blocked at startup in `app.py`) +- **Information disclosure** — API error responses scrub internal exception details (see `api/error_codes.py`) + +### Out of scope + +- **Intentional network-facing deployment** — this tool is designed for local browsing on loopback; exposing it on untrusted networks is not a supported configuration +- **Upstream Claude Code JSONL format bugs** — malformed or hostile data from Claude Code itself (we harden parsing but do not guarantee full isolation from arbitrary JSONL) +- **Third-party CDN availability** — DOMPurify is loaded from cdnjs with SRI; CDN compromise is an infrastructure concern outside this repo + +## Existing Controls (reference) + +| Control | Location | +| ------- | -------- | +| Path guard (`safe_join`) | `utils/session_path.py` | +| HTML sanitization (DOMPurify) | `static/js/shared/markdown.js`, `static/index.html` | +| Error response scrubbing | `api/error_codes.py`, session card handling in `api/projects.py` | +| Debug + non-loopback host guard | `app.py` (`validate_startup_cli`) | diff --git a/api/export_api.py b/api/export_api.py index f71c387..6ab107c 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -1,7 +1,6 @@ """Export endpoints -- bulk zip download and single-session md/json.""" import io -import json import os import zipfile from datetime import datetime @@ -12,20 +11,21 @@ from api._flask_types import FlaskReturn, json_response from api.error_codes import ErrorCode, error_response from models.export import ExportStateDict +from utils.exclusion_rules import is_session_excluded +from utils.export_engine import EXPORT_ERRORS as _EXPORT_ERRORS +from utils.export_engine import ZipSink, run_bulk_export from utils.export_state_store import ( EXPORT_STATE_FILE, atomic_write_export_state, export_state_lock, load_export_state_from_disk, ) -from utils.session_path import get_claude_projects_dir, list_projects +from utils.json_exporter import session_to_json from utils.jsonl_parser import parse_session -from utils.exclusion_rules import is_session_excluded -from utils.session_stats import compute_stats from utils.md_exporter import session_to_markdown -from utils.json_exporter import session_to_json +from utils.session_path import get_claude_projects_dir, list_projects +from utils.session_stats import compute_stats from utils.slugify import slugify -from utils.export_engine import EXPORT_ERRORS as _EXPORT_ERRORS, ZipSink, run_bulk_export export_bp = Blueprint("export", __name__) diff --git a/api/projects.py b/api/projects.py index 94b99d5..9dfa067 100644 --- a/api/projects.py +++ b/api/projects.py @@ -2,12 +2,12 @@ from flask import Blueprint, current_app -from api._flask_types import FlaskReturn, json_error, json_response +from api._flask_types import FlaskReturn, json_response from api.error_codes import ErrorCode, error_response from models.project import ProjectSessionRowDict, SessionListItemDict from models.session import SessionDict -from utils.session_path import get_claude_projects_dir, list_projects, list_sessions, safe_join from utils.exclusion_rules import is_session_excluded +from utils.session_path import get_claude_projects_dir, list_projects, list_sessions, safe_join projects_bp = Blueprint("projects", __name__) diff --git a/api/search.py b/api/search.py index 1044e51..d6397ed 100644 --- a/api/search.py +++ b/api/search.py @@ -1,15 +1,14 @@ """Search endpoint. Brute-force substring match across all sessions.""" -import os from flask import Blueprint, current_app, request from api._flask_types import FlaskReturn, json_response from api.error_codes import ErrorCode, error_response from models.search import SearchHitDict -from utils.session_path import get_claude_projects_dir, list_projects, list_sessions -from utils.jsonl_parser import parse_session from utils.exclusion_rules import is_session_excluded +from utils.jsonl_parser import parse_session +from utils.session_path import get_claude_projects_dir, list_projects, list_sessions search_bp = Blueprint("search", __name__) diff --git a/api/sessions.py b/api/sessions.py index 5b6979f..bcbe930 100644 --- a/api/sessions.py +++ b/api/sessions.py @@ -7,10 +7,10 @@ from api._flask_types import FlaskReturn, json_response from api.error_codes import ErrorCode, error_response -from utils.session_path import get_claude_projects_dir, safe_join +from utils.exclusion_rules import is_session_excluded from utils.jsonl_parser import parse_session +from utils.session_path import get_claude_projects_dir, safe_join from utils.session_stats import compute_stats -from utils.exclusion_rules import is_session_excluded sessions_bp = Blueprint("sessions", __name__) diff --git a/app.py b/app.py index 3f8df3c..a5935bf 100644 --- a/app.py +++ b/app.py @@ -3,16 +3,15 @@ __version__ = "0.1.0.dev0" import argparse -import os import sys from flask import Flask +from api.export_api import export_bp from api.projects import projects_bp -from api.sessions import sessions_bp from api.search import search_bp -from api.export_api import export_bp -from utils.exclusion_rules import resolve_exclusion_rules_path, load_rules +from api.sessions import sessions_bp +from utils.exclusion_rules import load_rules, resolve_exclusion_rules_path def _normalize_bind_host(host: str) -> str: @@ -100,7 +99,10 @@ def build_cli_parser() -> argparse.ArgumentParser: "--debug", action="store_true", default=False, - help="Enable Flask/Werkzeug debug mode (never use with --host 0.0.0.0 on untrusted networks).", + help=( + "Enable Flask/Werkzeug debug mode " + "(never use with --host 0.0.0.0 on untrusted networks)." + ), ) parser.add_argument("--base-dir", default=None, help="Override Claude projects dir") parser.add_argument( diff --git a/models/export.py b/models/export.py index cec4513..618e176 100644 --- a/models/export.py +++ b/models/export.py @@ -1,6 +1,6 @@ """Export state file shapes.""" -from typing import NotRequired, TypedDict +from typing import TypedDict class ExportStateDict(TypedDict, total=False): diff --git a/pyproject.toml b/pyproject.toml index 25e449e..b56d188 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,3 +16,16 @@ omit = [ [tool.coverage.report] fail_under = 60 include = ["api/*", "utils/*"] + +[tool.ruff] +target-version = "py312" +line-length = 100 + +[tool.ruff.lint] +select = ["E", "F", "W", "I"] + +[tool.ruff.lint.per-file-ignores] +# CLI bootstrap: sys.path must be set before local imports. +"scripts/export.py" = ["E402"] +# Tests mirror the same path bootstrap before importing app/utils. +"tests/*.py" = ["E402"] diff --git a/requirements-dev.txt b/requirements-dev.txt index 7f64a07..20aab30 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -3,3 +3,5 @@ pytest==9.0.2 mypy==1.15.0 types-Flask==1.1.6 pytest-cov>=5.0 +ruff>=0.9.0 +pip-audit>=2.7.0 diff --git a/scripts/export.py b/scripts/export.py index 7db67ff..5b27275 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -31,13 +31,7 @@ REPO_ROOT = os.path.dirname(SCRIPT_DIR) sys.path.insert(0, REPO_ROOT) -from utils.session_path import get_claude_projects_dir, list_projects, list_sessions -from utils.jsonl_parser import parse_session -from utils.session_stats import compute_stats, format_duration -from utils.md_exporter import session_to_markdown -from utils.json_exporter import session_to_json -from utils.exclusion_rules import resolve_exclusion_rules_path, load_rules -from utils.slugify import slugify +from utils.exclusion_rules import load_rules, resolve_exclusion_rules_path from utils.export_engine import ( BulkExportResult, ExportFormat, @@ -52,6 +46,12 @@ export_state_lock, load_export_state_from_disk, ) +from utils.json_exporter import session_to_json +from utils.jsonl_parser import parse_session +from utils.md_exporter import session_to_markdown +from utils.session_path import get_claude_projects_dir, list_projects, list_sessions +from utils.session_stats import compute_stats, format_duration +from utils.slugify import slugify STATE_DIR = os.path.join(os.path.expanduser("~"), ".claude-code-chat-browser") STATE_FILE = os.path.join(STATE_DIR, "export_state.json") @@ -364,7 +364,11 @@ def _aggregate_stats(base_dir: str, project_filter: str, fmt: str): totals["total_cost"] += cost totals["has_cost"] = True except Exception as e: - print(f" Warning: failed to parse {s['id'][:10]} in {project['name']}: {e}", file=sys.stderr) + print( + f" Warning: failed to parse {s['id'][:10]} " + f"in {project['name']}: {e}", + file=sys.stderr, + ) continue if fmt == "json": @@ -379,9 +383,15 @@ def _aggregate_stats(base_dir: str, project_filter: str, fmt: str): print(f" Projects: {totals['projects']}") print(f" Sessions: {totals['sessions']}") print(f" Models: {', '.join(sorted(totals['models'])) or 'none'}") - print(f" Total tokens: {total_tokens:,} (input: {totals['input_tokens']:,} / output: {totals['output_tokens']:,})") + print( + f" Total tokens: {total_tokens:,} " + f"(input: {totals['input_tokens']:,} / output: {totals['output_tokens']:,})" + ) if totals["cache_read_tokens"]: - print(f" Cache: read: {totals['cache_read_tokens']:,} / creation: {totals['cache_creation_tokens']:,}") + print( + f" Cache: read: {totals['cache_read_tokens']:,} / " + f"creation: {totals['cache_creation_tokens']:,}" + ) print(f" Tool calls: {totals['tool_calls']:,}") if totals["tool_counts"]: breakdown = ", ".join( @@ -598,8 +608,15 @@ def build_parser() -> argparse.ArgumentParser: help="Override Claude Code projects directory") parser.add_argument("--project", default=None, help="Filter by project (substring on list display name or dir name)") - parser.add_argument("--since", choices=["all", "last", "incremental"], default=None, - help="'last' = latest UTC calendar day; 'incremental' = new since last export (mtime)") + parser.add_argument( + "--since", + choices=["all", "last", "incremental"], + default=None, + help=( + "'last' = latest UTC calendar day; " + "'incremental' = new since last export (mtime)" + ), + ) parser.add_argument("--out", default=None, help="Output directory (default: current dir)") parser.add_argument("--no-zip", action="store_true", default=False, @@ -732,4 +749,3 @@ def _die(msg: str): if __name__ == "__main__": main() - \ No newline at end of file diff --git a/tests/test_api_integration.py b/tests/test_api_integration.py index 901bf9b..80009eb 100644 --- a/tests/test_api_integration.py +++ b/tests/test_api_integration.py @@ -9,10 +9,8 @@ from __future__ import annotations - from tests.conftest import assert_error_response as _assert_error_shape - # --- /api/projects --- diff --git a/tests/test_cli_args.py b/tests/test_cli_args.py index 9350af2..3fc26a2 100644 --- a/tests/test_cli_args.py +++ b/tests/test_cli_args.py @@ -9,12 +9,11 @@ pytest tests/test_cli_args.py -v """ +import argparse import ast -import sys import os -import importlib -import argparse -import types +import sys + import pytest # Ensure the repo root is on sys.path when tests are run from any directory. @@ -92,7 +91,11 @@ def _is_sys_platform_ne_win32(node: ast.AST) -> bool: def _is_debug_and_platform_guard(node: ast.AST) -> bool: """True for ``args.debug and (sys.platform != "win32")`` in either operand order.""" - if not isinstance(node, ast.BoolOp) or not isinstance(node.op, ast.And) or len(node.values) != 2: + if ( + not isinstance(node, ast.BoolOp) + or not isinstance(node.op, ast.And) + or len(node.values) != 2 + ): return False a, b = node.values return (_is_args_debug(a) and _is_sys_platform_ne_win32(b)) or ( @@ -428,10 +431,6 @@ def test_exclude_rules_short_form(self): def test_app_py_actual_argparse_has_exclude_rules(self): """Smoke-test: import app module and verify argparse accepts -e.""" - result = os.popen( - f'{sys.executable} -c "' - 'import sys, os; sys.path.insert(0, os.path.abspath(\\\".\\\"));"' - ) # Lightweight check: parse the app.py source for the flag definition app_path = os.path.join(REPO_ROOT, "app.py") with open(app_path, "r", encoding="utf-8") as f: diff --git a/tests/test_cli_export_exit_codes.py b/tests/test_cli_export_exit_codes.py index 26c2483..73a3c4a 100644 --- a/tests/test_cli_export_exit_codes.py +++ b/tests/test_cli_export_exit_codes.py @@ -2,7 +2,6 @@ from __future__ import annotations -import os import re import sys import types diff --git a/tests/test_error_propagation.py b/tests/test_error_propagation.py index 7f05526..4cd7dc1 100644 --- a/tests/test_error_propagation.py +++ b/tests/test_error_propagation.py @@ -34,7 +34,6 @@ class names from a defensive blocklist. from api.projects import projects_bp # noqa: E402 from api.sessions import sessions_bp # noqa: E402 - # Defensive blocklist — any of these substrings appearing in a response body # would mean the leak regressed. Includes common Python builtin exception # class names plus internal-looking shapes. @@ -193,7 +192,9 @@ def _boom(*args, **kwargs): ) _assert_no_class_name_leak(json.dumps(body)) error_rows = [r for r in body if isinstance(r, dict) and r.get("error")] - assert error_rows, "Expected at least one per-session error card from the forced parse failure" + assert error_rows, ( + "Expected at least one per-session error card from the forced parse failure" + ) for row in error_rows: assert "error_detail" not in row, ( "Per-session error card still includes error_detail (issue #25)" diff --git a/tests/test_exclusion_helpers.py b/tests/test_exclusion_helpers.py index 0603452..7636cfe 100644 --- a/tests/test_exclusion_helpers.py +++ b/tests/test_exclusion_helpers.py @@ -19,8 +19,6 @@ import sys from pathlib import Path -import pytest - REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) @@ -30,7 +28,6 @@ session_text_for_exclusion, ) - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- diff --git a/tests/test_export_engine_parity.py b/tests/test_export_engine_parity.py index aecb898..8517c38 100644 --- a/tests/test_export_engine_parity.py +++ b/tests/test_export_engine_parity.py @@ -8,8 +8,6 @@ import zipfile from pathlib import Path -import pytest - REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) diff --git a/tests/test_export_exclusion_filtering.py b/tests/test_export_exclusion_filtering.py index f829aa8..9256fd3 100644 --- a/tests/test_export_exclusion_filtering.py +++ b/tests/test_export_exclusion_filtering.py @@ -10,14 +10,10 @@ """ import json -import os import subprocess import sys -import tempfile from pathlib import Path -import pytest - REPO_ROOT = Path(__file__).resolve().parent.parent EXPORT_SCRIPT = REPO_ROOT / "scripts" / "export.py" diff --git a/tests/test_export_state.py b/tests/test_export_state.py index d7190eb..ebc16b2 100644 --- a/tests/test_export_state.py +++ b/tests/test_export_state.py @@ -13,18 +13,14 @@ import json import os import sys -import tempfile from datetime import datetime -import pytest - REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, REPO_ROOT) # Patch STATE_FILE before importing the module so we don't touch the real one. import scripts.export as _export_mod - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- diff --git a/tests/test_jsonl_parser.py b/tests/test_jsonl_parser.py index 8accf2f..aefd4b3 100644 --- a/tests/test_jsonl_parser.py +++ b/tests/test_jsonl_parser.py @@ -22,7 +22,6 @@ quick_session_info, ) - # --------------------------------------------------------------------------- # Metadata helpers (match parse_session initialisation) # --------------------------------------------------------------------------- @@ -289,7 +288,8 @@ def test_wrong_type_returns_empty(self): class TestExtractText: def test_text_blocks_joined(self): - assert _extract_text([{"type": "text", "text": "a"}, {"type": "text", "text": "b"}]) == "a\nb" + blocks = [{"type": "text", "text": "a"}, {"type": "text", "text": "b"}] + assert _extract_text(blocks) == "a\nb" def test_tool_use_blocks_ignored(self): assert _extract_text([{"type": "tool_use", "name": "Read"}]) == "" @@ -679,10 +679,17 @@ def test_file_history_snapshot_timestamp(self): os.unlink(path) def test_entry_counts_accumulated(self): - path = _write_jsonl([ - {"type": "assistant", "timestamp": "2026-01-01T00:00:00Z", "message": {"model": "m", "content": [], "usage": {}}}, - {"type": "user", "timestamp": "2026-01-01T00:01:00Z", "message": {"content": []}}, - ]) + assistant_entry = { + "type": "assistant", + "timestamp": "2026-01-01T00:00:00Z", + "message": {"model": "m", "content": [], "usage": {}}, + } + user_entry = { + "type": "user", + "timestamp": "2026-01-01T00:01:00Z", + "message": {"content": []}, + } + path = _write_jsonl([assistant_entry, user_entry]) try: s = parse_session(path) assert s["metadata"]["entry_counts"]["assistant"] == 1 @@ -821,7 +828,11 @@ def test_large_file_last_timestamp_from_tail(self): lines.append({ "type": "assistant", "timestamp": "2026-01-01T00:00:00Z", - "message": {"model": "m", "content": [{"type": "text", "text": "x" * 80}], "usage": {}}, + "message": { + "model": "m", + "content": [{"type": "text", "text": "x" * 80}], + "usage": {}, + }, }) lines.append({ "type": "assistant", @@ -837,9 +848,12 @@ def test_large_file_last_timestamp_from_tail(self): os.unlink(path) def test_no_user_entries_returns_untitled(self): - path = _write_jsonl([ - {"type": "assistant", "timestamp": "2026-01-01T00:00:00Z", "message": {"model": "m", "content": [], "usage": {}}}, - ]) + assistant_only = { + "type": "assistant", + "timestamp": "2026-01-01T00:00:00Z", + "message": {"model": "m", "content": [], "usage": {}}, + } + path = _write_jsonl([assistant_only]) try: info = quick_session_info(path) assert info["title"] == "Untitled Session" diff --git a/tests/test_null_usage_tokens.py b/tests/test_null_usage_tokens.py index 29d9dff..cfdc2a4 100644 --- a/tests/test_null_usage_tokens.py +++ b/tests/test_null_usage_tokens.py @@ -18,10 +18,9 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from utils.jsonl_parser import parse_session, _process_assistant +from utils.jsonl_parser import _process_assistant, parse_session from utils.session_stats import _estimate_cost - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- diff --git a/tests/test_session_path.py b/tests/test_session_path.py index 279c9da..1662cc2 100644 --- a/tests/test_session_path.py +++ b/tests/test_session_path.py @@ -14,9 +14,10 @@ def test_get_claude_projects_dir_uses_userprofile_on_windows( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: - """Linux/Windows CI smoke: patch ``session_path.platform.system`` (needs ``import platform`` in module). + """Linux/Windows CI smoke: patch ``session_path.platform.system``. - If ``session_path`` ever switches to ``from platform import system``, this patch + Requires ``import platform`` in the module under test. If ``session_path`` ever + switches to ``from platform import system``, this patch no-ops but may still pass on Linux via ``expanduser("~")``. Real Windows behavior is covered by ``test_get_claude_projects_dir_on_windows_runner`` (win32-only). """ diff --git a/tests/test_xss_sanitization.py b/tests/test_xss_sanitization.py index 33e1ea5..479baf2 100644 --- a/tests/test_xss_sanitization.py +++ b/tests/test_xss_sanitization.py @@ -25,7 +25,9 @@ STATIC_JS_DIR = REPO_ROOT / "static" / "js" DOMPURIFY_CDN_URL = "https://cdnjs.cloudflare.com/ajax/libs/dompurify/3.2.7/purify.min.js" -DOMPURIFY_SRI = "sha512-78KH17QLT5e55GJqP76vutp1D2iAoy06WcYBXB6iBCsmO6wWzx0Qdg8EDpm8mKXv68BcvHOyeeP4wxAL0twJGQ==" +DOMPURIFY_SRI = ( + "sha512-78KH17QLT5e55GJqP76vutp1D2iAoy06WcYBXB6iBCsmO6wWzx0Qdg8EDpm8mKXv68BcvHOyeeP4wxAL0twJGQ==" +) def _all_js_files(): diff --git a/utils/export_day_filter.py b/utils/export_day_filter.py index 9916723..e18a663 100644 --- a/utils/export_day_filter.py +++ b/utils/export_day_filter.py @@ -13,7 +13,10 @@ def iso_timestamp_to_date(ts: str | None) -> date | None: - """Calendar date in UTC for an ISO-8601 *ts* (offset-aware → convert; naive → that instant's date).""" + """Calendar date in UTC for ISO-8601 *ts*. + + Offset-aware values are converted to UTC; naive values use that instant's date. + """ if not ts or not isinstance(ts, str): return None s = ts.strip() diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index bfb6088..a0364a6 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -9,10 +9,20 @@ from models.session import MessageDict, SessionDict from utils.jsonl_helpers import ( entry_message as _entry_message, +) +from utils.jsonl_helpers import ( extract_images as _extract_images, +) +from utils.jsonl_helpers import ( extract_text as _extract_text, +) +from utils.jsonl_helpers import ( infer_title as _infer_title, +) +from utils.jsonl_helpers import ( normalize_content as _normalize_content, +) +from utils.jsonl_helpers import ( strip_system_tags as _strip_system_tags, ) from utils.session_peek import quick_session_info diff --git a/utils/md_exporter.py b/utils/md_exporter.py index 55d777f..c2d5db9 100644 --- a/utils/md_exporter.py +++ b/utils/md_exporter.py @@ -2,7 +2,6 @@ (cost, files touched, commands run), and the full conversation.""" from datetime import datetime - from typing import Any from models.session import MessageDict, SessionDict @@ -399,7 +398,8 @@ def _render_tool_result(parsed: dict[str, Any]) -> str: dur = parsed.get("total_duration_ms") dur_str = f" ({dur / 1000:.1f}s)" if dur else "" tok_str = f", {parsed['total_tokens']:,} tokens" if parsed.get("total_tokens") else "" - tool_str = f", {parsed['total_tool_use_count']} tool calls" if parsed.get("total_tool_use_count") else "" + tool_count = parsed.get("total_tool_use_count") + tool_str = f", {tool_count} tool calls" if tool_count else "" if parsed.get("retrieval_status"): lines.append(f"\n**Task retrieval:** {parsed['retrieval_status']}") elif parsed.get("description"): @@ -436,7 +436,7 @@ def _render_system(msg: MessageDict) -> str: content = msg.get("content", "") if subtype == "compact_boundary": - lines.append(f"\n*--- Context compacted ---*\n") + lines.append("\n*--- Context compacted ---*\n") elif content: lines.append(f"\n*[System: {content}]*\n") From 2eebda61b6758dac9ba302cabc84d831ac2ec6e0 Mon Sep 17 00:00:00 2001 From: chen Date: Tue, 9 Jun 2026 18:15:06 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20PR=20#70=20review=20?= =?UTF-8?q?=E2=80=94=20duplicate=20exclusion=20check,=20ruff=20format=20CI?= =?UTF-8?q?,=20and=20test=20cleanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/ci.yml | 3 + SECURITY.md | 2 +- api/export_api.py | 25 +- api/projects.py | 2 + api/search.py | 19 +- api/sessions.py | 8 - app.py | 5 +- pyproject.toml | 3 + scripts/export.py | 152 +++-- scripts/gen_real_session_fixtures.py | 1 + tests/conftest.py | 4 +- tests/test_api_integration.py | 4 +- tests/test_api_routes.py | 32 +- tests/test_cli_args.py | 6 +- tests/test_cli_e2e.py | 38 +- tests/test_cli_export_exit_codes.py | 42 +- tests/test_error_propagation.py | 20 +- tests/test_exclusion_helpers.py | 24 +- tests/test_export_api_bulk.py | 16 +- tests/test_export_engine_parity.py | 36 +- tests/test_export_exclusion_filtering.py | 40 +- tests/test_export_state.py | 8 +- tests/test_export_state_store.py | 5 +- tests/test_jsonl_parser.py | 678 ++++++++++++++--------- tests/test_jsonl_validation.py | 14 +- tests/test_null_usage_tokens.py | 161 +++--- tests/test_real_session_fixtures.py | 8 +- tests/test_search.py | 18 +- tests/test_xss_sanitization.py | 20 +- utils/exclusion_rules.py | 10 +- utils/export_engine.py | 26 +- utils/export_state_store.py | 6 +- utils/jsonl_helpers.py | 28 +- utils/jsonl_parser.py | 168 +++--- utils/md_exporter.py | 24 +- utils/session_path.py | 49 +- utils/session_stats.py | 24 +- utils/tool_dispatch.py | 8 +- utils/validation.py | 4 +- 39 files changed, 949 insertions(+), 792 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d668e74..cbd89fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,6 +71,9 @@ jobs: - name: Ruff run: ruff check . + - name: Ruff format check + run: ruff format --check . + - name: pip-audit run: pip-audit -r requirements.txt diff --git a/SECURITY.md b/SECURITY.md index b4fd988..8672c4e 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -6,7 +6,7 @@ This project is pre-release. Security fixes are applied to the **latest `master` | Version | Supported | | -------------- | --------- | -| latest `master`| Yes | +| latest `master` | Yes | | older commits | No | ## Reporting a Vulnerability diff --git a/api/export_api.py b/api/export_api.py index 6ab107c..1bd86d9 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -12,8 +12,7 @@ from api.error_codes import ErrorCode, error_response from models.export import ExportStateDict from utils.exclusion_rules import is_session_excluded -from utils.export_engine import EXPORT_ERRORS as _EXPORT_ERRORS -from utils.export_engine import ZipSink, run_bulk_export +from utils.export_engine import EXPORT_ERRORS as _EXPORT_ERRORS, ZipSink, run_bulk_export from utils.export_state_store import ( EXPORT_STATE_FILE, atomic_write_export_state, @@ -94,10 +93,7 @@ def bulk_export() -> FlaskReturn: since=since, ) - base = ( - current_app.config.get("CLAUDE_PROJECTS_DIR") - or get_claude_projects_dir() - ) + base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir() projects = list_projects(base) rules = current_app.config.get("EXCLUSION_RULES") or [] @@ -109,9 +105,7 @@ def bulk_export() -> FlaskReturn: buf = io.BytesIO() def _on_export_error(sid: str, exc: Exception) -> None: - current_app.logger.warning( - "Failed to export %s: %s", sid[:10], exc - ) + current_app.logger.warning("Failed to export %s: %s", sid[:10], exc) with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: result = run_bulk_export( @@ -163,10 +157,7 @@ def _on_export_error(sid: str, exc: Exception) -> None: def export_session(project_name: str, session_id: str) -> FlaskReturn: from utils.session_path import safe_join - base = ( - current_app.config.get("CLAUDE_PROJECTS_DIR") - or get_claude_projects_dir() - ) + base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir() try: filepath = safe_join(base, project_name, f"{session_id}.jsonl") except ValueError: @@ -183,9 +174,7 @@ def export_session(project_name: str, session_id: str) -> FlaskReturn: try: session = parse_session(filepath) except _EXPORT_ERRORS: - current_app.logger.exception( - "Failed to parse session %s for export", session_id - ) + current_app.logger.exception("Failed to parse session %s for export", session_id) return error_response( ErrorCode.PARSE_ERROR, "Failed to parse session", @@ -203,9 +192,7 @@ def export_session(project_name: str, session_id: str) -> FlaskReturn: try: stats = compute_stats(session) except _EXPORT_ERRORS: - current_app.logger.exception( - "Failed to compute stats for export %s", session_id - ) + current_app.logger.exception("Failed to compute stats for export %s", session_id) return error_response( ErrorCode.INTERNAL_ERROR, "Failed to compute session stats", diff --git a/api/projects.py b/api/projects.py index 9dfa067..16eb8bb 100644 --- a/api/projects.py +++ b/api/projects.py @@ -49,6 +49,7 @@ def get_projects() -> FlaskReturn: # so the landing page matches what the workspace page shows. # Uses quick_session_info() which peeks at files without full parsing. from utils.jsonl_parser import quick_session_info + for project in projects: sessions = list_sessions(project["path"]) titled_count = 0 @@ -81,6 +82,7 @@ def get_project_sessions(project_name: str) -> FlaskReturn: sessions = list_sessions(project_dir) # Add summary preview for each session from utils.jsonl_parser import parse_session + rules = current_app.config.get("EXCLUSION_RULES") or [] result: list[ProjectSessionRowDict] = [] for s in sessions: diff --git a/api/search.py b/api/search.py index d6397ed..d3d4721 100644 --- a/api/search.py +++ b/api/search.py @@ -1,6 +1,5 @@ """Search endpoint. Brute-force substring match across all sessions.""" - from flask import Blueprint, current_app, request from api._flask_types import FlaskReturn, json_response @@ -70,14 +69,16 @@ def search() -> FlaskReturn: end = min(len(text), idx + len(query) + 80) snippet = text[start:end] - results.append({ - "project": project["name"], - "session_id": session["session_id"], - "title": session["title"], - "role": msg["role"], - "timestamp": msg.get("timestamp"), - "snippet": snippet, - }) + results.append( + { + "project": project["name"], + "session_id": session["session_id"], + "title": session["title"], + "role": msg["role"], + "timestamp": msg.get("timestamp"), + "snippet": snippet, + } + ) if len(results) >= max_results: break diff --git a/api/sessions.py b/api/sessions.py index bcbe930..dbcbc0f 100644 --- a/api/sessions.py +++ b/api/sessions.py @@ -89,14 +89,6 @@ def get_session_stats(project_name: str, session_id: str) -> FlaskReturn: 500, ) - rules = current_app.config.get("EXCLUSION_RULES") or [] - if is_session_excluded(rules, session, project_name): - return error_response( - ErrorCode.SESSION_NOT_FOUND, - "Session not found", - 404, - ) - try: stats = compute_stats(session) return json_response(stats) diff --git a/app.py b/app.py index a5935bf..bcaaab1 100644 --- a/app.py +++ b/app.py @@ -106,11 +106,12 @@ def build_cli_parser() -> argparse.ArgumentParser: ) parser.add_argument("--base-dir", default=None, help="Override Claude projects dir") parser.add_argument( - "--exclude-rules", "-e", + "--exclude-rules", + "-e", default=None, metavar="PATH", help="Path to exclusion rules file (sensitive sessions are omitted). " - "If omitted, uses ~/.claude-code-chat-browser/exclusion-rules.txt if present.", + "If omitted, uses ~/.claude-code-chat-browser/exclusion-rules.txt if present.", ) return parser diff --git a/pyproject.toml b/pyproject.toml index b56d188..5e8f63b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,9 @@ line-length = 100 [tool.ruff.lint] select = ["E", "F", "W", "I"] +[tool.ruff.lint.isort] +combine-as-imports = true + [tool.ruff.lint.per-file-ignores] # CLI bootstrap: sys.path must be set before local imports. "scripts/export.py" = ["E402"] diff --git a/scripts/export.py b/scripts/export.py index 5b27275..b6795af 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -83,16 +83,10 @@ def _zip_export_basename( if project_filter: if len(projects) == 1: p0 = projects[0] - parts.append( - slugify(p0.get("display_name") or p0["name"], default="project") - ) + parts.append(slugify(p0.get("display_name") or p0["name"], default="project")) else: - parts.append( - f"{slugify(project_filter, default='project')}-n{len(projects)}" - ) - if since == "last" and latest_day is not None and isinstance( - latest_day, date - ): + parts.append(f"{slugify(project_filter, default='project')}-n{len(projects)}") + if since == "last" and latest_day is not None and isinstance(latest_day, date): parts.append(f"last-{latest_day.strftime('%m-%d')}") if parts: return f"claude-code-export-{'-'.join(parts)}-{date_tag}.zip" @@ -114,10 +108,15 @@ def _prefixed_export_option_overrides(argv: list[str]) -> dict[str, object]: i = 0 while i < len(pre): a = pre[i] - if a == "--since" and i + 1 < len(pre) and pre[i + 1] in ( - "all", - "last", - "incremental", + if ( + a == "--since" + and i + 1 < len(pre) + and pre[i + 1] + in ( + "all", + "last", + "incremental", + ) ): opts["since"] = pre[i + 1] i += 2 @@ -171,6 +170,7 @@ def main(): else: cmd_export(args) + def cmd_list(args): """Print a table of projects, or drill into one project's sessions.""" base_dir = getattr(args, "base_dir", None) or get_claude_projects_dir() @@ -276,9 +276,7 @@ def _session_stats(session_id: str, base_dir: str, fmt: str): print(f" Tool calls: {meta['total_tool_calls']}") if meta["tool_call_counts"]: breakdown = ", ".join( - f"{t}: {c}" for t, c in sorted( - meta["tool_call_counts"].items(), key=lambda x: -x[1] - ) + f"{t}: {c}" for t, c in sorted(meta["tool_call_counts"].items(), key=lambda x: -x[1]) ) print(f" {breakdown}") if meta.get("stop_reasons"): @@ -365,8 +363,7 @@ def _aggregate_stats(base_dir: str, project_filter: str, fmt: str): totals["has_cost"] = True except Exception as e: print( - f" Warning: failed to parse {s['id'][:10]} " - f"in {project['name']}: {e}", + f" Warning: failed to parse {s['id'][:10]} in {project['name']}: {e}", file=sys.stderr, ) continue @@ -395,9 +392,7 @@ def _aggregate_stats(base_dir: str, project_filter: str, fmt: str): print(f" Tool calls: {totals['tool_calls']:,}") if totals["tool_counts"]: breakdown = ", ".join( - f"{t}: {c}" for t, c in sorted( - totals["tool_counts"].items(), key=lambda x: -x[1] - )[:10] + f"{t}: {c}" for t, c in sorted(totals["tool_counts"].items(), key=lambda x: -x[1])[:10] ) print(f" {breakdown}") print(f" Files: {len(totals['files_unique']):,} unique") @@ -423,9 +418,9 @@ def _exit_bulk_export(result: BulkExportResult) -> None: if n > 0 or k > 0: dest = sys.stderr if k > 0 else sys.stdout print(f"Exported {n} of {m} sessions ({k} failed)", file=dest) - if n == 0 and k > 0: # total failure + if n == 0 and k > 0: # total failure sys.exit(1) - elif k > 0: # partial failure + elif k > 0: # partial failure sys.exit(2) @@ -503,20 +498,14 @@ def _on_export_error(sid: str, exc: Exception) -> None: "exporting sessions that overlap that calendar day." ) if export_result.latest_day_match_count == 0: - print( - f"No sessions overlap {latest_day.isoformat()} (UTC); " - "nothing to export." - ) + print(f"No sessions overlap {latest_day.isoformat()} (UTC); nothing to export.") _exit_bulk_export(export_result) return elif since == "incremental": skipped_mtime_unchanged = export_result.skipped_mtime_unchanged_count exported = len(all_exports) - print( - f"Exporting {exported} file(s) " - f"({skipped} skipped, {total_sessions} total)" - ) + print(f"Exporting {exported} file(s) ({skipped} skipped, {total_sessions} total)") if not all_exports: print("Nothing to export.") @@ -604,79 +593,88 @@ def build_parser() -> argparse.ArgumentParser: epilog=__doc__, ) # Global options (for backward compatibility when no subcommand) - parser.add_argument("--base-dir", default=None, - help="Override Claude Code projects directory") - parser.add_argument("--project", default=None, - help="Filter by project (substring on list display name or dir name)") + parser.add_argument("--base-dir", default=None, help="Override Claude Code projects directory") + parser.add_argument( + "--project", + default=None, + help="Filter by project (substring on list display name or dir name)", + ) parser.add_argument( "--since", choices=["all", "last", "incremental"], default=None, - help=( - "'last' = latest UTC calendar day; " - "'incremental' = new since last export (mtime)" - ), + help=("'last' = latest UTC calendar day; 'incremental' = new since last export (mtime)"), + ) + parser.add_argument("--out", default=None, help="Output directory (default: current dir)") + parser.add_argument( + "--no-zip", action="store_true", default=False, help="Write individual files instead of zip" + ) + parser.add_argument( + "--format", choices=["md", "json", "both"], default=None, help="Export format (default: md)" + ) + parser.add_argument( + "--session", default=None, help="Export/stats for single session (UUID prefix)" ) - parser.add_argument("--out", default=None, - help="Output directory (default: current dir)") - parser.add_argument("--no-zip", action="store_true", default=False, - help="Write individual files instead of zip") - parser.add_argument("--format", choices=["md", "json", "both"], - default=None, help="Export format (default: md)") - parser.add_argument("--session", default=None, - help="Export/stats for single session (UUID prefix)") parser.add_argument( - "--exclude-rules", "-e", + "--exclude-rules", + "-e", default=None, metavar="PATH", dest="exclude_rules", help="Path to exclusion rules file (sensitive sessions are omitted). " - "If omitted, uses ~/.claude-code-chat-browser/exclusion-rules.txt if present.", + "If omitted, uses ~/.claude-code-chat-browser/exclusion-rules.txt if present.", ) subparsers = parser.add_subparsers(dest="command") # List subcommand list_p = subparsers.add_parser("list", help="List projects and sessions") - list_p.add_argument("--project", default=None, - help="Filter/select project (display name or dir name substring)") - list_p.add_argument("--base-dir", default=None, - help="Override Claude Code projects directory") + list_p.add_argument( + "--project", default=None, help="Filter/select project (display name or dir name substring)" + ) + list_p.add_argument("--base-dir", default=None, help="Override Claude Code projects directory") # Stats subcommand stats_p = subparsers.add_parser("stats", help="Show statistics") - stats_p.add_argument("--session", default=None, - help="Stats for specific session (UUID prefix)") - stats_p.add_argument("--format", choices=["text", "json"], default="text", - help="Output format (default: text)") - stats_p.add_argument("--project", default=None, - help="Filter by project (display name or dir name substring)") - stats_p.add_argument("--base-dir", default=None, - help="Override Claude Code projects directory") + stats_p.add_argument("--session", default=None, help="Stats for specific session (UUID prefix)") + stats_p.add_argument( + "--format", choices=["text", "json"], default="text", help="Output format (default: text)" + ) + stats_p.add_argument( + "--project", default=None, help="Filter by project (display name or dir name substring)" + ) + stats_p.add_argument("--base-dir", default=None, help="Override Claude Code projects directory") # Export subcommand (explicit) export_p = subparsers.add_parser("export", help="Export sessions") - export_p.add_argument("--since", choices=["all", "last", "incremental"], default="all", - help="'last' = latest UTC day; 'incremental' = new since last export") - export_p.add_argument("--out", default=None, - help="Output directory (default: current dir)") - export_p.add_argument("--no-zip", action="store_true", - help="Write individual files instead of zip") - export_p.add_argument("--format", choices=["md", "json", "both"], - default="md", help="Export format (default: md)") - export_p.add_argument("--session", default=None, - help="Export single session by UUID prefix") - export_p.add_argument("--project", default=None, - help="Filter by project (display name or dir name substring)") - export_p.add_argument("--base-dir", default=None, - help="Override Claude Code projects directory") export_p.add_argument( - "--exclude-rules", "-e", + "--since", + choices=["all", "last", "incremental"], + default="all", + help="'last' = latest UTC day; 'incremental' = new since last export", + ) + export_p.add_argument("--out", default=None, help="Output directory (default: current dir)") + export_p.add_argument( + "--no-zip", action="store_true", help="Write individual files instead of zip" + ) + export_p.add_argument( + "--format", choices=["md", "json", "both"], default="md", help="Export format (default: md)" + ) + export_p.add_argument("--session", default=None, help="Export single session by UUID prefix") + export_p.add_argument( + "--project", default=None, help="Filter by project (display name or dir name substring)" + ) + export_p.add_argument( + "--base-dir", default=None, help="Override Claude Code projects directory" + ) + export_p.add_argument( + "--exclude-rules", + "-e", default=None, metavar="PATH", dest="exclude_rules", help="Path to exclusion rules file (sensitive sessions are omitted). " - "If omitted, uses ~/.claude-code-chat-browser/exclusion-rules.txt if present.", + "If omitted, uses ~/.claude-code-chat-browser/exclusion-rules.txt if present.", ) return parser diff --git a/scripts/gen_real_session_fixtures.py b/scripts/gen_real_session_fixtures.py index 7dcd7bc..84c6f80 100644 --- a/scripts/gen_real_session_fixtures.py +++ b/scripts/gen_real_session_fixtures.py @@ -4,6 +4,7 @@ Each entry includes top-level ``sessionId`` (real Claude Code JSONL shape). The parser currently ignores it and uses the filename for ``session_id``. """ + from __future__ import annotations import json diff --git a/tests/conftest.py b/tests/conftest.py index 4ed36bf..2d483b9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -73,6 +73,4 @@ def client_empty(tmp_path, export_state_file): @pytest.fixture def client_thinking(tmp_path, export_state_file): """Flask test client with a session containing thinking content blocks.""" - return _make_test_client( - tmp_path, {"session_think001.jsonl": "session_with_thinking.jsonl"} - ) + return _make_test_client(tmp_path, {"session_think001.jsonl": "session_with_thinking.jsonl"}) diff --git a/tests/test_api_integration.py b/tests/test_api_integration.py index 80009eb..7fd37cb 100644 --- a/tests/test_api_integration.py +++ b/tests/test_api_integration.py @@ -75,9 +75,7 @@ def test_session_detail_includes_thinking_blocks(client_thinking): session = resp.get_json() assert "messages" in session assistant_msgs = [m for m in session["messages"] if m.get("role") == "assistant"] - assert any( - m.get("thinking") == "Considering options carefully." for m in assistant_msgs - ) + assert any(m.get("thinking") == "Considering options carefully." for m in assistant_msgs) # --- /api/search --- diff --git a/tests/test_api_routes.py b/tests/test_api_routes.py index 9b90ff8..bb2fb30 100644 --- a/tests/test_api_routes.py +++ b/tests/test_api_routes.py @@ -2,15 +2,19 @@ from __future__ import annotations +import shutil +from pathlib import Path + +from app import create_app from tests.conftest import assert_error_response +FIXTURES = Path(__file__).parent / "fixtures" + def test_index_returns_html(client): resp = client.get("/") assert resp.status_code == 200 - assert b"html" in resp.data.lower() or ( - resp.content_type and "html" in resp.content_type - ) + assert b"html" in resp.data.lower() or (resp.content_type and "html" in resp.content_type) def test_session_stats_happy_path(client): @@ -27,6 +31,23 @@ def test_session_stats_not_found(client): assert_error_response(resp, expected_code="SESSION_NOT_FOUND") +def test_session_stats_excluded_session_returns_404(tmp_path, export_state_file): + project_dir = tmp_path / "test-project" + project_dir.mkdir(parents=True) + shutil.copy(FIXTURES / "session_minimal.jsonl", project_dir / "session_abc123.jsonl") + + rules_path = tmp_path / "exclusion-rules.txt" + rules_path.write_text("integration fixture\n", encoding="utf-8") + + app = create_app(base_dir=str(tmp_path), exclusion_rules_path=str(rules_path)) + app.config["TESTING"] = True + excluded_client = app.test_client() + + resp = excluded_client.get("/api/sessions/test-project/session_abc123/stats") + assert resp.status_code == 404 + assert_error_response(resp, expected_code="SESSION_NOT_FOUND") + + def test_session_stats_invalid_path(client): resp = client.get("/api/sessions/../../etc/passwd/session_abc123/stats") assert resp.status_code == 400 @@ -41,6 +62,7 @@ def test_session_detail_invalid_path(client): def test_session_detail_parse_failure_returns_500_without_leak(client, monkeypatch): """Parser failures must return generic PARSE_ERROR, not exception internals (#25).""" + def _boom(*_args, **_kwargs): raise KeyError("internal_secret_field_id") @@ -106,9 +128,7 @@ def test_export_session_markdown_attachment(client): def test_export_session_json_format(client): - resp = client.get( - "/api/export/session/test-project/session_abc123?format=json" - ) + resp = client.get("/api/export/session/test-project/session_abc123?format=json") assert resp.status_code == 200 assert resp.mimetype == "application/json" diff --git a/tests/test_cli_args.py b/tests/test_cli_args.py index 3fc26a2..6789758 100644 --- a/tests/test_cli_args.py +++ b/tests/test_cli_args.py @@ -114,6 +114,7 @@ def _use_reloader_kwarg_tied_to_debug(call: ast.Call) -> bool: # export.py argument tests # --------------------------------------------------------------------------- + class TestExportParserFlags: """Every flag that cursor's export.py exposes must also exist here.""" @@ -300,6 +301,7 @@ def test_help_exits_zero(self): # app.py argument tests # --------------------------------------------------------------------------- + class TestAppArgparse: """app.py CLI must expose the same flags as cursor's app.py.""" @@ -324,9 +326,7 @@ def test_debug_explicit_true(self): args = parser.parse_args(["--debug"]) assert args.debug is True - @pytest.mark.parametrize( - "host", ["127.0.0.1", "localhost", "::1", "[::1]", "127.0.0.2"] - ) + @pytest.mark.parametrize("host", ["127.0.0.1", "localhost", "::1", "[::1]", "127.0.0.2"]) def test_is_loopback_host_accepts_loopback(self, host: str) -> None: assert is_loopback_host(host) diff --git a/tests/test_cli_e2e.py b/tests/test_cli_e2e.py index 4141fc6..e8b79ea 100644 --- a/tests/test_cli_e2e.py +++ b/tests/test_cli_e2e.py @@ -75,13 +75,15 @@ def test_cli_stats_exits_zero(tmp_path): def test_cli_invalid_since_exits_nonzero(tmp_path): base = _seed_base_dir(tmp_path) - proc = _run_cli([ - "export", - "--since", - "yesterday", - "--base-dir", - str(base), - ]) + proc = _run_cli( + [ + "export", + "--since", + "yesterday", + "--base-dir", + str(base), + ] + ) assert proc.returncode != 0 assert "--since" in proc.stderr assert "invalid choice" in proc.stderr.lower() @@ -91,16 +93,18 @@ def test_cli_invalid_since_exits_nonzero(tmp_path): def test_cli_export_creates_output(tmp_path): base = _seed_base_dir(tmp_path) out_dir = tmp_path / "out" - proc = _run_cli([ - "export", - "--base-dir", - str(base), - "--since", - "all", - "--no-zip", - "--out", - str(out_dir), - ]) + proc = _run_cli( + [ + "export", + "--base-dir", + str(base), + "--since", + "all", + "--no-zip", + "--out", + str(out_dir), + ] + ) assert proc.returncode == 0, proc.stderr md_files = list(out_dir.rglob("*.md")) assert len(md_files) >= 1 diff --git a/tests/test_cli_export_exit_codes.py b/tests/test_cli_export_exit_codes.py index 73a3c4a..224021b 100644 --- a/tests/test_cli_export_exit_codes.py +++ b/tests/test_cli_export_exit_codes.py @@ -12,10 +12,10 @@ REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) -import scripts.export as export # noqa: E402 -from tests.test_cli_e2e import _run_cli, _seed_base_dir # noqa: E402 -from utils.export_engine import BulkExportResult # noqa: E402 -from utils.jsonl_parser import parse_session # noqa: E402 +import scripts.export as export +from tests.test_cli_e2e import _run_cli, _seed_base_dir +from utils.export_engine import BulkExportResult +from utils.jsonl_parser import parse_session _SUMMARY_RE = re.compile( r"Exported \d+ of \d+ sessions \(\d+ failed\)", @@ -44,16 +44,18 @@ def _export_args(tmp_path: Path, base: Path, out_dir: Path) -> types.SimpleNames def test_cli_export_clean_exits_zero(tmp_path): base = _seed_base_dir(tmp_path) out_dir = tmp_path / "out" - proc = _run_cli([ - "export", - "--base-dir", - str(base), - "--since", - "all", - "--no-zip", - "--out", - str(out_dir), - ]) + proc = _run_cli( + [ + "export", + "--base-dir", + str(base), + "--since", + "all", + "--no-zip", + "--out", + str(out_dir), + ] + ) assert proc.returncode == 0, proc.stderr assert list(out_dir.rglob("*.md")) # Success summary must go to stdout, not stderr @@ -61,9 +63,7 @@ def test_cli_export_clean_exits_zero(tmp_path): assert "Exported 1 of 1 sessions (0 failed)" in proc.stdout -def test_cli_export_partial_failure_exits_two( - tmp_path, monkeypatch, capsys -): +def test_cli_export_partial_failure_exits_two(tmp_path, monkeypatch, capsys): """One session exports; a second fails parse (simulated corrupt file).""" base = _seed_base_dir(tmp_path) project_dir = base / "test-project" @@ -95,9 +95,7 @@ def _parse(path: str): assert len(list(out_dir.rglob("*.md"))) == 1 -def test_since_last_early_return_invokes_exit_bulk_export( - tmp_path, monkeypatch, capsys -): +def test_since_last_early_return_invokes_exit_bulk_export(tmp_path, monkeypatch, capsys): """cmd_export --since last must call _exit_bulk_export on early-return paths.""" exit_calls: list[BulkExportResult] = [] @@ -134,9 +132,7 @@ def _track_exit(result: BulkExportResult) -> None: assert "Exported" not in captured.err -def test_since_last_early_return_exits_one_on_failure( - tmp_path, monkeypatch, capsys -): +def test_since_last_early_return_exits_one_on_failure(tmp_path, monkeypatch, capsys): """Since-last early-return with failure_count>0 must produce real exit code 1.""" fake_result = BulkExportResult(latest_day=None, failure_count=1) diff --git a/tests/test_error_propagation.py b/tests/test_error_propagation.py index 4cd7dc1..eb9392e 100644 --- a/tests/test_error_propagation.py +++ b/tests/test_error_propagation.py @@ -29,10 +29,10 @@ class names from a defensive blocklist. REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) -from flask import Flask # noqa: E402 +from flask import Flask -from api.projects import projects_bp # noqa: E402 -from api.sessions import sessions_bp # noqa: E402 +from api.projects import projects_bp +from api.sessions import sessions_bp # Defensive blocklist — any of these substrings appearing in a response body # would mean the leak regressed. Includes common Python builtin exception @@ -96,8 +96,8 @@ def _write_session(tmp_path, project: str, session_id: str, content: str): # /api/sessions// # --------------------------------------------------------------------------- -class TestGetSessionErrorBody: +class TestGetSessionErrorBody: def test_500_on_parse_failure_does_not_leak_class_name(self, tmp_path, client, monkeypatch): # Force the parser to raise an exception with a class-name + message # that WOULD leak through the old f-string interpolation if the fix @@ -142,8 +142,8 @@ def test_400_on_path_traversal_attempt(self, client): # /api/sessions///stats # --------------------------------------------------------------------------- -class TestGetSessionStatsErrorBody: +class TestGetSessionStatsErrorBody: def test_500_on_parse_failure_does_not_leak_class_name(self, tmp_path, client, monkeypatch): _write_session(tmp_path, "proj", "abc", "{}") @@ -166,8 +166,8 @@ def _boom(*args, **kwargs): # /api/projects (per-session card) # --------------------------------------------------------------------------- -class TestGetProjectsErrorCard: +class TestGetProjectsErrorCard: def test_per_session_error_card_omits_error_detail(self, tmp_path, client, monkeypatch): # parse_session is tolerant of malformed lines, so to exercise the # except branch deterministically (the one that builds the error @@ -208,6 +208,7 @@ def _boom(*args, **kwargs): # Source-level guard # --------------------------------------------------------------------------- + class TestNoExceptionInterpolationInSource: """Static guard: any future PR that re-introduces the `f"...{type(e).__name__}: {e}..."` pattern in api/ fails this test.""" @@ -220,11 +221,10 @@ def test_api_files_dont_interpolate_exception_in_jsonify(self): # contains both `type(e)` or `{e}` AND the word "error". offending_patterns = [ "type(e).__name__", # the class-name expose - "{e}\"", # bare {e} ending an f-string - "{e},", # bare {e} in a dict-value f-string + '{e}"', # bare {e} ending an f-string + "{e},", # bare {e} in a dict-value f-string ] for pat in offending_patterns: assert pat not in src, ( - f"{py_file.name} contains forbidden pattern {pat!r} " - f"— see issue #25" + f"{py_file.name} contains forbidden pattern {pat!r} — see issue #25" ) diff --git a/tests/test_exclusion_helpers.py b/tests/test_exclusion_helpers.py index 7636cfe..1792d5b 100644 --- a/tests/test_exclusion_helpers.py +++ b/tests/test_exclusion_helpers.py @@ -32,6 +32,7 @@ # Helpers # --------------------------------------------------------------------------- + def _write_rules(tmp_path, *lines: str) -> str: """Write rules file and return its path. Tokenized by load_rules.""" p = tmp_path / "exclusion-rules.txt" @@ -39,8 +40,9 @@ def _write_rules(tmp_path, *lines: str) -> str: return str(p) -def _session(*, title: str = "session", models: list[str] | None = None, - messages: list[dict] | None = None) -> dict: +def _session( + *, title: str = "session", models: list[str] | None = None, messages: list[dict] | None = None +) -> dict: return { "title": title, "metadata": {"models_used": models or []}, @@ -52,8 +54,8 @@ def _session(*, title: str = "session", models: list[str] | None = None, # session_text_for_exclusion # --------------------------------------------------------------------------- -class TestSessionTextForExclusion: +class TestSessionTextForExclusion: def test_empty_session(self): assert session_text_for_exclusion({}) == "" @@ -72,12 +74,14 @@ def test_skips_whitespace_only_text(self): # Regression: this is the inconsistency the consolidation fixed — # the helper rejects whitespace-only strings, the previous inline # variants didn't. The helper version is now canonical. - s = _session(messages=[ - {"text": "alpha"}, - {"text": " "}, # whitespace-only — should be skipped - {"text": "\n\t\n"}, # whitespace-only — should be skipped - {"text": "beta"}, - ]) + s = _session( + messages=[ + {"text": "alpha"}, + {"text": " "}, # whitespace-only — should be skipped + {"text": "\n\t\n"}, # whitespace-only — should be skipped + {"text": "beta"}, + ] + ) assert session_text_for_exclusion(s) == "alpha\n\nbeta" def test_skips_non_string_text(self): @@ -89,8 +93,8 @@ def test_skips_non_string_text(self): # is_session_excluded # --------------------------------------------------------------------------- -class TestIsSessionExcluded: +class TestIsSessionExcluded: def test_returns_false_when_rules_empty(self, tmp_path): s = _session(title="anything", messages=[{"text": "anything"}]) assert is_session_excluded([], s, "any project") is False diff --git a/tests/test_export_api_bulk.py b/tests/test_export_api_bulk.py index 4eb2695..6cd5ef7 100644 --- a/tests/test_export_api_bulk.py +++ b/tests/test_export_api_bulk.py @@ -11,9 +11,9 @@ REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) -from flask import Flask # noqa: E402 +from flask import Flask -from api.export_api import export_bp # noqa: E402 +from api.export_api import export_bp @pytest.fixture @@ -71,11 +71,13 @@ def test_bulk_export_empty_returns_422_json(isolated_state, tmp_path): def test_export_state_json_fields(isolated_state): isolated_state.write_text( - json.dumps({ - "lastExportTime": "2026-01-01T12:00:00", - "exportedCount": 5, - "sessions": {}, - }), + json.dumps( + { + "lastExportTime": "2026-01-01T12:00:00", + "exportedCount": 5, + "sessions": {}, + } + ), encoding="utf-8", ) app = Flask(__name__) diff --git a/tests/test_export_engine_parity.py b/tests/test_export_engine_parity.py index 8517c38..02e97f7 100644 --- a/tests/test_export_engine_parity.py +++ b/tests/test_export_engine_parity.py @@ -11,17 +11,17 @@ REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) -from flask import Flask # noqa: E402 +from flask import Flask -from api.export_api import export_bp # noqa: E402 -from tests.test_cli_e2e import _run_cli, _seed_base_dir # noqa: E402 -from utils.export_engine import ( # noqa: E402 +from api.export_api import export_bp +from tests.test_cli_e2e import _run_cli, _seed_base_dir +from utils.export_engine import ( MANIFEST_SHARED_KEYS, NoopSink, manifest_shared_subset, run_bulk_export, ) -from utils.session_path import list_projects # noqa: E402 +from utils.session_path import list_projects def _markdown_from_exports(exports: list[tuple[str, str]]) -> str: @@ -60,9 +60,7 @@ def test_engine_api_vs_cli_layout_same_markdown_and_manifest(tmp_path: Path) -> assert api_result.exported_session_count == 1 assert cli_result.exported_session_count == 1 - assert _markdown_from_exports(api_result.exports) == _markdown_from_exports( - cli_result.exports - ) + assert _markdown_from_exports(api_result.exports) == _markdown_from_exports(cli_result.exports) api_core = manifest_shared_subset(api_result.manifest[0]) cli_core = manifest_shared_subset(cli_result.manifest[0]) @@ -94,16 +92,18 @@ def test_http_post_export_matches_cli_no_zip(tmp_path: Path, monkeypatch) -> Non assert len(http_manifest) == 1, f"expected one manifest row, got {len(http_manifest)}" out_dir = tmp_path / "cli_out" - proc = _run_cli([ - "export", - "--base-dir", - str(base), - "--since", - "all", - "--no-zip", - "--out", - str(out_dir), - ]) + proc = _run_cli( + [ + "export", + "--base-dir", + str(base), + "--since", + "all", + "--no-zip", + "--out", + str(out_dir), + ] + ) assert proc.returncode == 0, proc.stderr cli_md_files = list(out_dir.rglob("*.md")) diff --git a/tests/test_export_exclusion_filtering.py b/tests/test_export_exclusion_filtering.py index 9256fd3..7748acb 100644 --- a/tests/test_export_exclusion_filtering.py +++ b/tests/test_export_exclusion_filtering.py @@ -22,6 +22,7 @@ # Helpers # --------------------------------------------------------------------------- + def _write_session(project_dir: Path, session_id: str, messages: list[dict]) -> Path: """Write a minimal JSONL session file and return its path.""" path = project_dir / f"{session_id}.jsonl" @@ -74,10 +75,13 @@ def _run_export( cmd = [ sys.executable, str(EXPORT_SCRIPT), - "--base-dir", str(base_dir), - "--since", "all", + "--base-dir", + str(base_dir), + "--since", + "all", "--no-zip", - "--out", str(out_dir), + "--out", + str(out_dir), ] if rules_path: cmd += ["--exclude-rules", str(rules_path)] @@ -99,6 +103,7 @@ def _collect_md(out_dir: Path) -> list[Path]: # Tests # --------------------------------------------------------------------------- + class TestExclusionRulesFiltering: def test_matched_session_is_excluded(self, tmp_path): """A session whose content matches an exclusion rule must not be exported.""" @@ -152,11 +157,15 @@ def test_short_flag_e_works(self, tmp_path): cmd = [ sys.executable, str(EXPORT_SCRIPT), - "--base-dir", str(tmp_path / "projects"), - "--since", "all", + "--base-dir", + str(tmp_path / "projects"), + "--since", + "all", "--no-zip", - "--out", str(out_dir), - "-e", str(rules_file), + "--out", + str(out_dir), + "-e", + str(rules_file), ] proc = subprocess.run(cmd, cwd=str(REPO_ROOT), capture_output=True, text=True) assert proc.returncode == 0, proc.stderr @@ -239,11 +248,15 @@ def test_exclude_rules_subcommand(self, tmp_path): sys.executable, str(EXPORT_SCRIPT), "export", - "--base-dir", str(tmp_path / "projects"), - "--since", "all", + "--base-dir", + str(tmp_path / "projects"), + "--since", + "all", "--no-zip", - "--out", str(out_dir), - "--exclude-rules", str(rules_file), + "--out", + str(out_dir), + "--exclude-rules", + str(rules_file), ] proc = subprocess.run(cmd, cwd=str(REPO_ROOT), capture_output=True, text=True) assert proc.returncode == 0, proc.stderr @@ -267,14 +280,13 @@ def test_state_saved_after_export_with_rules(self, tmp_path): # by running with a custom STATE_DIR via monkeypatching in the same # process — we test the _save_state API directly. import scripts.export as exp + original_state_file = exp.STATE_FILE original_state_dir = exp.STATE_DIR exp.STATE_FILE = str(state_dir / "export_state.json") exp.STATE_DIR = str(state_dir) try: - exp._save_state( - sessions={sid: 1740000000.0}, count=1, out_dir=str(out_dir) - ) + exp._save_state(sessions={sid: 1740000000.0}, count=1, out_dir=str(out_dir)) with open(exp.STATE_FILE) as f: state = json.load(f) for key in ("lastExportTime", "exportedCount", "exportDir", "sessions"): diff --git a/tests/test_export_state.py b/tests/test_export_state.py index ebc16b2..99cb778 100644 --- a/tests/test_export_state.py +++ b/tests/test_export_state.py @@ -25,6 +25,7 @@ # Helpers # --------------------------------------------------------------------------- + def _tmp_state_file(tmp_path): """Return a temporary state file path and patch the module to use it.""" path = str(tmp_path / "export_state.json") @@ -37,6 +38,7 @@ def _tmp_state_file(tmp_path): # _save_state tests # --------------------------------------------------------------------------- + class TestSaveState: def test_writes_last_export_time(self, tmp_path): _tmp_state_file(tmp_path) @@ -98,6 +100,7 @@ def test_sessions_not_at_top_level(self, tmp_path): # _load_state tests # --------------------------------------------------------------------------- + class TestLoadState: def test_returns_empty_dict_when_no_file(self, tmp_path): _tmp_state_file(tmp_path) @@ -165,15 +168,14 @@ def test_roundtrip_save_then_load(self, tmp_path): # _save_state → _load_state → since-last filtering integration # --------------------------------------------------------------------------- + class TestSinceLastFiltering: """Verify the since-last flow: save state, reload, new session skipped.""" def test_session_skipped_after_save(self, tmp_path): _tmp_state_file(tmp_path) mtime = 1740000000.0 - _export_mod._save_state( - sessions={"sess-known": mtime}, count=1, out_dir="/tmp" - ) + _export_mod._save_state(sessions={"sess-known": mtime}, count=1, out_dir="/tmp") state = _export_mod._load_state() last_export = state.get("sessions", {}) diff --git a/tests/test_export_state_store.py b/tests/test_export_state_store.py index 8c57556..4eededc 100644 --- a/tests/test_export_state_store.py +++ b/tests/test_export_state_store.py @@ -30,6 +30,7 @@ def _assert_file_stays_absent(path: Path, timeout: float = 0.5) -> None: raise AssertionError(f"{path} appeared while parent still holds the lock") time.sleep(0.01) + from utils.export_state_store import ( atomic_write_export_state, export_state_lock, @@ -86,9 +87,7 @@ def test_load_legacy_flat_dict_unchanged_shape(tmp_path: Path): assert out == {"sessions": legacy} -def test_export_state_lock_windows_branch_uses_msvcrt_when_no_fcntl( - monkeypatch, tmp_path: Path -): +def test_export_state_lock_windows_branch_uses_msvcrt_when_no_fcntl(monkeypatch, tmp_path: Path): """When ``fcntl`` is absent, use ``msvcrt.locking`` (cross-process on Windows).""" import utils.export_state_store as mod diff --git a/tests/test_jsonl_parser.py b/tests/test_jsonl_parser.py index aefd4b3..cf40e82 100644 --- a/tests/test_jsonl_parser.py +++ b/tests/test_jsonl_parser.py @@ -7,7 +7,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from utils.jsonl_parser import ( # noqa: E402 +from utils.jsonl_parser import ( _extract_images, _extract_text, _infer_title, @@ -26,6 +26,7 @@ # Metadata helpers (match parse_session initialisation) # --------------------------------------------------------------------------- + def _fresh_metadata() -> dict: return { "session_id": "x", @@ -61,9 +62,7 @@ def _fresh_metadata() -> dict: def _write_jsonl(entries: list) -> str: - f = tempfile.NamedTemporaryFile( - mode="w", suffix=".jsonl", delete=False, encoding="utf-8" - ) + f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False, encoding="utf-8") for entry in entries: f.write(json.dumps(entry) + "\n") f.close() @@ -74,6 +73,7 @@ def _write_jsonl(entries: list) -> str: # _parse_tool_result # --------------------------------------------------------------------------- + class TestParseToolResult: def test_bash_with_stdout(self): r = _parse_tool_result( @@ -92,13 +92,15 @@ def test_bash_with_stderr_only(self): assert r.get("stdout") == "" def test_bash_with_exit_code_and_interrupted(self): - r = _parse_tool_result({ - "stdout": "", - "stderr": "", - "exitCode": 130, - "interrupted": True, - "is_error": True, - }) + r = _parse_tool_result( + { + "stdout": "", + "stderr": "", + "exitCode": 130, + "interrupted": True, + "is_error": True, + } + ) assert r["exit_code"] == 130 assert r["interrupted"] is True assert r["is_error"] is True @@ -109,11 +111,13 @@ def test_file_edit_with_structured_patch(self): assert r["file_path"] == "/a.py" def test_file_edit_with_old_new_string(self): - r = _parse_tool_result({ - "filePath": "/b.ts", - "newString": "y", - "replaceAll": True, - }) + r = _parse_tool_result( + { + "filePath": "/b.ts", + "newString": "y", + "replaceAll": True, + } + ) assert r["result_type"] == "file_edit" assert r["replace_all"] is True @@ -123,12 +127,14 @@ def test_file_write_content(self): assert r["file_path"] == "/c.txt" def test_glob_result(self): - r = _parse_tool_result({ - "filenames": ["a", "b"], - "numFiles": 2, - "truncated": False, - "durationMs": 12, - }) + r = _parse_tool_result( + { + "filenames": ["a", "b"], + "numFiles": 2, + "truncated": False, + "durationMs": 12, + } + ) assert r["result_type"] == "glob" assert r["filenames"] == ["a", "b"] assert r["num_files"] == 2 @@ -138,34 +144,40 @@ def test_glob_truncated(self): assert r["truncated"] is True def test_grep_result(self): - r = _parse_tool_result({ - "mode": "content", - "numFiles": 3, - "numLines": 10, - "content": "matches", - }) + r = _parse_tool_result( + { + "mode": "content", + "numFiles": 3, + "numLines": 10, + "content": "matches", + } + ) assert r["result_type"] == "grep" assert r["mode"] == "content" assert r["content"] == "matches" def test_file_read_result(self): - r = _parse_tool_result({ - "file": { - "filePath": "/r.md", - "numLines": 5, - "content": "body", + r = _parse_tool_result( + { + "file": { + "filePath": "/r.md", + "numLines": 5, + "content": "body", + } } - }) + ) assert r["result_type"] == "file_read" assert r["file_path"] == "/r.md" assert r["content"] == "body" def test_web_search_result(self): - r = _parse_tool_result({ - "query": "q", - "results": [{"url": "u"}], - "durationSeconds": 1.5, - }) + r = _parse_tool_result( + { + "query": "q", + "results": [{"url": "u"}], + "durationSeconds": 1.5, + } + ) assert r["result_type"] == "web_search" assert r["query"] == "q" assert r["result_count"] == 1 @@ -188,32 +200,38 @@ def test_task_message_variant(self): assert r["task_id"] == "t1" def test_task_retrieval_variant(self): - r = _parse_tool_result({ - "retrieval_status": "ok", - "task": {"task_id": "tid"}, - }) + r = _parse_tool_result( + { + "retrieval_status": "ok", + "task": {"task_id": "tid"}, + } + ) assert r["result_type"] == "task" assert r["task_id"] == "tid" def test_task_completed_subagent(self): - r = _parse_tool_result({ - "agentId": "ag", - "totalDurationMs": 500, - "status": "completed", - "totalTokens": 100, - "totalToolUseCount": 2, - }) + r = _parse_tool_result( + { + "agentId": "ag", + "totalDurationMs": 500, + "status": "completed", + "totalTokens": 100, + "totalToolUseCount": 2, + } + ) assert r["result_type"] == "task" assert r["agent_id"] == "ag" assert r["total_duration_ms"] == 500 def test_task_async_launched(self): - r = _parse_tool_result({ - "agentId": "ag2", - "isAsync": True, - "status": "running", - "description": "bg", - }) + r = _parse_tool_result( + { + "agentId": "ag2", + "isAsync": True, + "status": "running", + "description": "bg", + } + ) assert r["result_type"] == "task" assert r["agent_id"] == "ag2" @@ -223,10 +241,12 @@ def test_todo_write_result(self): assert r["todo_count"] == 1 def test_user_input_result(self): - r = _parse_tool_result({ - "questions": [{"id": "q"}], - "answers": {"q": "a"}, - }) + r = _parse_tool_result( + { + "questions": [{"id": "q"}], + "answers": {"q": "a"}, + } + ) assert r["result_type"] == "user_input" def test_plan_result(self): @@ -235,11 +255,13 @@ def test_plan_result(self): def test_plan_with_content_not_classified_as_file_write(self): """plan is registered before file_write in _TOOL_RESULT_DISPATCH.""" - r = _parse_tool_result({ - "plan": [], - "filePath": "/plan.md", - "content": "plan body", - }) + r = _parse_tool_result( + { + "plan": [], + "filePath": "/plan.md", + "content": "plan body", + } + ) assert r["result_type"] == "plan" assert r["file_path"] == "/plan.md" @@ -260,6 +282,7 @@ def test_slug_preserved(self): # _normalize_content, _extract_text, _extract_images # --------------------------------------------------------------------------- + class TestNormalizeContent: def test_plain_string(self): assert _normalize_content("hi") == [{"type": "text", "text": "hi"}] @@ -303,21 +326,31 @@ def test_empty_content(self): class TestExtractImages: def test_base64_image_extracted(self): - imgs = _extract_images([{ - "type": "image", - "source": {"type": "base64", "data": "AAA", "media_type": "image/png"}, - }]) + imgs = _extract_images( + [ + { + "type": "image", + "source": {"type": "base64", "data": "AAA", "media_type": "image/png"}, + } + ] + ) assert len(imgs) == 1 assert imgs[0]["data"] == "AAA" def test_nested_tool_result_image_extracted(self): - imgs = _extract_images([{ - "type": "tool_result", - "content": [{ - "type": "image", - "source": {"type": "base64", "data": "BBB"}, - }], - }]) + imgs = _extract_images( + [ + { + "type": "tool_result", + "content": [ + { + "type": "image", + "source": {"type": "base64", "data": "BBB"}, + } + ], + } + ] + ) assert len(imgs) == 1 assert imgs[0]["data"] == "BBB" @@ -329,12 +362,15 @@ def test_non_image_skipped(self): # _infer_title, _strip_system_tags # --------------------------------------------------------------------------- + class TestInferTitle: def test_first_user_message_used(self): - title = _infer_title([ - {"role": "assistant", "text": "a"}, - {"role": "user", "text": "My title line\nmore"}, - ]) + title = _infer_title( + [ + {"role": "assistant", "text": "a"}, + {"role": "user", "text": "My title line\nmore"}, + ] + ) assert title == "My title line" def test_truncated_to_100_chars(self): @@ -375,26 +411,35 @@ def test_clean_text_unchanged(self): # _process_user # --------------------------------------------------------------------------- + class TestProcessUser: def test_metadata_captured_from_first_entry_only(self): messages = [] meta = _fresh_metadata() - _process_user({ - "type": "user", - "version": 1, - "cwd": "/first", - "gitBranch": "main", - "permissionMode": "default", - "message": {"content": [{"type": "text", "text": "a"}]}, - }, messages, meta) - _process_user({ - "type": "user", - "version": 2, - "cwd": "/second", - "gitBranch": "dev", - "permissionMode": "all", - "message": {"content": [{"type": "text", "text": "b"}]}, - }, messages, meta) + _process_user( + { + "type": "user", + "version": 1, + "cwd": "/first", + "gitBranch": "main", + "permissionMode": "default", + "message": {"content": [{"type": "text", "text": "a"}]}, + }, + messages, + meta, + ) + _process_user( + { + "type": "user", + "version": 2, + "cwd": "/second", + "gitBranch": "dev", + "permissionMode": "all", + "message": {"content": [{"type": "text", "text": "b"}]}, + }, + messages, + meta, + ) assert meta["version"] == 1 assert meta["cwd"] == "/first" assert meta["git_branch"] == "main" @@ -410,15 +455,21 @@ def test_missing_message_key_no_crash(self): def test_tool_use_result_images_extracted(self): messages = [] meta = _fresh_metadata() - _process_user({ - "message": {"content": []}, - "toolUseResult": { - "content": [{ - "type": "image", - "source": {"type": "base64", "data": "IMG"}, - }], + _process_user( + { + "message": {"content": []}, + "toolUseResult": { + "content": [ + { + "type": "image", + "source": {"type": "base64", "data": "IMG"}, + } + ], + }, }, - }, messages, meta) + messages, + meta, + ) assert messages[0]["images"] assert messages[0]["images"][0]["data"] == "IMG" @@ -427,113 +478,150 @@ def test_tool_use_result_images_extracted(self): # _process_assistant # --------------------------------------------------------------------------- + class TestProcessAssistant: def test_content_plain_string_normalized(self): messages = [] meta = _fresh_metadata() - _process_assistant({ - "message": { - "model": "m", - "content": "plain string body", - "usage": {}, + _process_assistant( + { + "message": { + "model": "m", + "content": "plain string body", + "usage": {}, + }, }, - }, messages, meta) + messages, + meta, + ) assert messages[0]["text"] == "plain string body" def test_synthetic_model_not_added(self): meta = _fresh_metadata() - _process_assistant({ - "message": { - "model": "", - "content": [{"type": "text", "text": "x"}], - "usage": {}, + _process_assistant( + { + "message": { + "model": "", + "content": [{"type": "text", "text": "x"}], + "usage": {}, + }, }, - }, [], meta) + [], + meta, + ) assert meta["models_used"] == set() def test_thinking_blocks_joined(self): messages = [] meta = _fresh_metadata() - _process_assistant({ - "message": { - "model": "m", - "content": [ - {"type": "thinking", "thinking": "t1"}, - {"type": "thinking", "thinking": "t2"}, - ], - "usage": {}, + _process_assistant( + { + "message": { + "model": "m", + "content": [ + {"type": "thinking", "thinking": "t1"}, + {"type": "thinking", "thinking": "t2"}, + ], + "usage": {}, + }, }, - }, messages, meta) + messages, + meta, + ) assert messages[0]["thinking"] == "t1\n\nt2" def test_tool_use_counts_accumulated(self): meta = _fresh_metadata() - _process_assistant({ - "message": { - "model": "m", - "content": [ - {"type": "tool_use", "name": "Read", "input": {"file_path": "/a"}}, - {"type": "tool_use", "name": "Read", "input": {"file_path": "/b"}}, - ], - "usage": {}, + _process_assistant( + { + "message": { + "model": "m", + "content": [ + {"type": "tool_use", "name": "Read", "input": {"file_path": "/a"}}, + {"type": "tool_use", "name": "Read", "input": {"file_path": "/b"}}, + ], + "usage": {}, + }, }, - }, [], meta) + [], + meta, + ) assert meta["total_tool_calls"] == 2 assert meta["tool_call_counts"]["Read"] == 2 def test_api_error_flag_increments_api_errors(self): meta = _fresh_metadata() - _process_assistant({ - "isApiErrorMessage": True, - "message": {"model": "m", "content": [], "usage": {}}, - }, [], meta) + _process_assistant( + { + "isApiErrorMessage": True, + "message": {"model": "m", "content": [], "usage": {}}, + }, + [], + meta, + ) assert meta["api_errors"] == 1 def test_stop_reason_accumulated(self): meta = _fresh_metadata() - _process_assistant({ - "message": { - "model": "m", - "content": [], - "stop_reason": "max_tokens", - "usage": {}, + _process_assistant( + { + "message": { + "model": "m", + "content": [], + "stop_reason": "max_tokens", + "usage": {}, + }, }, - }, [], meta) - _process_assistant({ - "message": { - "model": "m", - "content": [], - "stop_reason": "max_tokens", - "usage": {}, + [], + meta, + ) + _process_assistant( + { + "message": { + "model": "m", + "content": [], + "stop_reason": "max_tokens", + "usage": {}, + }, }, - }, [], meta) + [], + meta, + ) assert meta["stop_reasons"]["max_tokens"] == 2 def test_service_tier_added(self): meta = _fresh_metadata() - _process_assistant({ - "message": { - "model": "m", - "content": [], - "usage": {"service_tier": "priority"}, + _process_assistant( + { + "message": { + "model": "m", + "content": [], + "usage": {"service_tier": "priority"}, + }, }, - }, [], meta) + [], + meta, + ) assert "priority" in meta["service_tiers"] def test_ephemeral_cache_tokens_accumulated(self): meta = _fresh_metadata() - _process_assistant({ - "message": { - "model": "m", - "content": [], - "usage": { - "cache_creation": { - "ephemeral_5m_input_tokens": 7, - "ephemeral_1h_input_tokens": 3, + _process_assistant( + { + "message": { + "model": "m", + "content": [], + "usage": { + "cache_creation": { + "ephemeral_5m_input_tokens": 7, + "ephemeral_1h_input_tokens": 3, + }, }, }, }, - }, [], meta) + [], + meta, + ) assert meta["total_ephemeral_5m_tokens"] == 7 assert meta["total_ephemeral_1h_tokens"] == 3 @@ -542,6 +630,7 @@ def test_ephemeral_cache_tokens_accumulated(self): # _track_file_activity # --------------------------------------------------------------------------- + class TestTrackFileActivity: def _meta(self): return { @@ -592,15 +681,20 @@ def test_empty_file_path_not_added(self): # _process_system # --------------------------------------------------------------------------- + class TestProcessSystem: def test_compact_boundary_increments_compaction(self): messages = [] meta = _fresh_metadata() - _process_system({ - "subtype": "compact_boundary", - "timestamp": "2026-01-01T00:00:00Z", - "compactMetadata": {"trigger": "size", "preTokens": 100}, - }, messages, meta) + _process_system( + { + "subtype": "compact_boundary", + "timestamp": "2026-01-01T00:00:00Z", + "compactMetadata": {"trigger": "size", "preTokens": 100}, + }, + messages, + meta, + ) assert meta["compactions"] == 1 assert len(meta["compact_boundaries"]) == 1 assert meta["compact_boundaries"][0]["trigger"] == "size" @@ -608,10 +702,14 @@ def test_compact_boundary_increments_compaction(self): def test_compact_boundary_missing_metadata_no_crash(self): messages = [] meta = _fresh_metadata() - _process_system({ - "subtype": "compact_boundary", - "compactMetadata": None, - }, messages, meta) + _process_system( + { + "subtype": "compact_boundary", + "compactMetadata": None, + }, + messages, + meta, + ) assert meta["compactions"] == 1 assert meta["compact_boundaries"] == [] @@ -627,6 +725,7 @@ def test_other_subtype_no_compaction_increment(self): # parse_session (integration) # --------------------------------------------------------------------------- + class TestParseSession: def test_empty_file_returns_skeleton(self): path = _write_jsonl([]) @@ -639,9 +738,11 @@ def test_empty_file_returns_skeleton(self): os.unlink(path) def test_unknown_entry_type_silently_ignored(self): - path = _write_jsonl([ - {"type": "custom", "timestamp": "2026-01-01T00:00:00Z"}, - ]) + path = _write_jsonl( + [ + {"type": "custom", "timestamp": "2026-01-01T00:00:00Z"}, + ] + ) try: s = parse_session(path) assert s["messages"] == [] @@ -650,14 +751,16 @@ def test_unknown_entry_type_silently_ignored(self): os.unlink(path) def test_is_sidechain_increments_counter(self): - path = _write_jsonl([ - { - "type": "user", - "isSidechain": True, - "timestamp": "2026-01-01T00:00:00Z", - "message": {"content": [{"type": "text", "text": "s"}]}, - }, - ]) + path = _write_jsonl( + [ + { + "type": "user", + "isSidechain": True, + "timestamp": "2026-01-01T00:00:00Z", + "message": {"content": [{"type": "text", "text": "s"}]}, + }, + ] + ) try: s = parse_session(path) assert s["metadata"]["sidechain_messages"] == 1 @@ -665,12 +768,14 @@ def test_is_sidechain_increments_counter(self): os.unlink(path) def test_file_history_snapshot_timestamp(self): - path = _write_jsonl([ - { - "type": "file-history-snapshot", - "snapshot": {"timestamp": "2026-01-02T12:00:00Z"}, - }, - ]) + path = _write_jsonl( + [ + { + "type": "file-history-snapshot", + "snapshot": {"timestamp": "2026-01-02T12:00:00Z"}, + }, + ] + ) try: s = parse_session(path) assert s["metadata"]["first_timestamp"] == "2026-01-02T12:00:00Z" @@ -698,10 +803,12 @@ def test_entry_counts_accumulated(self): os.unlink(path) def test_wall_time_computed(self): - path = _write_jsonl([ - {"type": "user", "timestamp": "2026-01-01T00:00:00Z", "message": {"content": []}}, - {"type": "user", "timestamp": "2026-01-01T01:00:00Z", "message": {"content": []}}, - ]) + path = _write_jsonl( + [ + {"type": "user", "timestamp": "2026-01-01T00:00:00Z", "message": {"content": []}}, + {"type": "user", "timestamp": "2026-01-01T01:00:00Z", "message": {"content": []}}, + ] + ) try: s = parse_session(path) assert s["metadata"]["session_wall_time_seconds"] == 3600.0 @@ -713,11 +820,16 @@ def test_invalid_json_line_skipped(self): # append bad line with open(path, "a", encoding="utf-8") as f: f.write("{not json\n") - f.write(json.dumps({ - "type": "user", - "timestamp": "2026-01-01T00:00:00Z", - "message": {"content": [{"type": "text", "text": "ok"}]}, - }) + "\n") + f.write( + json.dumps( + { + "type": "user", + "timestamp": "2026-01-01T00:00:00Z", + "message": {"content": [{"type": "text", "text": "ok"}]}, + } + ) + + "\n" + ) try: s = parse_session(path) assert any(m.get("text") == "ok" for m in s["messages"]) @@ -725,9 +837,11 @@ def test_invalid_json_line_skipped(self): os.unlink(path) def test_missing_type_key_no_crash(self): - path = _write_jsonl([ - {"timestamp": "2026-01-01T00:00:00Z"}, - ]) + path = _write_jsonl( + [ + {"timestamp": "2026-01-01T00:00:00Z"}, + ] + ) try: s = parse_session(path) assert s["messages"] == [] @@ -735,13 +849,15 @@ def test_missing_type_key_no_crash(self): os.unlink(path) def test_missing_usage_dict_no_crash(self): - path = _write_jsonl([ - { - "type": "assistant", - "timestamp": "2026-01-01T00:00:00Z", - "message": {"model": "m", "content": [], "usage": None}, - }, - ]) + path = _write_jsonl( + [ + { + "type": "assistant", + "timestamp": "2026-01-01T00:00:00Z", + "message": {"model": "m", "content": [], "usage": None}, + }, + ] + ) try: s = parse_session(path) assert s["metadata"]["total_input_tokens"] == 0 @@ -749,13 +865,15 @@ def test_missing_usage_dict_no_crash(self): os.unlink(path) def test_null_message_assistant_no_crash(self): - path = _write_jsonl([ - { - "type": "assistant", - "timestamp": "2026-01-01T00:00:00Z", - "message": None, - }, - ]) + path = _write_jsonl( + [ + { + "type": "assistant", + "timestamp": "2026-01-01T00:00:00Z", + "message": None, + }, + ] + ) try: s = parse_session(path) assert s["metadata"]["total_input_tokens"] == 0 @@ -765,13 +883,15 @@ def test_null_message_assistant_no_crash(self): os.unlink(path) def test_non_dict_message_assistant_no_crash(self): - path = _write_jsonl([ - { - "type": "assistant", - "timestamp": "2026-01-01T00:00:00Z", - "message": "not-a-dict", - }, - ]) + path = _write_jsonl( + [ + { + "type": "assistant", + "timestamp": "2026-01-01T00:00:00Z", + "message": "not-a-dict", + }, + ] + ) try: s = parse_session(path) assert s["metadata"]["total_input_tokens"] == 0 @@ -781,13 +901,15 @@ def test_non_dict_message_assistant_no_crash(self): os.unlink(path) def test_non_dict_usage_assistant_no_crash(self): - path = _write_jsonl([ - { - "type": "assistant", - "timestamp": "2026-01-01T00:00:00Z", - "message": {"model": "m", "content": [], "usage": "invalid"}, - }, - ]) + path = _write_jsonl( + [ + { + "type": "assistant", + "timestamp": "2026-01-01T00:00:00Z", + "message": {"model": "m", "content": [], "usage": "invalid"}, + }, + ] + ) try: s = parse_session(path) assert s["metadata"]["total_input_tokens"] == 0 @@ -799,20 +921,23 @@ def test_non_dict_usage_assistant_no_crash(self): # quick_session_info # --------------------------------------------------------------------------- + class TestQuickSessionInfo: def test_small_file_title_and_timestamps(self): - path = _write_jsonl([ - { - "type": "user", - "timestamp": "2026-01-01T00:00:00Z", - "message": {"content": [{"type": "text", "text": "Hello Title"}]}, - }, - { - "type": "assistant", - "timestamp": "2026-01-01T00:30:00Z", - "message": {"model": "m", "content": [], "usage": {}}, - }, - ]) + path = _write_jsonl( + [ + { + "type": "user", + "timestamp": "2026-01-01T00:00:00Z", + "message": {"content": [{"type": "text", "text": "Hello Title"}]}, + }, + { + "type": "assistant", + "timestamp": "2026-01-01T00:30:00Z", + "message": {"model": "m", "content": [], "usage": {}}, + }, + ] + ) try: info = quick_session_info(path) assert info["title"] == "Hello Title" @@ -825,20 +950,24 @@ def test_large_file_last_timestamp_from_tail(self): # Build >10000 bytes; early timestamps, last line has later ts lines = [] for i in range(200): - lines.append({ + lines.append( + { + "type": "assistant", + "timestamp": "2026-01-01T00:00:00Z", + "message": { + "model": "m", + "content": [{"type": "text", "text": "x" * 80}], + "usage": {}, + }, + } + ) + lines.append( + { "type": "assistant", - "timestamp": "2026-01-01T00:00:00Z", - "message": { - "model": "m", - "content": [{"type": "text", "text": "x" * 80}], - "usage": {}, - }, - }) - lines.append({ - "type": "assistant", - "timestamp": "2026-12-31T23:59:59Z", - "message": {"model": "m", "content": [], "usage": {}}, - }) + "timestamp": "2026-12-31T23:59:59Z", + "message": {"model": "m", "content": [], "usage": {}}, + } + ) path = _write_jsonl(lines) try: assert os.path.getsize(path) > 10000 @@ -865,11 +994,14 @@ def test_no_user_entries_returns_untitled(self): # Extra malformed cases (Gap 9) # --------------------------------------------------------------------------- + class TestMalformedPartialEntries: def test_assistant_missing_message_key(self): - path = _write_jsonl([ - {"type": "assistant", "timestamp": "2026-01-01T00:00:00Z"}, - ]) + path = _write_jsonl( + [ + {"type": "assistant", "timestamp": "2026-01-01T00:00:00Z"}, + ] + ) try: s = parse_session(path) assert len(s["messages"]) == 1 @@ -880,17 +1012,25 @@ def test_assistant_missing_message_key(self): def test_tool_use_result_null_returns_none_in_message(self): messages = [] meta = _fresh_metadata() - _process_user({ - "message": {"content": []}, - "toolUseResult": None, - }, messages, meta) + _process_user( + { + "message": {"content": []}, + "toolUseResult": None, + }, + messages, + meta, + ) assert messages[0]["tool_result_parsed"] is None def test_tool_use_result_string_returns_none(self): messages = [] meta = _fresh_metadata() - _process_user({ - "message": {"content": []}, - "toolUseResult": "oops", - }, messages, meta) + _process_user( + { + "message": {"content": []}, + "toolUseResult": "oops", + }, + messages, + meta, + ) assert messages[0]["tool_result_parsed"] is None diff --git a/tests/test_jsonl_validation.py b/tests/test_jsonl_validation.py index c9899a2..bb400e2 100644 --- a/tests/test_jsonl_validation.py +++ b/tests/test_jsonl_validation.py @@ -8,9 +8,9 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from models.errors import SessionValidationError # noqa: E402 -from utils.jsonl_parser import parse_session # noqa: E402 -from utils.validation import validate_session_dict # noqa: E402 +from models.errors import SessionValidationError +from utils.jsonl_parser import parse_session +from utils.validation import validate_session_dict FIXTURES = os.path.join(os.path.dirname(__file__), "fixtures") @@ -47,16 +47,12 @@ def test_null_session_id(self): def test_null_role_in_message(self): with pytest.raises(SessionValidationError) as exc_info: - validate_session_dict( - _valid_payload(messages=[{"role": None, "text": "x"}]) - ) + validate_session_dict(_valid_payload(messages=[{"role": None, "text": "x"}])) assert exc_info.value.path == "messages[0].role" def test_missing_role_in_message(self): with pytest.raises(SessionValidationError) as exc_info: - validate_session_dict( - _valid_payload(messages=[{"text": "no role key"}]) - ) + validate_session_dict(_valid_payload(messages=[{"text": "no role key"}])) assert exc_info.value.path == "messages[0].role" def test_metadata_not_dict(self): diff --git a/tests/test_null_usage_tokens.py b/tests/test_null_usage_tokens.py index cfdc2a4..666df15 100644 --- a/tests/test_null_usage_tokens.py +++ b/tests/test_null_usage_tokens.py @@ -25,6 +25,7 @@ # Helpers # --------------------------------------------------------------------------- + def _fresh_metadata() -> dict: """Return a minimal metadata dict matching what parse_session initialises.""" return { @@ -79,14 +80,20 @@ def _assistant_entry(usage: dict) -> dict: # _process_assistant: null fields must not raise # --------------------------------------------------------------------------- + class TestProcessAssistantNullUsage: """Unit tests for _process_assistant with null token values.""" def test_null_cache_read_tokens(self): meta = _fresh_metadata() - entry = _assistant_entry({"input_tokens": 100, "output_tokens": 50, - "cache_read_input_tokens": None, - "cache_creation_input_tokens": 0}) + entry = _assistant_entry( + { + "input_tokens": 100, + "output_tokens": 50, + "cache_read_input_tokens": None, + "cache_creation_input_tokens": 0, + } + ) _process_assistant(entry, [], meta) assert meta["total_input_tokens"] == 100 assert meta["total_output_tokens"] == 50 @@ -94,9 +101,14 @@ def test_null_cache_read_tokens(self): def test_null_cache_creation_tokens(self): meta = _fresh_metadata() - entry = _assistant_entry({"input_tokens": 200, "output_tokens": 80, - "cache_read_input_tokens": 0, - "cache_creation_input_tokens": None}) + entry = _assistant_entry( + { + "input_tokens": 200, + "output_tokens": 80, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": None, + } + ) _process_assistant(entry, [], meta) assert meta["total_cache_creation_tokens"] == 0 @@ -116,12 +128,14 @@ def test_null_output_tokens(self): def test_all_null_usage_fields(self): meta = _fresh_metadata() - entry = _assistant_entry({ - "input_tokens": None, - "output_tokens": None, - "cache_read_input_tokens": None, - "cache_creation_input_tokens": None, - }) + entry = _assistant_entry( + { + "input_tokens": None, + "output_tokens": None, + "cache_read_input_tokens": None, + "cache_creation_input_tokens": None, + } + ) _process_assistant(entry, [], meta) assert meta["total_input_tokens"] == 0 assert meta["total_output_tokens"] == 0 @@ -130,14 +144,16 @@ def test_all_null_usage_fields(self): def test_null_ephemeral_tokens(self): meta = _fresh_metadata() - entry = _assistant_entry({ - "input_tokens": 10, - "output_tokens": 5, - "cache_creation": { - "ephemeral_5m_input_tokens": None, - "ephemeral_1h_input_tokens": None, - }, - }) + entry = _assistant_entry( + { + "input_tokens": 10, + "output_tokens": 5, + "cache_creation": { + "ephemeral_5m_input_tokens": None, + "ephemeral_1h_input_tokens": None, + }, + } + ) _process_assistant(entry, [], meta) assert meta["total_ephemeral_5m_tokens"] == 0 assert meta["total_ephemeral_1h_tokens"] == 0 @@ -146,12 +162,14 @@ def test_per_message_usage_dict_has_no_null(self): """The usage dict stored on the message itself must never contain None.""" messages = [] meta = _fresh_metadata() - entry = _assistant_entry({ - "input_tokens": None, - "output_tokens": None, - "cache_read_input_tokens": None, - "cache_creation_input_tokens": None, - }) + entry = _assistant_entry( + { + "input_tokens": None, + "output_tokens": None, + "cache_read_input_tokens": None, + "cache_creation_input_tokens": None, + } + ) _process_assistant(entry, messages, meta) assert len(messages) == 1 usage = messages[0]["usage"] @@ -164,12 +182,14 @@ def test_normal_values_still_accumulate(self): """Sanity check: valid integer values are accumulated correctly.""" meta = _fresh_metadata() for _ in range(3): - entry = _assistant_entry({ - "input_tokens": 100, - "output_tokens": 50, - "cache_read_input_tokens": 20, - "cache_creation_input_tokens": 10, - }) + entry = _assistant_entry( + { + "input_tokens": 100, + "output_tokens": 50, + "cache_read_input_tokens": 20, + "cache_creation_input_tokens": 10, + } + ) _process_assistant(entry, [], meta) assert meta["total_input_tokens"] == 300 assert meta["total_output_tokens"] == 150 @@ -181,27 +201,30 @@ def test_normal_values_still_accumulate(self): # parse_session (integration): null usage survives round-trip via temp file # --------------------------------------------------------------------------- + class TestParseSessionNullUsage: """Integration tests: parse_session must not raise on null usage fields.""" def _write_session(self, entries: list) -> str: - f = tempfile.NamedTemporaryFile( - mode="w", suffix=".jsonl", delete=False, encoding="utf-8" - ) + f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False, encoding="utf-8") for entry in entries: f.write(json.dumps(entry) + "\n") f.close() return f.name def test_null_cache_read_does_not_crash(self): - path = self._write_session([ - _assistant_entry({ - "input_tokens": 500, - "output_tokens": 100, - "cache_read_input_tokens": None, - "cache_creation_input_tokens": None, - }) - ]) + path = self._write_session( + [ + _assistant_entry( + { + "input_tokens": 500, + "output_tokens": 100, + "cache_read_input_tokens": None, + "cache_creation_input_tokens": None, + } + ) + ] + ) try: session = parse_session(path) assert session["metadata"]["total_input_tokens"] == 500 @@ -212,12 +235,16 @@ def test_null_cache_read_does_not_crash(self): def test_mixed_null_and_normal_entries(self): """A session with some null-usage entries and some normal ones should accumulate only the non-null values.""" - path = self._write_session([ - _assistant_entry({"input_tokens": 100, "output_tokens": 40, - "cache_read_input_tokens": None}), - _assistant_entry({"input_tokens": 200, "output_tokens": 80, - "cache_read_input_tokens": 30}), - ]) + path = self._write_session( + [ + _assistant_entry( + {"input_tokens": 100, "output_tokens": 40, "cache_read_input_tokens": None} + ), + _assistant_entry( + {"input_tokens": 200, "output_tokens": 80, "cache_read_input_tokens": 30} + ), + ] + ) try: session = parse_session(path) assert session["metadata"]["total_input_tokens"] == 300 @@ -231,41 +258,49 @@ def test_mixed_null_and_normal_entries(self): # _estimate_cost: null tokens must not crash cost calculation # --------------------------------------------------------------------------- + class TestEstimateCostNullUsage: """Unit tests for _estimate_cost with null token values.""" def _make_messages(self, usage_list: list) -> list: return [ - {"role": "assistant", "model": model, "usage": usage} - for model, usage in usage_list + {"role": "assistant", "model": model, "usage": usage} for model, usage in usage_list ] def test_null_output_tokens_with_valid_input(self): - messages = self._make_messages([ - ("claude-sonnet-4-5", {"input_tokens": 1_000_000, "output_tokens": None}), - ]) + messages = self._make_messages( + [ + ("claude-sonnet-4-5", {"input_tokens": 1_000_000, "output_tokens": None}), + ] + ) cost = _estimate_cost(messages, {}) assert cost is not None assert cost == pytest.approx(3.0, rel=1e-3) def test_null_input_tokens_with_valid_output(self): - messages = self._make_messages([ - ("claude-sonnet-4-5", {"input_tokens": None, "output_tokens": 1_000_000}), - ]) + messages = self._make_messages( + [ + ("claude-sonnet-4-5", {"input_tokens": None, "output_tokens": 1_000_000}), + ] + ) cost = _estimate_cost(messages, {}) assert cost is not None assert cost == pytest.approx(15.0, rel=1e-3) def test_all_null_tokens_returns_none(self): - messages = self._make_messages([ - ("claude-sonnet-4-5", {"input_tokens": None, "output_tokens": None}), - ]) + messages = self._make_messages( + [ + ("claude-sonnet-4-5", {"input_tokens": None, "output_tokens": None}), + ] + ) cost = _estimate_cost(messages, {}) assert cost is None def test_normal_values_unaffected(self): - messages = self._make_messages([ - ("claude-sonnet-4-5", {"input_tokens": 1_000_000, "output_tokens": 1_000_000}), - ]) + messages = self._make_messages( + [ + ("claude-sonnet-4-5", {"input_tokens": 1_000_000, "output_tokens": 1_000_000}), + ] + ) cost = _estimate_cost(messages, {}) assert cost == pytest.approx(18.0, rel=1e-3) diff --git a/tests/test_real_session_fixtures.py b/tests/test_real_session_fixtures.py index f6da553..9696dbc 100644 --- a/tests/test_real_session_fixtures.py +++ b/tests/test_real_session_fixtures.py @@ -124,9 +124,7 @@ def test_real_session_nested_tools_has_sidechain_and_tool_use() -> None: def test_real_session_unknown_fields_tolerated() -> None: session = parse_session(_fixture_path("real_session_unknown_fields.jsonl")) _assert_session_shape(session) - assert len(session["messages"]) == _FIXTURE_MESSAGE_COUNTS[ - "real_session_unknown_fields.jsonl" - ] + assert len(session["messages"]) == _FIXTURE_MESSAGE_COUNTS["real_session_unknown_fields.jsonl"] def test_real_session_malformed_lines_skips_bad_lines() -> None: @@ -135,9 +133,7 @@ def test_real_session_malformed_lines_skips_bad_lines() -> None: texts = [m.get("text") or "" for m in session["messages"] if m["role"] == "user"] assert any("before malformed" in t for t in texts) assert any("after malformed" in t for t in texts) - assert len(session["messages"]) == _FIXTURE_MESSAGE_COUNTS[ - "real_session_malformed_lines.jsonl" - ] + assert len(session["messages"]) == _FIXTURE_MESSAGE_COUNTS["real_session_malformed_lines.jsonl"] def test_task_retrieval_not_misclassified_as_task_message() -> None: diff --git a/tests/test_search.py b/tests/test_search.py index 52a605e..3442b32 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -7,14 +7,16 @@ from tests.conftest import assert_error_response -_SEARCH_HIT_KEYS = frozenset({ - "project", - "session_id", - "title", - "role", - "timestamp", - "snippet", -}) +_SEARCH_HIT_KEYS = frozenset( + { + "project", + "session_id", + "title", + "role", + "timestamp", + "snippet", + } +) def _assert_search_hits(results: list, *, max_items: int) -> None: diff --git a/tests/test_xss_sanitization.py b/tests/test_xss_sanitization.py index 479baf2..5de87c3 100644 --- a/tests/test_xss_sanitization.py +++ b/tests/test_xss_sanitization.py @@ -26,25 +26,21 @@ DOMPURIFY_CDN_URL = "https://cdnjs.cloudflare.com/ajax/libs/dompurify/3.2.7/purify.min.js" DOMPURIFY_SRI = ( - "sha512-78KH17QLT5e55GJqP76vutp1D2iAoy06WcYBXB6iBCsmO6wWzx0Qdg8EDpm8mKXv68BcvHOyeeP4wxAL0twJGQ==" + "sha512-78KH17QLT5e55GJqP76vutp1D2iAoy06WcYBXB6iBCsmO6wWzx0Qdg8EDpm8mKXv68BcvHOyee" + "P4wxAL0twJGQ==" ) def _all_js_files(): """Return all .js files under static/js/, excluding node_modules.""" - return [ - p for p in STATIC_JS_DIR.rglob("*.js") - if "node_modules" not in p.parts - ] + return [p for p in STATIC_JS_DIR.rglob("*.js") if "node_modules" not in p.parts] class TestDomPurifyInHTML: - def test_dompurify_cdn_url_present(self): html = INDEX_HTML.read_text(encoding="utf-8") assert DOMPURIFY_CDN_URL in html, ( - f"DOMPurify CDN URL not found in index.html. " - f"Expected: {DOMPURIFY_CDN_URL}" + f"DOMPurify CDN URL not found in index.html. Expected: {DOMPURIFY_CDN_URL}" ) def test_dompurify_sri_hash_present(self): @@ -58,19 +54,18 @@ def test_dompurify_crossorigin_anonymous(self): html = INDEX_HTML.read_text(encoding="utf-8") # Find the script tag that loads DOMPurify and check crossorigin attribute script_re = re.compile( - r']*' + re.escape("dompurify") + r'[^>]*>', + r"]*" + re.escape("dompurify") + r"[^>]*>", re.DOTALL | re.IGNORECASE, ) m = script_re.search(html) assert m, "No