Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6c19b6b
Fix two netlinker syscall-path bugs (#25, #26)
randomizedcoder May 19, 2026
767adc5
Bug 27: getLatestSchemaID had no timeout and ignored ctx
randomizedcoder May 19, 2026
f55ce87
Bug 28: kafkaDest.Close dropped in-flight records (no Flush)
randomizedcoder May 19, 2026
fc4541f
Bug 29: getLatestSchemaIDAt silently returned id:0 on 4xx/5xx
randomizedcoder May 19, 2026
0e95f93
Bug 30: kafka_to_clickhouse getLatestSchemaIDAt same 4xx/5xx → id:0
randomizedcoder May 19, 2026
6abc4ae
Bug 31: clickhouse insert wrong FORMAT for envelope=true (default)
randomizedcoder May 19, 2026
0de0009
Bug 32: pollStreamRecv spun CPU on persistent stream errors
randomizedcoder May 19, 2026
888b9ea
Bug 33: handleRecvCQE nil-deref on orphan CQEs
randomizedcoder May 19, 2026
8f4d5d8
Bug 34: runReceiver hung on shutdown — ReadFromUDP isn't ctx-aware
randomizedcoder May 19, 2026
1a4c773
Bug 35: handleRecord cross-contaminated decoded records (no Reset)
randomizedcoder May 19, 2026
16a02b5
Bug 36: udp_receiver_server runReceiver same proto.Reset miss
randomizedcoder May 19, 2026
29833ee
Bug 37: pollAllNetlinkSockets miscounted polled fds (off-by-1 from xt…
randomizedcoder May 19, 2026
e3e5854
Bug 38: checkDirectoryExists nil-deref on non-not-exist Stat errors
randomizedcoder May 19, 2026
c89b02d
Bug 39: SOCKOPT attribute used DeserializeCGroupIDXTCP placeholder
randomizedcoder May 19, 2026
425b09c
Bug 40: netNamespaceInstance leaked one fd + goroutine per nsDelete
randomizedcoder May 19, 2026
9b1e4e7
Bug 41: reconcileMaps leaked netlinkers + fd on backstop deletes
randomizedcoder May 19, 2026
bb38765
Bug 42: stream() spun CPU on io.EOF — should break, not continue
randomizedcoder May 19, 2026
0ee30f1
Bug 43: kafka_to_clickhouse primaryFunction sleep ignored ctx
randomizedcoder May 19, 2026
bbb4d04
Bug 44: SetPollFrequency could wedge gRPC handler forever
randomizedcoder May 19, 2026
f6643ab
Bug 45: PollFlatRecords blocking send wedged gRPC stream handler
randomizedcoder May 19, 2026
f2ea535
Bug 46: encodeLengthDelimitedProtobufList dropped varint return value
randomizedcoder May 19, 2026
fd1257e
Bug 47: Ring.Close leaked in-flight buffers when drain deadline expired
randomizedcoder May 19, 2026
445efa5
Bug 48: netlinkerIoUring log.Fatalf paths killed whole daemon
randomizedcoder May 19, 2026
5fe5d89
Bug 49: setSocketTimeoutViaSyscall log.Fatalf killed whole daemon
randomizedcoder May 19, 2026
9f3234d
Bug 50: BBRv2/v3 congestion records mis-labelled as BBRv1
randomizedcoder May 19, 2026
403eb13
Test: consolidate CongInfoXTCP dispatch tests into one table-driven case
randomizedcoder May 19, 2026
f40faa3
Bug 51: dialWithRetry off-by-one + non-positive-attempts pretty-print
randomizedcoder May 19, 2026
3d9ecfe
Test: table-driven TestAtoiOr0 + TestBytesIndex
randomizedcoder May 19, 2026
ecd1097
Test: regression tests for bug 41 reconcileMaps cancel-on-delete
randomizedcoder May 19, 2026
c758373
gofmt: format quality-report main_test.go (pulled forward from later …
randomizedcoder Jun 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,24 @@ func writeDataToFile(ctx context.Context, filename string, data []byte) error {
var ErrClickHouseHTTPPost = errors.New("clickhouse http post failed")

func insertIntoCH(ctx context.Context, c config, binaryData []byte) error {
return insertIntoCHAt(ctx, http.DefaultClient, "http://"+c.connectStr, binaryData)
return insertIntoCHAt(ctx, http.DefaultClient, "http://"+c.connectStr, binaryData, c.envelope)
}

// insertIntoCHAt POSTs a binary protobuf payload to ClickHouse's HTTP
// endpoint. Extracted so tests can drive it against httptest.Server (the
// production wrapper picks the baseURL from config.connectStr).
func insertIntoCHAt(ctx context.Context, client *http.Client, baseURL string, binaryData []byte) error {
clickhouseURL := baseURL + "/?query=INSERT%20INTO%20clickhouse_protolist.clickhouse_protolist%20FORMAT%20Protobuf&format_schema=clickhouse_protolist.proto:clickhouse_protolist.v1.Record"
//
// useEnvelope picks the ClickHouse format: ProtobufList for envelope mode
// (a single Envelope with a repeated Rows field), Protobuf for the
// length-prefixed per-row mode. The previous code hardwired FORMAT
// Protobuf regardless, so the default envelope=true path mailed an
// Envelope at a Record schema — ClickHouse rejected every insert.
func insertIntoCHAt(ctx context.Context, client *http.Client, baseURL string, binaryData []byte, useEnvelope bool) error {
format := "Protobuf"
if useEnvelope {
format = "ProtobufList"
}
clickhouseURL := baseURL + "/?query=INSERT%20INTO%20clickhouse_protolist.clickhouse_protolist%20FORMAT%20" + format + "&format_schema=clickhouse_protolist.proto:clickhouse_protolist.v1.Record"

req, err := http.NewRequestWithContext(ctx, http.MethodPost, clickhouseURL, bytes.NewReader(binaryData))
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestInsertIntoCHAt_success(t *testing.T) {
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
if err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("payload")); err != nil {
if err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("payload"), true); err != nil {
t.Errorf("err = %v, want nil", err)
}
}
Expand All @@ -146,7 +146,7 @@ func TestInsertIntoCHAt_serverError(t *testing.T) {
_, _ = w.Write([]byte("oops")) //nolint:errcheck // test plumbing
}))
defer srv.Close()
err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("payload"))
err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("payload"), true)
if err == nil {
t.Error("500 should produce error")
}
Expand All @@ -159,15 +159,41 @@ func TestInsertIntoCHAt_connRefused(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}))
url := srv.URL
srv.Close()
if err := insertIntoCHAt(t.Context(), http.DefaultClient, url, []byte("payload")); err == nil {
if err := insertIntoCHAt(t.Context(), http.DefaultClient, url, []byte("payload"), true); err == nil {
t.Error("conn-refused should produce error")
}
}

