diff --git a/src/mcp/shared/inbound.py b/src/mcp/shared/inbound.py index 1c70e3d92..a2a2a9c27 100644 --- a/src/mcp/shared/inbound.py +++ b/src/mcp/shared/inbound.py @@ -191,10 +191,20 @@ def find_invalid_x_mcp_header(input_schema: Any) -> str | None: return f"{X_MCP_HEADER_KEY} found at a schema position not reachable via a pure `properties` chain" where = ".".join(path) header = schema[X_MCP_HEADER_KEY] - if not isinstance(header, str) or not _RFC9110_TOKEN.fullmatch(header): + # Wrong type and malformed value are distinct failures with distinct messages: the + # non-str arm returns before any interpolation, because `repr` of an arbitrary + # schema value is not total (a large `int` exceeds `sys.get_int_max_str_digits`). + if not isinstance(header, str): + return f"property {where!r}: {X_MCP_HEADER_KEY} must be a string, not {type(header).__name__}" + if not _RFC9110_TOKEN.fullmatch(header): return f"property {where!r}: {X_MCP_HEADER_KEY} {header!r} is not an RFC 9110 token" prop_type = schema.get("type") - if not isinstance(prop_type, str) or prop_type not in _X_MCP_HEADER_PRIMITIVE_TYPES: + if not isinstance(prop_type, str): + return ( + f"property {where!r}: {X_MCP_HEADER_KEY} is only permitted on " + f"integer/string/boolean properties (the type keyword is {type(prop_type).__name__}, not a string)" + ) + if prop_type not in _X_MCP_HEADER_PRIMITIVE_TYPES: return ( f"property {where!r}: {X_MCP_HEADER_KEY} is only permitted on " f"integer/string/boolean properties (got {prop_type!r})" diff --git a/tests/shared/test_inbound.py b/tests/shared/test_inbound.py index 93ab6ecc2..8478c3733 100644 --- a/tests/shared/test_inbound.py +++ b/tests/shared/test_inbound.py @@ -26,6 +26,9 @@ from mcp_types.version import LATEST_HANDSHAKE_VERSION, LATEST_MODERN_VERSION, MODERN_PROTOCOL_VERSIONS from mcp.shared.inbound import ( + _SUBSCHEMA_LIST, + _SUBSCHEMA_MAP, + _SUBSCHEMA_SINGLE, ERROR_CODE_HTTP_STATUS, MCP_METHOD_HEADER, MCP_NAME_HEADER, @@ -378,6 +381,10 @@ def _schema(**props: Any) -> dict[str, Any]: _schema(a={"type": "string", "const": {"x-mcp-header": "ignored"}}), id="annotation-lookalike-in-const-is-data", ), + pytest.param( + _schema(a={"type": "string", "enum": [{"x-mcp-header": "ignored"}]}), + id="annotation-lookalike-in-enum-is-data", + ), pytest.param( {"properties": {"a": {"type": "string", "x-mcp-header": "R"}}, "$ref": "#/$defs/loop"}, id="ref-is-not-dereferenced", @@ -404,12 +411,14 @@ def test_find_invalid_x_mcp_header_accepts_valid_or_absent_annotations(input_sch pytest.param(_schema(a={"type": "string", "x-mcp-header": "Région"}), id="non-ascii"), pytest.param(_schema(a={"type": "string", "x-mcp-header": "Region\t1"}), id="control-char"), pytest.param(_schema(a={"type": "string", "x-mcp-header": 42}), id="non-string"), + pytest.param(_schema(a={"type": "string", "x-mcp-header": 10**5000}), id="oversized-int-header"), pytest.param(_schema(a={"type": "object", "x-mcp-header": "Data"}), id="on-object"), pytest.param(_schema(a={"type": "array", "x-mcp-header": "Items"}), id="on-array"), pytest.param(_schema(a={"type": "null", "x-mcp-header": "Nil"}), id="on-null"), pytest.param(_schema(a={"type": "number", "x-mcp-header": "Ratio"}), id="on-number"), pytest.param(_schema(a={"type": ["string", "null"], "x-mcp-header": "Maybe"}), id="array-type"), pytest.param(_schema(a={"type": {"not": "valid"}, "x-mcp-header": "Bad"}), id="dict-type"), + pytest.param(_schema(a={"type": 10**5000, "x-mcp-header": "Big"}), id="oversized-int-type"), pytest.param(_schema(a={"x-mcp-header": "NoType"}), id="missing-type"), pytest.param( _schema(a={"type": "string", "x-mcp-header": "Region"}, b={"type": "string", "x-mcp-header": "Region"}), @@ -420,28 +429,8 @@ def test_find_invalid_x_mcp_header_accepts_valid_or_absent_annotations(input_sch id="duplicate-diff-case", ), pytest.param( - _schema(a={"type": "array", "items": {"type": "string", "x-mcp-header": "X"}}), - id="under-items", - ), - pytest.param( - {"allOf": [{"properties": {"a": {"type": "string", "x-mcp-header": "X"}}}]}, - id="under-allOf", - ), - pytest.param( - {"oneOf": [{"type": "string", "x-mcp-header": "X"}]}, - id="under-oneOf", - ), - pytest.param( - _schema(a={"if": {"type": "string", "x-mcp-header": "X"}}), - id="under-if", - ), - pytest.param( - {"$defs": {"T": {"type": "string", "x-mcp-header": "X"}}, "properties": {}}, - id="under-defs", - ), - pytest.param( - {"patternProperties": {"^a": {"type": "string", "x-mcp-header": "X"}}}, - id="under-patternProperties", + {"allOf": [{"type": "object", "properties": {"a": {"type": "string", "x-mcp-header": "X"}}}]}, + id="properties-chain-not-restored-below-an-applicator", ), pytest.param( {"type": "string", "x-mcp-header": "X"}, @@ -470,6 +459,47 @@ def test_find_invalid_x_mcp_header_rejects_malformed_annotations(input_schema: d assert isinstance(find_invalid_x_mcp_header(input_schema), str) +# Keyword → a value of that keyword's own JSON Schema shape carrying an annotated subschema. +# Deliberately a literal table, independent of the `_SUBSCHEMA_*` sets in `inbound.py`: +# dropping a keyword from the walk must FAIL its case here, not shrink the parametrization. +_ANNOTATED = {"type": "string", "x-mcp-header": "Region"} +_APPLICATOR_CASES: dict[str, Any] = { + "$defs": {"T": _ANNOTATED}, + "additionalProperties": _ANNOTATED, + "allOf": [_ANNOTATED], + "anyOf": [_ANNOTATED], + "contains": _ANNOTATED, + "contentSchema": _ANNOTATED, + "definitions": {"T": _ANNOTATED}, + "dependentSchemas": {"k": _ANNOTATED}, + "else": _ANNOTATED, + "if": _ANNOTATED, + "items": _ANNOTATED, + "not": _ANNOTATED, + "oneOf": [_ANNOTATED], + "patternProperties": {"^a": _ANNOTATED}, + "prefixItems": [_ANNOTATED], + "propertyNames": _ANNOTATED, + "then": _ANNOTATED, + "unevaluatedItems": _ANNOTATED, + "unevaluatedProperties": _ANNOTATED, +} + + +@pytest.mark.parametrize("keyword", sorted(_APPLICATOR_CASES)) +def test_find_invalid_x_mcp_header_rejects_annotations_under_every_non_properties_applicator(keyword: str) -> None: + """Spec-mandated: a property reached through any applicator other than `properties` is not + statically reachable, so its annotation invalidates the whole tool definition.""" + schema = _schema(ok={"type": "string"}) | {keyword: _APPLICATOR_CASES[keyword]} + assert isinstance(find_invalid_x_mcp_header(schema), str) + + +def test_schema_walk_applicator_keywords_match_the_pinned_reject_cases() -> None: + """SDK-defined: a keyword added to the walk must gain a literal reject case above (a removed + keyword already fails its case there).""" + assert _SUBSCHEMA_LIST | _SUBSCHEMA_MAP | _SUBSCHEMA_SINGLE == set(_APPLICATOR_CASES) + + def test_find_invalid_x_mcp_header_reports_dotted_path_for_nested_property() -> None: """SDK-defined: the reason string names the nested property by its dotted `properties` path.""" schema = _schema(outer={"type": "object", "properties": {"r": {"type": "object", "x-mcp-header": "R"}}})