Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dart/xtcp_config/v1/xtcp_config.pb.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions dart/xtcp_config/v1/xtcp_config.pbjson.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 37 additions & 37 deletions gen/xtcp_config/v1/xtcp_config.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/clickhouse_protolist/clickhouse_protolist.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 72 additions & 0 deletions pkg/xtcp/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ package xtcp

import (
"context"
"encoding/binary"
"log"
"net"
"time"

"github.com/twmb/franz-go/pkg/kgo"
)

// Destination functions are invoked from a single deserializer goroutine in
// production. The implementations below assume serial access to their
// underlying net.Conn / client field. Concurrent callers are not supported
// without adding a mutex.

// destNull sends the protobuf to nowhere!
func (x *XTCP) destNull(_ context.Context, xtcpRecordBinary *[]byte) (n int, err error) {

Expand Down Expand Up @@ -237,3 +244,68 @@ func (x *XTCP) destValKey(ctx context.Context, xtcpRecordBinary *[]byte) (n int,

return 1, err
}

// destUnixGram sends the protobuf record to a Unix datagram socket.
// One Write == one datagram == one record; no framing is required because
// the kernel preserves message boundaries. Records exceeding SO_SNDBUF
// (≈208 KB on Linux by default) fail with EMSGSIZE; xtcp records today
// are well below that.
//
// TODO: reconnect on persistent write failure (currently dial-once, fail-
// loudly at startup; runtime errors are logged and the next record is
// attempted).
func (x *XTCP) destUnixGram(_ context.Context, xtcpRecordBinary *[]byte) (n int, err error) {

written, err := x.unixGramConn.Write(*xtcpRecordBinary)
if err != nil {
x.pC.WithLabelValues("destUnixGram", "Write", "error").Inc()
if x.debugLevel > 100 {
log.Printf("destUnixGram Write err:%v", err)
}
return 0, err
}

x.pC.WithLabelValues("destUnixGram", "Writes", "count").Inc()
x.pC.WithLabelValues("destUnixGram", "WriteBytes", "count").Add(float64(written))

return 1, nil
}

// destUnix sends the protobuf record to a Unix stream socket, framed with
// a varint length prefix so the daemon reader can recover record
// boundaries. Wire format per record:
//
// [varint(len(payload))] [payload bytes...]
//
// Daemon-side: read the varint via binary.ReadUvarint, then exactly that
// many payload bytes via io.ReadFull.
//
// Header and payload are written through a net.Buffers, which the standard
// library lowers to a single writev(2) on a *net.UnixConn. That keeps the
// frame atomic on the wire: a partial-write failure can't leave a varint
// header on the receiver without its payload, which would otherwise wedge
// the receiver's binary.ReadUvarint + io.ReadFull recovery loop.
//
// TODO: reconnect on persistent write failure (currently dial-once, fail-
// loudly at startup; runtime errors are logged and the next record is
// attempted).
func (x *XTCP) destUnix(_ context.Context, xtcpRecordBinary *[]byte) (n int, err error) {

var hdr [binary.MaxVarintLen64]byte
hdrLen := binary.PutUvarint(hdr[:], uint64(len(*xtcpRecordBinary)))

bufs := net.Buffers{hdr[:hdrLen], *xtcpRecordBinary}
written, err := bufs.WriteTo(x.unixConn)
if err != nil {
x.pC.WithLabelValues("destUnix", "Write", "error").Inc()
if x.debugLevel > 100 {
log.Printf("destUnix WriteTo err:%v written:%d", err, written)
}
return 0, err
}

x.pC.WithLabelValues("destUnix", "Writes", "count").Inc()
x.pC.WithLabelValues("destUnix", "WriteBytes", "count").Add(float64(written))

return 1, nil
}
Loading