func TestInsertIntoCHAt_badURL(t *testing.T) {
// Malformed URL forces http.NewRequestWithContext to fail.
err := insertIntoCHAt(t.Context(), http.DefaultClient, "://not a url", []byte("p"))
err := insertIntoCHAt(t.Context(), http.DefaultClient, "://not a url", []byte("p"), true)
if err == nil {
t.Error("malformed URL should produce error")
}
}

// Verify the URL contains FORMAT ProtobufList when useEnvelope is true,
// and FORMAT Protobuf when false — the bug under fix.
func TestInsertIntoCHAt_formatSelection(t *testing.T) {
cases := []struct {
useEnvelope bool
wantFormat string
}{
{true, "ProtobufList"},
{false, "Protobuf&"},
}
for _, tc := range cases {
var gotURL string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotURL = r.URL.String()
w.WriteHeader(http.StatusOK)
}))
if err := insertIntoCHAt(t.Context(), srv.Client(), srv.URL, []byte("p"), tc.useEnvelope); err != nil {
t.Errorf("useEnvelope=%v: err = %v", tc.useEnvelope, err)
}
if !strings.Contains(gotURL, "FORMAT%20"+tc.wantFormat) {
t.Errorf("useEnvelope=%v: URL = %s, want FORMAT %s", tc.useEnvelope, gotURL, tc.wantFormat)
}
srv.Close()
}
}
9 changes: 8 additions & 1 deletion cmd/clickhouse_protobuflist/clickhouse_protobuflist.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ func encodeLengthDelimitedProtobufList(r *clickhouse_protolist.Envelope_Record)
}

