diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index 1af6784fd..233c0d2c5 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -29,6 +29,7 @@ import { TodoWriteTool } from "../../tool/todo" import { Locale } from "../../util/locale" import { Tracer, FileExporter, HttpExporter, type TraceExporter } from "../../altimate/observability/tracing" import { Config } from "../../config/config" +import { readStdinIfAvailable, assembleStdinMessage } from "../../util/stdin" type ToolProps = { input: Tool.InferParameters @@ -454,14 +455,11 @@ export const RunCommand = cmd({ message = [extractedParts.join("\n\n"), message].filter(Boolean).join("\n\n") } - // altimate_change start — null-safe stdin read. process.stdin can be - // undefined in embedded/child runtimes (dev-punia review, PR #937). - // Earlier revision used `!process.stdin?.isTTY`, which turned the crash - // into a stall: undefined stdin satisfied the guard and we then awaited - // Bun.stdin.text() on a stream that would never EOF. Skip the read - // entirely when there is no stdin to read from. - if (process.stdin && !process.stdin.isTTY) message += "\n" + (await Bun.stdin.text()) - // altimate_change end + // Read piped/redirected stdin without wedging on inherited-but-idle fds. + // See `src/util/stdin.ts` for the failure mode and the first-byte-race fix. + // The helper also handles `process.stdin === undefined` (embedded/child + // runtimes — dev-punia review on PR #937) by returning "". + message = assembleStdinMessage(message, await readStdinIfAvailable()) if (message.trim().length === 0 && !args.command) { UI.error("You must provide a message or a command") diff --git a/packages/opencode/src/util/stdin.ts b/packages/opencode/src/util/stdin.ts new file mode 100644 index 000000000..be2054840 --- /dev/null +++ b/packages/opencode/src/util/stdin.ts @@ -0,0 +1,194 @@ +import fs from "fs" +import type { Stats } from "fs" + +const STDIN_TIMEOUT_ENV = "ALTIMATE_STDIN_TIMEOUT_MS" +const DEFAULT_FIRST_BYTE_TIMEOUT_MS = 500 + +type Stat = Pick + +export interface ReadStdinDeps { + isTTY?: boolean + fstat?: () => Stat + // Returns "" if no first byte arrives within timeoutMs; otherwise drains + // stdin to EOF and returns the full content. Default implementation uses + // process.stdin events so that a wedged-after-first-byte case still has a + // well-defined behavior (waits for `end`); callers can inject a faster + // implementation in tests. + readStdin?: (timeoutMs: number) => Promise + timeoutMs?: number + // Called once with a human-readable note when fd 0 looked like a real + // input source (FIFO / file / socket) but no first byte arrived within + // the timeout — so a user whose pipe was silently dropped finds out. + // Default writes to stderr; tests inject a no-op to silence. + warn?: (msg: string) => void +} + +function resolveTimeoutMs(): number { + const raw = process.env[STDIN_TIMEOUT_ENV] + if (raw === undefined) return DEFAULT_FIRST_BYTE_TIMEOUT_MS + const parsed = Number.parseInt(raw, 10) + if (!Number.isFinite(parsed) || parsed < 0) return DEFAULT_FIRST_BYTE_TIMEOUT_MS + return parsed +} + +function defaultWarn(msg: string): void { + try { + process.stderr?.write(msg + "\n") + } catch {} +} + +// Read piped/redirected stdin without wedging on an inherited-but-idle fd. +// +// The failure mode this guards against: subprocess callers (Claude Code's +// Bash tool, Python `subprocess.run(..., stdin=None)`, CI, plugin hosts) +// leave stdin attached to a parent pipe that is never written to and never +// closed. A blind `Bun.stdin.text()` waits forever for an EOF that never +// arrives. +// +// Strategy — two gates: +// +// 1. fstat gate: only FIFOs (pipes), regular files (redirects), and +// sockets (process supervisors, socket activation, `nc -l`) can carry +// real input. TTYs and character devices (e.g. `< /dev/null`) skip. +// +// 2. First-byte timeout: instead of bounding the whole-stream drain, we +// wait up to `timeoutMs` for the first readable byte. If no byte +// arrives in that window, we treat stdin as inherited-idle and skip. +// If a byte arrives, we drain to EOF without further deadline — so a +// slow producer that takes >500ms total but flushes its first chunk +// within the window is not truncated. The 500ms default is chosen to +// cover realistic slow-to-first-byte producers (DB queries that need +// to plan, decompression headers, network calls with DNS+TLS handshake) +// while still failing fast enough to keep the inherited-idle wedge fix +// effective. Users can override via `ALTIMATE_STDIN_TIMEOUT_MS=N` (ms) +// for environments with even slower producers. This avoids the two +// pitfalls of a whole-stream race: (a) the orphaned `Bun.stdin.text()` +// continuing to hold fd 0 open after the loser is abandoned, and +// (b) silent mid-stream truncation of legitimate slow / large producers. +// +// 3. Stderr note on silent drop: if we got past the fstat gate (fd 0 +// looked like real input) but the read came back empty, we write one +// line to stderr so a user whose pipe was dropped finds out instead +// of silently getting no input. The note also tells them how to bump +// the timeout. This fires on the dominant inherited-idle path too — +// that's intentional, since stderr is captured but not surfaced for +// most subprocess callers (Claude Code's Bash tool, CI) and a single +// line of noise per call is worth the explicitness for the +// slow-producer case. +export async function readStdinIfAvailable(deps: ReadStdinDeps = {}): Promise { + // `process.stdin` can be undefined in embedded / child runtimes (flagged + // by dev-punia on PR #937, suryaiyer95 review on PR #935 #3). Optional + // chaining keeps the access safe; the fstat catch below covers the case + // where fd 0 itself isn't open. + const isTTY = deps.isTTY ?? Boolean(process.stdin?.isTTY) + const fstat = deps.fstat ?? (() => fs.fstatSync(0) as Stat) + const readStdin = deps.readStdin ?? defaultReadStdin + const timeoutMs = deps.timeoutMs ?? resolveTimeoutMs() + const warn = deps.warn ?? defaultWarn + + if (isTTY) return "" + + try { + const stat = fstat() + if (!stat.isFIFO() && !stat.isFile() && !stat.isSocket()) return "" + } catch { + return "" + } + + const result = await readStdin(timeoutMs) + + if (result === "") { + // An empty result can come from the timer firing (slow producer), a + // clean `end` with zero bytes (intentionally empty pipe), or a stream + // error. The previous version included a `set ${STDIN_TIMEOUT_ENV}=N` + // tip; cubic-dev-ai flagged it as misleading for the error and the + // dominant inherited-idle subprocess cases. The env var stays + // documented in code/docs for the rare slow-producer scenario. + warn(`altimate-code: stdin produced no data; proceeding without it.`) + } + + return result +} + +// Compose the final prompt from a positional message and stdin input. +// Extracted as a pure function so the regression case from PR #935 +// (`echo ctx | run "prompt"` must concatenate, not silently drop ctx) can +// be unit-tested without spawning the full run command. +export function assembleStdinMessage(positional: string, stdinInput: string): string { + if (stdinInput.trim().length === 0) return positional + if (positional.length === 0) return stdinInput + return positional + "\n" + stdinInput +} + +// Default implementation of the first-byte race over `process.stdin`. +// +// Why not `Bun.stdin.text()`: that reads the entire stream as a single +// uncancellable Promise. If we race it against a timer and the timer wins, +// the read still holds fd 0 open until the producer eventually closes, +// blocking process exit (the original wedge moved to teardown). +// +// Using `process.stdin` event listeners lets us: +// - bind the timeout to "first byte" rather than "full drain", so slow +// producers and large payloads aren't truncated; +// - cleanly remove our listeners and `unref` the stream on the no-data +// path, so an inherited-open fd doesn't pin the event loop. +function defaultReadStdin(timeoutMs: number): Promise { + return new Promise((resolve) => { + const stdin = process.stdin + // Defensive: a caller can reach this path with `deps.isTTY` set but + // `process.stdin` undefined (embedded/child runtime). The outer + // `process.stdin?.isTTY` guard short-circuits the default case but not + // this one. Treat absence as "no data" to match the function's contract. + if (!stdin) return resolve("") + const chunks: Buffer[] = [] + let firstByteReceived = false + let firstByteTimer: ReturnType | undefined + let settled = false + + const cleanup = () => { + stdin.off("data", onData) + stdin.off("end", onEnd) + stdin.off("error", onError) + if (firstByteTimer) clearTimeout(firstByteTimer) + try { + stdin.pause() + } catch {} + try { + stdin.unref?.() + } catch {} + } + + const settle = (result: string) => { + if (settled) return + settled = true + cleanup() + resolve(result) + } + + const onData = (chunk: Buffer) => { + if (!firstByteReceived) { + firstByteReceived = true + if (firstByteTimer) { + clearTimeout(firstByteTimer) + firstByteTimer = undefined + } + } + chunks.push(chunk) + } + const onEnd = () => settle(Buffer.concat(chunks).toString("utf8")) + const onError = () => settle(Buffer.concat(chunks).toString("utf8")) + + firstByteTimer = setTimeout(() => { + if (!firstByteReceived) settle("") + }, timeoutMs) + + stdin.on("data", onData) + stdin.on("end", onEnd) + stdin.on("error", onError) + try { + stdin.resume() + } catch { + settle("") + } + }) +} diff --git a/packages/opencode/test/util/stdin-e2e.test.ts b/packages/opencode/test/util/stdin-e2e.test.ts new file mode 100644 index 000000000..5f6301e7d --- /dev/null +++ b/packages/opencode/test/util/stdin-e2e.test.ts @@ -0,0 +1,130 @@ +import { describe, expect, test } from "bun:test" +import path from "path" + +// The fixture imports the real helper and prints { result, elapsed } as JSON. +// Spawning it lets us exercise the actual `process.stdin` event path against +// real fd 0 conditions — something dependency injection can't cover. +const FIXTURE = path.join(__dirname, "stdin-fixture.ts") + +type FileSink = { write(chunk: string | Uint8Array): number; end(): Promise | number } + +async function runFixture(opts: { + writeStdin?: (sink: FileSink) => Promise + killAfterMs?: number +}): Promise<{ code: number | null; result?: string; elapsed?: number; stdout: string; stderr: string }> { + const proc = Bun.spawn(["bun", "run", FIXTURE], { + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }) + + if (opts.writeStdin) { + await opts.writeStdin(proc.stdin as unknown as FileSink) + } + + let killTimer: ReturnType | undefined + if (opts.killAfterMs) { + killTimer = setTimeout(() => proc.kill(), opts.killAfterMs) + } + const code = await proc.exited + if (killTimer) clearTimeout(killTimer) + + const stdout = await new Response(proc.stdout).text() + const stderr = await new Response(proc.stderr).text() + + try { + const parsed = JSON.parse(stdout) + return { code, result: parsed.result, elapsed: parsed.elapsed, stdout, stderr } + } catch { + return { code, stdout, stderr } + } +} + +describe("readStdinIfAvailable (spawned subprocess)", () => { + // M1-regression — the canonical wedge: an inherited pipe that's never + // written to and never closed. Pre-fix this hung forever; with the + // previous Promise.race fix, the await released at 100ms but fd 0 stayed + // open until the parent closed. With the first-byte event race + unref, + // the child exits promptly. + test( + "exits promptly with empty result when stdin is an inherited-but-idle pipe", + async () => { + const { code, result, elapsed } = await runFixture({ + // Don't write — leave the pipe open and silent. Close after 3s so + // the parent's writer isn't garbage-collected; by then the child + // should have already exited via the 500ms first-byte timeout. + writeStdin: async (sink) => { + await new Promise((r) => setTimeout(r, 3000)) + try { + await sink.end() + } catch {} + }, + killAfterMs: 8000, + }) + expect(code).toBe(0) + expect(result).toBe("") + // 1500 = timeout (500ms) × 3 for CI scheduler headroom. Well under + // "infinite hang" — the assertion guards the wedge fix, not perf. + expect(elapsed).toBeLessThan(1500) + }, + 15000, + ) + + // MAJOR-regression from PR #935: `echo ctx | run "prompt"` must still + // deliver "ctx". Verified here by writing data + closing the pipe. + test( + "returns piped data when producer writes and closes", + async () => { + const { code, result } = await runFixture({ + writeStdin: async (sink) => { + sink.write("context data") + await sink.end() + }, + }) + expect(code).toBe(0) + expect(result).toBe("context data") + }, + 10000, + ) + + // Sury PR #935 review #2: 500ms must cover realistic slow-first-byte + // producers (DB queries that need to plan, decompression, network calls + // with DNS+TLS handshake). 300ms is comfortably under the 500ms budget; + // the previous 100ms config would have truncated this. + test( + "preserves data from slow producer (first byte arrives ~300ms after spawn)", + async () => { + const { code, result } = await runFixture({ + writeStdin: async (sink) => { + await new Promise((r) => setTimeout(r, 300)) + sink.write("slow ctx") + await sink.end() + }, + }) + expect(code).toBe(0) + expect(result).toBe("slow ctx") + }, + 10000, + ) + + // Negative control: a producer slow enough to miss the first-byte window + // should yield "" (intentional cutoff, no truncation of in-flight data). + // 1200ms > 500ms timeout with margin for CI scheduler latency. + test( + "returns empty when first byte arrives after the first-byte timeout", + async () => { + const { code, result } = await runFixture({ + writeStdin: async (sink) => { + await new Promise((r) => setTimeout(r, 1200)) + try { + sink.write("too late") + await sink.end() + } catch {} + }, + }) + expect(code).toBe(0) + expect(result).toBe("") + }, + 15000, + ) +}) diff --git a/packages/opencode/test/util/stdin-fixture.ts b/packages/opencode/test/util/stdin-fixture.ts new file mode 100644 index 000000000..4be684958 --- /dev/null +++ b/packages/opencode/test/util/stdin-fixture.ts @@ -0,0 +1,9 @@ +// Subprocess fixture used by stdin-e2e.test.ts. +// Imports the real helper, invokes it against real fd 0, and prints the +// result as JSON so the test can assert on it. +import { readStdinIfAvailable } from "../../src/util/stdin" + +const start = Date.now() +const result = await readStdinIfAvailable() +const elapsed = Date.now() - start +process.stdout.write(JSON.stringify({ result, elapsed })) diff --git a/packages/opencode/test/util/stdin.test.ts b/packages/opencode/test/util/stdin.test.ts new file mode 100644 index 000000000..c38b42fd8 --- /dev/null +++ b/packages/opencode/test/util/stdin.test.ts @@ -0,0 +1,278 @@ +import { describe, expect, test } from "bun:test" +import { readStdinIfAvailable, assembleStdinMessage } from "../../src/util/stdin" + +const fifo = { isFIFO: () => true, isFile: () => false, isSocket: () => false } +const file = { isFIFO: () => false, isFile: () => true, isSocket: () => false } +const sock = { isFIFO: () => false, isFile: () => false, isSocket: () => true } +const charDev = { isFIFO: () => false, isFile: () => false, isSocket: () => false } + +describe("readStdinIfAvailable", () => { + test("returns empty when stdin is a TTY (no fstat, no read)", async () => { + let read = false + const out = await readStdinIfAvailable({ + isTTY: true, + fstat: () => { + throw new Error("fstat should not run when isTTY") + }, + readStdin: async () => { + read = true + return "should not happen" + }, + }) + expect(out).toBe("") + expect(read).toBe(false) + }) + + test("returns empty for `< /dev/null` (character device — neither FIFO, file, nor socket)", async () => { + let read = false + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => charDev, + readStdin: async () => { + read = true + return "should not happen" + }, + }) + expect(out).toBe("") + expect(read).toBe(false) + }) + + test("returns empty when fstat throws (e.g. EBADF — fd 0 not open)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => { + throw new Error("EBADF") + }, + readStdin: async () => "should not happen", + }) + expect(out).toBe("") + }) + + // MAJOR-regression: `echo ctx | run "prompt"` must still deliver "ctx". + test("returns piped data when stdin is a FIFO with available bytes", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => "context data", + }) + expect(out).toBe("context data") + }) + + test("returns redirected file contents (run < file.txt)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => file, + readStdin: async () => "from-file", + }) + expect(out).toBe("from-file") + }) + + // m1 fix: sockets are now accepted (used by process supervisors, socket + // activation, `nc -l`). Pre-fix the helper silently skipped them. + test("accepts socket-backed stdin (process supervisors, socket activation, nc -l)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => sock, + readStdin: async () => "via-socket", + }) + expect(out).toBe("via-socket") + }) + + // M1-regression: timed-out wait must propagate as "" (no orphan, no wedge). + test("returns empty when readStdin times out (idle inherited FIFO)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => "", // simulates first-byte timeout + warn: () => {}, // silence the stderr note in test output + }) + expect(out).toBe("") + }) + + // M2-regression: a producer that takes >100ms total but flushes its first + // byte inside the window must NOT be truncated. The helper trusts readStdin + // to wait for EOF after first byte; we verify the no-truncation contract by + // injecting a readStdin that resolves slowly with full content. + test("returns full content from slow-but-pre-timeout producer", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: () => new Promise((r) => setTimeout(() => r("ctx after 80ms"), 80)), + timeoutMs: 100, + }) + expect(out).toBe("ctx after 80ms") + }) + + test("returns empty for FIFO with immediate EOF (empty pipe)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => "", + warn: () => {}, + }) + expect(out).toBe("") + }) + + test("returns whitespace-only stdin verbatim (caller decides to trim)", async () => { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => " \n\t ", + }) + expect(out).toBe(" \n\t ") + }) + + // PR #937 / dev-punia review: `process.stdin` can be undefined in + // embedded/child runtimes. The helper must not throw when accessing + // `process.stdin.isTTY` in that case. + test("returns empty when process.stdin is undefined (embedded/child runtime)", async () => { + const original = process.stdin + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ;(process as any).stdin = undefined + try { + const out = await readStdinIfAvailable() + expect(out).toBe("") + } finally { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ;(process as any).stdin = original + } + }) + + // cubic P2 on commit 7fceb85a1: a caller that passes `deps.isTTY` bypasses + // the outer `process.stdin?.isTTY` guard. If `process.stdin` is undefined, + // the default reader path used to crash on `stdin.on(...)`. The defensive + // check inside `defaultReadStdin` covers this. + test("default reader returns empty when process.stdin is undefined even when deps.isTTY is passed", async () => { + const original = process.stdin + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ;(process as any).stdin = undefined + try { + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + // no readStdin → exercises defaultReadStdin + warn: () => {}, + }) + expect(out).toBe("") + } finally { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ;(process as any).stdin = original + } + }) + + // Sury PR #935 review #2: silent drop is bad UX. When fd 0 looked like + // real input but no first byte arrived, warn so the user knows. + test("warns when stdin looked like input but readStdin returned empty (silent-drop fix)", async () => { + const seen: string[] = [] + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => "", + warn: (msg) => seen.push(msg), + }) + expect(out).toBe("") + expect(seen).toHaveLength(1) + expect(seen[0]).toContain("stdin produced no data") + }) + + test("does NOT warn when isTTY (no pipe to drop in the first place)", async () => { + const seen: string[] = [] + const out = await readStdinIfAvailable({ + isTTY: true, + readStdin: async () => "", + warn: (msg) => seen.push(msg), + }) + expect(out).toBe("") + expect(seen).toHaveLength(0) + }) + + test("does NOT warn when fstat says char device (/dev/null) — intentional skip", async () => { + const seen: string[] = [] + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => charDev, + readStdin: async () => "", + warn: (msg) => seen.push(msg), + }) + expect(out).toBe("") + expect(seen).toHaveLength(0) + }) + + test("does NOT warn when stdin delivered data (no drop happened)", async () => { + const seen: string[] = [] + const out = await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async () => "ctx", + warn: (msg) => seen.push(msg), + }) + expect(out).toBe("ctx") + expect(seen).toHaveLength(0) + }) + + // Sury PR #935 review #2: env override for slow-pipeline users. + test("ALTIMATE_STDIN_TIMEOUT_MS env override is forwarded to readStdin", async () => { + const original = process.env["ALTIMATE_STDIN_TIMEOUT_MS"] + process.env["ALTIMATE_STDIN_TIMEOUT_MS"] = "2500" + let observed: number | undefined + try { + await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async (ms) => { + observed = ms + return "data" + }, + warn: () => {}, + }) + } finally { + if (original === undefined) delete process.env["ALTIMATE_STDIN_TIMEOUT_MS"] + else process.env["ALTIMATE_STDIN_TIMEOUT_MS"] = original + } + expect(observed).toBe(2500) + }) + + test("ALTIMATE_STDIN_TIMEOUT_MS falls back to default when unparseable", async () => { + const original = process.env["ALTIMATE_STDIN_TIMEOUT_MS"] + process.env["ALTIMATE_STDIN_TIMEOUT_MS"] = "not-a-number" + let observed: number | undefined + try { + await readStdinIfAvailable({ + isTTY: false, + fstat: () => fifo, + readStdin: async (ms) => { + observed = ms + return "data" + }, + warn: () => {}, + }) + } finally { + if (original === undefined) delete process.env["ALTIMATE_STDIN_TIMEOUT_MS"] + else process.env["ALTIMATE_STDIN_TIMEOUT_MS"] = original + } + expect(observed).toBe(500) + }) +}) + +describe("assembleStdinMessage", () => { + // MAJOR-regression from PR #935: positional must NOT silently override stdin. + test("concatenates positional + stdin with newline", () => { + expect(assembleStdinMessage("summarize:", "context data")).toBe("summarize:\ncontext data") + }) + + test("returns stdin when positional is empty", () => { + expect(assembleStdinMessage("", "stdin only")).toBe("stdin only") + }) + + test("returns positional when stdin is empty", () => { + expect(assembleStdinMessage("msg", "")).toBe("msg") + }) + + test("ignores whitespace-only stdin", () => { + expect(assembleStdinMessage("msg", " \n ")).toBe("msg") + }) + + test("returns empty string when both are empty", () => { + expect(assembleStdinMessage("", "")).toBe("") + }) +})