From 6c19b6b99e90cf6c6098d5b0decb06a7becfc546 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 17:47:08 -0700 Subject: [PATCH 01/30] Fix two netlinker syscall-path bugs (#25, #26) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #25: Diagnostic packet captures wrote the full pool-buffer (e.g. 8 KiB) instead of the n bytes Recvfrom actually filled. Pcap-like consumers parsed trailing garbage past the real end of every capture. #26: At goroutine exit the pool Put used (*packetBuffer)[:0] — a zero-length slice. A later Get from a freshly spawned netlinker would call syscall.Recvfrom on it, which panics on &p[0] when len(p)==0. iouringPrefillRecvs already restored cap defensively on Get; the syscall producer must Put in usable shape. Both bugs only fired in long-running daemons with namespace add/remove churn — first netlinker exits cleanly, second namespace gets the corrupted buffer. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/netlinker.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/xtcp/netlinker.go b/pkg/xtcp/netlinker.go index 4328519..0bac4b7 100644 --- a/pkg/xtcp/netlinker.go +++ b/pkg/xtcp/netlinker.go @@ -90,9 +90,14 @@ func (x *XTCP) netlinkerSyscall(ctx context.Context, wg *sync.WaitGroup, nsName if wf > 0 { now := time.Now() + // Capture only the n bytes Recvfrom actually filled. Writing the + // raw *packetBuffer here used to dump the full pool-buffer size + // (e.g. 8 KiB), trailing the real packet with stale bytes from a + // previous Recvfrom — pcap-like consumers parsed garbage past + // the real end of the message. err = os.WriteFile( x.config.CapturePath+"netlink."+now.Format(time.RFC3339Nano), - *(packetBuffer), + (*packetBuffer)[:n], writeFilesPermissionsCst) if err != nil { // Diagnostic capture-to-file is a side feature; a disk- @@ -146,7 +151,14 @@ func (x *XTCP) netlinkerSyscall(ctx context.Context, wg *sync.WaitGroup, nsName } } - *packetBuffer = (*packetBuffer)[:0] + // Restore the slice header to full capacity before returning it to + // the pool. (*packetBuffer)[:0] would Put a zero-length slice — a + // later Get from a fresh netlinker would call syscall.Recvfrom on + // it, which panics on &p[0] when len(p)==0. iouringPrefillRecvs + // (netlinker_iouring.go) already restores cap on Get as a defensive + // measure, but the syscall path is the producer of these buffers + // and must Put them in usable shape. + *packetBuffer = (*packetBuffer)[:cap(*packetBuffer)] x.packetBufferPool.Put(packetBuffer) x.pC.WithLabelValues("Netlinker", "complete", "count").Inc() From 767adc5c93f35ff7e3717a52c5ac5a5a5dd55087 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 17:48:57 -0700 Subject: [PATCH 02/30] Bug 27: getLatestSchemaID had no timeout and ignored ctx MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit http.Get uses the DefaultClient with no per-request timeout — a hung or wedged Schema Registry would block daemon startup forever. Add a NewRequestWithContext + 10s hard ceiling so the init-time call observes ctx cancellation and fails cleanly. Also surface non-2xx (other than 404) as an explicit error instead of letting a 401/403/5xx fall through to a confusing JSON-decode failure. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/destinations_kafka.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/xtcp/destinations_kafka.go b/pkg/xtcp/destinations_kafka.go index 4bb92aa..998a4b1 100644 --- a/pkg/xtcp/destinations_kafka.go +++ b/pkg/xtcp/destinations_kafka.go @@ -53,7 +53,7 @@ func newKafkaDest(ctx context.Context, x *XTCP) (Destination, error) { return nil, err } - schemaID, err := d.getLatestSchemaID() + schemaID, err := d.getLatestSchemaID(ctx) if err != nil { return nil, fmt.Errorf("newKafkaDest getLatestSchemaID: %w", err) } @@ -160,13 +160,23 @@ func (d *kafkaDest) Close() error { return nil } -func (d *kafkaDest) getLatestSchemaID() (int, error) { +func (d *kafkaDest) getLatestSchemaID(ctx context.Context) (int, error) { url := fmt.Sprintf("%s/subjects/%s-value/versions/latest", d.x.config.KafkaSchemaUrl, d.x.config.Topic) if d.x.debugLevel > 10 { log.Printf("getLatestSchemaID url:%s\n", url) } - resp, err := http.Get(url) + // http.Get used the DefaultClient which has no timeout — a hung + // Schema Registry would block daemon startup indefinitely. Build + // the request with the caller's ctx + a hard 10s ceiling so the + // init-time call observes shutdown and never wedges. + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, nil) + if err != nil { + return 0, err + } + resp, err := http.DefaultClient.Do(req) if err != nil { return 0, err } @@ -174,6 +184,9 @@ func (d *kafkaDest) getLatestSchemaID() (int, error) { if resp.StatusCode == http.StatusNotFound { return 0, fmt.Errorf("getLatestSchemaID http.StatusNotFound url:%s", url) } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return 0, fmt.Errorf("getLatestSchemaID url:%s unexpected status:%d", url, resp.StatusCode) + } var result struct { ID int `json:"id"` } From f55ce878848c54249551c91f4c30f09684be925a Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 17:53:08 -0700 Subject: [PATCH 03/30] Bug 28: kafkaDest.Close dropped in-flight records (no Flush) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit franz-go's Close cancels in-flight produces without waiting for broker acks — any records buffered when SIGTERM hit were silently dropped. Add a bounded (5s) Flush before Close so the daemon's last poll cycle is durably delivered. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/destinations_kafka.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/xtcp/destinations_kafka.go b/pkg/xtcp/destinations_kafka.go index 998a4b1..d8dc638 100644 --- a/pkg/xtcp/destinations_kafka.go +++ b/pkg/xtcp/destinations_kafka.go @@ -155,6 +155,18 @@ func (d *kafkaDest) Send(ctx context.Context, b *[]byte) (int, error) { func (d *kafkaDest) Close() error { if d.client != nil { + // franz-go's Close cancels in-flight produces without waiting for + // their broker acks — records buffered when shutdown fires were + // silently dropped. Flush first with a bounded timeout so the + // daemon's last poll cycle is durably delivered before teardown. + flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := d.client.Flush(flushCtx); err != nil { + d.x.pC.WithLabelValues("destKafka", "FlushOnClose", "error").Inc() + if d.x.debugLevel > 10 { + log.Printf("destKafka Flush on Close: %v", err) + } + } + cancel() d.client.Close() } return nil From fc4541fe44825fca87ff41aa5077ade8c45e8ff0 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 17:54:08 -0700 Subject: [PATCH 04/30] Bug 29: getLatestSchemaIDAt silently returned id:0 on 4xx/5xx MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Schema Registries return a JSON error body on non-2xx (e.g. {\"error_code\":40402,\"message\":\"Subject not found.\"}). The previous code skipped the status check and decoded that error body straight into {id int}, producing a silent id:0 success — the CLI printed \"id:0\" for a missing subject instead of reporting the registry error. Co-Authored-By: Claude Opus 4.7 --- cmd/register_schema/register_schema.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmd/register_schema/register_schema.go b/cmd/register_schema/register_schema.go index f3480c2..b35e33a 100644 --- a/cmd/register_schema/register_schema.go +++ b/cmd/register_schema/register_schema.go @@ -70,6 +70,14 @@ func getLatestSchemaIDAt(client *http.Client, baseURL, subject string) (int, err return 0, err } defer resp.Body.Close() + // Schema Registries return a JSON error body on 4xx/5xx (e.g. + // {"error_code":40402,"message":"Subject not found."}). The previous + // code skipped the status check and decoded the error body straight + // into the {id int} struct, producing a silent id:0 success — the + // CLI printed "id:0" for a missing subject. Reject non-2xx upfront. + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return 0, fmt.Errorf("getLatestSchemaIDAt %s: unexpected status:%d", url, resp.StatusCode) + } var result struct { ID int `json:"id"` } From 0e95f93cae0d064bc784fba735d32547ccc054bf Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 17:55:13 -0700 Subject: [PATCH 05/30] =?UTF-8?q?Bug=2030:=20kafka=5Fto=5Fclickhouse=20get?= =?UTF-8?q?LatestSchemaIDAt=20same=204xx/5xx=20=E2=86=92=20id:0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same pattern as bug 29 in cmd/register_schema. Downstream impact here is more harmful: kafka_to_clickhouse stamps every Kafka record's Confluent header with the returned schemaID. A silent id:0 would make every produced record claim schema 0 — ClickHouse's protobuflist input then mis-parses or drops every row. Co-Authored-By: Claude Opus 4.7 --- cmd/kafka_to_clickhouse/kafka_to_clickhouse.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go b/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go index 78d4613..52cf85b 100644 --- a/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go +++ b/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go @@ -442,6 +442,14 @@ func getLatestSchemaIDAt(ctx context.Context, client *http.Client, baseURL, subj } defer resp.Body.Close() + // Schema Registries return a JSON error body on 4xx/5xx; decoding it + // into the {id int} struct silently yields id:0. Reject non-2xx + // upfront so kafka_to_clickhouse's downstream Produce path doesn't + // stamp every Kafka record with a bogus schemaID:0 magic header. + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return 0, fmt.Errorf("getLatestSchemaIDAt %s: unexpected status:%d", url, resp.StatusCode) + } + var result struct { ID int `json:"id"` } From 6abc4ae38d3b23ec356535f1776cd4dc177ad8ff Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 17:57:21 -0700 Subject: [PATCH 06/30] Bug 31: clickhouse insert wrong FORMAT for envelope=true (default) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cmd/clickhouse_http_insert_protobuflist hardwired FORMAT Protobuf for both envelope (Envelope with repeated Rows) and non-envelope (per-row length-prefixed Record) payloads. With the binary's default -envelope=true, the request sent an Envelope-shaped protobuf at a URL declaring the schema as Record — ClickHouse rejected every insert. Switch to FORMAT ProtobufList when useEnvelope is true and Protobuf otherwise. Add TestInsertIntoCHAt_formatSelection so the URL string is asserted in both modes. Co-Authored-By: Claude Opus 4.7 --- .../clickhouse_http_insert_protobuflist.go | 16 +++++++-- ...lickhouse_http_insert_protobuflist_test.go | 34 ++++++++++++++++--- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/cmd/clickhouse_http_insert_protobuflist/clickhouse_http_insert_protobuflist.go b/cmd/clickhouse_http_insert_protobuflist/clickhouse_http_insert_protobuflist.go index 5acf9c2..76d5e1e 100644 --- a/cmd/clickhouse_http_insert_protobuflist/clickhouse_http_insert_protobuflist.go +++ b/cmd/clickhouse_http_insert_protobuflist/clickhouse_http_insert_protobuflist.go @@ -179,14 +179,24 @@ func writeDataToFile(ctx context.Context, filename string, data []byte) error { var ErrClickHouseHTTPPost = errors.New("clickhouse http post failed") func insertIntoCH(ctx context.Context, c config, binaryData []byte) error { - return insertIntoCHAt(ctx, http.DefaultClient, "http://"+c.connectStr, binaryData) + return insertIntoCHAt(ctx, http.DefaultClient, "http://"+c.connectStr, binaryData, c.envelope) } // insertIntoCHAt POSTs a binary protobuf payload to ClickHouse's HTTP // endpoint. Extracted so tests can drive it against httptest.Server (the // production wrapper picks the baseURL from config.connectStr). -func insertIntoCHAt(ctx context.Context, client *http.Client, baseURL string, binaryData []byte) error { - clickhouseURL := baseURL + "/?query=INSERT%20INTO%20clickhouse_protolist.clickhouse_protolist%20FORMAT%20Protobuf&format_schema=clickhouse_protolist.proto:clickhouse_protolist.v1.Record" +// +// useEnvelope picks the ClickHouse format: ProtobufList for envelope mode +// (a single Envelope with a repeated Rows field), Protobuf for the +// length-prefixed per-row mode. The previous code hardwired FORMAT +// Protobuf regardless, so the default envelope=true path mailed an +// Envelope at a Record schema — ClickHouse rejected every insert. +func insertIntoCHAt(ctx context.Context, client *http.Client, baseURL string, binaryData []byte, useEnvelope bool) error { + format := "Protobuf" + if useEnvelope { + format = "ProtobufList" + } + clickhouseURL := baseURL + "/?query=INSERT%20INTO%20clickhouse_protolist.clickhouse_protolist%20FORMAT%20" + format + "&format_schema=clickhouse_protolist.proto:clickhouse_protolist.v1.Record" req, err := http.NewRequestWithContext(ctx, http.MethodPost, clickhouseURL, bytes.NewReader(binaryData)) if err != nil { diff --git a/cmd/clickhouse_http_insert_protobuflist/clickhouse_http_insert_protobuflist_test.go b/cmd/clickhouse_http_insert_protobuflist/clickhouse_http_insert_protobuflist_test.go index 41b26dc..d349c5f 100644 --- a/cmd/clickhouse_http_insert_protobuflist/clickhouse_http_insert_protobuflist_test.go +++ b/cmd/clickhouse_http_insert_protobuflist/clickhouse_http_insert_protobuflist_test.go @@ -135,7 +135,7 @@ func TestInsertIntoCHAt_success(t *testing.T) { w.WriteHeader(http.StatusOK) })) defer srv.Close() - if err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("payload")); err != nil { + if err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("payload"), true); err != nil { t.Errorf("err = %v, want nil", err) } } @@ -146,7 +146,7 @@ func TestInsertIntoCHAt_serverError(t *testing.T) { _, _ = w.Write([]byte("oops")) //nolint:errcheck // test plumbing })) defer srv.Close() - err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("payload")) + err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("payload"), true) if err == nil { t.Error("500 should produce error") } @@ -159,15 +159,41 @@ func TestInsertIntoCHAt_connRefused(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) url := srv.URL srv.Close() - if err := insertIntoCHAt(t.Context(), http.DefaultClient, url, []byte("payload")); err == nil { + if err := insertIntoCHAt(t.Context(), http.DefaultClient, url, []byte("payload"), true); err == nil { t.Error("conn-refused should produce error") } } func TestInsertIntoCHAt_badURL(t *testing.T) { // Malformed URL forces http.NewRequestWithContext to fail. - err := insertIntoCHAt(t.Context(), http.DefaultClient, "://not a url", []byte("p")) + err := insertIntoCHAt(t.Context(), http.DefaultClient, "://not a url", []byte("p"), true) if err == nil { t.Error("malformed URL should produce error") } } + +// Verify the URL contains FORMAT ProtobufList when useEnvelope is true, +// and FORMAT Protobuf when false — the bug under fix. +func TestInsertIntoCHAt_formatSelection(t *testing.T) { + cases := []struct { + useEnvelope bool + wantFormat string + }{ + {true, "ProtobufList"}, + {false, "Protobuf&"}, + } + for _, tc := range cases { + var gotURL string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotURL = r.URL.String() + w.WriteHeader(http.StatusOK) + })) + if err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("p"), tc.useEnvelope); err != nil { + t.Errorf("useEnvelope=%v: err = %v", tc.useEnvelope, err) + } + if !strings.Contains(gotURL, "FORMAT%20"+tc.wantFormat) { + t.Errorf("useEnvelope=%v: URL = %s, want FORMAT %s", tc.useEnvelope, gotURL, tc.wantFormat) + } + srv.Close() + } +} From 0de00096d641f475019decfd1d0425a96bdd311d Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 17:58:54 -0700 Subject: [PATCH 07/30] Bug 32: pollStreamRecv spun CPU on persistent stream errors A broken bidi stream typically returns the same error immediately on every Recv (e.g. "rpc error: stream closed"). The previous loop re-called Recv with zero backoff, pegging a core at 100% until ctx was canceled. Add a ctx-aware 100ms backoff between retries. Co-Authored-By: Claude Opus 4.7 --- cmd/xtcp2client/xtcp2client.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/xtcp2client/xtcp2client.go b/cmd/xtcp2client/xtcp2client.go index 516542b..1a9c12d 100644 --- a/cmd/xtcp2client/xtcp2client.go +++ b/cmd/xtcp2client/xtcp2client.go @@ -222,11 +222,15 @@ breakPoint: log.Printf("pollStreamRecv err:%v", err) } + // A broken stream typically returns the same error immediately + // on every subsequent Recv (e.g. "rpc error: stream closed"). + // The previous code looped without backoff, pegging a CPU core + // at 100% until ctx was canceled. Sleep briefly between + // retries so the loop is ctx-cancellable AND non-spinny. select { case <-ctx.Done(): break breakPoint - default: - // non-blocking + case <-time.After(100 * time.Millisecond): } continue } From 888b9eaf9ea872030d9e50e7bbcd2c6b22d19136 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 17:59:48 -0700 Subject: [PATCH 08/30] Bug 33: handleRecvCQE nil-deref on orphan CQEs drainOnce appends a Result with Buf=nil when the CQE's RequestID isn't in the in-flight map (e.g. post-Close stragglers or, at the >2^32 SQE horizon, a request-ID wrap collision). handleRecvCQE then dereferenced res.Buf without a guard, panicking on (*nil)[:n]. Skip orphan CQEs with a counter for visibility. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/netlinker_iouring.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/xtcp/netlinker_iouring.go b/pkg/xtcp/netlinker_iouring.go index 627fab3..40d7df5 100644 --- a/pkg/xtcp/netlinker_iouring.go +++ b/pkg/xtcp/netlinker_iouring.go @@ -235,6 +235,15 @@ func (x *XTCP) handleRecvCQE(ctx context.Context, ring *xio.Ring, nsName *string x.pC.WithLabelValues("NetlinkerIoUring", "packets", "count").Inc() x.pC.WithLabelValues("NetlinkerIoUring", "n", "count").Add(float64(n)) + // If drainOnce couldn't match the CQE to an in-flight entry (e.g. + // post-Close stragglers, or — at >2^32 SQEs — a request-ID wrap + // collision), res.Buf is nil. Dereferencing it would panic. Count + // the orphan and skip; the buffer was never ours to return. + if res.Buf == nil { + x.pC.WithLabelValues("NetlinkerIoUring", "OrphanCQE", "error").Inc() + return + } + b := (*res.Buf)[:n] _, errD := x.Deserialize(ctx, DeserializeArgs{ ns: nsName, From 8f4d5d8a1b7912d925c59232d9670f79374d6f37 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 18:01:12 -0700 Subject: [PATCH 09/30] =?UTF-8?q?Bug=2034:=20runReceiver=20hung=20on=20shu?= =?UTF-8?q?tdown=20=E2=80=94=20ReadFromUDP=20isn't=20ctx-aware?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The top-of-loop select on ctx.Done only fired between reads. A ReadFromUDP already in flight when ctx was canceled blocked forever (UDP has no read timeout by default), so runMain's wg/defer would never return — the process hung on SIGTERM. Watch ctx in a sibling goroutine and Close the conn on cancel; the blocking Read returns with EBADF / "use of closed network connection", which the loop already converts to a clean nil return via ctx.Err(). Co-Authored-By: Claude Opus 4.7 --- tools/udp_receiver_server/udp_receiver_server.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tools/udp_receiver_server/udp_receiver_server.go b/tools/udp_receiver_server/udp_receiver_server.go index 021a10c..a99f280 100644 --- a/tools/udp_receiver_server/udp_receiver_server.go +++ b/tools/udp_receiver_server/udp_receiver_server.go @@ -93,6 +93,21 @@ func runReceiver(ctx context.Context, conn *net.UDPConn) error { xtcpRecord, _ := xtcpRecordPool.Get().(*xtcp_flat_record.Envelope_XtcpFlatRecord) //nolint:errcheck // pool.Get returns the type from pool.New defer xtcpRecordPool.Put(xtcpRecord) + // Close the connection on ctx cancel so the blocking ReadFromUDP + // returns with a "use of closed network connection" error and the + // loop can observe ctx.Err(). Previously the top-of-loop ctx select + // only fired between reads — if a Read was already in flight when + // ctx was canceled, the goroutine hung forever. + stopCloseWatcher := make(chan struct{}) + defer close(stopCloseWatcher) + go func() { + select { + case <-ctx.Done(): + _ = conn.Close() //nolint:errcheck // shutdown path + case <-stopCloseWatcher: + } + }() + for i := 0; ; i++ { select { case <-ctx.Done(): From 1a4c77377ea7edb52ccbeb3267386b6be4ddc28b Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 18:07:30 -0700 Subject: [PATCH 10/30] Bug 35: handleRecord cross-contaminated decoded records (no Reset) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit proto.Unmarshal merges into the destination rather than overwriting. xtcpRecord is reused across the consume loop (pool entry), so fields that were SET on record N but UNSET on record N+1 stayed populated in the decoded output — adjacent Kafka records bled into each other. Reset the destination before each Unmarshal. Co-Authored-By: Claude Opus 4.7 --- tools/kafka_topic_reader/kafka_topic_reader.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tools/kafka_topic_reader/kafka_topic_reader.go b/tools/kafka_topic_reader/kafka_topic_reader.go index 0abaea0..c3abb62 100644 --- a/tools/kafka_topic_reader/kafka_topic_reader.go +++ b/tools/kafka_topic_reader/kafka_topic_reader.go @@ -135,6 +135,12 @@ func handleRecord(i, j, records int, record *kgo.Record, xtcpRecord *xtcp_flat_r fmt.Printf("i:%d j:%d records:%d Received message from topic %s, partition %d, offset %d\n", i, j, records, record.Topic, record.Partition, record.Offset) + // proto.Unmarshal merges into the destination; without Reset, fields + // that were SET on the previous record but UNSET on this one would + // stay populated, producing decoded output that mixes adjacent + // records. xtcpRecord is reused across the consume loop (pool entry) + // so this is reachable in practice. + proto.Reset(xtcpRecord) if err := proto.Unmarshal(record.Value, xtcpRecord); err != nil { log.Printf("Error unmarshalling protobuf message: %v", err) return From 16a02b50b3ebb85cb3390a190408246673ad7a00 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 18:08:01 -0700 Subject: [PATCH 11/30] Bug 36: udp_receiver_server runReceiver same proto.Reset miss Same pattern as bug 35 in tools/kafka_topic_reader. xtcpRecord is acquired once from a pool and reused for every packet's Unmarshal; without Reset, populated fields from packet N persist into packet N+1. Demo output mixed records. Co-Authored-By: Claude Opus 4.7 --- tools/udp_receiver_server/udp_receiver_server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/udp_receiver_server/udp_receiver_server.go b/tools/udp_receiver_server/udp_receiver_server.go index a99f280..6a7d23f 100644 --- a/tools/udp_receiver_server/udp_receiver_server.go +++ b/tools/udp_receiver_server/udp_receiver_server.go @@ -124,6 +124,10 @@ func runReceiver(ctx context.Context, conn *net.UDPConn) error { return err } + // proto.Unmarshal merges; without Reset, fields set on record N + // linger into record N+1 because xtcpRecord is reused across the + // loop (pool entry). Reset before each Unmarshal. + proto.Reset(xtcpRecord) if uerr := proto.Unmarshal((*packetBuffer)[:n], xtcpRecord); uerr != nil { return fmt.Errorf("%w: %v", ErrDecode, uerr) } From 29833ee0013de1466856c226c3bc9a00b29f80a0 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 18:38:34 -0700 Subject: [PATCH 12/30] Bug 37: pollAllNetlinkSockets miscounted polled fds (off-by-1 from xtcpNS) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GetNetlinkSocketFDs pulls every fd from nsMap including the xtcpNS fd, but the loop skips xtcpNS (it's xtcp's own bookkeeping namespace, not a target for inet_diag dumps). The function then returned len(socketFDs) — counting the skipped fd as if it had been polled. The Poller used this count to track how many "done" signals to expect, so every poll cycle waited for one extra done that never arrived, falling back to the PollTimeoutTimer instead of advancing on natural signals. Return the actual polled-fd count. Update + extend the existing TestPollAllNetlinkSockets_skipsXtcpNS to assert the correct count, and add a mixed test (one xtcpNS + one regular fd → count=1). Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/poller.go | 10 +++++++++- pkg/xtcp/poller_pure_test.go | 31 +++++++++++++++++++++++++------ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pkg/xtcp/poller.go b/pkg/xtcp/poller.go index 377952b..af2f2e5 100644 --- a/pkg/xtcp/poller.go +++ b/pkg/xtcp/poller.go @@ -166,6 +166,7 @@ func (x *XTCP) pollAllNetlinkSockets(pollingLoops uint64) (count int) { x.updateNetlinkRequestSequenceNumber(pollingLoops) socketFDs := x.GetNetlinkSocketFDs() + polled := 0 for i, socketFD := range socketFDs { if ns, ok := x.fdToNsMap.Load(socketFD); ok { nsStr, _ := ns.(string) //nolint:errcheck // fdToNsMap values are strings @@ -177,6 +178,7 @@ func (x *XTCP) pollAllNetlinkSockets(pollingLoops uint64) (count int) { continue } x.poll(socketFD) + polled++ if x.debugLevel > 10 { log.Printf("pollAllNetlinkSockets Poll i:%d fd:%d", i, socketFD) } @@ -186,7 +188,13 @@ func (x *XTCP) pollAllNetlinkSockets(pollingLoops uint64) (count int) { // restart the timeout timer x.pollTimeoutTimer.Reset(x.config.PollTimeout.AsDuration()) - return len(socketFDs) + // Return the count of fds we actually issued a poll against, NOT + // len(socketFDs). The xtcpNS fd is in socketFDs but is skipped above + // — counting it would tell Poller to expect one more done signal + // than will ever arrive, so count never drops to 0 and every cycle + // waits for the PollTimeoutTimer to fire instead of advancing on + // the natural netlinker-done signals. + return polled } func (x *XTCP) updateNetlinkRequestSequenceNumber(pollingLoops uint64) { diff --git a/pkg/xtcp/poller_pure_test.go b/pkg/xtcp/poller_pure_test.go index 0543e4a..1eae8f4 100644 --- a/pkg/xtcp/poller_pure_test.go +++ b/pkg/xtcp/poller_pure_test.go @@ -71,18 +71,37 @@ func TestPollAllNetlinkSockets_emptyFDs(t *testing.T) { } } -// pollAllNetlinkSockets skips the xtcpNS namespace's fd. GetNetlinkSocketFDs -// pulls socketFD values out of nsMap (not fdToNsMap), so we have to seed -// both: nsMap gives the fd; fdToNsMap gives the ns name lookup used by -// the skip check. +// pollAllNetlinkSockets skips the xtcpNS namespace's fd AND excludes it +// from the returned count. GetNetlinkSocketFDs pulls socketFD values out +// of nsMap (not fdToNsMap), so we have to seed both: nsMap gives the fd; +// fdToNsMap gives the ns name lookup used by the skip check. +// +// The returned count must equal the number of fds actually polled, not +// len(socketFDs). Counting the skipped xtcpNS fd would make Poller wait +// for one extra "done" signal that never arrives, forcing every poll +// cycle to fall back to the PollTimeoutTimer. func TestPollAllNetlinkSockets_skipsXtcpNS(t *testing.T) { x := newPollerFixture(t) x.nsMap.Store(linuxNetNSDirCst+xtcpNSName, netNSitem{socketFD: 42}) x.fdToNsMap.Store(42, linuxNetNSDirCst+xtcpNSName) x.debugLevel = 200 // hit the skip-log branch got := x.pollAllNetlinkSockets(0) - if got != 1 { // 1 socket in nsMap, but it was skipped (not polled) - t.Errorf("count = %d, want 1", got) + if got != 0 { + t.Errorf("count = %d, want 0 (xtcpNS was skipped and must not count)", got) + } +} + +// Mixed case: one xtcpNS fd (skipped) + one regular fd (polled). Count +// should be 1 — the polled one. +func TestPollAllNetlinkSockets_skipsXtcpNSAmongstOthers(t *testing.T) { + x := newPollerFixture(t) + x.nsMap.Store(linuxNetNSDirCst+xtcpNSName, netNSitem{socketFD: 42}) + x.fdToNsMap.Store(42, linuxNetNSDirCst+xtcpNSName) + x.nsMap.Store("/run/netns/other", netNSitem{socketFD: 7}) + x.fdToNsMap.Store(7, "/run/netns/other") + got := x.pollAllNetlinkSockets(0) + if got != 1 { + t.Errorf("count = %d, want 1 (one polled, one skipped)", got) } } From e3e58545db425494736982f1e2069cd441c4ead3 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 18:40:09 -0700 Subject: [PATCH 13/30] Bug 38: checkDirectoryExists nil-deref on non-not-exist Stat errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit os.Stat returns (nil, err) on any failure. The previous body only special-cased ErrNotExist (returning false), then fell through to info.IsDir() — which panics on a nil receiver. Permission-denied on a restricted mount or EIO on a flaky filesystem both reach that code path. Treat any Stat error as "no" and only dereference info on the success path. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/ns_watch.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/xtcp/ns_watch.go b/pkg/xtcp/ns_watch.go index f172b5b..52a4bfa 100644 --- a/pkg/xtcp/ns_watch.go +++ b/pkg/xtcp/ns_watch.go @@ -102,10 +102,16 @@ breakPoint: return nil } -// checkDirectoryExists checks if a directory exists +// checkDirectoryExists checks if a directory exists. +// +// The previous body only special-cased ErrNotExist, then unconditionally +// dereferenced info — a non-not-exist Stat error (EACCES on a permission- +// restricted mount, EIO on a flaky filesystem) leaves info==nil and the +// info.IsDir() call panics. Treat any Stat error as "no" and only +// dereference info on the success path. func checkDirectoryExists(dir string) bool { info, err := os.Stat(dir) - if os.IsNotExist(err) { + if err != nil { return false } return info.IsDir() From c89b02de86dbd81549d1fa6b0a109eab2fc63533 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 18:42:33 -0700 Subject: [PATCH 14/30] Bug 39: SOCKOPT attribute used DeserializeCGroupIDXTCP placeholder pkg/xtcp/deserializers.go registered DeserializeCGroupIDXTCP under SockOptEnumValueCst (22) instead of DeserializeSockOptXTCP. The correct SockOpt parser couldn't be plugged in because its signature took *Envelope_XtcpFlatRecord rather than *XtcpFlatRecord, so it didn't match the dispatch map's function-pointer type. Runtime consequence: every INET_DIAG_SOCKOPT attribute (2 bytes wide) was fed to the CGroupID parser, which requires 8 bytes; CGroupID returned ErrCGroupIDSmall every time, SockOpt was never populated in the emitted XtcpFlatRecord, and per-socket TCP socket-option metadata silently disappeared from every record. Fix both: re-type DeserializeSockOptXTCP to take *XtcpFlatRecord, and register it under the SockOpt enum. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/deserializers.go | 8 +++++++- pkg/xtcpnl/xtcpnl_inet_diag_sockopt.go | 9 ++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/xtcp/deserializers.go b/pkg/xtcp/deserializers.go index 7c48a40..b34b71c 100644 --- a/pkg/xtcp/deserializers.go +++ b/pkg/xtcp/deserializers.go @@ -154,9 +154,15 @@ func (x *XTCP) InitDeserializers(wg *sync.WaitGroup) { } // INET_DIAG_SOCKOPT 22 + // Previously registered DeserializeCGroupIDXTCP here as a workaround + // because DeserializeSockOptXTCP had the wrong target type + // (*Envelope_XtcpFlatRecord). With the sockopt deserializer's + // signature corrected, register the actual SockOpt parser — the + // CGroupID one was decoding 8 bytes against the 2-byte SOCKOPT + // payload, silently erroring out and leaving SockOpt unpopulated. key = dsKeySockopt if _, exists := x.config.EnabledDeserializers.Enabled[key]; exists { - x.RTATypeDeserializer[xtcpnl.SockOptEnumValueCst] = xtcpnl.DeserializeCGroupIDXTCP + x.RTATypeDeserializer[xtcpnl.SockOptEnumValueCst] = xtcpnl.DeserializeSockOptXTCP x.RTATypeDeserializerStr[xtcpnl.SockOptEnumValueCst] = key } diff --git a/pkg/xtcpnl/xtcpnl_inet_diag_sockopt.go b/pkg/xtcpnl/xtcpnl_inet_diag_sockopt.go index 22dfecb..b99b8a3 100644 --- a/pkg/xtcpnl/xtcpnl_inet_diag_sockopt.go +++ b/pkg/xtcpnl/xtcpnl_inet_diag_sockopt.go @@ -94,7 +94,14 @@ func DeserializeSockOptReflection(data []byte, c *SockOpt) (n int, err error) { return n, err } -func DeserializeSockOptXTCP(data []byte, x *xtcp_flat_record.Envelope_XtcpFlatRecord) (err error) { +// DeserializeSockOptXTCP reads an INET_DIAG_SOCKOPT (22) attribute into +// XtcpFlatRecord.SockOpt. Previously typed against the wrong target +// (*Envelope_XtcpFlatRecord), which didn't match the runtime dispatch +// map signature in pkg/xtcp/deserializers.go — the dispatch entry had +// to be filled with DeserializeCGroupIDXTCP as a placeholder, so the +// SockOpt field was never populated in production (and the CGroupID +// parser silently errored on the 2-byte payload). +func DeserializeSockOptXTCP(data []byte, x *xtcp_flat_record.XtcpFlatRecord) (err error) { if len(data) < SockOptSizeCst { return ErrSockOptSmall From 425b09ca80943559dda654e7fdd0513855e3ec29 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 18:46:48 -0700 Subject: [PATCH 15/30] Bug 40: netNamespaceInstance leaked one fd + goroutine per nsDelete MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit netNamespaceInstance blocked on the parent (daemon-lifetime) ctx, but nsDelete's cancel() reached only the inner nsCtx created inside createNetlinkersAndStore — which the netlinkers used, not the namespace-instance goroutine. Result: deletes stopped the netlinkers but left netNamespaceInstance blocked, so its deferred closeSocket(socketFD) never fired. Each namespace removal leaked one netlink fd + one Go runtime goroutine until daemon shutdown. With namespace churn (short-lived containers), this exhausts fd limits. Lift the nsCtx/nsCancel creation up into netNamespaceInstance, pass both into createNetlinkersAndStore, and block on the nsCtx. nsDelete already calls nsi.cancel() — which is now the same cancel, so both the netlinkers AND the namespace-instance goroutine exit cleanly, firing the deferred close. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/ns_createNetlinkersAndStore.go | 11 ++++++++--- pkg/xtcp/ns_extra_test.go | 8 ++++++-- pkg/xtcp/ns_net_namespace.go | 15 +++++++++++---- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/pkg/xtcp/ns_createNetlinkersAndStore.go b/pkg/xtcp/ns_createNetlinkersAndStore.go index 4c29e50..2ba6f9a 100644 --- a/pkg/xtcp/ns_createNetlinkersAndStore.go +++ b/pkg/xtcp/ns_createNetlinkersAndStore.go @@ -6,7 +6,14 @@ import ( "sync" ) -func (x *XTCP) createNetlinkersAndStore(ctx context.Context, nsName *string, fd int) { +// createNetlinkersAndStore takes the per-ns context/cancel from the caller +// (netNamespaceInstance) so that nsDelete's cancel() reaches BOTH the +// netlinkers AND netNamespaceInstance's blocking <-nsCtx.Done(). Previously +// the cancel was created locally here and only reached the netlinkers, +// leaving netNamespaceInstance blocked on the parent (daemon-lifetime) +// context — its deferred closeSocket(fd) never fired on a delete, leaking +// one netlink fd + one goroutine per namespace removed. +func (x *XTCP) createNetlinkersAndStore(nsCtx context.Context, nsCancel context.CancelFunc, nsName *string, fd int) { x.pC.WithLabelValues("createWorksAndStore", "start", "counter").Inc() @@ -14,8 +21,6 @@ func (x *XTCP) createNetlinkersAndStore(ctx context.Context, nsName *string, fd x.setSocketTimeoutViaSyscall(int64(x.config.NlTimeoutMilliseconds), fd) } - nsCtx, nsCancel := context.WithCancel(ctx) - wg := new(sync.WaitGroup) nsi := netNSitem{ name: nsName, diff --git a/pkg/xtcp/ns_extra_test.go b/pkg/xtcp/ns_extra_test.go index e856729..131d788 100644 --- a/pkg/xtcp/ns_extra_test.go +++ b/pkg/xtcp/ns_extra_test.go @@ -78,7 +78,9 @@ func TestCreateNetlinkersAndStore_zeroNetlinkers(t *testing.T) { x.Netlinker = func(_ context.Context, _ *sync.WaitGroup, _ *string, _ int, _ uint32) {} x.debugLevel = 11 // hit the log branch nsName := "test-ns" - x.createNetlinkersAndStore(context.Background(), &nsName, fds[0]) + nsCtx, nsCancel := context.WithCancel(context.Background()) + defer nsCancel() + x.createNetlinkersAndStore(nsCtx, nsCancel, &nsName, fds[0]) if _, ok := x.nsMap.Load(nsName); !ok { t.Error("nsMap should contain the new ns entry") @@ -136,7 +138,9 @@ func TestCreateNetlinkersAndStore_spawnsNetlinkers(t *testing.T) { } x.debugLevel = 11 nsName := "spawn-store-ns" - x.createNetlinkersAndStore(context.Background(), &nsName, fds[0]) + nsCtx, nsCancel := context.WithCancel(context.Background()) + defer nsCancel() + x.createNetlinkersAndStore(nsCtx, nsCancel, &nsName, fds[0]) ran.Wait() if _, ok := x.nsMap.Load(nsName); !ok { diff --git a/pkg/xtcp/ns_net_namespace.go b/pkg/xtcp/ns_net_namespace.go index ee95c71..7f68125 100644 --- a/pkg/xtcp/ns_net_namespace.go +++ b/pkg/xtcp/ns_net_namespace.go @@ -89,14 +89,20 @@ func (x *XTCP) netNamespaceInstance(ctx context.Context, nsName *string) { return } - x.createNetlinkersAndStore(ctx, nsName, socketFD) + // Derive a per-ns context here so that nsDelete's cancel() reaches + // the <-nsCtx.Done() below; otherwise this goroutine blocks on the + // daemon's parent ctx and the deferred closeSocket(socketFD) never + // fires on a per-namespace removal — leaking one fd + one goroutine + // per nsDelete. + nsCtx, nsCancel := context.WithCancel(ctx) + x.createNetlinkersAndStore(nsCtx, nsCancel, nsName, socketFD) x.pH.WithLabelValues("netNamespaceInstance", "store", "counter").Observe(time.Since(startTime).Seconds()) x.closeFD(fd) - // block waiting for done - <-ctx.Done() + // block waiting for done (per-ns ctx, not parent — see comment above) + <-nsCtx.Done() x.pC.WithLabelValues("netNamespaceInstance", "ctx.Done", "count").Inc() } @@ -138,7 +144,8 @@ func (x *XTCP) openDefaultNetLinkSocket(ctx context.Context) { } df := "default" - x.createNetlinkersAndStore(ctx, &df, socketFD) + nsCtx, nsCancel := context.WithCancel(ctx) + x.createNetlinkersAndStore(nsCtx, nsCancel, &df, socketFD) if x.debugLevel > 10 { log.Printf("openDefaultNetLinkSocket default net namespace netlink socket stored") From 9b1e4e7552e2efe61127cdd6de39b67e0332c015 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 18:48:21 -0700 Subject: [PATCH 16/30] Bug 41: reconcileMaps leaked netlinkers + fd on backstop deletes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When fsnotify misses a namespace removal, reconcileMaps catches it on the next 5-minute tick and Deletes the destMap entry. The previous code Deleted but didn't call the netNSitem's cancel — so the netNamespaceInstance goroutine (post bug 40 fix) and its in-flight netlinkers kept running with no map registration, and the netlink socket fd never got closed. Cancel the netNSitem before deleting. testing=true callers may pass arbitrary value types, so type-assert first and only cancel when the value is actually a netNSitem. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/ns_reconcile.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/xtcp/ns_reconcile.go b/pkg/xtcp/ns_reconcile.go index ba07946..64209fa 100644 --- a/pkg/xtcp/ns_reconcile.go +++ b/pkg/xtcp/ns_reconcile.go @@ -79,6 +79,19 @@ func (x *XTCP) reconcileMaps(ctx context.Context, srcMap, destMap *sync.Map, tes // discoverNamespaces stores all its values as nil. srcValue, ok := srcMap.Load(key) if !ok || (srcValue != nil && srcValue != value) { + // In production, destMap values are netNSitem structs that + // own a cancel func + an in-flight netNamespaceInstance + // goroutine + open netlink socketFD. Just deleting the map + // entry leaves all of that orphaned. Cancel first so the + // per-ns ctx fires, the netlinkers exit, and the deferred + // closeSocket in netNamespaceInstance runs. testing=true + // callers may pass arbitrary value types; only invoke + // cancel when the value is actually a netNSitem. + if !testing { + if item, isItem := value.(netNSitem); isItem && item.cancel != nil { + item.cancel() + } + } destMap.Delete(key) deleteCount++ } From bb38765aa30ca6a3e741e590ba52ff70beee1a27 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:14:55 -0700 Subject: [PATCH 17/30] =?UTF-8?q?Bug=2042:=20stream()=20spun=20CPU=20on=20?= =?UTF-8?q?io.EOF=20=E2=80=94=20should=20break,=20not=20continue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the server closes the stream cleanly, Recv returns io.EOF. Subsequent Recv calls on the same closed stream keep returning EOF, so the `if err == io.EOF { continue }` branch pegged a core forever. The intended behavior is to break out and let singleStreamingClient's reconnect-with-sleep loop establish a fresh stream. Co-Authored-By: Claude Opus 4.7 --- cmd/xtcp2client/xtcp2client.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/xtcp2client/xtcp2client.go b/cmd/xtcp2client/xtcp2client.go index 1a9c12d..87c3035 100644 --- a/cmd/xtcp2client/xtcp2client.go +++ b/cmd/xtcp2client/xtcp2client.go @@ -370,7 +370,12 @@ breakPoint: var flatRecordsResponse *xtcp_flat_record.FlatRecordsResponse flatRecordsResponse, err = stream.Recv() if err == io.EOF { - continue + // End of stream — the server closed cleanly. Subsequent + // Recv calls keep returning io.EOF, so `continue` here + // pegged a CPU core. Break out of the loop so the + // reconnect-with-sleep path in singleStreamingClient + // re-establishes a fresh stream. + break breakPoint } if err != nil { From 0ee30f141372c6245ba0b54f5ec58dd7c7b8fa4d Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:17:55 -0700 Subject: [PATCH 18/30] Bug 43: kafka_to_clickhouse primaryFunction sleep ignored ctx time.Sleep(c.loopsSleep) at the bottom of the loop wasn't ctx-aware, so SIGTERM took up to loopsSleep (default 10s) to be observed. Switch to a select with ctx.Done so shutdown is prompt. Co-Authored-By: Claude Opus 4.7 --- cmd/kafka_to_clickhouse/kafka_to_clickhouse.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go b/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go index 52cf85b..cfcfcf5 100644 --- a/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go +++ b/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go @@ -172,7 +172,14 @@ func primaryFunction(ctx context.Context, c config) { log.Printf("primaryFunction i:%d", i) } fileOrKafka(ctx, c, &binaryData) - time.Sleep(c.loopsSleep) + // Bug fix: time.Sleep ignored ctx, so SIGTERM took up to + // loopsSleep (default 10s) to be observed. Use a ctx-aware + // wait so shutdown is prompt. + select { + case <-ctx.Done(): + return + case <-time.After(c.loopsSleep): + } } } From bbb4d045dcd99340fff94e3ecd959e3a452e5a19 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:20:29 -0700 Subject: [PATCH 19/30] Bug 44: SetPollFrequency could wedge gRPC handler forever MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit *c.changePollFrequencyCh <- ... is a blocking send. The channel has a buffer of 2, so the third call without a poller draining (poller stopped, paused, mid-shutdown) blocks forever — pegging the gRPC handler goroutine and never returning to the client. Switch to a select with the request ctx, the server ctx, and a non-blocking default. If the channel is full, drop the coalesced frequency-change and surface a metrics counter; if either ctx fires, return a clean Canceled / Unavailable status. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/grpc_configService.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/pkg/xtcp/grpc_configService.go b/pkg/xtcp/grpc_configService.go index ea4db2c..52c5d5e 100644 --- a/pkg/xtcp/grpc_configService.go +++ b/pkg/xtcp/grpc_configService.go @@ -148,7 +148,24 @@ func (c *xtcpConfigService) SetPollFrequency( c.config.PollFrequency = in.PollFrequency c.config.PollTimeout = in.PollTimeout - *c.changePollFrequencyCh <- c.config.PollFrequency.AsDuration() + // Send the new poll frequency to the poller. The channel is buffered + // (size 2), so two sends can succeed without a reader; the third + // would block forever — pegging the gRPC handler goroutine — if the + // poller stopped reading (mid-shutdown, paused, etc.). Use ctx-aware + // select with a non-blocking default fallback so a coalesced + // frequency-change is dropped (the next caller will resend) rather + // than wedging the RPC. + select { + case *c.changePollFrequencyCh <- c.config.PollFrequency.AsDuration(): + case <-ctx.Done(): + c.pC.WithLabelValues("SetPollFrequency", "ctxDone", "count").Inc() + return nil, status.Error(codes.Canceled, ctx.Err().Error()) + case <-c.ctx.Done(): + c.pC.WithLabelValues("SetPollFrequency", "serverCtxDone", "count").Inc() + return nil, status.Error(codes.Unavailable, "server shutting down") + default: + c.pC.WithLabelValues("SetPollFrequency", "chFull", "count").Inc() + } if c.debugLevel > 10 { log.Printf("SetPollFrequency c.config.PollFrequency:%0.2f c.config.PollTimeout:%0.2f", From f6643ab9cfae0867e230f8c68a2f9b8efcc2a34e Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:21:36 -0700 Subject: [PATCH 20/30] Bug 45: PollFlatRecords blocking send wedged gRPC stream handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit *s.pollRequestCh <- struct{}{} same shape as bug 44 — buffered chan of size 2, third call without a poller drain blocks the streaming-RPC handler forever. PollFlatRecords clients periodically request polls; during a quiescent or shutting-down poller, the third request hung the bidi stream loop. Switch to a select with the stream ctx, the service ctx, and a non-blocking default. Drop coalesced pokes (the poller's tick will re-fire) rather than blocking. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/grpc_flatRecordService.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/xtcp/grpc_flatRecordService.go b/pkg/xtcp/grpc_flatRecordService.go index 3701f7f..7cf96b8 100644 --- a/pkg/xtcp/grpc_flatRecordService.go +++ b/pkg/xtcp/grpc_flatRecordService.go @@ -174,7 +174,22 @@ func (s *xtcpFlatRecordService) PollFlatRecords( } continue } - *s.pollRequestCh <- struct{}{} + // Buffered channel of size 2 — third in-flight poke would block + // forever if the poller isn't draining (mid-shutdown, paused, + // already-polling state). Non-blocking send so the RPC handler + // stays responsive; observe both the stream ctx and the + // service-level ctx so coalesced pokes don't wedge teardown. + select { + case *s.pollRequestCh <- struct{}{}: + case <-ctx.Done(): + s.pC.WithLabelValues("PollFlatRecords", "ctxDone", "count").Inc() + return ctx.Err() + case <-s.ctx.Done(): + s.pC.WithLabelValues("PollFlatRecords", "serverCtxDone", "count").Inc() + return s.ctx.Err() + default: + s.pC.WithLabelValues("PollFlatRecords", "chFull", "count").Inc() + } if s.debugLevel > 10 { log.Printf("PollFlatRecords *s.pollRequestCh <- struct{}{}") } From f2ea5352a5a65e89d3742e3269db2bc23abbe77b Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:40:43 -0700 Subject: [PATCH 21/30] Bug 46: encodeLengthDelimitedProtobufList dropped varint return value MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit protowire.AppendVarint returns the appended slice; the previous code called it as if it mutated in place and ignored the return. The length prefix was therefore never written. Non-envelope output was raw protobuf bytes — ClickHouse readers expecting the function's namesake LengthDelimited format misparsed every file. Use the return value the way encodeLengthDelimitedEnvelope already does. Co-Authored-By: Claude Opus 4.7 --- cmd/clickhouse_protobuflist/clickhouse_protobuflist.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/clickhouse_protobuflist/clickhouse_protobuflist.go b/cmd/clickhouse_protobuflist/clickhouse_protobuflist.go index e965652..9a3dfc5 100644 --- a/cmd/clickhouse_protobuflist/clickhouse_protobuflist.go +++ b/cmd/clickhouse_protobuflist/clickhouse_protobuflist.go @@ -26,7 +26,14 @@ func encodeLengthDelimitedProtobufList(r *clickhouse_protolist.Envelope_Record) } log.Printf("AppendVarint of length:%d", len(recordBytes)) - protowire.AppendVarint(result, uint64(len(recordBytes))) + // protowire.AppendVarint returns the appended slice — the previous + // code dropped the return value, so the length prefix was never + // actually written. Non-envelope mode emitted raw record bytes + // without the length-delim wrapper its name advertises (ClickHouse + // readers expecting LengthDelimited misparsed the file). Use the + // return value the way encodeLengthDelimitedEnvelope below already + // does. + result = protowire.AppendVarint(result, uint64(len(recordBytes))) result = append(result, recordBytes...) From fd1257e4a37529221c07306ae6bee7505ff0b658 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:42:32 -0700 Subject: [PATCH 22/30] Bug 47: Ring.Close leaked in-flight buffers when drain deadline expired MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If pending CQEs hadn't arrived by drainTimeout, Close called QueueExit and threw away r.inFlight. The kernel released its SQEs, but the userspace-side packetBufferPool buffers tracked in inFlight were never handed back to the caller — each ring teardown leaked up to inFlightCap (= sqEntries * 2, default 512) buffers per close. For a daemon with namespace add/remove churn this drains the packet pool. Iterate any remaining in-flight entries before QueueExit and feed them through the onDrain callback with Res=-ETIME so the caller knows they were abandoned at teardown. onRingClosedResult already handles Buf!=nil correctly — the buffers get Put back to packetBufferPool / destBytesPool as usual. Co-Authored-By: Claude Opus 4.7 --- pkg/io_uring/ring.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pkg/io_uring/ring.go b/pkg/io_uring/ring.go index 6c7dc3e..dac3683 100644 --- a/pkg/io_uring/ring.go +++ b/pkg/io_uring/ring.go @@ -180,6 +180,26 @@ func (r *Ring) Close(drainTimeout time.Duration, onDrain func(Result)) { } } } + // Drain any in-flight entries that didn't get a CQE within the + // deadline. The kernel still owns these buffers until QueueExit + // reclaims the ring, so we can't safely reuse them yet — but we + // MUST hand them back to the caller's drain callback before the + // inFlight map vanishes, otherwise the userspace-side packet pool + // leaks N buffers per ring close (up to inFlightCap each). Mark + // these as a synthetic ETIME-style result (Res=-syscall.ETIME) so + // the callback can tell apart "normal CQE with bytes" from "abandoned + // at teardown". + if onDrain != nil { + for reqID, entry := range r.inFlight { + onDrain(Result{ + Op: entry.op, + Res: -int32(syscall.ETIME), + Buf: entry.buf, + HdrBytes: entry.wvHdr, + }) + delete(r.inFlight, reqID) + } + } r.r.QueueExit() r.r = nil } From 445efa51a2985bda573a0e4d35494c67469d445b Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:45:41 -0700 Subject: [PATCH 23/30] Bug 48: netlinkerIoUring log.Fatalf paths killed whole daemon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two failure points called log.Fatalf — ring init failure (xio.New) and SQE prefill failure (iouringPrefillRecvs). A single namespace's io_uring setup failure (kernel build doesn't have the required ops, fd quota hit, etc.) would take down the gRPC services, the poller, and every other namespace's netlinkers along with it. Demote both to log.Printf + return so deferred wg.Done + UnlockOSThread + ring.Close (where applicable) fire cleanly. The explicit pre-Fatalf wg.Done() / UnlockOSThread() the previous code did was also incorrect — in mocked-fatalf tests, the function would continue with ring==nil and double-decrement the WaitGroup. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/netlinker_iouring.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/xtcp/netlinker_iouring.go b/pkg/xtcp/netlinker_iouring.go index 40d7df5..e7d31d0 100644 --- a/pkg/xtcp/netlinker_iouring.go +++ b/pkg/xtcp/netlinker_iouring.go @@ -70,9 +70,21 @@ func (x *XTCP) netlinkerIoUring(ctx context.Context, wg *sync.WaitGroup, nsName CQEBatchSize: cqeBatch, }) if err != nil { - wg.Done() // release the WG explicitly; log.Fatalf skips the deferred Done - runtime.UnlockOSThread() // unpin before exit; deferred UnlockOSThread skipped - log.Fatalf("netlinkerIoUring %d ring init: %v", id, err) //nolint:gocritic // exitAfterDefer: deferred wg.Done() + UnlockOSThread released explicitly above + // The previous code did `wg.Done(); UnlockOSThread(); log.Fatalf(...)` + // with the explicit cleanup justified as "log.Fatalf skips the + // deferred Done." That reasoning fails in two directions: + // * Production: log.Fatalf calls os.Exit, killing every + // goroutine — wg cleanup is irrelevant either way. + // * Tests where fatalf is mocked to return: the function would + // continue past Fatalf with ring==nil, panic on ring.Close + // in the deferred teardown, and the deferred wg.Done would + // then DOUBLE-decrement the WaitGroup that the explicit + // wg.Done already touched. + // Just log + return; the deferred wg.Done and UnlockOSThread + // handle cleanup once, and a mocked fatalf no longer leaves the + // function half-initialised. + log.Printf("netlinkerIoUring %d ring init: %v", id, err) + return } x.rings.Store(id, ring) defer func() { @@ -88,7 +100,12 @@ func (x *XTCP) netlinkerIoUring(ctx context.Context, wg *sync.WaitGroup, nsName // gets pinned in the ring's in-flight map; the kernel will fill them // as netlink datagrams arrive. if perr := x.iouringPrefillRecvs(ring, fd, batch); perr != nil { - log.Fatalf("netlinkerIoUring %d prefill: %v", id, perr) + // Demoted from log.Fatalf: a single namespace's prefill failure + // shouldn't kill the whole daemon (gRPC services, poller, every + // other namespace's netlinkers). Log + return so the deferred + // ring.Close + wg.Done + UnlockOSThread fire normally. + log.Printf("netlinkerIoUring %d prefill: %v", id, perr) + return } if _, serr := ring.Submit(); serr != nil { log.Printf("netlinkerIoUring %d initial Submit: %v", id, serr) From 5fe5d896d1d5af5272c49f8356802304ca0a30a1 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:47:49 -0700 Subject: [PATCH 24/30] Bug 49: setSocketTimeoutViaSyscall log.Fatalf killed whole daemon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Called per-namespace from createNetlinkersAndStore. A single namespace's SO_RCVTIMEO setsockopt failure (rare — maybe fd quota, maybe a closed fd race) used to tear down every gRPC service, every other namespace's netlinkers, and the poller. Demote to a counter + debug log so the affected namespace's netlinker won't observe ctx cancellation between recvs (visible via the counter), but the rest of the daemon stays up. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/ns_net_namespace.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/xtcp/ns_net_namespace.go b/pkg/xtcp/ns_net_namespace.go index 7f68125..f8668b3 100644 --- a/pkg/xtcp/ns_net_namespace.go +++ b/pkg/xtcp/ns_net_namespace.go @@ -340,6 +340,17 @@ func (x *XTCP) setSocketTimeoutViaSyscall(timeout int64, socketFD int) { // https://godoc.org/golang.org/x/sys/unix#SetsockoptTimeval err := syscall.SetsockoptTimeval(socketFD, syscall.SOL_SOCKET, syscall.SO_RCVTIMEO, &tv) if err != nil { - log.Fatalf("SetSocketTimeoutViaSyscall SetsockopttimeSpec %s", err) + // Demoted from log.Fatalf: this is called per-namespace from + // createNetlinkersAndStore. A SO_RCVTIMEO setsockopt failure on + // one namespace's fd shouldn't tear down the whole daemon (every + // gRPC service, every other namespace's netlinkers, the poller). + // Without the timeout the netlinker can't observe ctx + // cancellation between recv calls — record that in a counter so + // the operator can see the affected namespace can't shut down + // cleanly, but keep the rest of the daemon running. + x.pC.WithLabelValues("setSocketTimeoutViaSyscall", "SetsockoptTimeval", "error").Inc() + if x.debugLevel > 10 { + log.Printf("setSocketTimeoutViaSyscall SetsockoptTimeval err: %v", err) + } } } From 9f3234d846c1fc162b7040b735fa19f533c72e27 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:51:59 -0700 Subject: [PATCH 25/30] Bug 50: BBRv2/v3 congestion records mis-labelled as BBRv1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DeserializeCongInfoXTCP matched on the 3-byte "bbr" prefix and always assigned CONGESTION_ALGORITHM_BBR1, even though the XtcpFlatRecord proto defines distinct BBR1 / BBR2 / BBR3 enum values. The original comment hand-waved this as "preserving original behavior" — operators running BBRv2/v3 had no way to distinguish them from BBRv1 in any downstream dashboard or alert. Inspect data[3] (the 4th byte of the null-terminated algorithm name): '2' → BBR2, '3' → BBR3, else → BBR1. Guarded by len(data) >= 4 so the 4-byte minimum guarantee from CongInfoSizeCst still holds. Update the existing bbr2 test to assert BBR2 and add bbr1 / bbr3 round-trip tests. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcpnl/xtcpnl_extra_test.go | 26 ++++++++++++++++++++++++- pkg/xtcpnl/xtcpnl_inet_diag_conginfo.go | 19 +++++++++++++----- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/pkg/xtcpnl/xtcpnl_extra_test.go b/pkg/xtcpnl/xtcpnl_extra_test.go index 064f88a..782d414 100644 --- a/pkg/xtcpnl/xtcpnl_extra_test.go +++ b/pkg/xtcpnl/xtcpnl_extra_test.go @@ -221,11 +221,35 @@ func TestDeserializeCongInfoXTCP_vegas(t *testing.T) { func TestDeserializeCongInfoXTCP_bbr2(t *testing.T) { x := &xtcp_flat_record.XtcpFlatRecord{} - // "bbr2" — 3-byte prefix "bbr" matches the bbr branch + // "bbr2" — 3-byte prefix "bbr" matches the bbr branch, and data[3]=='2' + // selects the BBR2 enum variant (previously bucketed into BBR1 — bug 50). data := []byte("bbr2") if err := DeserializeCongInfoXTCP(data, x); err != nil { t.Fatalf("err = %v", err) } + if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR2 { + t.Errorf("alg = %v, want BBR2", x.CongestionAlgorithmEnum) + } +} + +func TestDeserializeCongInfoXTCP_bbr3(t *testing.T) { + x := &xtcp_flat_record.XtcpFlatRecord{} + data := []byte("bbr3") + if err := DeserializeCongInfoXTCP(data, x); err != nil { + t.Fatalf("err = %v", err) + } + if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR3 { + t.Errorf("alg = %v, want BBR3", x.CongestionAlgorithmEnum) + } +} + +func TestDeserializeCongInfoXTCP_bbr1(t *testing.T) { + x := &xtcp_flat_record.XtcpFlatRecord{} + // "bbr\0" — bbr1 (no '2' / '3' in data[3]). + data := []byte{'b', 'b', 'r', 0} + if err := DeserializeCongInfoXTCP(data, x); err != nil { + t.Fatalf("err = %v", err) + } if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR1 { t.Errorf("alg = %v, want BBR1", x.CongestionAlgorithmEnum) } diff --git a/pkg/xtcpnl/xtcpnl_inet_diag_conginfo.go b/pkg/xtcpnl/xtcpnl_inet_diag_conginfo.go index 35094bd..eb986ef 100644 --- a/pkg/xtcpnl/xtcpnl_inet_diag_conginfo.go +++ b/pkg/xtcpnl/xtcpnl_inet_diag_conginfo.go @@ -92,15 +92,24 @@ func DeserializeCongInfoXTCP(data []byte, x *xtcp_flat_record.XtcpFlatRecord) (e // Match on the first 3 bytes — the kernel attribute is a null-terminated // algorithm name like "cubic", "bbr", "dctcp", "vegas". Comparing data[0:4] // against 3-char strings would never match, so we use the 3-char prefix. - // "bbr2" (the BBRv2 variant) is also checked, with the longer literal - // taking precedence via the bbr1/bbr2 fall-through (matched first). switch string(data[0:3]) { case "cub": x.CongestionAlgorithmEnum = xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC case "bbr": - // data[3] == '2' selects BBRv2; otherwise BBRv1. Both currently use - // the same enum value (BBR1) — preserving original behavior. - x.CongestionAlgorithmEnum = xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR1 + // Distinguish bbr1 / bbr2 / bbr3 via the 4th byte. The XtcpFlatRecord + // proto defines BBR1/BBR2/BBR3 as separate enum values — previously + // every "bbr*" prefix was bucketed into BBR1, so operators couldn't + // tell BBRv2/v3 connections apart from BBRv1 in their dashboards. + // CongInfoSizeCst is 4 (sizeof "bbr\0"); only inspect data[3] when + // the buffer is long enough. + switch { + case len(data) >= 4 && data[3] == '3': + x.CongestionAlgorithmEnum = xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR3 + case len(data) >= 4 && data[3] == '2': + x.CongestionAlgorithmEnum = xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR2 + default: + x.CongestionAlgorithmEnum = xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR1 + } case "dct": x.CongestionAlgorithmEnum = xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_DCTCP case "veg": From 403eb1371f6910c979160c79ceb26345ec3fc2ac Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:57:49 -0700 Subject: [PATCH 26/30] Test: consolidate CongInfoXTCP dispatch tests into one table-driven case Eight near-identical one-off tests collapsed into a single TestDeserializeCongInfoXTCP_dispatch with a {name, data, wantAlg} table. The new row "bbr_garbage_byte_falls_back_to_bbr1" also covers the default-case of the inner BBR sub-discriminator switch (bug 50) that the previous tests didn't reach. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcpnl/xtcpnl_extra_test.go | 126 +++++++++----------------------- 1 file changed, 35 insertions(+), 91 deletions(-) diff --git a/pkg/xtcpnl/xtcpnl_extra_test.go b/pkg/xtcpnl/xtcpnl_extra_test.go index 782d414..8f77963 100644 --- a/pkg/xtcpnl/xtcpnl_extra_test.go +++ b/pkg/xtcpnl/xtcpnl_extra_test.go @@ -166,8 +166,8 @@ func TestDeserializeInetDiagMsgXTCPWG(t *testing.T) { wg.Wait() } -// DeserializeCongInfoXTCP: 4-byte prefix dispatches to one of 5 congestion -// algorithm enums. +// TestDeserializeCongInfoXTCP_short exercises the length-guard branch +// separately — every other case has the 4-byte minimum. func TestDeserializeCongInfoXTCP_short(t *testing.T) { x := &xtcp_flat_record.XtcpFlatRecord{} if err := DeserializeCongInfoXTCP([]byte{0x01}, x); err != ErrCongInfoSmall { @@ -175,94 +175,38 @@ func TestDeserializeCongInfoXTCP_short(t *testing.T) { } } -func TestDeserializeCongInfoXTCP_cubic(t *testing.T) { - x := &xtcp_flat_record.XtcpFlatRecord{} - data := []byte("cub\x00") - if err := DeserializeCongInfoXTCP(data, x); err != nil { - t.Fatalf("err = %v", err) - } - if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC { - t.Errorf("alg = %v, want CUBIC", x.CongestionAlgorithmEnum) - } -} - -func TestDeserializeCongInfoXTCP_bbr(t *testing.T) { - x := &xtcp_flat_record.XtcpFlatRecord{} - data := []byte("bbr\x00") - if err := DeserializeCongInfoXTCP(data, x); err != nil { - t.Fatalf("err = %v", err) - } - if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR1 { - t.Errorf("alg = %v, want BBR1", x.CongestionAlgorithmEnum) - } -} - -func TestDeserializeCongInfoXTCP_dctcp(t *testing.T) { - x := &xtcp_flat_record.XtcpFlatRecord{} - data := []byte("dct\x00") - if err := DeserializeCongInfoXTCP(data, x); err != nil { - t.Fatalf("err = %v", err) - } - if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_DCTCP { - t.Errorf("alg = %v, want DCTCP", x.CongestionAlgorithmEnum) - } -} - -func TestDeserializeCongInfoXTCP_vegas(t *testing.T) { - x := &xtcp_flat_record.XtcpFlatRecord{} - data := []byte("veg\x00") - if err := DeserializeCongInfoXTCP(data, x); err != nil { - t.Fatalf("err = %v", err) - } - if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_VEGAS { - t.Errorf("alg = %v, want VEGAS", x.CongestionAlgorithmEnum) - } -} - -func TestDeserializeCongInfoXTCP_bbr2(t *testing.T) { - x := &xtcp_flat_record.XtcpFlatRecord{} - // "bbr2" — 3-byte prefix "bbr" matches the bbr branch, and data[3]=='2' - // selects the BBR2 enum variant (previously bucketed into BBR1 — bug 50). - data := []byte("bbr2") - if err := DeserializeCongInfoXTCP(data, x); err != nil { - t.Fatalf("err = %v", err) - } - if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR2 { - t.Errorf("alg = %v, want BBR2", x.CongestionAlgorithmEnum) - } -} - -func TestDeserializeCongInfoXTCP_bbr3(t *testing.T) { - x := &xtcp_flat_record.XtcpFlatRecord{} - data := []byte("bbr3") - if err := DeserializeCongInfoXTCP(data, x); err != nil { - t.Fatalf("err = %v", err) - } - if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR3 { - t.Errorf("alg = %v, want BBR3", x.CongestionAlgorithmEnum) - } -} - -func TestDeserializeCongInfoXTCP_bbr1(t *testing.T) { - x := &xtcp_flat_record.XtcpFlatRecord{} - // "bbr\0" — bbr1 (no '2' / '3' in data[3]). - data := []byte{'b', 'b', 'r', 0} - if err := DeserializeCongInfoXTCP(data, x); err != nil { - t.Fatalf("err = %v", err) - } - if x.CongestionAlgorithmEnum != xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR1 { - t.Errorf("alg = %v, want BBR1", x.CongestionAlgorithmEnum) - } -} - -func TestDeserializeCongInfoXTCP_unknown(t *testing.T) { - // Unknown prefix: switch falls through to no-op (enum stays zero). - x := &xtcp_flat_record.XtcpFlatRecord{} - data := []byte("xxxx") - if err := DeserializeCongInfoXTCP(data, x); err != nil { - t.Fatalf("err = %v", err) - } - if x.CongestionAlgorithmEnum != 0 { - t.Errorf("unknown prefix should leave enum at 0; got %v", x.CongestionAlgorithmEnum) +// TestDeserializeCongInfoXTCP_dispatch is the table-driven combination of +// the previous 8 one-off tests. Each row exercises one prefix → enum +// mapping; the BBR row covers the data[3] sub-discriminator added in +// bug 50. An empty wantAlg means "enum should stay at zero" (unknown +// prefix branch). +func TestDeserializeCongInfoXTCP_dispatch(t *testing.T) { + cases := []struct { + name string + data []byte + wantAlg xtcp_flat_record.XtcpFlatRecord_CongestionAlgorithm + }{ + {"cubic", []byte("cub\x00"), xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_CUBIC}, + {"bbr1_explicit_nul", []byte{'b', 'b', 'r', 0}, xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR1}, + {"bbr1_prefix", []byte("bbr\x00"), xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR1}, + {"bbr2", []byte("bbr2"), xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR2}, + {"bbr3", []byte("bbr3"), xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR3}, + {"dctcp", []byte("dct\x00"), xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_DCTCP}, + {"vegas", []byte("veg\x00"), xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_VEGAS}, + {"unknown_prefix_stays_zero", []byte("xxxx"), 0}, + // "bbr" with garbage 4th byte (anything but '2' or '3') falls + // back to BBR1 — covers the default-case of the inner switch. + {"bbr_garbage_byte_falls_back_to_bbr1", []byte("bbrX"), xtcp_flat_record.XtcpFlatRecord_CONGESTION_ALGORITHM_BBR1}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + x := &xtcp_flat_record.XtcpFlatRecord{} + if err := DeserializeCongInfoXTCP(tc.data, x); err != nil { + t.Fatalf("err = %v", err) + } + if x.CongestionAlgorithmEnum != tc.wantAlg { + t.Errorf("alg = %v, want %v", x.CongestionAlgorithmEnum, tc.wantAlg) + } + }) } } From f40faa3934bc27d48f9202dd649c417616b75827 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 19:59:58 -0700 Subject: [PATCH 27/30] Bug 51: dialWithRetry off-by-one + non-positive-attempts pretty-print MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The for-loop bound was `for r := 1; r < attempts; r++` — one short of the advertised count. attempts=10 ran 9 iterations; attempts=1 ran 0 iterations and returned a confusing "dial X:Y: %!w()" formatted error because lastErr was never assigned. attempts<=0 hit the same trap. Fix: loop r := 0..attempts-1 (matches the count contract); reject attempts<=0 up front with an explicit error so the report doesn't contain a `%!w` placeholder. Add table-driven tests covering the zero/negative/one cases that previously slipped through. Co-Authored-By: Claude Opus 4.7 --- tools/tcp_client/tcp_client.go | 16 ++++++++-- tools/tcp_client/tcp_client_test.go | 48 +++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/tools/tcp_client/tcp_client.go b/tools/tcp_client/tcp_client.go index 819fa94..958b4df 100644 --- a/tools/tcp_client/tcp_client.go +++ b/tools/tcp_client/tcp_client.go @@ -118,11 +118,21 @@ func buildMessage(port, pads int) []byte { // dialWithRetry retries dial up to `attempts` times with linearly-increasing // timeout. Returns the first successful conn or the last non-timeout error. +// +// The previous loop bound was `for r := 1; r < attempts; r++` — one short +// of the advertised count (attempts=10 ran 9 iterations). With attempts=1 +// the loop didn't run at all, lastErr stayed nil, and the function +// returned a nonsense "dial X:Y: %!w()" error. Loop r := 0..attempts-1 +// matches the comment, and if attempts <= 0 we report it instead of +// pretending we ran a loop. func dialWithRetry(bind string, port, attempts int, baseTimeout time.Duration) (net.Conn, error) { - timeout := baseTimeout addr := fmt.Sprintf("%s:%d", bind, port) + if attempts <= 0 { + return nil, fmt.Errorf("dial %s: attempts must be > 0, got %d", addr, attempts) + } + timeout := baseTimeout var lastErr error - for r := 1; r < attempts; r++ { + for r := 0; r < attempts; r++ { dialer := net.Dialer{Timeout: timeout} dialCtx, cancel := context.WithTimeout(context.Background(), timeout) conn, err := dialer.DialContext(dialCtx, "tcp", addr) @@ -133,7 +143,7 @@ func dialWithRetry(bind string, port, attempts int, baseTimeout time.Duration) ( lastErr = err var netErr net.Error if errors.As(err, &netErr) && netErr.Timeout() { - timeout = baseTimeout + (baseTimeout * time.Duration(r)) + timeout = baseTimeout + (baseTimeout * time.Duration(r+1)) continue } return nil, err diff --git a/tools/tcp_client/tcp_client_test.go b/tools/tcp_client/tcp_client_test.go index 675c48a..23cf129 100644 --- a/tools/tcp_client/tcp_client_test.go +++ b/tools/tcp_client/tcp_client_test.go @@ -63,6 +63,54 @@ func TestDialWithRetry_connRefused(t *testing.T) { } } +// dialWithRetry rejects non-positive attempts cleanly. Previously the +// for-loop bound was `for r := 1; r < attempts; r++` so attempts <= 1 +// ran zero iterations, lastErr stayed nil, and the function returned a +// confusing `dial X:Y: %!w()` formatted error. +func TestDialWithRetry_nonPositiveAttempts(t *testing.T) { + cases := []struct { + name string + attempts int + }{ + {"zero_attempts", 0}, + {"negative_attempts", -3}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := dialWithRetry("127.0.0.1", 1, tc.attempts, 10*time.Millisecond) + if err == nil { + t.Fatalf("attempts=%d should produce an error", tc.attempts) + } + // The error should explain what went wrong, not contain + // a formatting placeholder. + if errMsg := err.Error(); strings.Contains(errMsg, "%!w") { + t.Errorf("error contains formatting placeholder: %q", errMsg) + } + }) + } +} + +// dialWithRetry with attempts=1 must execute exactly one dial. Previously +// the loop ran zero times (off-by-one) and returned a stale nil-wrapped +// error. +func TestDialWithRetry_attemptsOne(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + port := ln.Addr().(*net.TCPAddr).Port + _ = ln.Close() //nolint:errcheck // test plumbing + + // Conn-refused on attempts=1 must return a real error, not nil-wrapped. + _, err = dialWithRetry("127.0.0.1", port, 1, 50*time.Millisecond) + if err == nil { + t.Fatal("conn-refused should error") + } + if strings.Contains(err.Error(), "%!w") { + t.Errorf("attempts=1 returned formatting-placeholder error: %q", err.Error()) + } +} + func TestClientOnce_happy(t *testing.T) { a, b := net.Pipe() defer func() { _ = a.Close() }() //nolint:errcheck // test plumbing From 3d9ecfebea2268dae94f22c26a4d019fc248ef63 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 20:01:25 -0700 Subject: [PATCH 28/30] Test: table-driven TestAtoiOr0 + TestBytesIndex Convert two scalar helper tests into table-driven form so coverage gaps are obvious at a glance and edge cases are easy to add. Picks up new coverage of: - atoiOr0: whitespace-only, float-like, trailing-garbage inputs - bytesIndex: empty-both, prefix/suffix matches, needle-longer-than- haystack Co-Authored-By: Claude Opus 4.7 --- tools/quality-report/main_test.go | 66 +++++++++++++++++++------------ 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/tools/quality-report/main_test.go b/tools/quality-report/main_test.go index c28321f..372888a 100644 --- a/tools/quality-report/main_test.go +++ b/tools/quality-report/main_test.go @@ -12,20 +12,26 @@ import ( // ─────────────────────────────────────────────────────────────────────── func TestAtoiOr0(t *testing.T) { - if got := atoiOr0("42"); got != 42 { - t.Errorf("atoiOr0(42) = %d", got) - } - if got := atoiOr0(""); got != 0 { - t.Errorf("atoiOr0(empty) = %d", got) - } - if got := atoiOr0("not-a-number"); got != 0 { - t.Errorf("atoiOr0(bad) = %d", got) - } - if got := atoiOr0("0"); got != 0 { - t.Errorf("atoiOr0(0) = %d", got) + cases := []struct { + in string + want int + }{ + {"42", 42}, + {"", 0}, + {"not-a-number", 0}, + {"0", 0}, + {"-123", -123}, + {" ", 0}, // whitespace-only also unparseable + {"1.5", 0}, // floats aren't integers + {"42a", 0}, // trailing garbage + {"2147483647", 2147483647}, // int32 max round-trips } - if got := atoiOr0("-123"); got != -123 { - t.Errorf("atoiOr0(-123) = %d", got) + for _, tc := range cases { + t.Run(tc.in, func(t *testing.T) { + if got := atoiOr0(tc.in); got != tc.want { + t.Errorf("atoiOr0(%q) = %d, want %d", tc.in, got, tc.want) + } + }) } } @@ -44,20 +50,28 @@ func TestFileExists(t *testing.T) { } func TestBytesIndex(t *testing.T) { - if got := bytesIndex([]byte("hello world"), []byte("world")); got != 6 { - t.Errorf("bytesIndex = %d, want 6", got) - } - if got := bytesIndex([]byte("abc"), []byte("d")); got != -1 { - t.Errorf("bytesIndex(missing) = %d, want -1", got) - } - if got := bytesIndex([]byte("abc"), []byte("")); got != 0 { - t.Errorf("bytesIndex(empty needle) = %d, want 0", got) - } - if got := bytesIndex([]byte(""), []byte("x")); got != -1 { - t.Errorf("bytesIndex(empty haystack) = %d, want -1", got) + cases := []struct { + name string + haystack []byte + needle []byte + want int + }{ + {"middle", []byte("hello world"), []byte("world"), 6}, + {"missing", []byte("abc"), []byte("d"), -1}, + {"empty_needle", []byte("abc"), []byte(""), 0}, + {"empty_haystack", []byte(""), []byte("x"), -1}, + {"empty_both", []byte(""), []byte(""), 0}, + {"exact_match", []byte("abc"), []byte("abc"), 0}, + {"prefix_match", []byte("abcdef"), []byte("abc"), 0}, + {"suffix_match", []byte("abcdef"), []byte("def"), 3}, + {"needle_longer_than_haystack", []byte("ab"), []byte("abcd"), -1}, } - if got := bytesIndex([]byte("abc"), []byte("abc")); got != 0 { - t.Errorf("bytesIndex(exact) = %d, want 0", got) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := bytesIndex(tc.haystack, tc.needle); got != tc.want { + t.Errorf("bytesIndex(%q, %q) = %d, want %d", tc.haystack, tc.needle, got, tc.want) + } + }) } } From ecd1097952ce057fc500f936905e48869d65584b Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Mon, 18 May 2026 20:03:11 -0700 Subject: [PATCH 29/30] Test: regression tests for bug 41 reconcileMaps cancel-on-delete MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new subtests under TestReconcileMaps: - backstop_delete_calls_netNSitem_cancel: asserts the production delete path invokes netNSitem.cancel() so orphaned goroutines / fds wind down (the bug 41 fix). - backstop_delete_non_netNSitem_value_is_safe: guards the type-assertion branch — table tests pass raw strings, and the cancel logic must skip them without panicking. Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/ns_reconcile_test.go | 49 +++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/pkg/xtcp/ns_reconcile_test.go b/pkg/xtcp/ns_reconcile_test.go index 645eb14..5838f92 100644 --- a/pkg/xtcp/ns_reconcile_test.go +++ b/pkg/xtcp/ns_reconcile_test.go @@ -132,6 +132,55 @@ func TestReconcileMaps(t *testing.T) { } }) + // Bug 41 regression: a backstop delete (key in dest, not in src) + // must call netNSitem.cancel() so the orphaned per-ns goroutine + // + netlinkers + socketFD wind down. testing=false uses the + // production path which now invokes cancel before Delete. + t.Run("backstop_delete_calls_netNSitem_cancel", func(t *testing.T) { + srcMap := &sync.Map{} // empty src → every dest entry is "gone" + destMap := &sync.Map{} + + // Build a netNSitem with an observable cancel func. + var cancelCalled bool + nsName := "/run/netns/stale" + destMap.Store(nsName, netNSitem{ + name: &nsName, + cancel: func() { cancelCalled = true }, + }) + + // Need a stub XTCP with the pC CounterVec the production + // path increments via nsAdd → but with empty src nothing is + // added. Just enough to call reconcileMaps; the cancel branch + // runs before any counter increments. + x2 := newPollerFixture(t) // reuses the test helper with metrics + dels, _ := x2.reconcileMaps(context.Background(), srcMap, destMap, false) + if dels != 1 { + t.Errorf("dels = %d, want 1", dels) + } + if !cancelCalled { + t.Error("netNSitem.cancel() was not called — bug 41 regression") + } + if _, ok := destMap.Load(nsName); ok { + t.Errorf("destMap still has %q after backstop delete", nsName) + } + }) + + // Bug 41 negative case: testing=true callers may pass arbitrary + // value types (raw strings, in the table tests above). The cancel + // branch must skip the type-assertion safely instead of panicking. + t.Run("backstop_delete_non_netNSitem_value_is_safe", func(t *testing.T) { + srcMap := &sync.Map{} + destMap := &sync.Map{} + destMap.Store("k", "raw-string-not-netNSitem") + // testing=true is the path the table tests use. Even if a + // caller forgets and passes testing=false with non-netNSitem + // values, no panic should occur. + dels, _ := x.reconcileMaps(context.Background(), srcMap, destMap, false) + if dels != 1 { + t.Errorf("dels = %d, want 1", dels) + } + }) + for _, test := range tests { t.Run(test.name, func(t *testing.T) { srcMap := &sync.Map{} From c75837368fd9098807b0e5e0767484ad3445ecab Mon Sep 17 00:00:00 2001 From: "randomizedcoder dave.seddon.ca@gmail.com" Date: Sun, 14 Jun 2026 15:51:07 -0700 Subject: [PATCH 30/30] gofmt: format quality-report main_test.go (pulled forward from later in layer) Co-Authored-By: Claude Opus 4.8 --- tools/quality-report/main_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/quality-report/main_test.go b/tools/quality-report/main_test.go index 372888a..425795d 100644 --- a/tools/quality-report/main_test.go +++ b/tools/quality-report/main_test.go @@ -21,9 +21,9 @@ func TestAtoiOr0(t *testing.T) { {"not-a-number", 0}, {"0", 0}, {"-123", -123}, - {" ", 0}, // whitespace-only also unparseable - {"1.5", 0}, // floats aren't integers - {"42a", 0}, // trailing garbage + {" ", 0}, // whitespace-only also unparseable + {"1.5", 0}, // floats aren't integers + {"42a", 0}, // trailing garbage {"2147483647", 2147483647}, // int32 max round-trips } for _, tc := range cases {