log.Printf("AppendVarint of length:%d", len(recordBytes))
protowire.AppendVarint(result, uint64(len(recordBytes)))
// protowire.AppendVarint returns the appended slice — the previous
// code dropped the return value, so the length prefix was never
// actually written. Non-envelope mode emitted raw record bytes
// without the length-delim wrapper its name advertises (ClickHouse
// readers expecting LengthDelimited misparsed the file). Use the
// return value the way encodeLengthDelimitedEnvelope below already
// does.
result = protowire.AppendVarint(result, uint64(len(recordBytes)))

result = append(result, recordBytes...)

Expand Down
17 changes: 16 additions & 1 deletion cmd/kafka_to_clickhouse/kafka_to_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,14 @@ func primaryFunction(ctx context.Context, c config) {
log.Printf("primaryFunction i:%d", i)
}
fileOrKafka(ctx, c, &binaryData)
time.Sleep(c.loopsSleep)
// Bug fix: time.Sleep ignored ctx, so SIGTERM took up to
// loopsSleep (default 10s) to be observed. Use a ctx-aware
// wait so shutdown is prompt.
select {
case <-ctx.Done():
return
case <-time.After(c.loopsSleep):
}
}
}

Expand Down Expand Up @@ -442,6 +449,14 @@ func getLatestSchemaIDAt(ctx context.Context, client *http.Client, baseURL, subj
}
defer resp.Body.Close()

// Schema Registries return a JSON error body on 4xx/5xx; decoding it
// into the {id int} struct silently yields id:0. Reject non-2xx
// upfront so kafka_to_clickhouse's downstream Produce path doesn't
// stamp every Kafka record with a bogus schemaID:0 magic header.
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return 0, fmt.Errorf("getLatestSchemaIDAt %s: unexpected status:%d", url, resp.StatusCode)
}

var result struct {
ID int `json:"id"`
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/register_schema/register_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func getLatestSchemaIDAt(client *http.Client, baseURL, subject string) (int, err
return 0, err
}
defer resp.Body.Close()
// Schema Registries return a JSON error body on 4xx/5xx (e.g.
// {"error_code":40402,"message":"Subject not found."}). The previous
// code skipped the status check and decoded the error body straight
// into the {id int} struct, producing a silent id:0 success — the
// CLI printed "id:0" for a missing subject. Reject non-2xx upfront.
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return 0, fmt.Errorf("getLatestSchemaIDAt %s: unexpected status:%d", url, resp.StatusCode)
}
var result struct {
ID int `json:"id"`
}
Expand Down
15 changes: 12 additions & 3 deletions cmd/xtcp2client/xtcp2client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,15 @@ breakPoint:
log.Printf("pollStreamRecv err:%v", err)
}

// A broken stream typically returns the same error immediately
// on every subsequent Recv (e.g. "rpc error: stream closed").
// The previous code looped without backoff, pegging a CPU core
// at 100% until ctx was canceled. Sleep briefly between
// retries so the loop is ctx-cancellable AND non-spinny.
select {
case <-ctx.Done():
break breakPoint
default:
// non-blocking
case <-time.After(100 * time.Millisecond):
}
continue
}
Expand Down Expand Up @@ -366,7 +370,12 @@ breakPoint:
var flatRecordsResponse *xtcp_flat_record.FlatRecordsResponse
flatRecordsResponse, err = stream.Recv()
if err == io.EOF {
continue
// End of stream — the server closed cleanly. Subsequent
// Recv calls keep returning io.EOF, so `continue` here
// pegged a CPU core. Break out of the loop so the
// reconnect-with-sleep path in singleStreamingClient
// re-establishes a fresh stream.
break breakPoint
}

