Add unix/unixgram destinations + table-driven tests#9
Merged
Conversation
xtcp2 now supports writing records to a local unix-domain socket so that
a daemon (kubernetes daemonset, machine-local collector, etc.) can read
them off without a network hop. Two schemes:
unix:/path/to/sock SOCK_STREAM, varint-length-prefixed framing
unixgram:/path/to/sock SOCK_DGRAM, one Write == one datagram == one
record, no framing
Both follow the established destination pattern (function stored in
sync.Map, init function dialled once at startup, blocking writes, log
on error). For unixgram, init pre-checks os.Stat so the "fail loudly at
startup" contract holds even though SOCK_DGRAM dial doesn't verify the
peer.
The dest proto field's max_len cap goes from 40 to 128 to accommodate
the longer unixgram:/path strings; comment is rewritten to enumerate
all current schemes. Generated bindings regenerated via buf.
Adds the repo's first destination_test.go: one table-driven test
covering null/udp/unix/unixgram round-trip and multiple-record cases,
a varying-size stream-framing test, missing-socket / missing-daemon
sanity tests, and benchmarks for all four. Each row uses a real local
socket in t.TempDir() — no mocks. To make InitDest* paths testable
without taking the process down, a hookable x.fatalf field on XTCP
defaults to log.Fatalf and is overridden to t.Fatalf in tests; only
the two new InitDest* functions use it, existing destinations are
untouched.
go test -race ./pkg/xtcp/... -run TestDest is clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… one writev
destUnix was issuing two sequential net.Conn.Write calls — first the
varint length header, then the payload. If the header write succeeded
but the payload write failed midway (peer disconnect, EPIPE during a
backpressure event, etc.) the receiver was left with a varint length
prefix promising N bytes followed by fewer than N bytes of data. That
shape is unrecoverable from the daemon-side reader's
binary.ReadUvarint + io.ReadFull pattern: the next reader pass would
read garbage into the next-frame header position and never recover.
Switch to net.Buffers{hdr, payload}.WriteTo(conn). Go's stdlib lowers
this to a single writev(2) on *net.UnixConn — header and payload land
atomically or fail as a unit. Same on-wire shape, same per-record
accounting, half the syscalls in the happy path. Removes the
TODO that explicitly called this out.
Tested:
- go build ./pkg/xtcp/... clean
- go vet ./pkg/xtcp/... clean
- go test -race -run 'TestDestinations|TestDestUnix|TestDestUnixGram' ./pkg/xtcp/
PASS
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
3 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds two stdlib-only destinations alongside the existing
kafka/udp/nsq/nats/valkey/nullset, with table-driven coverage:unix:/path/to/sock— SOCK_STREAM, framed with a varint length prefix per record. Header + payload now coalesced into onewritev(2)vianet.Buffers(see "Reliability hardening" below).unixgram:/path/to/sock— SOCK_DGRAM. One record per datagram; the kernel preserves message boundaries.CLI:
-dest unix:/run/xtcp2.sockor-dest unixgram:/run/xtcp2.sock.Architectural touches
fatalffield onXTCP(defaults tolog.Fatalf) soInit*paths can be exercised from tests without taking the process down. Existing destinations still calllog.Fatalfdirectly — opt-in.closeDestinationnowstrings.Cuts the scheme so the switch is"unix"/"unixgram"rather than the full-deststring.destmax_len bumped 40 → 128 to fitunixgram:(9 bytes) + Linuxsun_path(108 bytes).destinations.go: "implementations may assume serial access; concurrent callers are not supported without an internal mutex."Reliability hardening (the substantive small fix on top of the original commit)
destUnixwas issuing two sequentialWritecalls — varint header, then payload. If the header succeeded and the payload failed partway through (peer disconnect, EPIPE on backpressure), the receiver saw a varint header promising N bytes followed by fewer than N bytes of data — an unrecoverable torn-frame state that would wedge the daemon-sidebinary.ReadUvarint + io.ReadFullreader. Switched tonet.Buffers{hdr, payload}.WriteTo(conn), which Go's stdlib lowers to a singlewritev(2)on*net.UnixConn. Header + payload land atomically or fail as a unit. Same wire shape, same accounting, half the syscalls in the happy path.Test plan
go build ./pkg/xtcp/...cleango vet ./pkg/xtcp/...cleango test -race -run 'TestDestinations|TestDestUnix|TestDestUnixGram' ./pkg/xtcp/PASSTestDestinationscovers null + udp + unix + unixgram with single + multi-record cases.TestDestUnix_StreamFramingexercises payloads of 1 / 256 / 50 KB (the 50 KB case exercises the multi-byte varint path).TestDestUnixGram_MissingSocketandTestDestUnix_MissingDaemonverify the fail-loudly contract via the capturedfatalfhook.b.SetBytesfor per-record throughput reporting.Known follow-ups (not blocking)
setupUDPDest/setupUDPDestTBetc. are near-identical pairs because the helpers take*testing.Twhile benchmarks needtesting.TB. Takingtesting.TBdirectly would collapse the pair.InitDestUnixGram'sos.Statpre-check has a small TOCTOU window with the subsequentnet.Dial("unixgram", path). Tiny in practice (unixgram is connectionless so dial can't verify the peer anyway); worth documenting if anyone cares.🤖 Generated with Claude Code