Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1472e7a
P2.1: destinations_kafka tests (gated by dest_kafka build tag)
randomizedcoder May 20, 2026
de65897
P2.2: destinations_nats tests (gated by dest_nats build tag)
randomizedcoder May 20, 2026
84ffde1
P2.3: destinations_nsq tests (gated by dest_nsq build tag)
randomizedcoder May 20, 2026
dbfad17
P2.4: destinations_valkey tests (gated by dest_valkey build tag)
randomizedcoder May 20, 2026
ebb58fb
P3: coverage ratchet in quality-report (exit code 3 on >0.5% drop)
randomizedcoder May 20, 2026
53a0a3b
quality-report: fix orchestrator exit-3 handling + lower baseline
randomizedcoder May 20, 2026
ef73716
docs: refresh quality-report.md after the Nix-target wave
randomizedcoder May 20, 2026
7d9c3af
L1: gofmt sweep across new test files
randomizedcoder May 20, 2026
e4c5ec0
L2: misspell sweep (behaviour→behavior, cancelled→canceled)
randomizedcoder May 20, 2026
4508411
L3: remaining lint fixes (unlambda, exhaustive, ST1011, SA4004, etc.)
randomizedcoder May 20, 2026
74ef58e
C1: cmd/xtcp2client coverage 87.9% → 91.5% (above 90% threshold)
randomizedcoder May 20, 2026
d380503
C2: cmd/clickhouse_protobuflist coverage 86.4% → 93.2%
randomizedcoder May 20, 2026
c003d3b
C3: cmd/kafka_to_clickhouse coverage 85.4% → 90.7%
randomizedcoder May 20, 2026
8dbb874
L4: clean up remaining 5 non-coverage lint findings
randomizedcoder May 20, 2026
a0a243c
docs: refresh quality-report.md after L1-L4 + C1-C3
randomizedcoder May 20, 2026
b43be96
D1: kafkaDest producer seam + Send/Close/ping tests
randomizedcoder May 20, 2026
b6bcf18
D2: natsDest publisher seam + Send/Close tests
randomizedcoder May 20, 2026
80d57bb
D3: nsqDest producer seam + Send/Close tests
randomizedcoder May 20, 2026
c83c1fc
D4: valkeyDest publisher seam + Send/Close tests
randomizedcoder May 20, 2026
55ba213
D5: cmd/xtcp2_kafka_client fetcher seam + EachRecord-closure tests
randomizedcoder May 20, 2026
7e56c3b
D6: tools/kafka_topic_reader fetcher seam + EachRecord-closure tests
randomizedcoder May 20, 2026
553d54b
D7: newKafkaDest factory seam + constructor happy-path tests
randomizedcoder May 20, 2026
fae1f4f
D8: newValKeyDest factory seam + constructor happy/err tests
randomizedcoder May 20, 2026
371760c
D9: newNATSDest + newNSQDest factory seams + constructor tests
randomizedcoder May 20, 2026
ad95337
D10: netlinkerSyscall loop test via socketpair fixture
randomizedcoder May 20, 2026
e9b003b
docs: refresh report after D1-D10 — overall 87.2% → 90.3%
randomizedcoder May 20, 2026
2b59573
gofmt: format dest_kafka/dest_valkey gated test files (pulled forward)
randomizedcoder Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/clickhouse_protobuflist/clickhouse_protobuflist.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ import (
"google.golang.org/protobuf/proto"
)

// marshalFn is the proto-marshaller seam encodeLengthDelimitedProtobufList
// uses. Defaults to proto.Marshal; tests can swap in a deliberately
// failing marshaller to exercise the error-return branch (which is
// unreachable with proto.Marshal on a valid struct). Production code
// never mutates this.
var marshalFn = proto.Marshal

