diff --git a/packages/server/src/server/serveStdio.ts b/packages/server/src/server/serveStdio.ts index e166c4298f..7511a3ece6 100644 --- a/packages/server/src/server/serveStdio.ts +++ b/packages/server/src/server/serveStdio.ts @@ -47,6 +47,7 @@ * were written for. */ import type { + CancelledNotificationParams, JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, @@ -117,6 +118,16 @@ export interface StdioServerHandle { * Per-instance channel * ------------------------------------------------------------------------ */ +/** + * How long the probe-discard path waits for the probe instance to answer the + * requests it was delivered before closing it. The wait normally settles as + * soon as the DiscoverResult is handed to the wire (or immediately, when a + * delivered cancellation already settled the probe); the bound is a backstop + * so no edge can ever hold the connection's inbound pump indefinitely behind + * the discard. + */ +const DISCARD_ANSWER_TIMEOUT_MS = 3000; + /** * The transport a pinned instance is connected to: a thin channel that writes * through to the entry-owned wire transport and receives the messages the @@ -173,21 +184,45 @@ class StdioConnectionChannel implements Transport { } if (isJSONRPCRequest(message)) { this._pendingRequests.add(message.id); + } else if (isJSONRPCNotification(message) && message.method === 'notifications/cancelled') { + // By protocol contract a cancelled request may legitimately go + // unanswered (the instance aborts the in-flight handler and writes + // nothing for it), so a delivered cancellation settles the request + // it names: nothing should keep waiting for an answer that may + // never come. Non-cancelled requests still settle only when their + // answer is handed to the wire. + const cancelledId = (message.params as CancelledNotificationParams | undefined)?.requestId; + if (cancelledId !== undefined) { + this._settle(cancelledId); + } } this.onmessage?.(message, extra); } /** * Resolves once every request delivered to the instance has been answered - * through {@linkcode send} (or the channel has been closed and nothing - * further can be answered). Used by the probe-discard path so a probe - * request the entry accepted is never silently dropped. + * through {@linkcode send}, settled by a delivered cancellation, or the + * channel has been closed and nothing further can be answered. The wait is + * bounded by `timeoutMs` as a backstop so no edge can hold the caller + * indefinitely; resolves `false` only when the bound elapsed with requests + * still unanswered. Used by the probe-discard path so a probe request the + * entry accepted is never silently dropped. */ - async whenRequestsAnswered(): Promise { + async whenRequestsAnswered(timeoutMs: number): Promise { if (this._closed || this._pendingRequests.size === 0) { - return; + return true; } - await new Promise(resolve => this._drainWaiters.push(resolve)); + return await new Promise(resolve => { + const waiter = (): void => { + clearTimeout(timer); + resolve(true); + }; + const timer = setTimeout(() => { + this._drainWaiters = this._drainWaiters.filter(pending => pending !== waiter); + resolve(false); + }, timeoutMs); + this._drainWaiters.push(waiter); + }); } async close(): Promise { @@ -405,8 +440,19 @@ export function serveStdio(factory: McpServerFactory, options: ServeStdioOptions // the instance aborts whatever it still has in flight. Let the // in-flight DiscoverResult reach the wire before the instance is // closed; the probe instance only ever receives `server/discover`, - // whose entry-installed handler always answers promptly. - await instance.channel.whenRequestsAnswered(); + // whose entry-installed handler always answers promptly. A probe + // the client cancelled is already settled by the delivered + // cancellation (a cancelled request may go unanswered), and the + // wait is bounded as a backstop so nothing can wedge the + // connection's pump behind the discard. + const answered = await instance.channel.whenRequestsAnswered(DISCARD_ANSWER_TIMEOUT_MS); + if (!answered) { + reportError( + new Error( + `Discarded the probe instance with requests still unanswered after ${DISCARD_ANSWER_TIMEOUT_MS}ms; continuing with the fallback` + ) + ); + } await instance.product.close(); } catch (error) { reportError(toError(error)); diff --git a/packages/server/test/server/serveStdio.test.ts b/packages/server/test/server/serveStdio.test.ts index 0ba6c3ecf5..e342918179 100644 --- a/packages/server/test/server/serveStdio.test.ts +++ b/packages/server/test/server/serveStdio.test.ts @@ -403,6 +403,41 @@ describe('server/discover probe window', () => { await handle.close(); }); + it('a pipelined cancellation of the probe followed by initialize still falls back to a working legacy session', async () => { + const { handle, request, notify, flush, eras, closed, errors } = await startEntry(); + + // The client pipelines all three messages without waiting for any + // answer: the probe, an enveloped cancellation naming the probe id + // (which aborts the in-flight discover handler, so the probe may + // legitimately never be answered), and the fallback 2025 handshake. + // The cancelled probe must not hold the connection: the handshake is + // answered and the legacy session is fully usable. + void request({ jsonrpc: '2.0', id: 'probe-1', method: 'server/discover', params: { _meta: envelope() } }); + void notify({ + jsonrpc: '2.0', + method: 'notifications/cancelled', + params: { requestId: 'probe-1', reason: 'negotiation aborted', _meta: envelope() } + }); + const init = await request(initializeRequest(2)); + expect(isJSONRPCResultResponse(init)).toBe(true); + if (isJSONRPCResultResponse(init)) { + expect((init.result as { protocolVersion?: string }).protocolVersion).toBe(LATEST_PROTOCOL_VERSION); + } + + // The probe instance was discarded and the fallback is served end to + // end by a fresh legacy instance. + expect(eras).toEqual(['modern', 'legacy']); + expect(closed[0]).toBe(true); + expect(closed[1]).toBe(false); + + const list = await request({ jsonrpc: '2.0', id: 3, method: 'tools/list', params: {} }); + expect(isJSONRPCResultResponse(list)).toBe(true); + await flush(); + expect(errors).toEqual([]); + + await handle.close(); + }); + it('an enveloped non-discover request after the probe still pins the modern era', async () => { const { handle, request, eras } = await startEntry();