From df85899dd09ffd5773a252bbc512575e63553ef7 Mon Sep 17 00:00:00 2001 From: randomizedcoder Date: Wed, 13 May 2026 08:12:11 -0700 Subject: [PATCH 1/2] Add unix/unixgram destinations + table-driven destination tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit xtcp2 now supports writing records to a local unix-domain socket so that a daemon (kubernetes daemonset, machine-local collector, etc.) can read them off without a network hop. Two schemes: unix:/path/to/sock SOCK_STREAM, varint-length-prefixed framing unixgram:/path/to/sock SOCK_DGRAM, one Write == one datagram == one record, no framing Both follow the established destination pattern (function stored in sync.Map, init function dialled once at startup, blocking writes, log on error). For unixgram, init pre-checks os.Stat so the "fail loudly at startup" contract holds even though SOCK_DGRAM dial doesn't verify the peer. The dest proto field's max_len cap goes from 40 to 128 to accommodate the longer unixgram:/path strings; comment is rewritten to enumerate all current schemes. Generated bindings regenerated via buf. Adds the repo's first destination_test.go: one table-driven test covering null/udp/unix/unixgram round-trip and multiple-record cases, a varying-size stream-framing test, missing-socket / missing-daemon sanity tests, and benchmarks for all four. Each row uses a real local socket in t.TempDir() — no mocks. To make InitDest* paths testable without taking the process down, a hookable x.fatalf field on XTCP defaults to log.Fatalf and is overridden to t.Fatalf in tests; only the two new InitDest* functions use it, existing destinations are untouched. go test -race ./pkg/xtcp/... -run TestDest is clean. Co-Authored-By: Claude Opus 4.7 --- dart/xtcp_config/v1/xtcp_config.pb.dart | 6 +- dart/xtcp_config/v1/xtcp_config.pbjson.dart | 28 +- gen/xtcp_config/v1/xtcp_config.pb.cc | 74 +-- .../clickhouse_protolist.pb.go | 2 +- pkg/xtcp/destinations.go | 71 +++ pkg/xtcp/destinations_test.go | 596 ++++++++++++++++++ pkg/xtcp/init_destinations.go | 69 +- pkg/xtcp/xtcp.go | 23 +- pkg/xtcp_config/xtcp_config.pb.go | 15 +- pkg/xtcp_config/xtcp_config_grpc.pb.go | 10 +- pkg/xtcp_flat_record/xtcp_flat_record.pb.go | 2 +- .../xtcp_flat_record_grpc.pb.go | 8 +- proto/xtcp_config/v1/xtcp_config.proto | 8 +- python/xtcp_config/v1/xtcp_config_pb2.py | 18 +- xtcp_config/v1/xtcp_config.swagger.json | 2 +- 15 files changed, 845 insertions(+), 87 deletions(-) create mode 100644 pkg/xtcp/destinations_test.go diff --git a/dart/xtcp_config/v1/xtcp_config.pb.dart b/dart/xtcp_config/v1/xtcp_config.pb.dart index 8c30c85..7fbeba2 100644 --- a/dart/xtcp_config/v1/xtcp_config.pb.dart +++ b/dart/xtcp_config/v1/xtcp_config.pb.dart @@ -648,7 +648,11 @@ class XtcpConfig extends $pb.GeneratedMessage { @$pb.TagNumber(121) void clearProtobufListLengthDelimit() => clearField(121); - /// kafka:127.0.0.1:9092, udp:127.0.0.1:13000, or nsq:127.0.0.1:4150, or null: + /// kafka:127.0.0.1:9092, udp:127.0.0.1:13000, nsq:127.0.0.1:4150, + /// nats:nats://127.0.0.1:4222, valkey:127.0.0.1:6379, null:, + /// unix:/path/to/sock (SOCK_STREAM, length-prefixed via varint), or + /// unixgram:/path/to/sock (SOCK_DGRAM, one record per datagram). + /// max_len 128 leaves room for unixgram: (9 bytes) + Linux sun_path (108 bytes). @$pb.TagNumber(130) $core.String get dest => $_getSZ(14); @$pb.TagNumber(130) diff --git a/dart/xtcp_config/v1/xtcp_config.pbjson.dart b/dart/xtcp_config/v1/xtcp_config.pbjson.dart index 5e3aabe..765420a 100644 --- a/dart/xtcp_config/v1/xtcp_config.pbjson.dart +++ b/dart/xtcp_config/v1/xtcp_config.pbjson.dart @@ -142,20 +142,20 @@ final $typed_data.Uint8List xtcpConfigDescriptor = $convert.base64Decode( 'VzEi8KDGNhcHR1cmVfcGF0aBhkIAEoCUIMukgJyAEAcgQQARhQUgtjYXB0dXJlUGF0aBIoCgdt' 'b2R1bHVzGG4gASgEQg66SAvIAQEyBhjAhD0oAVIHbW9kdWx1cxIrCgptYXJzaGFsX3RvGHggAS' 'gJQgy6SAnIAQFyBBAEGChSCW1hcnNoYWxUbxJHChxwcm90b2J1Zl9saXN0X2xlbmd0aF9kZWxp' - 'bWl0GHkgASgIQga6SAPIAQBSGXByb3RvYnVmTGlzdExlbmd0aERlbGltaXQSIQoEZGVzdBiCAS' - 'ABKAlCDLpICcgBAXIEEAQYKFIEZGVzdBI4ChBkZXN0X3dyaXRlX2ZpbGVzGIcBIAEoDUINukgK' - 'yAEAKgUY6AcoAFIOZGVzdFdyaXRlRmlsZXMSIwoFdG9waWMYjAEgASgJQgy6SAnIAQByBBABGC' - 'hSBXRvcGljEjUKD3h0Y3BfcHJvdG9fZmlsZRiPASABKAlCDLpICcgBAHIEEAEYUFINeHRjcFBy' - 'b3RvRmlsZRI3ChBrYWZrYV9zY2hlbWFfdXJsGJEBIAEoCUIMukgJyAEAcgQQARg8Ug5rYWZrYV' - 'NjaGVtYVVybBJgChVrYWZrYV9wcm9kdWNlX3RpbWVvdXQYlgEgASgLMhkuZ29vZ2xlLnByb3Rv' - 'YnVmLkR1cmF0aW9uQhC6SA3IAQCqAQciAwjYBDIAUhNrYWZrYVByb2R1Y2VUaW1lb3V0Ei8KC2' - 'RlYnVnX2xldmVsGKABIAEoDUINukgKyAEBKgUY6AcoAFIKZGVidWdMZXZlbBIhCgVsYWJlbBiq' - 'ASABKAlCCrpIB8gBAHICGChSBWxhYmVsEh0KA3RhZxi0ASABKAlCCrpIB8gBAHICGChSA3RhZx' - 'IsCglncnBjX3BvcnQYvgEgASgNQg66SAvIAQEqBhj//wMoAVIIZ3JwY1BvcnQSYgoVZW5hYmxl' - 'ZF9kZXNlcmlhbGl6ZXJzGMgBIAEoCzIkLnh0Y3BfY29uZmlnLnYxLkVuYWJsZWREZXNlcmlhbG' - 'l6ZXJzQga6SAPIAQBSFGVuYWJsZWREZXNlcmlhbGl6ZXJzOnO6SHAabgoPWHRjcENvbmZpZy5w' - 'b2xsEjJQb2xsIHRpbWVvdXQgbXVzdCBiZSBsZXNzIHRoYW4gcG9sbCBwb2xsX2ZyZXF1ZW5jeR' - 'ondGhpcy5wb2xsX2ZyZXF1ZW5jeSA+IHRoaXMucG9sbF90aW1lb3V0'); + 'bWl0GHkgASgIQga6SAPIAQBSGXByb3RvYnVmTGlzdExlbmd0aERlbGltaXQSIgoEZGVzdBiCAS' + 'ABKAlCDbpICsgBAXIFEAQYgAFSBGRlc3QSOAoQZGVzdF93cml0ZV9maWxlcxiHASABKA1CDbpI' + 'CsgBACoFGOgHKABSDmRlc3RXcml0ZUZpbGVzEiMKBXRvcGljGIwBIAEoCUIMukgJyAEAcgQQAR' + 'goUgV0b3BpYxI1Cg94dGNwX3Byb3RvX2ZpbGUYjwEgASgJQgy6SAnIAQByBBABGFBSDXh0Y3BQ' + 'cm90b0ZpbGUSNwoQa2Fma2Ffc2NoZW1hX3VybBiRASABKAlCDLpICcgBAHIEEAEYPFIOa2Fma2' + 'FTY2hlbWFVcmwSYAoVa2Fma2FfcHJvZHVjZV90aW1lb3V0GJYBIAEoCzIZLmdvb2dsZS5wcm90' + 'b2J1Zi5EdXJhdGlvbkIQukgNyAEAqgEHIgMI2AQyAFITa2Fma2FQcm9kdWNlVGltZW91dBIvCg' + 'tkZWJ1Z19sZXZlbBigASABKA1CDbpICsgBASoFGOgHKABSCmRlYnVnTGV2ZWwSIQoFbGFiZWwY' + 'qgEgASgJQgq6SAfIAQByAhgoUgVsYWJlbBIdCgN0YWcYtAEgASgJQgq6SAfIAQByAhgoUgN0YW' + 'cSLAoJZ3JwY19wb3J0GL4BIAEoDUIOukgLyAEBKgYY//8DKAFSCGdycGNQb3J0EmIKFWVuYWJs' + 'ZWRfZGVzZXJpYWxpemVycxjIASABKAsyJC54dGNwX2NvbmZpZy52MS5FbmFibGVkRGVzZXJpYW' + 'xpemVyc0IGukgDyAEAUhRlbmFibGVkRGVzZXJpYWxpemVyczpzukhwGm4KD1h0Y3BDb25maWcu' + 'cG9sbBIyUG9sbCB0aW1lb3V0IG11c3QgYmUgbGVzcyB0aGFuIHBvbGwgcG9sbF9mcmVxdWVuY3' + 'kaJ3RoaXMucG9sbF9mcmVxdWVuY3kgPiB0aGlzLnBvbGxfdGltZW91dA=='); @$core.Deprecated('Use enabledDeserializersDescriptor instead') const EnabledDeserializers$json = { diff --git a/gen/xtcp_config/v1/xtcp_config.pb.cc b/gen/xtcp_config/v1/xtcp_config.pb.cc index f1fca9b..8e2c247 100644 --- a/gen/xtcp_config/v1/xtcp_config.pb.cc +++ b/gen/xtcp_config/v1/xtcp_config.pb.cc @@ -471,7 +471,7 @@ const char descriptor_table_protodef_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto[ " than poll poll_frequency\032\'this.poll_tim" "eout < this.poll_frequency\"N\n\030SetPollFre" "quencyResponse\0222\n\006config\030\001 \001(\0132\032.xtcp_co" - "nfig.v1.XtcpConfigR\006config\"\203\014\n\nXtcpConfi" + "nfig.v1.XtcpConfigR\006config\"\204\014\n\nXtcpConfi" "g\022F\n\027nl_timeout_milliseconds\030\n \001(\004B\016\272H\0132" "\006\030\240\215\006(\000\310\001\001R\025nlTimeoutMilliseconds\022S\n\016pol" "l_frequency\030\024 \001(\0132\031.google.protobuf.Dura" @@ -492,41 +492,41 @@ const char descriptor_table_protodef_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto[ "\310\001\001R\007modulus\022+\n\nmarshal_to\030x \001(\tB\014\272H\tr\004\020" "\004\030(\310\001\001R\tmarshalTo\022G\n\034protobuf_list_lengt" "h_delimit\030y \001(\010B\006\272H\003\310\001\000R\031protobufListLen" - "gthDelimit\022!\n\004dest\030\202\001 \001(\tB\014\272H\tr\004\020\004\030(\310\001\001R" - "\004dest\0228\n\020dest_write_files\030\207\001 \001(\rB\r\272H\n*\005\030" - "\350\007(\000\310\001\000R\016destWriteFiles\022#\n\005topic\030\214\001 \001(\tB" - "\014\272H\tr\004\020\001\030(\310\001\000R\005topic\0225\n\017xtcp_proto_file\030" - "\217\001 \001(\tB\014\272H\tr\004\020\001\030P\310\001\000R\rxtcpProtoFile\0227\n\020k" - "afka_schema_url\030\221\001 \001(\tB\014\272H\tr\004\020\001\030<\310\001\000R\016ka" - "fkaSchemaUrl\022`\n\025kafka_produce_timeout\030\226\001" - " \001(\0132\031.google.protobuf.DurationB\020\272H\r\252\001\007\"" - "\003\010\330\0042\000\310\001\000R\023kafkaProduceTimeout\022/\n\013debug_" - "level\030\240\001 \001(\rB\r\272H\n*\005\030\350\007(\000\310\001\001R\ndebugLevel\022" - "!\n\005label\030\252\001 \001(\tB\n\272H\007r\002\030(\310\001\000R\005label\022\035\n\003ta" - "g\030\264\001 \001(\tB\n\272H\007r\002\030(\310\001\000R\003tag\022,\n\tgrpc_port\030\276" - "\001 \001(\rB\016\272H\013*\006\030\377\377\003(\001\310\001\001R\010grpcPort\022b\n\025enabl" - "ed_deserializers\030\310\001 \001(\0132$.xtcp_config.v1" - ".EnabledDeserializersB\006\272H\003\310\001\000R\024enabledDe" - "serializers:s\272Hp\032n\n\017XtcpConfig.poll\0222Pol" - "l timeout must be less than poll poll_fr" - "equency\032\'this.poll_frequency > this.poll" - "_timeout\"\237\001\n\024EnabledDeserializers\022K\n\007ena" - "bled\030\001 \003(\01321.xtcp_config.v1.EnabledDeser" - "ializers.EnabledEntryR\007enabled\032:\n\014Enable" - "dEntry\022\020\n\003key\030\001 \001(\tR\003key\022\024\n\005value\030\002 \001(\010R" - "\005value:\0028\0012\341\002\n\rConfigService\022]\n\003Get\022\032.xt" - "cp_config.v1.GetRequest\032\033.xtcp_config.v1" - ".GetResponse\"\035\202\323\344\223\002\027\032\022/ConfigService/Get" - ":\001*\022]\n\003Set\022\032.xtcp_config.v1.SetRequest\032\033" - ".xtcp_config.v1.SetResponse\"\035\202\323\344\223\002\027\032\022/Co" - "nfigService/Set:\001*\022\221\001\n\020SetPollFrequency\022" - "\'.xtcp_config.v1.SetPollFrequencyRequest" - "\032(.xtcp_config.v1.SetPollFrequencyRespon" - "se\"*\202\323\344\223\002$\032\037/ConfigService/SetPollFreque" - "ncy:\001*B\215\001\n\022com.xtcp_config.v1B\017XtcpConfi" - "gProtoP\001Z\021./pkg/xtcp_config\242\002\003XXX\252\002\rXtcp" - "Config.V1\312\002\rXtcpConfig\\V1\342\002\031XtcpConfig\\V" - "1\\GPBMetadata\352\002\016XtcpConfig::V1b\006proto3" + "gthDelimit\022\"\n\004dest\030\202\001 \001(\tB\r\272H\nr\005\020\004\030\200\001\310\001\001" + "R\004dest\0228\n\020dest_write_files\030\207\001 \001(\rB\r\272H\n*\005" + "\030\350\007(\000\310\001\000R\016destWriteFiles\022#\n\005topic\030\214\001 \001(\t" + "B\014\272H\tr\004\020\001\030(\310\001\000R\005topic\0225\n\017xtcp_proto_file" + "\030\217\001 \001(\tB\014\272H\tr\004\020\001\030P\310\001\000R\rxtcpProtoFile\0227\n\020" + "kafka_schema_url\030\221\001 \001(\tB\014\272H\tr\004\020\001\030<\310\001\000R\016k" + "afkaSchemaUrl\022`\n\025kafka_produce_timeout\030\226" + "\001 \001(\0132\031.google.protobuf.DurationB\020\272H\r\252\001\007" + "\"\003\010\330\0042\000\310\001\000R\023kafkaProduceTimeout\022/\n\013debug" + "_level\030\240\001 \001(\rB\r\272H\n*\005\030\350\007(\000\310\001\001R\ndebugLevel" + "\022!\n\005label\030\252\001 \001(\tB\n\272H\007r\002\030(\310\001\000R\005label\022\035\n\003t" + "ag\030\264\001 \001(\tB\n\272H\007r\002\030(\310\001\000R\003tag\022,\n\tgrpc_port\030" + "\276\001 \001(\rB\016\272H\013*\006\030\377\377\003(\001\310\001\001R\010grpcPort\022b\n\025enab" + "led_deserializers\030\310\001 \001(\0132$.xtcp_config.v" + "1.EnabledDeserializersB\006\272H\003\310\001\000R\024enabledD" + "eserializers:s\272Hp\032n\n\017XtcpConfig.poll\0222Po" + "ll timeout must be less than poll poll_f" + "requency\032\'this.poll_frequency > this.pol" + "l_timeout\"\237\001\n\024EnabledDeserializers\022K\n\007en" + "abled\030\001 \003(\01321.xtcp_config.v1.EnabledDese" + "rializers.EnabledEntryR\007enabled\032:\n\014Enabl" + "edEntry\022\020\n\003key\030\001 \001(\tR\003key\022\024\n\005value\030\002 \001(\010" + "R\005value:\0028\0012\341\002\n\rConfigService\022]\n\003Get\022\032.x" + "tcp_config.v1.GetRequest\032\033.xtcp_config.v" + "1.GetResponse\"\035\202\323\344\223\002\027\032\022/ConfigService/Ge" + "t:\001*\022]\n\003Set\022\032.xtcp_config.v1.SetRequest\032" + "\033.xtcp_config.v1.SetResponse\"\035\202\323\344\223\002\027\032\022/C" + "onfigService/Set:\001*\022\221\001\n\020SetPollFrequency" + "\022\'.xtcp_config.v1.SetPollFrequencyReques" + "t\032(.xtcp_config.v1.SetPollFrequencyRespo" + "nse\"*\202\323\344\223\002$\032\037/ConfigService/SetPollFrequ" + "ency:\001*B\215\001\n\022com.xtcp_config.v1B\017XtcpConf" + "igProtoP\001Z\021./pkg/xtcp_config\242\002\003XXX\252\002\rXtc" + "pConfig.V1\312\002\rXtcpConfig\\V1\342\002\031XtcpConfig\\" + "V1\\GPBMetadata\352\002\016XtcpConfig::V1b\006proto3" }; static const ::_pbi::DescriptorTable* const descriptor_table_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto_deps[3] = { @@ -538,7 +538,7 @@ static ::absl::once_flag descriptor_table_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2ep PROTOBUF_CONSTINIT const ::_pbi::DescriptorTable descriptor_table_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto = { false, false, - 2958, + 2959, descriptor_table_protodef_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto, "xtcp_config/v1/xtcp_config.proto", &descriptor_table_xtcp_5fconfig_2fv1_2fxtcp_5fconfig_2eproto_once, diff --git a/pkg/clickhouse_protolist/clickhouse_protolist.pb.go b/pkg/clickhouse_protolist/clickhouse_protolist.pb.go index 4d6ce59..a825a49 100644 --- a/pkg/clickhouse_protolist/clickhouse_protolist.pb.go +++ b/pkg/clickhouse_protolist/clickhouse_protolist.pb.go @@ -13,7 +13,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 +// protoc-gen-go v1.36.11 // protoc (unknown) // source: clickhouse_protolist/v1/clickhouse_protolist.proto diff --git a/pkg/xtcp/destinations.go b/pkg/xtcp/destinations.go index 2b763ac..5b548cd 100644 --- a/pkg/xtcp/destinations.go +++ b/pkg/xtcp/destinations.go @@ -2,12 +2,18 @@ package xtcp import ( "context" + "encoding/binary" "log" "time" "github.com/twmb/franz-go/pkg/kgo" ) +// Destination functions are invoked from a single deserializer goroutine in +// production. The implementations below assume serial access to their +// underlying net.Conn / client field. Concurrent callers are not supported +// without adding a mutex. + // destNull sends the protobuf to nowhere! func (x *XTCP) destNull(_ context.Context, xtcpRecordBinary *[]byte) (n int, err error) { @@ -237,3 +243,68 @@ func (x *XTCP) destValKey(ctx context.Context, xtcpRecordBinary *[]byte) (n int, return 1, err } + +// destUnixGram sends the protobuf record to a Unix datagram socket. +// One Write == one datagram == one record; no framing is required because +// the kernel preserves message boundaries. Records exceeding SO_SNDBUF +// (≈208 KB on Linux by default) fail with EMSGSIZE; xtcp records today +// are well below that. +// +// TODO: reconnect on persistent write failure (currently dial-once, fail- +// loudly at startup; runtime errors are logged and the next record is +// attempted). +func (x *XTCP) destUnixGram(_ context.Context, xtcpRecordBinary *[]byte) (n int, err error) { + + written, err := x.unixGramConn.Write(*xtcpRecordBinary) + if err != nil { + x.pC.WithLabelValues("destUnixGram", "Write", "error").Inc() + if x.debugLevel > 100 { + log.Printf("destUnixGram Write err:%v", err) + } + return 0, err + } + + x.pC.WithLabelValues("destUnixGram", "Writes", "count").Inc() + x.pC.WithLabelValues("destUnixGram", "WriteBytes", "count").Add(float64(written)) + + return 1, nil +} + +// destUnix sends the protobuf record to a Unix stream socket, framed with +// a varint length prefix so the daemon reader can recover record +// boundaries. Wire format per record: +// +// [varint(len(payload))] [payload bytes...] +// +// Daemon-side: read the varint via binary.ReadUvarint, then exactly that +// many payload bytes via io.ReadFull. +// +// TODO: coalesce the header and payload into a single net.Buffers write if +// profiling shows the two syscalls matter; reconnect on persistent failure. +func (x *XTCP) destUnix(_ context.Context, xtcpRecordBinary *[]byte) (n int, err error) { + + var hdr [binary.MaxVarintLen64]byte + hdrLen := binary.PutUvarint(hdr[:], uint64(len(*xtcpRecordBinary))) + + if _, err = x.unixConn.Write(hdr[:hdrLen]); err != nil { + x.pC.WithLabelValues("destUnix", "Write", "error").Inc() + if x.debugLevel > 100 { + log.Printf("destUnix header Write err:%v", err) + } + return 0, err + } + + written, err := x.unixConn.Write(*xtcpRecordBinary) + if err != nil { + x.pC.WithLabelValues("destUnix", "Write", "error").Inc() + if x.debugLevel > 100 { + log.Printf("destUnix payload Write err:%v", err) + } + return 0, err + } + + x.pC.WithLabelValues("destUnix", "Writes", "count").Inc() + x.pC.WithLabelValues("destUnix", "WriteBytes", "count").Add(float64(hdrLen + written)) + + return 1, nil +} diff --git a/pkg/xtcp/destinations_test.go b/pkg/xtcp/destinations_test.go new file mode 100644 index 0000000..0fdbd1b --- /dev/null +++ b/pkg/xtcp/destinations_test.go @@ -0,0 +1,596 @@ +package xtcp + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "net" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/randomizedcoder/xtcp2/pkg/xtcp_config" +) + +// destSetupResult is what each row's setup closure returns. +type destSetupResult struct { + dest string // value to assign to x.config.Dest, e.g. "udp:127.0.0.1:12345" + recv func() ([]byte, error) + cleanup func() +} + +// destCase describes one row of the destination-round-trip table. +type destCase struct { + name string + scheme string // "null", "udp", "unix", "unixgram" + setup func(t *testing.T, dir string) destSetupResult + expectFrame func(payload []byte) []byte // identity for null/udp/unixgram; varint-prefixed for unix +} + +// newTestXTCP builds the minimal XTCP fixture needed to drive a destination: +// fresh Prometheus registry (so counters don't collide across rows), the +// destination function maps populated by InitDests' first half, and a fatalf +// hook that flips startup failures into t.Fatalf. +func newTestXTCP(t *testing.T, dest string) *XTCP { + t.Helper() + + x := new(XTCP) + x.config = &xtcp_config.XtcpConfig{Dest: dest} + x.debugLevel = 0 + x.fatalf = func(format string, args ...any) { + t.Fatalf(format, args...) + } + + reg := prometheus.NewRegistry() + x.pC = promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{Subsystem: "xtcp_test", Name: "counts", Help: "test counts"}, + []string{"function", "variable", "type"}, + ) + x.pH = promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Subsystem: "xtcp_test", Name: "histograms", Help: "test histograms", + Objectives: map[float64]float64{0.5: quantileError, 0.99: quantileError}, + MaxAge: summaryVecMaxAge, + }, + []string{"function", "variable", "type"}, + ) + + return x +} + +// setupNullDest returns a no-op setup — destNull doesn't need a listener. +func setupNullDest(_ *testing.T, _ string) destSetupResult { + return destSetupResult{ + dest: "null:", + recv: func() ([]byte, error) { return nil, nil }, + cleanup: func() {}, + } +} + +// setupUDPDest spins up a UDP listener on a free localhost port, returns a +// recv() closure that reads one datagram with a short deadline. +func setupUDPDest(t *testing.T, _ string) destSetupResult { + t.Helper() + + pc, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ListenPacket udp: %v", err) + } + addr := pc.LocalAddr().String() + + return destSetupResult{ + dest: "udp:" + addr, + recv: func() ([]byte, error) { + if err := pc.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { + return nil, err + } + buf := make([]byte, 1<<16) + n, _, err := pc.ReadFrom(buf) + if err != nil { + return nil, err + } + return buf[:n], nil + }, + cleanup: func() { pc.Close() }, + } +} + +// setupUnixGramDest creates a SOCK_DGRAM Unix socket under dir, listens for +// datagrams. recv() reads one datagram with a deadline. +func setupUnixGramDest(t *testing.T, dir string) destSetupResult { + t.Helper() + + path := filepath.Join(dir, "ug.sock") + addr := &net.UnixAddr{Name: path, Net: "unixgram"} + conn, err := net.ListenUnixgram("unixgram", addr) + if err != nil { + t.Fatalf("ListenUnixgram %s: %v", path, err) + } + + return destSetupResult{ + dest: "unixgram:" + path, + recv: func() ([]byte, error) { + if err := conn.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { + return nil, err + } + buf := make([]byte, 1<<16) + n, _, err := conn.ReadFromUnix(buf) + if err != nil { + return nil, err + } + return buf[:n], nil + }, + cleanup: func() { conn.Close() }, + } +} + +// setupUnixDest creates a SOCK_STREAM Unix socket listener and accepts a +// single client connection in a goroutine. recv() reads one length-prefixed +// (varint) record off that connection. +func setupUnixDest(t *testing.T, dir string) destSetupResult { + t.Helper() + + path := filepath.Join(dir, "u.sock") + ln, err := net.Listen("unix", path) + if err != nil { + t.Fatalf("Listen unix %s: %v", path, err) + } + + connCh := make(chan net.Conn, 1) + errCh := make(chan error, 1) + go func() { + c, err := ln.Accept() + if err != nil { + errCh <- err + return + } + connCh <- c + }() + + var ( + clientConn net.Conn + acceptOnce sync.Once + ) + getConn := func() (net.Conn, error) { + var firstErr error + acceptOnce.Do(func() { + select { + case c := <-connCh: + clientConn = c + case err := <-errCh: + firstErr = err + case <-time.After(2 * time.Second): + firstErr = fmt.Errorf("timeout waiting for client to dial unix socket") + } + }) + if firstErr != nil { + return nil, firstErr + } + return clientConn, nil + } + + return destSetupResult{ + dest: "unix:" + path, + recv: func() ([]byte, error) { + c, err := getConn() + if err != nil { + return nil, err + } + if err := c.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { + return nil, err + } + br := newByteReader(c) + length, err := binary.ReadUvarint(br) + if err != nil { + return nil, fmt.Errorf("read varint: %w", err) + } + payload := make([]byte, length) + if _, err := io.ReadFull(c, payload); err != nil { + return nil, fmt.Errorf("read payload: %w", err) + } + return payload, nil + }, + cleanup: func() { + if clientConn != nil { + clientConn.Close() + } + ln.Close() + }, + } +} + +// byteReader adapts a net.Conn to io.ByteReader so binary.ReadUvarint can +// consume the length prefix one byte at a time without over-reading into the +// payload. +type byteReader struct{ r io.Reader } + +func newByteReader(r io.Reader) *byteReader { return &byteReader{r: r} } + +func (br *byteReader) ReadByte() (byte, error) { + var b [1]byte + if _, err := io.ReadFull(br.r, b[:]); err != nil { + return 0, err + } + return b[0], nil +} + +// runDestRow exercises one row: setup → init → write payload(s) → verify. +func runDestRow(t *testing.T, c destCase, payloads [][]byte) { + t.Helper() + + dir := t.TempDir() + setup := c.setup(t, dir) + defer setup.cleanup() + + x := newTestXTCP(t, setup.dest) + ctx := context.Background() + + // Register both runtime closures and init closures so we can both dial + // and write through x.Destination — this mirrors what InitDests does in + // production for the relevant schemes. + switch c.scheme { + case "null": + x.Destinations.Store("null", func(ctx context.Context, b *[]byte) (int, error) { + return x.destNull(ctx, b) + }) + f, _ := x.Destinations.Load("null") + x.Destination = f.(func(context.Context, *[]byte) (int, error)) + case "udp": + x.Destinations.Store("udp", func(ctx context.Context, b *[]byte) (int, error) { + return x.destUDP(ctx, b) + }) + x.InitDestUDP(ctx) + f, _ := x.Destinations.Load("udp") + x.Destination = f.(func(context.Context, *[]byte) (int, error)) + case "unix": + x.Destinations.Store("unix", func(ctx context.Context, b *[]byte) (int, error) { + return x.destUnix(ctx, b) + }) + x.InitDestUnix(ctx) + f, _ := x.Destinations.Load("unix") + x.Destination = f.(func(context.Context, *[]byte) (int, error)) + case "unixgram": + x.Destinations.Store("unixgram", func(ctx context.Context, b *[]byte) (int, error) { + return x.destUnixGram(ctx, b) + }) + x.InitDestUnixGram(ctx) + f, _ := x.Destinations.Load("unixgram") + x.Destination = f.(func(context.Context, *[]byte) (int, error)) + default: + t.Fatalf("unknown scheme %q", c.scheme) + } + defer x.closeDestination() + + for i, payload := range payloads { + buf := append([]byte(nil), payload...) + n, err := x.Destination(ctx, &buf) + if err != nil { + t.Fatalf("payload[%d] Destination err: %v", i, err) + } + if c.scheme == "null" { + if n != len(payload) { + t.Errorf("payload[%d] destNull n=%d want=%d", i, n, len(payload)) + } + continue + } + if n != 1 { + t.Errorf("payload[%d] Destination n=%d want=1", i, n) + } + + got, err := setup.recv() + if err != nil { + t.Fatalf("payload[%d] recv err: %v", i, err) + } + want := c.expectFrame(payload) + if !bytes.Equal(got, want) { + t.Errorf("payload[%d] bytes mismatch\n got: %x\nwant: %x", i, got, want) + } + } +} + +// TestDestinations exercises every destination we can stand up with stdlib +// only: null, udp, unix, unixgram. Kafka, nsq, nats, valkey are deferred to +// a follow-up that brings in embedded servers / testcontainers. +func TestDestinations(t *testing.T) { + identity := func(p []byte) []byte { return p } + + cases := []destCase{ + {name: "null", scheme: "null", setup: setupNullDest, expectFrame: identity}, + {name: "udp_round_trip", scheme: "udp", setup: setupUDPDest, expectFrame: identity}, + {name: "udp_multiple", scheme: "udp", setup: setupUDPDest, expectFrame: identity}, + {name: "unixgram_round_trip", scheme: "unixgram", setup: setupUnixGramDest, expectFrame: identity}, + {name: "unixgram_multiple", scheme: "unixgram", setup: setupUnixGramDest, expectFrame: identity}, + // For unix, recv() already strips the varint length prefix and returns + // the raw payload; the framing is exercised inside recv()'s + // binary.ReadUvarint + io.ReadFull. So the comparison is identity. + {name: "unix_round_trip", scheme: "unix", setup: setupUnixDest, expectFrame: identity}, + {name: "unix_multiple", scheme: "unix", setup: setupUnixDest, expectFrame: identity}, + } + + single := [][]byte{[]byte("hello-xtcp2-record")} + triple := [][]byte{ + []byte("first record"), + []byte("second record with slightly more bytes"), + []byte("third"), + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + payloads := single + switch c.name { + case "udp_multiple", "unixgram_multiple", "unix_multiple": + payloads = triple + } + runDestRow(t, c, payloads) + }) + } +} + +// TestDestUnix_StreamFraming sends records of varying sizes through the +// stream socket and confirms each is recovered intact. Exercises the +// multi-byte varint path (~50KB record produces a 3-byte length prefix). +func TestDestUnix_StreamFraming(t *testing.T) { + dir := t.TempDir() + setup := setupUnixDest(t, dir) + defer setup.cleanup() + + x := newTestXTCP(t, setup.dest) + ctx := context.Background() + x.InitDestUnix(ctx) + defer x.closeDestination() + x.Destinations.Store("unix", func(ctx context.Context, b *[]byte) (int, error) { + return x.destUnix(ctx, b) + }) + + sizes := []int{1, 256, 50 * 1024} + for _, size := range sizes { + payload := make([]byte, size) + for i := range payload { + payload[i] = byte(i & 0xff) + } + buf := append([]byte(nil), payload...) + if _, err := x.destUnix(ctx, &buf); err != nil { + t.Fatalf("size=%d destUnix err: %v", size, err) + } + got, err := setup.recv() + if err != nil { + t.Fatalf("size=%d recv err: %v", size, err) + } + if !bytes.Equal(got, payload) { + t.Errorf("size=%d payload mismatch (first 16 bytes got=%x want=%x)", size, got[:min(16, len(got))], payload[:min(16, len(payload))]) + } + } +} + +// TestDestUnixGram_MissingSocket confirms InitDestUnixGram fails when the +// socket file doesn't exist — that's our "fail loudly at startup" contract. +func TestDestUnixGram_MissingSocket(t *testing.T) { + dir := t.TempDir() + missing := filepath.Join(dir, "does-not-exist.sock") + + x := newTestXTCP(t, "unixgram:"+missing) + // Override fatalf to capture instead of failing the test. + var captured string + x.fatalf = func(format string, args ...any) { + captured = fmt.Sprintf(format, args...) + } + x.InitDestUnixGram(context.Background()) + if captured == "" { + t.Fatalf("expected fatalf to be called for missing socket %q", missing) + } +} + +// TestDestUnix_MissingDaemon confirms InitDestUnix fails when nothing is +// listening on the path. +func TestDestUnix_MissingDaemon(t *testing.T) { + dir := t.TempDir() + missing := filepath.Join(dir, "no-daemon.sock") + + x := newTestXTCP(t, "unix:"+missing) + var captured string + x.fatalf = func(format string, args ...any) { + captured = fmt.Sprintf(format, args...) + } + x.InitDestUnix(context.Background()) + if captured == "" { + t.Fatalf("expected fatalf to be called for missing daemon at %q", missing) + } +} + +// Benchmarks. Each allocates a 256-byte payload (representative of an xtcp +// record) and runs b.N writes through the destination; a goroutine on the +// receiver side drains so the write side isn't blocked by kernel buffer +// saturation. b.SetBytes() reports per-record throughput. + +func benchDest(b *testing.B, setup func(t testing.TB, dir string) destSetupResult, fn func(*XTCP, context.Context, *[]byte) (int, error)) { + b.Helper() + + dir := b.TempDir() + tb := testingTB{TB: b} + s := setup(tb, dir) + defer s.cleanup() + + x := new(XTCP) + x.config = &xtcp_config.XtcpConfig{Dest: s.dest} + x.fatalf = func(format string, args ...any) { + b.Fatalf(format, args...) + } + reg := prometheus.NewRegistry() + x.pC = promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{Subsystem: "xtcp_bench", Name: "counts", Help: "bench counts"}, + []string{"function", "variable", "type"}, + ) + x.pH = promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{Subsystem: "xtcp_bench", Name: "histograms", Help: "bench histograms"}, + []string{"function", "variable", "type"}, + ) + + ctx := context.Background() + switch { + case bytes.HasPrefix([]byte(s.dest), []byte("udp:")): + x.InitDestUDP(ctx) + case bytes.HasPrefix([]byte(s.dest), []byte("unix:")): + x.InitDestUnix(ctx) + case bytes.HasPrefix([]byte(s.dest), []byte("unixgram:")): + x.InitDestUnixGram(ctx) + } + defer x.closeDestination() + + // Drain receiver in the background so the writer doesn't block on kernel + // buffer saturation. For "null" there's no recv. + stop := make(chan struct{}) + if s.recv != nil { + go func() { + for { + select { + case <-stop: + return + default: + } + _, _ = s.recv() + } + }() + } + defer close(stop) + + payload := make([]byte, 256) + for i := range payload { + payload[i] = byte(i) + } + + b.SetBytes(int64(len(payload))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + buf := append([]byte(nil), payload...) + if _, err := fn(x, ctx, &buf); err != nil { + b.Fatalf("write err: %v", err) + } + } +} + +// testingTB lets the setup helpers (which take *testing.T) be reused from +// benchmarks. testing.TB is the common interface. +type testingTB struct{ testing.TB } + +func (t testingTB) Helper() {} +func (t testingTB) TempDir() string { return t.TB.TempDir() } +func (t testingTB) Fatalf(format string, args ...any) { t.TB.Fatalf(format, args...) } + +// Adapt the *testing.T setup signatures to testing.TB. +func setupNullDestTB(t testing.TB, _ string) destSetupResult { + return destSetupResult{dest: "null:", recv: nil, cleanup: func() {}} +} +func setupUDPDestTB(t testing.TB, dir string) destSetupResult { + pc, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("ListenPacket udp: %v", err) + } + addr := pc.LocalAddr().String() + return destSetupResult{ + dest: "udp:" + addr, + recv: func() ([]byte, error) { + pc.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + buf := make([]byte, 1<<16) + n, _, err := pc.ReadFrom(buf) + if err != nil { + return nil, err + } + return buf[:n], nil + }, + cleanup: func() { pc.Close() }, + } +} +func setupUnixGramDestTB(t testing.TB, dir string) destSetupResult { + path := filepath.Join(dir, "ug.sock") + addr := &net.UnixAddr{Name: path, Net: "unixgram"} + conn, err := net.ListenUnixgram("unixgram", addr) + if err != nil { + t.Fatalf("ListenUnixgram %s: %v", path, err) + } + return destSetupResult{ + dest: "unixgram:" + path, + recv: func() ([]byte, error) { + conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + buf := make([]byte, 1<<16) + n, _, err := conn.ReadFromUnix(buf) + if err != nil { + return nil, err + } + return buf[:n], nil + }, + cleanup: func() { conn.Close() }, + } +} +func setupUnixDestTB(t testing.TB, dir string) destSetupResult { + path := filepath.Join(dir, "u.sock") + ln, err := net.Listen("unix", path) + if err != nil { + t.Fatalf("Listen unix %s: %v", path, err) + } + type connOrErr struct { + c net.Conn + err error + } + ch := make(chan connOrErr, 1) + go func() { + c, err := ln.Accept() + ch <- connOrErr{c, err} + }() + var ( + conn net.Conn + acceptOnce sync.Once + ) + getConn := func() net.Conn { + acceptOnce.Do(func() { + ce := <-ch + if ce.err != nil { + t.Fatalf("Accept: %v", ce.err) + } + conn = ce.c + }) + return conn + } + return destSetupResult{ + dest: "unix:" + path, + recv: func() ([]byte, error) { + c := getConn() + c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + br := newByteReader(c) + length, err := binary.ReadUvarint(br) + if err != nil { + return nil, err + } + payload := make([]byte, length) + if _, err := io.ReadFull(c, payload); err != nil { + return nil, err + } + return payload, nil + }, + cleanup: func() { + if conn != nil { + conn.Close() + } + ln.Close() + }, + } +} + +func BenchmarkDestNull(b *testing.B) { + benchDest(b, setupNullDestTB, (*XTCP).destNull) +} +func BenchmarkDestUDP(b *testing.B) { + benchDest(b, setupUDPDestTB, (*XTCP).destUDP) +} +func BenchmarkDestUnixGram(b *testing.B) { + benchDest(b, setupUnixGramDestTB, (*XTCP).destUnixGram) +} +func BenchmarkDestUnix(b *testing.B) { + benchDest(b, setupUnixDestTB, (*XTCP).destUnix) +} diff --git a/pkg/xtcp/init_destinations.go b/pkg/xtcp/init_destinations.go index b7fe820..73519f5 100644 --- a/pkg/xtcp/init_destinations.go +++ b/pkg/xtcp/init_destinations.go @@ -38,12 +38,14 @@ const ( var ( validDestinationsMap = map[string]bool{ - "null": true, - "kafka": true, - "nsq": true, - "udp": true, - "nats": true, - "valkey": true, + "null": true, + "kafka": true, + "nsq": true, + "udp": true, + "nats": true, + "valkey": true, + "unix": true, + "unixgram": true, } ) @@ -87,6 +89,12 @@ func (x *XTCP) InitDests(ctx context.Context, wg *sync.WaitGroup) { x.Destinations.Store("valkey", func(ctx context.Context, xtcpRecordBinary *[]byte) (n int, err error) { return x.destValKey(ctx, xtcpRecordBinary) }) + x.Destinations.Store("unix", func(ctx context.Context, xtcpRecordBinary *[]byte) (n int, err error) { + return x.destUnix(ctx, xtcpRecordBinary) + }) + x.Destinations.Store("unixgram", func(ctx context.Context, xtcpRecordBinary *[]byte) (n int, err error) { + return x.destUnixGram(ctx, xtcpRecordBinary) + }) f, ok := x.Destinations.Load(dest) if !ok { @@ -110,6 +118,12 @@ func (x *XTCP) InitDests(ctx context.Context, wg *sync.WaitGroup) { x.InitDestinations.Store("valkey", func(ctx context.Context) { x.InitDestValKey(ctx) }) + x.InitDestinations.Store("unix", func(ctx context.Context) { + x.InitDestUnix(ctx) + }) + x.InitDestinations.Store("unixgram", func(ctx context.Context) { + x.InitDestUnixGram(ctx) + }) if f, ok := x.InitDestinations.Load(dest); ok { f.(func(ctx context.Context))(ctx) @@ -489,3 +503,46 @@ func (x *XTCP) pingKafka(ctx context.Context) (err error) { } return err } + +// InitDestUnix dials a Unix stream socket where the daemon is listening. +// Fails loudly (x.fatalf) when nothing is listening on the path so the +// process doesn't silently drop records on startup. +func (x *XTCP) InitDestUnix(ctx context.Context) { + + path := strings.TrimPrefix(x.config.Dest, "unix:") + + if x.debugLevel > 10 { + log.Printf("InitDestUnix config.Dest:%s path:%s", x.config.Dest, path) + } + + conn, err := net.Dial("unix", path) + if err != nil { + x.fatalf("InitDestUnix net.Dial(unix, %q) err:%v", path, err) + return + } + x.unixConn = conn +} + +// InitDestUnixGram dials a Unix datagram socket. Because dialing unixgram +// does not verify the peer exists, we pre-check the socket file via +// os.Stat so that the "fail loudly at startup" contract is preserved. +func (x *XTCP) InitDestUnixGram(ctx context.Context) { + + path := strings.TrimPrefix(x.config.Dest, "unixgram:") + + if x.debugLevel > 10 { + log.Printf("InitDestUnixGram config.Dest:%s path:%s", x.config.Dest, path) + } + + if _, err := os.Stat(path); err != nil { + x.fatalf("InitDestUnixGram socket %q does not exist: %v", path, err) + return + } + + conn, err := net.Dial("unixgram", path) + if err != nil { + x.fatalf("InitDestUnixGram net.Dial(unixgram, %q) err:%v", path, err) + return + } + x.unixGramConn = conn +} diff --git a/pkg/xtcp/xtcp.go b/pkg/xtcp/xtcp.go index ea24b4a..e034cc3 100644 --- a/pkg/xtcp/xtcp.go +++ b/pkg/xtcp/xtcp.go @@ -4,6 +4,7 @@ import ( "context" "log" "net" + "strings" "sync" "sync/atomic" "time" @@ -97,9 +98,18 @@ type XTCP struct { schemaID int nsqProducer *nsq.Producer udpConn net.Conn + unixConn net.Conn + unixGramConn net.Conn natsClient *nats.Conn valKeyClient *redis.Client + // fatalf is the function used by InitDest* helpers to abort on startup + // errors. Defaults to log.Fatalf; tests override it with t.Fatalf so they + // can drive the init paths without taking down the process. Only the new + // InitDestUnix/InitDestUnixGram use this hook today; existing destinations + // still call log.Fatalf directly. + fatalf func(format string, args ...any) + flatRecordService *xtcpFlatRecordService configService *xtcpConfigService @@ -134,6 +144,7 @@ func NewXTCP(ctx context.Context, cancel context.CancelFunc, config *xtcp_config x.config = config x.debugLevel = x.config.DebugLevel + x.fatalf = log.Fatalf x.Init(ctx) @@ -146,6 +157,7 @@ func NewNsTestingXTCP(ctx context.Context, cancel context.CancelFunc, debugLevel x.ctx = ctx x.cancel = cancel + x.fatalf = log.Fatalf x.config = &xtcp_config.XtcpConfig{ NlTimeoutMilliseconds: 5000, @@ -244,12 +256,21 @@ func (x *XTCP) checkDoneNonBlocking(ctx context.Context) (netlinkerDone bool) { } func (x *XTCP) closeDestination() { - switch x.config.Dest { + scheme, _, _ := strings.Cut(x.config.Dest, ":") + switch scheme { case "kafka": x.kClient.Close() case "nsq": x.nsqProducer.Stop() case "udp": x.udpConn.Close() + case "unix": + if x.unixConn != nil { + x.unixConn.Close() + } + case "unixgram": + if x.unixGramConn != nil { + x.unixGramConn.Close() + } } } diff --git a/pkg/xtcp_config/xtcp_config.pb.go b/pkg/xtcp_config/xtcp_config.pb.go index 5c52c68..7c49592 100644 --- a/pkg/xtcp_config/xtcp_config.pb.go +++ b/pkg/xtcp_config/xtcp_config.pb.go @@ -10,7 +10,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 +// protoc-gen-go v1.36.11 // protoc (unknown) // source: xtcp_config/v1/xtcp_config.proto @@ -348,7 +348,11 @@ type XtcpConfig struct { MarshalTo string `protobuf:"bytes,120,opt,name=marshal_to,json=marshalTo,proto3" json:"marshal_to,omitempty"` // protobufListMarshal can optionally not length delimit ProtobufListLengthDelimit bool `protobuf:"varint,121,opt,name=protobuf_list_length_delimit,json=protobufListLengthDelimit,proto3" json:"protobuf_list_length_delimit,omitempty"` - // kafka:127.0.0.1:9092, udp:127.0.0.1:13000, or nsq:127.0.0.1:4150, or null: + // kafka:127.0.0.1:9092, udp:127.0.0.1:13000, nsq:127.0.0.1:4150, + // nats:nats://127.0.0.1:4222, valkey:127.0.0.1:6379, null:, + // unix:/path/to/sock (SOCK_STREAM, length-prefixed via varint), or + // unixgram:/path/to/sock (SOCK_DGRAM, one record per datagram). + // max_len 128 leaves room for unixgram: (9 bytes) + Linux sun_path (108 bytes). Dest string `protobuf:"bytes,130,opt,name=dest,proto3" json:"dest,omitempty"` // Write marhselled data to writeFiles number of files ( to allow debugging of the serialization ) // xtcp will capture this many examples of the marshalled data @@ -645,7 +649,7 @@ const file_xtcp_config_v1_xtcp_config_proto_rawDesc = "" + "\fpoll_timeout\x18\x1e \x01(\v2\x19.google.protobuf.DurationB\x11\xbaH\x0e\xc8\x01\x01\xaa\x01\b\"\x04\b\x80\xf5$2\x00R\vpollTimeout:s\xbaHp\x1an\n" + "\x0fXtcpConfig.poll\x122Poll timeout must be less than poll poll_frequency\x1a'this.poll_timeout < this.poll_frequency\"N\n" + "\x18SetPollFrequencyResponse\x122\n" + - "\x06config\x18\x01 \x01(\v2\x1a.xtcp_config.v1.XtcpConfigR\x06config\"\x83\f\n" + + "\x06config\x18\x01 \x01(\v2\x1a.xtcp_config.v1.XtcpConfigR\x06config\"\x84\f\n" + "\n" + "XtcpConfig\x12F\n" + "\x17nl_timeout_milliseconds\x18\n" + @@ -670,8 +674,9 @@ const file_xtcp_config_v1_xtcp_config_proto_rawDesc = "" + "\amodulus\x18n \x01(\x04B\x0e\xbaH\v\xc8\x01\x012\x06\x18\xc0\x84=(\x01R\amodulus\x12+\n" + "\n" + "marshal_to\x18x \x01(\tB\f\xbaH\t\xc8\x01\x01r\x04\x10\x04\x18(R\tmarshalTo\x12G\n" + - "\x1cprotobuf_list_length_delimit\x18y \x01(\bB\x06\xbaH\x03\xc8\x01\x00R\x19protobufListLengthDelimit\x12!\n" + - "\x04dest\x18\x82\x01 \x01(\tB\f\xbaH\t\xc8\x01\x01r\x04\x10\x04\x18(R\x04dest\x128\n" + + "\x1cprotobuf_list_length_delimit\x18y \x01(\bB\x06\xbaH\x03\xc8\x01\x00R\x19protobufListLengthDelimit\x12\"\n" + + "\x04dest\x18\x82\x01 \x01(\tB\r\xbaH\n" + + "\xc8\x01\x01r\x05\x10\x04\x18\x80\x01R\x04dest\x128\n" + "\x10dest_write_files\x18\x87\x01 \x01(\rB\r\xbaH\n" + "\xc8\x01\x00*\x05\x18\xe8\a(\x00R\x0edestWriteFiles\x12#\n" + "\x05topic\x18\x8c\x01 \x01(\tB\f\xbaH\t\xc8\x01\x00r\x04\x10\x01\x18(R\x05topic\x125\n" + diff --git a/pkg/xtcp_config/xtcp_config_grpc.pb.go b/pkg/xtcp_config/xtcp_config_grpc.pb.go index d508526..1056c5b 100644 --- a/pkg/xtcp_config/xtcp_config_grpc.pb.go +++ b/pkg/xtcp_config/xtcp_config_grpc.pb.go @@ -10,7 +10,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.2 // - protoc (unknown) // source: xtcp_config/v1/xtcp_config.proto @@ -99,13 +99,13 @@ type ConfigServiceServer interface { type UnimplementedConfigServiceServer struct{} func (UnimplementedConfigServiceServer) Get(context.Context, *GetRequest) (*GetResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Get not implemented") + return nil, status.Error(codes.Unimplemented, "method Get not implemented") } func (UnimplementedConfigServiceServer) Set(context.Context, *SetRequest) (*SetResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Set not implemented") + return nil, status.Error(codes.Unimplemented, "method Set not implemented") } func (UnimplementedConfigServiceServer) SetPollFrequency(context.Context, *SetPollFrequencyRequest) (*SetPollFrequencyResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method SetPollFrequency not implemented") + return nil, status.Error(codes.Unimplemented, "method SetPollFrequency not implemented") } func (UnimplementedConfigServiceServer) mustEmbedUnimplementedConfigServiceServer() {} func (UnimplementedConfigServiceServer) testEmbeddedByValue() {} @@ -118,7 +118,7 @@ type UnsafeConfigServiceServer interface { } func RegisterConfigServiceServer(s grpc.ServiceRegistrar, srv ConfigServiceServer) { - // If the following call pancis, it indicates UnimplementedConfigServiceServer was + // If the following call panics, it indicates UnimplementedConfigServiceServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/pkg/xtcp_flat_record/xtcp_flat_record.pb.go b/pkg/xtcp_flat_record/xtcp_flat_record.pb.go index 8f6174f..22d5264 100644 --- a/pkg/xtcp_flat_record/xtcp_flat_record.pb.go +++ b/pkg/xtcp_flat_record/xtcp_flat_record.pb.go @@ -19,7 +19,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 +// protoc-gen-go v1.36.11 // protoc (unknown) // source: xtcp_flat_record/v1/xtcp_flat_record.proto diff --git a/pkg/xtcp_flat_record/xtcp_flat_record_grpc.pb.go b/pkg/xtcp_flat_record/xtcp_flat_record_grpc.pb.go index 81e2aa4..6be729d 100644 --- a/pkg/xtcp_flat_record/xtcp_flat_record_grpc.pb.go +++ b/pkg/xtcp_flat_record/xtcp_flat_record_grpc.pb.go @@ -19,7 +19,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.2 // - protoc (unknown) // source: xtcp_flat_record/v1/xtcp_flat_record.proto @@ -111,10 +111,10 @@ type XTCPFlatRecordServiceServer interface { type UnimplementedXTCPFlatRecordServiceServer struct{} func (UnimplementedXTCPFlatRecordServiceServer) FlatRecords(*FlatRecordsRequest, grpc.ServerStreamingServer[FlatRecordsResponse]) error { - return status.Errorf(codes.Unimplemented, "method FlatRecords not implemented") + return status.Error(codes.Unimplemented, "method FlatRecords not implemented") } func (UnimplementedXTCPFlatRecordServiceServer) PollFlatRecords(grpc.BidiStreamingServer[PollFlatRecordsRequest, PollFlatRecordsResponse]) error { - return status.Errorf(codes.Unimplemented, "method PollFlatRecords not implemented") + return status.Error(codes.Unimplemented, "method PollFlatRecords not implemented") } func (UnimplementedXTCPFlatRecordServiceServer) mustEmbedUnimplementedXTCPFlatRecordServiceServer() {} func (UnimplementedXTCPFlatRecordServiceServer) testEmbeddedByValue() {} @@ -127,7 +127,7 @@ type UnsafeXTCPFlatRecordServiceServer interface { } func RegisterXTCPFlatRecordServiceServer(s grpc.ServiceRegistrar, srv XTCPFlatRecordServiceServer) { - // If the following call pancis, it indicates UnimplementedXTCPFlatRecordServiceServer was + // If the following call panics, it indicates UnimplementedXTCPFlatRecordServiceServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/proto/xtcp_config/v1/xtcp_config.proto b/proto/xtcp_config/v1/xtcp_config.proto index efca22b..11a1c79 100644 --- a/proto/xtcp_config/v1/xtcp_config.proto +++ b/proto/xtcp_config/v1/xtcp_config.proto @@ -254,12 +254,16 @@ message XtcpConfig { (buf.validate.field).required = false ]; - // kafka:127.0.0.1:9092, udp:127.0.0.1:13000, or nsq:127.0.0.1:4150, or null: + // kafka:127.0.0.1:9092, udp:127.0.0.1:13000, nsq:127.0.0.1:4150, + // nats:nats://127.0.0.1:4222, valkey:127.0.0.1:6379, null:, + // unix:/path/to/sock (SOCK_STREAM, length-prefixed via varint), or + // unixgram:/path/to/sock (SOCK_DGRAM, one record per datagram). + // max_len 128 leaves room for unixgram: (9 bytes) + Linux sun_path (108 bytes). string dest = 130 [ (buf.validate.field).required = true, (buf.validate.field).string = { min_len: 4, - max_len: 40, + max_len: 128, }]; // Write marhselled data to writeFiles number of files ( to allow debugging of the serialization ) diff --git a/python/xtcp_config/v1/xtcp_config_pb2.py b/python/xtcp_config/v1/xtcp_config_pb2.py index 2b628da..8f1e4d4 100644 --- a/python/xtcp_config/v1/xtcp_config_pb2.py +++ b/python/xtcp_config/v1/xtcp_config_pb2.py @@ -27,7 +27,7 @@ from buf.validate import validate_pb2 as buf_dot_validate_dot_validate__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n xtcp_config/v1/xtcp_config.proto\x12\x0extcp_config.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1cgoogle/api/annotations.proto\x1a\x1b\x62uf/validate/validate.proto\"\x0c\n\nGetRequest\"A\n\x0bGetResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"@\n\nSetRequest\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"A\n\x0bSetResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"\xb4\x02\n\x17SetPollFrequencyRequest\x12S\n\x0epoll_frequency\x18\x14 \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$2\x00\xc8\x01\x01R\rpollFrequency\x12O\n\x0cpoll_timeout\x18\x1e \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$2\x00\xc8\x01\x01R\x0bpollTimeout:s\xbaHp\x1an\n\x0fXtcpConfig.poll\x12\x32Poll timeout must be less than poll poll_frequency\x1a\'this.poll_timeout < this.poll_frequency\"N\n\x18SetPollFrequencyResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"\x83\x0c\n\nXtcpConfig\x12\x46\n\x17nl_timeout_milliseconds\x18\n \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xa0\x8d\x06(\x00\xc8\x01\x01R\x15nlTimeoutMilliseconds\x12S\n\x0epoll_frequency\x18\x14 \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$*\x00\xc8\x01\x01R\rpollFrequency\x12O\n\x0cpoll_timeout\x18\x1e \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$*\x00\xc8\x01\x01R\x0bpollTimeout\x12+\n\tmax_loops\x18( \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xa0\x8d\x06(\x00\xc8\x01\x00R\x08maxLoops\x12,\n\nnetlinkers\x18\x32 \x01(\rB\x0c\xbaH\t*\x04\x18\x64(\x01\xc8\x01\x01R\nnetlinkers\x12H\n\x19netlinkers_done_chan_size\x18\x33 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x01\xc8\x01\x01R\x16netlinkersDoneChanSize\x12*\n\tnlmsg_seq\x18< \x01(\rB\r\xbaH\n*\x05\x18\x90N(\x00\xc8\x01\x01R\x08nlmsgSeq\x12/\n\x0bpacket_size\x18\x46 \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xc0\x84=(\x00\xc8\x01\x00R\npacketSize\x12\x36\n\x10packet_size_mply\x18P \x01(\rB\x0c\xbaH\t*\x04\x18\x64(\x00\xc8\x01\x00R\x0epacketSizeMply\x12.\n\x0bwrite_files\x18Z \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x00R\nwriteFiles\x12/\n\x0c\x63\x61pture_path\x18\x64 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18P\xc8\x01\x00R\x0b\x63\x61pturePath\x12(\n\x07modulus\x18n \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xc0\x84=(\x01\xc8\x01\x01R\x07modulus\x12+\n\nmarshal_to\x18x \x01(\tB\x0c\xbaH\tr\x04\x10\x04\x18(\xc8\x01\x01R\tmarshalTo\x12G\n\x1cprotobuf_list_length_delimit\x18y \x01(\x08\x42\x06\xbaH\x03\xc8\x01\x00R\x19protobufListLengthDelimit\x12!\n\x04\x64\x65st\x18\x82\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x04\x18(\xc8\x01\x01R\x04\x64\x65st\x12\x38\n\x10\x64\x65st_write_files\x18\x87\x01 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x00R\x0e\x64\x65stWriteFiles\x12#\n\x05topic\x18\x8c\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18(\xc8\x01\x00R\x05topic\x12\x35\n\x0fxtcp_proto_file\x18\x8f\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18P\xc8\x01\x00R\rxtcpProtoFile\x12\x37\n\x10kafka_schema_url\x18\x91\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18<\xc8\x01\x00R\x0ekafkaSchemaUrl\x12`\n\x15kafka_produce_timeout\x18\x96\x01 \x01(\x0b\x32\x19.google.protobuf.DurationB\x10\xbaH\r\xaa\x01\x07\"\x03\x08\xd8\x04\x32\x00\xc8\x01\x00R\x13kafkaProduceTimeout\x12/\n\x0b\x64\x65\x62ug_level\x18\xa0\x01 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x01R\ndebugLevel\x12!\n\x05label\x18\xaa\x01 \x01(\tB\n\xbaH\x07r\x02\x18(\xc8\x01\x00R\x05label\x12\x1d\n\x03tag\x18\xb4\x01 \x01(\tB\n\xbaH\x07r\x02\x18(\xc8\x01\x00R\x03tag\x12,\n\tgrpc_port\x18\xbe\x01 \x01(\rB\x0e\xbaH\x0b*\x06\x18\xff\xff\x03(\x01\xc8\x01\x01R\x08grpcPort\x12\x62\n\x15\x65nabled_deserializers\x18\xc8\x01 \x01(\x0b\x32$.xtcp_config.v1.EnabledDeserializersB\x06\xbaH\x03\xc8\x01\x00R\x14\x65nabledDeserializers:s\xbaHp\x1an\n\x0fXtcpConfig.poll\x12\x32Poll timeout must be less than poll poll_frequency\x1a\'this.poll_frequency > this.poll_timeout\"\x9f\x01\n\x14\x45nabledDeserializers\x12K\n\x07\x65nabled\x18\x01 \x03(\x0b\x32\x31.xtcp_config.v1.EnabledDeserializers.EnabledEntryR\x07\x65nabled\x1a:\n\x0c\x45nabledEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\x08R\x05value:\x02\x38\x01\x32\xe1\x02\n\rConfigService\x12]\n\x03Get\x12\x1a.xtcp_config.v1.GetRequest\x1a\x1b.xtcp_config.v1.GetResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x1a\x12/ConfigService/Get:\x01*\x12]\n\x03Set\x12\x1a.xtcp_config.v1.SetRequest\x1a\x1b.xtcp_config.v1.SetResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x1a\x12/ConfigService/Set:\x01*\x12\x91\x01\n\x10SetPollFrequency\x12\'.xtcp_config.v1.SetPollFrequencyRequest\x1a(.xtcp_config.v1.SetPollFrequencyResponse\"*\x82\xd3\xe4\x93\x02$\x1a\x1f/ConfigService/SetPollFrequency:\x01*B\x8d\x01\n\x12\x63om.xtcp_config.v1B\x0fXtcpConfigProtoP\x01Z\x11./pkg/xtcp_config\xa2\x02\x03XXX\xaa\x02\rXtcpConfig.V1\xca\x02\rXtcpConfig\\V1\xe2\x02\x19XtcpConfig\\V1\\GPBMetadata\xea\x02\x0eXtcpConfig::V1b\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n xtcp_config/v1/xtcp_config.proto\x12\x0extcp_config.v1\x1a\x1egoogle/protobuf/duration.proto\x1a\x1cgoogle/api/annotations.proto\x1a\x1b\x62uf/validate/validate.proto\"\x0c\n\nGetRequest\"A\n\x0bGetResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"@\n\nSetRequest\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"A\n\x0bSetResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"\xb4\x02\n\x17SetPollFrequencyRequest\x12S\n\x0epoll_frequency\x18\x14 \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$2\x00\xc8\x01\x01R\rpollFrequency\x12O\n\x0cpoll_timeout\x18\x1e \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$2\x00\xc8\x01\x01R\x0bpollTimeout:s\xbaHp\x1an\n\x0fXtcpConfig.poll\x12\x32Poll timeout must be less than poll poll_frequency\x1a\'this.poll_timeout < this.poll_frequency\"N\n\x18SetPollFrequencyResponse\x12\x32\n\x06\x63onfig\x18\x01 \x01(\x0b\x32\x1a.xtcp_config.v1.XtcpConfigR\x06\x63onfig\"\x84\x0c\n\nXtcpConfig\x12\x46\n\x17nl_timeout_milliseconds\x18\n \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xa0\x8d\x06(\x00\xc8\x01\x01R\x15nlTimeoutMilliseconds\x12S\n\x0epoll_frequency\x18\x14 \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$*\x00\xc8\x01\x01R\rpollFrequency\x12O\n\x0cpoll_timeout\x18\x1e \x01(\x0b\x32\x19.google.protobuf.DurationB\x11\xbaH\x0e\xaa\x01\x08\"\x04\x08\x80\xf5$*\x00\xc8\x01\x01R\x0bpollTimeout\x12+\n\tmax_loops\x18( \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xa0\x8d\x06(\x00\xc8\x01\x00R\x08maxLoops\x12,\n\nnetlinkers\x18\x32 \x01(\rB\x0c\xbaH\t*\x04\x18\x64(\x01\xc8\x01\x01R\nnetlinkers\x12H\n\x19netlinkers_done_chan_size\x18\x33 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x01\xc8\x01\x01R\x16netlinkersDoneChanSize\x12*\n\tnlmsg_seq\x18< \x01(\rB\r\xbaH\n*\x05\x18\x90N(\x00\xc8\x01\x01R\x08nlmsgSeq\x12/\n\x0bpacket_size\x18\x46 \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xc0\x84=(\x00\xc8\x01\x00R\npacketSize\x12\x36\n\x10packet_size_mply\x18P \x01(\rB\x0c\xbaH\t*\x04\x18\x64(\x00\xc8\x01\x00R\x0epacketSizeMply\x12.\n\x0bwrite_files\x18Z \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x00R\nwriteFiles\x12/\n\x0c\x63\x61pture_path\x18\x64 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18P\xc8\x01\x00R\x0b\x63\x61pturePath\x12(\n\x07modulus\x18n \x01(\x04\x42\x0e\xbaH\x0b\x32\x06\x18\xc0\x84=(\x01\xc8\x01\x01R\x07modulus\x12+\n\nmarshal_to\x18x \x01(\tB\x0c\xbaH\tr\x04\x10\x04\x18(\xc8\x01\x01R\tmarshalTo\x12G\n\x1cprotobuf_list_length_delimit\x18y \x01(\x08\x42\x06\xbaH\x03\xc8\x01\x00R\x19protobufListLengthDelimit\x12\"\n\x04\x64\x65st\x18\x82\x01 \x01(\tB\r\xbaH\nr\x05\x10\x04\x18\x80\x01\xc8\x01\x01R\x04\x64\x65st\x12\x38\n\x10\x64\x65st_write_files\x18\x87\x01 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x00R\x0e\x64\x65stWriteFiles\x12#\n\x05topic\x18\x8c\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18(\xc8\x01\x00R\x05topic\x12\x35\n\x0fxtcp_proto_file\x18\x8f\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18P\xc8\x01\x00R\rxtcpProtoFile\x12\x37\n\x10kafka_schema_url\x18\x91\x01 \x01(\tB\x0c\xbaH\tr\x04\x10\x01\x18<\xc8\x01\x00R\x0ekafkaSchemaUrl\x12`\n\x15kafka_produce_timeout\x18\x96\x01 \x01(\x0b\x32\x19.google.protobuf.DurationB\x10\xbaH\r\xaa\x01\x07\"\x03\x08\xd8\x04\x32\x00\xc8\x01\x00R\x13kafkaProduceTimeout\x12/\n\x0b\x64\x65\x62ug_level\x18\xa0\x01 \x01(\rB\r\xbaH\n*\x05\x18\xe8\x07(\x00\xc8\x01\x01R\ndebugLevel\x12!\n\x05label\x18\xaa\x01 \x01(\tB\n\xbaH\x07r\x02\x18(\xc8\x01\x00R\x05label\x12\x1d\n\x03tag\x18\xb4\x01 \x01(\tB\n\xbaH\x07r\x02\x18(\xc8\x01\x00R\x03tag\x12,\n\tgrpc_port\x18\xbe\x01 \x01(\rB\x0e\xbaH\x0b*\x06\x18\xff\xff\x03(\x01\xc8\x01\x01R\x08grpcPort\x12\x62\n\x15\x65nabled_deserializers\x18\xc8\x01 \x01(\x0b\x32$.xtcp_config.v1.EnabledDeserializersB\x06\xbaH\x03\xc8\x01\x00R\x14\x65nabledDeserializers:s\xbaHp\x1an\n\x0fXtcpConfig.poll\x12\x32Poll timeout must be less than poll poll_frequency\x1a\'this.poll_frequency > this.poll_timeout\"\x9f\x01\n\x14\x45nabledDeserializers\x12K\n\x07\x65nabled\x18\x01 \x03(\x0b\x32\x31.xtcp_config.v1.EnabledDeserializers.EnabledEntryR\x07\x65nabled\x1a:\n\x0c\x45nabledEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\x08R\x05value:\x02\x38\x01\x32\xe1\x02\n\rConfigService\x12]\n\x03Get\x12\x1a.xtcp_config.v1.GetRequest\x1a\x1b.xtcp_config.v1.GetResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x1a\x12/ConfigService/Get:\x01*\x12]\n\x03Set\x12\x1a.xtcp_config.v1.SetRequest\x1a\x1b.xtcp_config.v1.SetResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x1a\x12/ConfigService/Set:\x01*\x12\x91\x01\n\x10SetPollFrequency\x12\'.xtcp_config.v1.SetPollFrequencyRequest\x1a(.xtcp_config.v1.SetPollFrequencyResponse\"*\x82\xd3\xe4\x93\x02$\x1a\x1f/ConfigService/SetPollFrequency:\x01*B\x8d\x01\n\x12\x63om.xtcp_config.v1B\x0fXtcpConfigProtoP\x01Z\x11./pkg/xtcp_config\xa2\x02\x03XXX\xaa\x02\rXtcpConfig.V1\xca\x02\rXtcpConfig\\V1\xe2\x02\x19XtcpConfig\\V1\\GPBMetadata\xea\x02\x0eXtcpConfig::V1b\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -70,7 +70,7 @@ _globals['_XTCPCONFIG'].fields_by_name['protobuf_list_length_delimit']._loaded_options = None _globals['_XTCPCONFIG'].fields_by_name['protobuf_list_length_delimit']._serialized_options = b'\272H\003\310\001\000' _globals['_XTCPCONFIG'].fields_by_name['dest']._loaded_options = None - _globals['_XTCPCONFIG'].fields_by_name['dest']._serialized_options = b'\272H\tr\004\020\004\030(\310\001\001' + _globals['_XTCPCONFIG'].fields_by_name['dest']._serialized_options = b'\272H\nr\005\020\004\030\200\001\310\001\001' _globals['_XTCPCONFIG'].fields_by_name['dest_write_files']._loaded_options = None _globals['_XTCPCONFIG'].fields_by_name['dest_write_files']._serialized_options = b'\272H\n*\005\030\350\007(\000\310\001\000' _globals['_XTCPCONFIG'].fields_by_name['topic']._loaded_options = None @@ -114,11 +114,11 @@ _globals['_SETPOLLFREQUENCYRESPONSE']._serialized_start=668 _globals['_SETPOLLFREQUENCYRESPONSE']._serialized_end=746 _globals['_XTCPCONFIG']._serialized_start=749 - _globals['_XTCPCONFIG']._serialized_end=2288 - _globals['_ENABLEDDESERIALIZERS']._serialized_start=2291 - _globals['_ENABLEDDESERIALIZERS']._serialized_end=2450 - _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._serialized_start=2392 - _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._serialized_end=2450 - _globals['_CONFIGSERVICE']._serialized_start=2453 - _globals['_CONFIGSERVICE']._serialized_end=2806 + _globals['_XTCPCONFIG']._serialized_end=2289 + _globals['_ENABLEDDESERIALIZERS']._serialized_start=2292 + _globals['_ENABLEDDESERIALIZERS']._serialized_end=2451 + _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._serialized_start=2393 + _globals['_ENABLEDDESERIALIZERS_ENABLEDENTRY']._serialized_end=2451 + _globals['_CONFIGSERVICE']._serialized_start=2454 + _globals['_CONFIGSERVICE']._serialized_end=2807 # @@protoc_insertion_point(module_scope) diff --git a/xtcp_config/v1/xtcp_config.swagger.json b/xtcp_config/v1/xtcp_config.swagger.json index da10ee2..dd782f6 100644 --- a/xtcp_config/v1/xtcp_config.swagger.json +++ b/xtcp_config/v1/xtcp_config.swagger.json @@ -272,7 +272,7 @@ }, "dest": { "type": "string", - "title": "kafka:127.0.0.1:9092, udp:127.0.0.1:13000, or nsq:127.0.0.1:4150, or null:" + "description": "kafka:127.0.0.1:9092, udp:127.0.0.1:13000, nsq:127.0.0.1:4150,\nnats:nats://127.0.0.1:4222, valkey:127.0.0.1:6379, null:,\nunix:/path/to/sock (SOCK_STREAM, length-prefixed via varint), or\nunixgram:/path/to/sock (SOCK_DGRAM, one record per datagram).\nmax_len 128 leaves room for unixgram: (9 bytes) + Linux sun_path (108 bytes)." }, "destWriteFiles": { "type": "integer", From 25fb71b8aa75eb4109adcb99e6c44ef081b09928 Mon Sep 17 00:00:00 2001 From: "randomizedcoder dave.seddon.ca@gmail.com" Date: Wed, 3 Jun 2026 14:29:01 -0700 Subject: [PATCH 2/2] pkg/xtcp: destUnix uses net.Buffers to coalesce header + payload into one writev MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit destUnix was issuing two sequential net.Conn.Write calls — first the varint length header, then the payload. If the header write succeeded but the payload write failed midway (peer disconnect, EPIPE during a backpressure event, etc.) the receiver was left with a varint length prefix promising N bytes followed by fewer than N bytes of data. That shape is unrecoverable from the daemon-side reader's binary.ReadUvarint + io.ReadFull pattern: the next reader pass would read garbage into the next-frame header position and never recover. Switch to net.Buffers{hdr, payload}.WriteTo(conn). Go's stdlib lowers this to a single writev(2) on *net.UnixConn — header and payload land atomically or fail as a unit. Same on-wire shape, same per-record accounting, half the syscalls in the happy path. Removes the TODO that explicitly called this out. Tested: - go build ./pkg/xtcp/... clean - go vet ./pkg/xtcp/... clean - go test -race -run 'TestDestinations|TestDestUnix|TestDestUnixGram' ./pkg/xtcp/ PASS Co-Authored-By: Claude Opus 4.7 --- pkg/xtcp/destinations.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/xtcp/destinations.go b/pkg/xtcp/destinations.go index 5b548cd..7dcbf4f 100644 --- a/pkg/xtcp/destinations.go +++ b/pkg/xtcp/destinations.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "log" + "net" "time" "github.com/twmb/franz-go/pkg/kgo" @@ -279,32 +280,32 @@ func (x *XTCP) destUnixGram(_ context.Context, xtcpRecordBinary *[]byte) (n int, // Daemon-side: read the varint via binary.ReadUvarint, then exactly that // many payload bytes via io.ReadFull. // -// TODO: coalesce the header and payload into a single net.Buffers write if -// profiling shows the two syscalls matter; reconnect on persistent failure. +// Header and payload are written through a net.Buffers, which the standard +// library lowers to a single writev(2) on a *net.UnixConn. That keeps the +// frame atomic on the wire: a partial-write failure can't leave a varint +// header on the receiver without its payload, which would otherwise wedge +// the receiver's binary.ReadUvarint + io.ReadFull recovery loop. +// +// TODO: reconnect on persistent write failure (currently dial-once, fail- +// loudly at startup; runtime errors are logged and the next record is +// attempted). func (x *XTCP) destUnix(_ context.Context, xtcpRecordBinary *[]byte) (n int, err error) { var hdr [binary.MaxVarintLen64]byte hdrLen := binary.PutUvarint(hdr[:], uint64(len(*xtcpRecordBinary))) - if _, err = x.unixConn.Write(hdr[:hdrLen]); err != nil { - x.pC.WithLabelValues("destUnix", "Write", "error").Inc() - if x.debugLevel > 100 { - log.Printf("destUnix header Write err:%v", err) - } - return 0, err - } - - written, err := x.unixConn.Write(*xtcpRecordBinary) + bufs := net.Buffers{hdr[:hdrLen], *xtcpRecordBinary} + written, err := bufs.WriteTo(x.unixConn) if err != nil { x.pC.WithLabelValues("destUnix", "Write", "error").Inc() if x.debugLevel > 100 { - log.Printf("destUnix payload Write err:%v", err) + log.Printf("destUnix WriteTo err:%v written:%d", err, written) } return 0, err } x.pC.WithLabelValues("destUnix", "Writes", "count").Inc() - x.pC.WithLabelValues("destUnix", "WriteBytes", "count").Add(float64(hdrLen + written)) + x.pC.WithLabelValues("destUnix", "WriteBytes", "count").Add(float64(written)) return 1, nil }