if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/io_uring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ func (r *Ring) Close(drainTimeout time.Duration, onDrain func(Result)) {
}
}
}
// Drain any in-flight entries that didn't get a CQE within the
// deadline. The kernel still owns these buffers until QueueExit
// reclaims the ring, so we can't safely reuse them yet — but we
// MUST hand them back to the caller's drain callback before the
// inFlight map vanishes, otherwise the userspace-side packet pool
// leaks N buffers per ring close (up to inFlightCap each). Mark
// these as a synthetic ETIME-style result (Res=-syscall.ETIME) so
// the callback can tell apart "normal CQE with bytes" from "abandoned
// at teardown".
if onDrain != nil {
for reqID, entry := range r.inFlight {
onDrain(Result{
Op: entry.op,
Res: -int32(syscall.ETIME),
Buf: entry.buf,
HdrBytes: entry.wvHdr,
})
delete(r.inFlight, reqID)
}
}
r.r.QueueExit()
r.r = nil
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/xtcp/deserializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,15 @@ func (x *XTCP) InitDeserializers(wg *sync.WaitGroup) {
}

// INET_DIAG_SOCKOPT 22
// Previously registered DeserializeCGroupIDXTCP here as a workaround
// because DeserializeSockOptXTCP had the wrong target type
// (*Envelope_XtcpFlatRecord). With the sockopt deserializer's
// signature corrected, register the actual SockOpt parser — the
// CGroupID one was decoding 8 bytes against the 2-byte SOCKOPT
// payload, silently erroring out and leaving SockOpt unpopulated.
key = dsKeySockopt
if _, exists := x.config.EnabledDeserializers.Enabled[key]; exists {
x.RTATypeDeserializer[xtcpnl.SockOptEnumValueCst] = xtcpnl.DeserializeCGroupIDXTCP
x.RTATypeDeserializer[xtcpnl.SockOptEnumValueCst] = xtcpnl.DeserializeSockOptXTCP
x.RTATypeDeserializerStr[xtcpnl.SockOptEnumValueCst] = key
}

Expand Down
31 changes: 28 additions & 3 deletions pkg/xtcp/destinations_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newKafkaDest(ctx context.Context, x *XTCP) (Destination, error) {
return nil, err
}