func encodeLengthDelimitedProtobufList(r *clickhouse_protolist.Envelope_Record) (result []byte, err error) {

// for _, record := range e.Rows {
Expand All @@ -20,7 +27,7 @@ func encodeLengthDelimitedProtobufList(r *clickhouse_protolist.Envelope_Record)
// return nil, fmt.Errorf("error marshaling Record: %v", err)
// }

recordBytes, err := proto.Marshal(r)
recordBytes, err := marshalFn(r)
if err != nil {
return nil, fmt.Errorf("error marshaling Record: %v", err)
}
Expand Down
44 changes: 44 additions & 0 deletions cmd/clickhouse_protobuflist/clickhouse_protobuflist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package main

import (
"bytes"
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"github.com/randomizedcoder/xtcp2/pkg/clickhouse_protolist"
"google.golang.org/protobuf/proto"
)

func TestEncodeLengthDelimitedProtobufList(t *testing.T) {
Expand Down Expand Up @@ -106,3 +108,45 @@ func TestRunMain_writeEnvelopeError(t *testing.T) {
t.Errorf("rc = %d, want 1", rc)
}
}

// TestEncodeLengthDelimitedProtobufList_marshalErr swaps the
// marshalFn seam for a failing fake to cover the proto-marshal
// error-return branch (unreachable from production where proto.Marshal
// can't fail on this struct).
func TestEncodeLengthDelimitedProtobufList_marshalErr(t *testing.T) {
orig := marshalFn
marshalFn = func(_ proto.Message) ([]byte, error) {
return nil, fmt.Errorf("synthetic marshal err")
}
defer func() { marshalFn = orig }()

_, err := encodeLengthDelimitedProtobufList(&clickhouse_protolist.Envelope_Record{MyUint32: 1})
if err == nil {
t.Fatal("expected err from failing marshaller")
}
if !strings.Contains(err.Error(), "error marshaling Record") {
t.Errorf("err = %q, want substring 'error marshaling Record'", err)
}
}

// TestRunMain_encodeError exercises runMain's error-handling branch
// when encodeLengthDelimitedProtobufList fails. rc=1 means the
// encode-error branch fired.
func TestRunMain_encodeError(t *testing.T) {
orig := marshalFn
marshalFn = func(_ proto.Message) ([]byte, error) {
return nil, fmt.Errorf("synthetic")
}
defer func() { marshalFn = orig }()

dir := t.TempDir()
out := filepath.Join(dir, "out.bin")
var stderr bytes.Buffer
rc := runMain([]string{"-filename", out}, &bytes.Buffer{}, &stderr)
if rc != 1 {
t.Errorf("rc = %d, want 1 (encode error)", rc)
}
if !strings.Contains(stderr.String(), "Error encoding") {
t.Errorf("stderr = %q, want substring 'Error encoding'", stderr.String())
}
}
5 changes: 4 additions & 1 deletion cmd/kafka_to_clickhouse/kafka_to_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ func runMain(ctx context.Context, args []string, stdout, stderr io.Writer) int {
// bounded by a 5s timeout so a wedged broker doesn't block
// shutdown indefinitely.
defer func() {
flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// Derive flushCtx from the caller's ctx so cancellation
// propagates correctly; cap at 5s so a wedged broker
// doesn't block teardown indefinitely.
flushCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := kClient.Flush(flushCtx); err != nil {
fmt.Fprintf(stderr, "kgo Flush on exit: %v\n", err)
Expand Down
31 changes: 31 additions & 0 deletions cmd/kafka_to_clickhouse/kafka_to_clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,37 @@ func TestRunMain_fileMode(t *testing.T) {
}
}

// TestRunMain_kafkaMode drives runMain through the kafka=true branch
// using a bogus broker + a short-lived ctx so destKafka's wg.Wait
// (which blocks on Produce's callback) is unblocked by ctx
// cancellation rather than waiting for franz-go's connection retries.
// Covers the `if c.kafka { InitDestKafka + defer flush+close }` block
// inside runMain that the file-mode test skips.
func TestRunMain_kafkaMode(t *testing.T) {
prevClient := kClient
t.Cleanup(func() {
if kClient != prevClient && kClient != nil {
kClient.Close()
}
kClient = prevClient
})
ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second)
defer cancel()
dir := t.TempDir()
out := filepath.Join(dir, "out.bin")
var stderr bytes.Buffer
rc := runMain(ctx, []string{
"-filename", out, "-values", "1",
"-kafka=true", "-broker", "127.0.0.1:1", // refused
"-loops", "1", "-loopsSleep", "1ms",
}, &bytes.Buffer{}, &stderr)
// rc=0 — runMain swallows broker errors; the assertion is just
// "does not hang, returns cleanly".
if rc != 0 {
t.Errorf("rc = %d, want 0 (kafka mode w/ unreachable broker)", rc)
}
}

func TestInitDestKafka_noopWhenDisabled(t *testing.T) {
// kafka=false → InitDestKafka short-circuits without trying to create
// a client.
Expand Down
10 changes: 9 additions & 1 deletion cmd/xtcp2_kafka_client/xtcp2_kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,17 @@ func runMain(ctx context.Context, args []string, stderr io.Writer) int {
return 0
}

// kafkaFetcher is the surface pollLoop needs from a Kafka consumer
// client. Lifting it to an interface lets tests drive pollLoop's
// happy-path EachRecord closure without a real broker. *kgo.Client
// satisfies this interface.
type kafkaFetcher interface {
PollFetches(ctx context.Context) kgo.Fetches
}

// pollLoop is the Kafka consume body. Extracted so test code can call it
// against a fake client (with a pre-canceled ctx for a quick exit).
func pollLoop(ctx context.Context, cl *kgo.Client) {
func pollLoop(ctx context.Context, cl kafkaFetcher) {
for i := 0; ; i++ {
select {
case <-ctx.Done():
Expand Down
96 changes: 96 additions & 0 deletions cmd/xtcp2_kafka_client/xtcp2_kafka_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,99 @@ func TestPollLoop_fetchErrorsThenCancel(t *testing.T) {
t.Fatal("pollLoop did not exit after cancel")
}
}

// fakeFetcher implements the kafkaFetcher interface so pollLoop can be
// driven with synthetic records — exercises the EachRecord closure
// body that broker-bound tests can't reach without real kafka.
type fakeFetcher struct {
fetches []kgo.Fetches
calls int
onCancel context.CancelFunc
}

func (f *fakeFetcher) PollFetches(_ context.Context) kgo.Fetches {
f.calls++
if f.calls > len(f.fetches) {
if f.onCancel != nil {
f.onCancel()
}
return kgo.Fetches{}
}
return f.fetches[f.calls-1]
}

func makeFetchWithRecord(value []byte) kgo.Fetches {
return kgo.Fetches{
{
Topics: []kgo.FetchTopic{
{
Topic: "test-topic",
Partitions: []kgo.FetchPartition{
{Records: []*kgo.Record{{Value: value}}},
},
},
},
},
}
}

// TestPollLoop_eachRecordClosureFires drives pollLoop with one
// synthetic Fetch containing a single valid Confluent-framed record.
// The EachRecord closure (processRecord call) fires, then the second
// PollFetches call signals the fake to cancel ctx.
func TestPollLoop_eachRecordClosureFires(t *testing.T) {
value := make([]byte, KafkaHeaderSizeCst+1)
value[0] = 0x00 // magic
// schemaID bytes [1:5] = 0; length byte [5] = 0 → empty proto
// envelope; processRecord parses it successfully.

ctx, cancel := context.WithCancel(t.Context())
defer cancel()
fake := &fakeFetcher{
fetches: []kgo.Fetches{makeFetchWithRecord(value)},
onCancel: cancel,
}
done := make(chan struct{})
go func() {
pollLoop(ctx, fake)
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("pollLoop did not exit after fake fetcher exhausted")
}
if fake.calls < 1 {
t.Errorf("expected ≥1 PollFetches call; got %d", fake.calls)
}
}

// TestPollLoop_fakeFetcherErrors drives pollLoop with a Fetches that
// surfaces an error via FetchPartition.Err, then exhausts to cancel.
// Exercises the `if errs := fetches.Errors(); ...` branch with a
// non-empty Errors() result.
func TestPollLoop_fakeFetcherErrors(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
errFetch := kgo.Fetches{
{Topics: []kgo.FetchTopic{
{Topic: "test-topic", Partitions: []kgo.FetchPartition{
{Err: errors.New("fetch err")},
}},
}},
}
fake := &fakeFetcher{
fetches: []kgo.Fetches{errFetch},
onCancel: cancel,
}
done := make(chan struct{})
go func() {
pollLoop(ctx, fake)
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("pollLoop did not exit on fake-fetcher exhaust")
}
}
Loading