schemaID, err := d.getLatestSchemaID()
schemaID, err := d.getLatestSchemaID(ctx)
if err != nil {
return nil, fmt.Errorf("newKafkaDest getLatestSchemaID: %w", err)
}
Expand Down Expand Up @@ -155,25 +155,50 @@ func (d *kafkaDest) Send(ctx context.Context, b *[]byte) (int, error) {

func (d *kafkaDest) Close() error {
if d.client != nil {
// franz-go's Close cancels in-flight produces without waiting for
// their broker acks — records buffered when shutdown fires were
// silently dropped. Flush first with a bounded timeout so the
// daemon's last poll cycle is durably delivered before teardown.
flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := d.client.Flush(flushCtx); err != nil {
d.x.pC.WithLabelValues("destKafka", "FlushOnClose", "error").Inc()
if d.x.debugLevel > 10 {
log.Printf("destKafka Flush on Close: %v", err)
}
}
cancel()
d.client.Close()
}
return nil
}

func (d *kafkaDest) getLatestSchemaID() (int, error) {
func (d *kafkaDest) getLatestSchemaID(ctx context.Context) (int, error) {
url := fmt.Sprintf("%s/subjects/%s-value/versions/latest",
d.x.config.KafkaSchemaUrl, d.x.config.Topic)
if d.x.debugLevel > 10 {
log.Printf("getLatestSchemaID url:%s\n", url)
}
resp, err := http.Get(url)
// http.Get used the DefaultClient which has no timeout — a hung
// Schema Registry would block daemon startup indefinitely. Build
// the request with the caller's ctx + a hard 10s ceiling so the
// init-time call observes shutdown and never wedges.
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, url, nil)
if err != nil {
return 0, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return 0, fmt.Errorf("getLatestSchemaID http.StatusNotFound url:%s", url)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return 0, fmt.Errorf("getLatestSchemaID url:%s unexpected status:%d", url, resp.StatusCode)
}
var result struct {
ID int `json:"id"`
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/xtcp/grpc_configService.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,24 @@ func (c *xtcpConfigService) SetPollFrequency(
c.config.PollFrequency = in.PollFrequency
c.config.PollTimeout = in.PollTimeout

*c.changePollFrequencyCh <- c.config.PollFrequency.AsDuration()
// Send the new poll frequency to the poller. The channel is buffered
// (size 2), so two sends can succeed without a reader; the third
// would block forever — pegging the gRPC handler goroutine — if the
// poller stopped reading (mid-shutdown, paused, etc.). Use ctx-aware
// select with a non-blocking default fallback so a coalesced
// frequency-change is dropped (the next caller will resend) rather
// than wedging the RPC.
select {
case *c.changePollFrequencyCh <- c.config.PollFrequency.AsDuration():
case <-ctx.Done():
c.pC.WithLabelValues("SetPollFrequency", "ctxDone", "count").Inc()
return nil, status.Error(codes.Canceled, ctx.Err().Error())
case <-c.ctx.Done():
c.pC.WithLabelValues("SetPollFrequency", "serverCtxDone", "count").Inc()
return nil, status.Error(codes.Unavailable, "server shutting down")
default:
c.pC.WithLabelValues("SetPollFrequency", "chFull", "count").Inc()
}

if c.debugLevel > 10 {
log.Printf("SetPollFrequency c.config.PollFrequency:%0.2f c.config.PollTimeout:%0.2f",
Expand Down
17 changes: 16 additions & 1 deletion pkg/xtcp/grpc_flatRecordService.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,22 @@ func (s *xtcpFlatRecordService) PollFlatRecords(
}
continue
}
*s.pollRequestCh <- struct{}{}
// Buffered channel of size 2 — third in-flight poke would block
// forever if the poller isn't draining (mid-shutdown, paused,
// already-polling state). Non-blocking send so the RPC handler
// stays responsive; observe both the stream ctx and the
// service-level ctx so coalesced pokes don't wedge teardown.
select {
case *s.pollRequestCh <- struct{}{}:
case <-ctx.Done():
s.pC.WithLabelValues("PollFlatRecords", "ctxDone", "count").Inc()
return ctx.Err()
case <-s.ctx.Done():
s.pC.WithLabelValues("PollFlatRecords", "serverCtxDone", "count").Inc()
return s.ctx.Err()
default:
s.pC.WithLabelValues("PollFlatRecords", "chFull", "count").Inc()
}
if s.debugLevel > 10 {
log.Printf("PollFlatRecords *s.pollRequestCh <- struct{}{}")
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/xtcp/netlinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ func (x *XTCP) netlinkerSyscall(ctx context.Context, wg *sync.WaitGroup, nsName

if wf > 0 {
now := time.Now()
// Capture only the n bytes Recvfrom actually filled. Writing the
// raw *packetBuffer here used to dump the full pool-buffer size
// (e.g. 8 KiB), trailing the real packet with stale bytes from a
// previous Recvfrom — pcap-like consumers parsed garbage past
// the real end of the message.
err = os.WriteFile(
x.config.CapturePath+"netlink."+now.Format(time.RFC3339Nano),
*(packetBuffer),
(*packetBuffer)[:n],
writeFilesPermissionsCst)
if err != nil {
// Diagnostic capture-to-file is a side feature; a disk-
Expand Down Expand Up @@ -146,7 +151,14 @@ func (x *XTCP) netlinkerSyscall(ctx context.Context, wg *sync.WaitGroup, nsName
}
}

*packetBuffer = (*packetBuffer)[:0]
// Restore the slice header to full capacity before returning it to
// the pool. (*packetBuffer)[:0] would Put a zero-length slice — a
// later Get from a fresh netlinker would call syscall.Recvfrom on
// it, which panics on &p[0] when len(p)==0. iouringPrefillRecvs
// (netlinker_iouring.go) already restores cap on Get as a defensive
// measure, but the syscall path is the producer of these buffers
// and must Put them in usable shape.
*packetBuffer = (*packetBuffer)[:cap(*packetBuffer)]
x.packetBufferPool.Put(packetBuffer)

x.pC.WithLabelValues("Netlinker", "complete", "count").Inc()
Expand Down
Loading