diff --git a/cmd/clickhouse_protobuflist/clickhouse_protobuflist.go b/cmd/clickhouse_protobuflist/clickhouse_protobuflist.go index 9a3dfc5..ffec8ca 100644 --- a/cmd/clickhouse_protobuflist/clickhouse_protobuflist.go +++ b/cmd/clickhouse_protobuflist/clickhouse_protobuflist.go @@ -12,6 +12,13 @@ import ( "google.golang.org/protobuf/proto" ) +// marshalFn is the proto-marshaller seam encodeLengthDelimitedProtobufList +// uses. Defaults to proto.Marshal; tests can swap in a deliberately +// failing marshaller to exercise the error-return branch (which is +// unreachable with proto.Marshal on a valid struct). Production code +// never mutates this. +var marshalFn = proto.Marshal + func encodeLengthDelimitedProtobufList(r *clickhouse_protolist.Envelope_Record) (result []byte, err error) { // for _, record := range e.Rows { @@ -20,7 +27,7 @@ func encodeLengthDelimitedProtobufList(r *clickhouse_protolist.Envelope_Record) // return nil, fmt.Errorf("error marshaling Record: %v", err) // } - recordBytes, err := proto.Marshal(r) + recordBytes, err := marshalFn(r) if err != nil { return nil, fmt.Errorf("error marshaling Record: %v", err) } diff --git a/cmd/clickhouse_protobuflist/clickhouse_protobuflist_test.go b/cmd/clickhouse_protobuflist/clickhouse_protobuflist_test.go index d9cc649..9a376cb 100644 --- a/cmd/clickhouse_protobuflist/clickhouse_protobuflist_test.go +++ b/cmd/clickhouse_protobuflist/clickhouse_protobuflist_test.go @@ -2,12 +2,14 @@ package main import ( "bytes" + "fmt" "os" "path/filepath" "strings" "testing" "github.com/randomizedcoder/xtcp2/pkg/clickhouse_protolist" + "google.golang.org/protobuf/proto" ) func TestEncodeLengthDelimitedProtobufList(t *testing.T) { @@ -106,3 +108,45 @@ func TestRunMain_writeEnvelopeError(t *testing.T) { t.Errorf("rc = %d, want 1", rc) } } + +// TestEncodeLengthDelimitedProtobufList_marshalErr swaps the +// marshalFn seam for a failing fake to cover the proto-marshal +// error-return branch (unreachable from production where proto.Marshal +// can't fail on this struct). +func TestEncodeLengthDelimitedProtobufList_marshalErr(t *testing.T) { + orig := marshalFn + marshalFn = func(_ proto.Message) ([]byte, error) { + return nil, fmt.Errorf("synthetic marshal err") + } + defer func() { marshalFn = orig }() + + _, err := encodeLengthDelimitedProtobufList(&clickhouse_protolist.Envelope_Record{MyUint32: 1}) + if err == nil { + t.Fatal("expected err from failing marshaller") + } + if !strings.Contains(err.Error(), "error marshaling Record") { + t.Errorf("err = %q, want substring 'error marshaling Record'", err) + } +} + +// TestRunMain_encodeError exercises runMain's error-handling branch +// when encodeLengthDelimitedProtobufList fails. rc=1 means the +// encode-error branch fired. +func TestRunMain_encodeError(t *testing.T) { + orig := marshalFn + marshalFn = func(_ proto.Message) ([]byte, error) { + return nil, fmt.Errorf("synthetic") + } + defer func() { marshalFn = orig }() + + dir := t.TempDir() + out := filepath.Join(dir, "out.bin") + var stderr bytes.Buffer + rc := runMain([]string{"-filename", out}, &bytes.Buffer{}, &stderr) + if rc != 1 { + t.Errorf("rc = %d, want 1 (encode error)", rc) + } + if !strings.Contains(stderr.String(), "Error encoding") { + t.Errorf("stderr = %q, want substring 'Error encoding'", stderr.String()) + } +} diff --git a/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go b/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go index d98f968..64d4651 100644 --- a/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go +++ b/cmd/kafka_to_clickhouse/kafka_to_clickhouse.go @@ -152,7 +152,10 @@ func runMain(ctx context.Context, args []string, stdout, stderr io.Writer) int { // bounded by a 5s timeout so a wedged broker doesn't block // shutdown indefinitely. defer func() { - flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // Derive flushCtx from the caller's ctx so cancellation + // propagates correctly; cap at 5s so a wedged broker + // doesn't block teardown indefinitely. + flushCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := kClient.Flush(flushCtx); err != nil { fmt.Fprintf(stderr, "kgo Flush on exit: %v\n", err) diff --git a/cmd/kafka_to_clickhouse/kafka_to_clickhouse_test.go b/cmd/kafka_to_clickhouse/kafka_to_clickhouse_test.go index a5b2f42..200e853 100644 --- a/cmd/kafka_to_clickhouse/kafka_to_clickhouse_test.go +++ b/cmd/kafka_to_clickhouse/kafka_to_clickhouse_test.go @@ -160,6 +160,37 @@ func TestRunMain_fileMode(t *testing.T) { } } +// TestRunMain_kafkaMode drives runMain through the kafka=true branch +// using a bogus broker + a short-lived ctx so destKafka's wg.Wait +// (which blocks on Produce's callback) is unblocked by ctx +// cancellation rather than waiting for franz-go's connection retries. +// Covers the `if c.kafka { InitDestKafka + defer flush+close }` block +// inside runMain that the file-mode test skips. +func TestRunMain_kafkaMode(t *testing.T) { + prevClient := kClient + t.Cleanup(func() { + if kClient != prevClient && kClient != nil { + kClient.Close() + } + kClient = prevClient + }) + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + dir := t.TempDir() + out := filepath.Join(dir, "out.bin") + var stderr bytes.Buffer + rc := runMain(ctx, []string{ + "-filename", out, "-values", "1", + "-kafka=true", "-broker", "127.0.0.1:1", // refused + "-loops", "1", "-loopsSleep", "1ms", + }, &bytes.Buffer{}, &stderr) + // rc=0 — runMain swallows broker errors; the assertion is just + // "does not hang, returns cleanly". + if rc != 0 { + t.Errorf("rc = %d, want 0 (kafka mode w/ unreachable broker)", rc) + } +} + func TestInitDestKafka_noopWhenDisabled(t *testing.T) { // kafka=false → InitDestKafka short-circuits without trying to create // a client. diff --git a/cmd/xtcp2_kafka_client/xtcp2_kafka_client.go b/cmd/xtcp2_kafka_client/xtcp2_kafka_client.go index 5b2c44d..b26cd9d 100644 --- a/cmd/xtcp2_kafka_client/xtcp2_kafka_client.go +++ b/cmd/xtcp2_kafka_client/xtcp2_kafka_client.go @@ -65,9 +65,17 @@ func runMain(ctx context.Context, args []string, stderr io.Writer) int { return 0 } +// kafkaFetcher is the surface pollLoop needs from a Kafka consumer +// client. Lifting it to an interface lets tests drive pollLoop's +// happy-path EachRecord closure without a real broker. *kgo.Client +// satisfies this interface. +type kafkaFetcher interface { + PollFetches(ctx context.Context) kgo.Fetches +} + // pollLoop is the Kafka consume body. Extracted so test code can call it // against a fake client (with a pre-canceled ctx for a quick exit). -func pollLoop(ctx context.Context, cl *kgo.Client) { +func pollLoop(ctx context.Context, cl kafkaFetcher) { for i := 0; ; i++ { select { case <-ctx.Done(): diff --git a/cmd/xtcp2_kafka_client/xtcp2_kafka_client_test.go b/cmd/xtcp2_kafka_client/xtcp2_kafka_client_test.go index 3f68baa..5c16c14 100644 --- a/cmd/xtcp2_kafka_client/xtcp2_kafka_client_test.go +++ b/cmd/xtcp2_kafka_client/xtcp2_kafka_client_test.go @@ -162,3 +162,99 @@ func TestPollLoop_fetchErrorsThenCancel(t *testing.T) { t.Fatal("pollLoop did not exit after cancel") } } + +// fakeFetcher implements the kafkaFetcher interface so pollLoop can be +// driven with synthetic records — exercises the EachRecord closure +// body that broker-bound tests can't reach without real kafka. +type fakeFetcher struct { + fetches []kgo.Fetches + calls int + onCancel context.CancelFunc +} + +func (f *fakeFetcher) PollFetches(_ context.Context) kgo.Fetches { + f.calls++ + if f.calls > len(f.fetches) { + if f.onCancel != nil { + f.onCancel() + } + return kgo.Fetches{} + } + return f.fetches[f.calls-1] +} + +func makeFetchWithRecord(value []byte) kgo.Fetches { + return kgo.Fetches{ + { + Topics: []kgo.FetchTopic{ + { + Topic: "test-topic", + Partitions: []kgo.FetchPartition{ + {Records: []*kgo.Record{{Value: value}}}, + }, + }, + }, + }, + } +} + +// TestPollLoop_eachRecordClosureFires drives pollLoop with one +// synthetic Fetch containing a single valid Confluent-framed record. +// The EachRecord closure (processRecord call) fires, then the second +// PollFetches call signals the fake to cancel ctx. +func TestPollLoop_eachRecordClosureFires(t *testing.T) { + value := make([]byte, KafkaHeaderSizeCst+1) + value[0] = 0x00 // magic + // schemaID bytes [1:5] = 0; length byte [5] = 0 → empty proto + // envelope; processRecord parses it successfully. + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + fake := &fakeFetcher{ + fetches: []kgo.Fetches{makeFetchWithRecord(value)}, + onCancel: cancel, + } + done := make(chan struct{}) + go func() { + pollLoop(ctx, fake) + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("pollLoop did not exit after fake fetcher exhausted") + } + if fake.calls < 1 { + t.Errorf("expected ≥1 PollFetches call; got %d", fake.calls) + } +} + +// TestPollLoop_fakeFetcherErrors drives pollLoop with a Fetches that +// surfaces an error via FetchPartition.Err, then exhausts to cancel. +// Exercises the `if errs := fetches.Errors(); ...` branch with a +// non-empty Errors() result. +func TestPollLoop_fakeFetcherErrors(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + errFetch := kgo.Fetches{ + {Topics: []kgo.FetchTopic{ + {Topic: "test-topic", Partitions: []kgo.FetchPartition{ + {Err: errors.New("fetch err")}, + }}, + }}, + } + fake := &fakeFetcher{ + fetches: []kgo.Fetches{errFetch}, + onCancel: cancel, + } + done := make(chan struct{}) + go func() { + pollLoop(ctx, fake) + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("pollLoop did not exit on fake-fetcher exhaust") + } +} diff --git a/cmd/xtcp2client/stream_helpers_test.go b/cmd/xtcp2client/stream_helpers_test.go index 4980057..c0cd18f 100644 --- a/cmd/xtcp2client/stream_helpers_test.go +++ b/cmd/xtcp2client/stream_helpers_test.go @@ -39,7 +39,7 @@ func TestClassifyRecvErr_table(t *testing.T) { {"corner_wrapped_eof_NOT_treated_as_eof", "corner", // The helper uses `err == io.EOF` (sentinel equality), not // errors.Is. A wrapped EOF doesn't compare equal — pin this - // behaviour against a future shift to errors.Is. + // behavior against a future shift to errors.Is. wrapErr(io.EOF), recvContinue}, {"adversarial_internal_err_continue", "adversarial", status.Error(codes.Internal, "x"), recvContinue}, } @@ -68,7 +68,7 @@ func TestCtxDone_table(t *testing.T) { }{ {"positive_live_ctx_false", "positive", func(_ *testing.T) context.Context { return context.Background() }, false}, - {"positive_cancelled_ctx_true", "positive", + {"positive_canceled_ctx_true", "positive", func(_ *testing.T) context.Context { c, cancel := context.WithCancel(context.Background()) cancel() @@ -111,24 +111,97 @@ func TestCtxDone_table(t *testing.T) { func TestResourceExhaustedSleep_ctxCancelReturnsTrue(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - cancel() // already cancelled + cancel() // already canceled start := time.Now() got := resourceExhaustedSleep(ctx, errors.New("RE")) if !got { - t.Error("cancelled ctx should return true (caller should break loop)") + t.Error("canceled ctx should return true (caller should break loop)") } // Sanity: returned promptly, not waiting the full ResourceExhaustedSleepTime. if time.Since(start) > 1*time.Second { - t.Errorf("returned after %v on cancelled ctx; should be near-instant", time.Since(start)) + t.Errorf("returned after %v on canceled ctx; should be near-instant", time.Since(start)) } } -// (No live-sleep test here: the production sleep is 30-40s and gating -// it behind an env var is anti-pattern per project convention. The -// cancel-path test above + the live-ctx benchmark below are -// sufficient deterministic coverage; the full-sleep behaviour is -// exercised by the production reconnect loop in singleStreamingClient, -// which already has integration coverage in xtcp2client_test.go.) +// TestResourceExhaustedSleep_liveCtxRunsFullSleep exercises the +// time.After branch (returns false) by shrinking the base sleep +// duration to a microsecond. ResourceExhaustedSleepTime is a var +// (not a const) precisely so this test can shrink it without +// wall-clocking 30+ seconds; production code never mutates it. +func TestResourceExhaustedSleep_liveCtxRunsFullSleep(t *testing.T) { + origSleep := ResourceExhaustedSleepTime + origJitter := JitterSleepMaxMs + ResourceExhaustedSleepTime = 1 * time.Microsecond + JitterSleepMaxMs = 1 + defer func() { + ResourceExhaustedSleepTime = origSleep + JitterSleepMaxMs = origJitter + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + start := time.Now() + got := resourceExhaustedSleep(ctx, errors.New("RE")) + if got { + t.Error("uncancelled ctx with shrunken sleep should return false") + } + if elapsed := time.Since(start); elapsed > 1*time.Second { + t.Errorf("sleep took %v with shrunken base+jitter; should be sub-second", elapsed) + } +} + +// TestResourceExhaustedSleep_debugLogPath covers the debug-log branch +// (debugLevel > 10) on the same shrunken-sleep path. +// TestHandleRecvContinueErr_resourceExhaustedLiveCtxContinues covers +// the recvContinue path where the sleep runs to completion: ctx stays +// live → resourceExhaustedSleep returns false → handleRecvContinueErr +// also returns false. Shrunken globals keep the test fast. +func TestHandleRecvContinueErr_resourceExhaustedLiveCtxContinues(t *testing.T) { + origSleep := ResourceExhaustedSleepTime + origJitter := JitterSleepMaxMs + ResourceExhaustedSleepTime = 1 * time.Microsecond + JitterSleepMaxMs = 1 + defer func() { + ResourceExhaustedSleepTime = origSleep + JitterSleepMaxMs = origJitter + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + got := handleRecvContinueErr(ctx, "client", + status.Error(codes.ResourceExhausted, "x")) + if got { + t.Error("live ctx + shrunken sleep should return false (continue)") + } +} + +// TestHandleRecvContinueErr_debugLogPath drives the debug-log gate. +func TestHandleRecvContinueErr_debugLogPath(t *testing.T) { + origDebug := debugLevel + debugLevel = 11 + defer func() { debugLevel = origDebug }() + got := handleRecvContinueErr(context.Background(), "client", + errors.New("non-retryable")) + if got { + t.Error("non-ResourceExhausted err with live ctx should return false") + } +} + +func TestResourceExhaustedSleep_debugLogPath(t *testing.T) { + origSleep := ResourceExhaustedSleepTime + origJitter := JitterSleepMaxMs + origDebug := debugLevel + ResourceExhaustedSleepTime = 1 * time.Microsecond + JitterSleepMaxMs = 1 + debugLevel = 11 + defer func() { + ResourceExhaustedSleepTime = origSleep + JitterSleepMaxMs = origJitter + debugLevel = origDebug + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _ = resourceExhaustedSleep(ctx, errors.New("RE")) +} // ─────────────────────────────────────────────────────────────────────── // handleRecvContinueErr @@ -148,12 +221,12 @@ func TestHandleRecvContinueErr_table(t *testing.T) { { name: "positive_ctx_live_non_resource_exhausted_continues", category: "positive", - ctxBuild: func() context.Context { return context.Background() }, + ctxBuild: context.Background, err: errors.New("Unavailable"), wantBreak: false, }, { - name: "negative_ctx_cancelled_breaks", + name: "negative_ctx_canceled_breaks", category: "negative", ctxBuild: func() context.Context { c, cancel := context.WithCancel(context.Background()) @@ -164,7 +237,7 @@ func TestHandleRecvContinueErr_table(t *testing.T) { wantBreak: true, }, { - name: "corner_resource_exhausted_with_cancelled_ctx_breaks_during_sleep", + name: "corner_resource_exhausted_with_canceled_ctx_breaks_during_sleep", category: "corner", ctxBuild: func() context.Context { c, cancel := context.WithCancel(context.Background()) @@ -180,7 +253,7 @@ func TestHandleRecvContinueErr_table(t *testing.T) { // Caller invariant: handleRecvContinueErr is only entered // for the recvContinue case (i.e. err != nil and not EOF). // But the function itself should still be robust. - ctxBuild: func() context.Context { return context.Background() }, + ctxBuild: context.Background, err: nil, wantBreak: false, }, @@ -213,9 +286,9 @@ func TestStreamHelpers_concurrent(t *testing.T) { _ = classifyRecvErr(io.EOF) _ = classifyRecvErr(nil) _ = ctxDone(context.Background()) - cancelled, cancel := context.WithCancel(context.Background()) + canceled, cancel := context.WithCancel(context.Background()) cancel() - if handleRecvContinueErr(cancelled, "c", errors.New("x")) { + if handleRecvContinueErr(canceled, "c", errors.New("x")) { breaks.Add(1) } } diff --git a/cmd/xtcp2client/xtcp2client.go b/cmd/xtcp2client/xtcp2client.go index b554b76..7fb321e 100644 --- a/cmd/xtcp2client/xtcp2client.go +++ b/cmd/xtcp2client/xtcp2client.go @@ -52,9 +52,6 @@ const ( // default 20s keepaliveTimeout = 20 * time.Second - ResourceExhaustedSleepTime = 30 * time.Second - JitterSleepMaxMs = 10000 - reconnectTime = 10 * time.Second servicePolicyString = ` @@ -91,6 +88,19 @@ var ( version string debugLevel uint + + // ResourceExhaustedSleepTime is the base backoff duration the + // stream loop waits before retrying after a ResourceExhausted + // gRPC error. var (was const) so tests can shrink it to + // microseconds and exercise the full-sleep branch of + // resourceExhaustedSleep without wall-clocking 30+ seconds. + // Production code never mutates it. + ResourceExhaustedSleepTime = 30 * time.Second + + // JitterSleepMaxMs caps the random jitter added on top of + // ResourceExhaustedSleepTime (and reconnectTime). var (was const) + // so tests can shrink it to 1ms. + JitterSleepMaxMs uint32 = 10000 ) func main() { @@ -157,7 +167,11 @@ func pollMode(ctx context.Context, addr string, complete *chan struct{}, pollFre stream, err := client.PollFlatRecords(ctx) if err != nil { - log.Fatalf("client.PollFlatRecords(shortCtx) err:%v", err) + // Demoted from log.Fatalf: Fatalf calls os.Exit so the deferred + // ticker.Stop() above would never run. Log + return so the + // defers fire and the caller can decide what to do next. + log.Printf("pollMode: client.PollFlatRecords err: %v", err) + return } // recvCh := make(chan *xtcp_flat_record.FlatRecordsResponse) @@ -381,9 +395,9 @@ func classifyRecvErr(err error) recvAction { } // resourceExhaustedSleep waits jittered ResourceExhaustedSleepTime or -// until ctx is cancelled, whichever comes first. Returns true if the -// caller should break the loop (ctx cancelled during the wait). -func resourceExhaustedSleep(ctx context.Context, err error) (cancelled bool) { +// until ctx is canceled, whichever comes first. Returns true if the +// caller should break the loop (ctx canceled during the wait). +func resourceExhaustedSleep(ctx context.Context, err error) (canceled bool) { sleepTime := ResourceExhaustedSleepTime + (time.Duration(FastRandN(JitterSleepMaxMs)) * time.Millisecond) if debugLevel > 10 { log.Printf("Received ResourceExhausted error: %v, so sleeping:%0.3f before retry", err, sleepTime.Seconds()) @@ -459,8 +473,9 @@ func stream(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn, json case recvPrint: printFlatRecordsResponse(resp, id, json, debugLevel) continue + case recvContinue: + // fall through to handleRecvContinueErr below } - // recvContinue: classify the err further, optionally backoff. if handleRecvContinueErr(ctx, client, rerr) { break } diff --git a/cmd/xtcp2client/xtcp2client_test.go b/cmd/xtcp2client/xtcp2client_test.go index 7c0aea2..7b52e9e 100644 --- a/cmd/xtcp2client/xtcp2client_test.go +++ b/cmd/xtcp2client/xtcp2client_test.go @@ -83,6 +83,28 @@ func TestRunMain_invalidFlag(t *testing.T) { } } +func TestRunMain_pollModeCancellable(t *testing.T) { + // runMain's poll-mode branch dials gRPC and enters pollMode. + // With an already-canceled ctx + a very long poll frequency, + // pollMode's ticker never fires and the ctx.Done case wins + // immediately. Confirms the `if *poll { pollMode(...) }` branch + // gets coverage. + ctx, cancel := context.WithCancel(t.Context()) + cancel() + done := make(chan int, 1) + go func() { + done <- runMain(ctx, []string{"-poll", "-pollFrequency", "1h"}, &bytes.Buffer{}, &bytes.Buffer{}) + }() + select { + case rc := <-done: + if rc != 0 { + t.Errorf("rc = %d, want 0", rc) + } + case <-time.After(2 * time.Second): + t.Fatal("runMain did not exit on cancel in poll mode") + } +} + func TestRunMain_listenModeCancellable(t *testing.T) { // listenMode dials gRPC against the default target then spawns workers // that loop until ctx is canceled. workers=0 makes wg.Wait return diff --git a/docs/coverage-baseline.txt b/docs/coverage-baseline.txt new file mode 100644 index 0000000..23cab69 --- /dev/null +++ b/docs/coverage-baseline.txt @@ -0,0 +1 @@ +86.0 diff --git a/docs/quality-report.md b/docs/quality-report.md index dbc47eb..a4bf143 100644 --- a/docs/quality-report.md +++ b/docs/quality-report.md @@ -1,6 +1,6 @@ # xtcp2 code-quality report -Generated: 2026-05-18T19:43:00Z +Generated: 2026-05-20T15:24:56Z Tool versions: go=go1.25.10; golangci-lint=2.12.2; gosec=2.26.1; nixfmt=1.2.0; @@ -15,12 +15,12 @@ between commits reveals exactly what changed. | Metric | Value | |---|---| -| Total findings | 5 | +| Total findings | 4 | | Findings (Tier 0) | 0 | | Findings (Tier 1) | 0 | -| Findings (Tier 2) | 0 | -| Findings (non-tiered) | 5 | -| Files with at least one finding | 5 | +| Findings (Tier 2) | 1 | +| Findings (non-tiered) | 3 | +| Files with at least one finding | 4 | | Test failures (new) | 0 | | Test failures (pre-existing) | 0 | | Config exclusions reviewed | 4 | @@ -31,19 +31,19 @@ between commits reveals exactly what changed. | Tool | Status | Findings | Runtime | |---|---|---|---| -| golangci-lint (comprehensive) | clean | 0 | 5s | +| golangci-lint (comprehensive) | findings | 1 | 5s | | golangci-lint (standard) | clean | 0 | 5s | -| golangci-lint (quick) | clean | 0 | 14s | +| golangci-lint (quick) | findings | 1 | 15s | | gosec | clean | 0 | 1s | | go vet | clean | 0 | 2s | -| gofmt | clean | 0 | 0s | +| gofmt | findings | 2 | 0s | | nixfmt | clean | 0 | 1s | | netlink-audit | clean | 0 | 0s | | iouring-audit | clean | 0 | 0s | | metrics-audit | clean | 0 | 0s | | proto-field-audit | clean | 0 | 0s | -| go test | clean | 0 | 11s | -| go test -cover | findings | 5 | 0s | +| go test | clean | 0 | 8s | +| go test -cover | findings | 1 | 0s | --- @@ -52,9 +52,9 @@ between commits reveals exactly what changed. | Tier | Linters | Findings | Quick-fixable¹ | |---|---|---|---| -| 0 (`lint-quick`) | govet, errcheck, ineffassign, unused, staticcheck | 0 | 0 | +| 0 (`lint-quick`) | govet, errcheck, ineffassign, unused, staticcheck | 0 | 2 | | 1 (`lint` / CI) | Tier 0 + gosec, gocritic, revive, noctx, contextcheck, durationcheck | 0 | 0 | -| 2 (`lint-comprehensive`) | Tier 1 + exhaustive, prealloc, gocyclo, funlen, goconst, dupl, unconvert, nakedret, misspell | 0 | 0 | +| 2 (`lint-comprehensive`) | Tier 1 + exhaustive, prealloc, gocyclo, funlen, goconst, dupl, unconvert, nakedret, misspell | 1 | 1 | ¹ Quick-fixable = produced by a linter that supports `golangci-lint run --fix` (gofmt, goimports, misspell, unconvert, …). @@ -64,22 +64,28 @@ between commits reveals exactly what changed. | File | Findings | Top rules | |---|---|---| -| `cmd/clickhouse_protobuflist` | 1 | below-90pct×1 | -| `cmd/xtcp2_kafka_client` | 1 | below-90pct×1 | -| `cmd/xtcp2client` | 1 | below-90pct×1 | | `pkg/xtcp` | 1 | below-90pct×1 | -| `tools/kafka_topic_reader` | 1 | below-90pct×1 | +| `pkg/xtcp/destinations_kafka_test.go` | 1 | format×1 | +| `pkg/xtcp/destinations_valkey_test.go` | 1 | format×1 | +| `pkg/xtcpnl/xtcpnl_fatalf_test.go` | 1 | unconvert×1 | --- ## 5. Findings by linter -### go-test-cover / below-90pct — 5 +### gofmt / format — 2 -- `pkg/xtcp`: package coverage 75.9% < 90% -- `cmd/clickhouse_protobuflist`: package coverage 86.4% < 90% -- `cmd/xtcp2client`: package coverage 85.8% < 90% +- `pkg/xtcp/destinations_kafka_test.go`: file not formatted +- `pkg/xtcp/destinations_valkey_test.go`: file not formatted + +### go-test-cover / below-90pct — 1 + +- `pkg/xtcp`: package coverage 85.2% < 90% + +### golangci-lint / unconvert — 1 + +- `pkg/xtcpnl/xtcpnl_fatalf_test.go:95`: unnecessary conversion --- @@ -101,10 +107,10 @@ between commits reveals exactly what changed. | Status | Count | |---|---| -| Pass | 706 | +| Pass | 1284 | | Fail (new) | 0 | | Fail (pre-existing) | 0 | -| Skip | 10 | +| Skip | 7 | @@ -119,8 +125,10 @@ between commits reveals exactly what changed. ## 10. Format checks -`gofmt`: clean. +**`gofmt` would reformat (2 files):** +- `pkg/xtcp/destinations_kafka_test.go` +- `pkg/xtcp/destinations_valkey_test.go` `nixfmt`: clean. --- @@ -142,40 +150,42 @@ the adjacent YAML comment. Rows with no justification need review. ## 12. Recommendations -- Top contributor: **go-test-cover/below-90pct** with 5 findings (100% of total). Concentrate effort here for the biggest quality win. -- Hotspot file: `cmd/clickhouse_protobuflist` carries 1 findings (below-90pct×1). Refactor here before touching adjacent code. +- Top contributor: **gofmt/format** with 2 findings (50% of total). Concentrate effort here for the biggest quality win. +- Run `lint-fix` (or `golangci-lint run --fix`) to auto-resolve ~3 quick-fixable findings before manual review. +- Hotspot file: `pkg/xtcp` carries 1 findings (below-90pct×1). Refactor here before touching adjacent code. +- Format files are out of sync — run `gofmt -w .` and `nixfmt **/*.nix` to bring formatting back to baseline. --- ## 13. Test coverage -**Overall:** 86.4% of statements (target: 90% per package). +**Overall:** 90.3% of statements (target: 90% per package). | Package | Coverage | Status | |---|---|---| -| `cmd/clickhouse_http_insert_protobuflist` | 93.4% | 🟢 OK | -| `cmd/clickhouse_protobuflist` | 86.4% | 🔴 below 90% | +| `cmd/clickhouse_http_insert_protobuflist` | 93.7% | 🟢 OK | +| `cmd/clickhouse_protobuflist` | 93.2% | 🟢 OK | | `cmd/clickhouse_protobuflist_db` | 93.3% | 🟢 OK | -| `cmd/kafka_to_clickhouse` | 90.2% | 🟢 OK | +| `cmd/kafka_to_clickhouse` | 91.4% | 🟢 OK | | `cmd/ns` | 93.9% | 🟢 OK | | `cmd/nsTest` | 94.1% | 🟢 OK | -| `cmd/register_schema` | 92.9% | 🟢 OK | +| `cmd/register_schema` | 91.4% | 🟢 OK | | `cmd/xtcp2` | 92.4% | 🟢 OK | -| `cmd/xtcp2_kafka_client` | 81.4% | 🔴 below 90% | -| `cmd/xtcp2client` | 85.8% | 🔴 below 90% | -| `pkg/io_uring` | 91.6% | 🟢 OK | +| `cmd/xtcp2_kafka_client` | 93.0% | 🟢 OK | +| `cmd/xtcp2client` | 91.5% | 🟢 OK | +| `pkg/io_uring` | 92.6% | 🟢 OK | | `pkg/misc` | 93.8% | 🟢 OK | -| `pkg/xtcp` | 75.9% | 🔴 below 90% | -| `pkg/xtcpnl` | 91.3% | 🟢 OK | +| `pkg/xtcp` | 85.2% | 🔴 below 90% | +| `pkg/xtcpnl` | 91.4% | 🟢 OK | | `tools/iouring-audit` | 95.2% | 🟢 OK | -| `tools/kafka_topic_reader` | 85.7% | 🔴 below 90% | -| `tools/metrics-audit` | 95.3% | 🟢 OK | -| `tools/netlink-audit` | 96.7% | 🟢 OK | -| `tools/proto-field-audit` | 96.6% | 🟢 OK | -| `tools/quality-report` | 90.5% | 🟢 OK | -| `tools/tcp_client` | 92.9% | 🟢 OK | -| `tools/tcp_server` | 94.3% | 🟢 OK | -| `tools/udp_receiver_server` | 95.2% | 🟢 OK | +| `tools/kafka_topic_reader` | 94.7% | 🟢 OK | +| `tools/metrics-audit` | 97.2% | 🟢 OK | +| `tools/netlink-audit` | 95.8% | 🟢 OK | +| `tools/proto-field-audit` | 96.7% | 🟢 OK | +| `tools/quality-report` | 94.5% | 🟢 OK | +| `tools/tcp_client` | 90.3% | 🟢 OK | +| `tools/tcp_server` | 94.6% | 🟢 OK | +| `tools/udp_receiver_server` | 97.9% | 🟢 OK | diff --git a/nix/quality-report/default.nix b/nix/quality-report/default.nix index c9d6c45..e2b8d5f 100644 --- a/nix/quality-report/default.nix +++ b/nix/quality-report/default.nix @@ -285,12 +285,32 @@ pkgs.runCommand "xtcp2-quality-report" : > "$RAW/cli-help-smoke.out" # ── Aggregate ───────────────────────────────────────────────────── + # Coverage ratchet: if the total falls below + # docs/coverage-baseline.txt by more than coverage-max-drop points, + # quality-report exits with code 3. The orchestrator below treats + # that as a non-fatal report — the markdown is still produced — + # but the non-zero exit propagates up to CI so the regression is + # surfaced. Operator manually bumps the baseline file when they + # intentionally raise the floor. mkdir -p $out + # `|| qr_rc=$?` is the bash-portable way to capture a non-zero exit + # without aborting under `set -eu`. The earlier `set +e`/`set -e` + # dance interacted badly with Nix's runCommand wrapper (the + # WARNING echo never ran on a ratchet breach). + qr_rc=0 go run ./tools/quality-report \ -raw-dir "$RAW" \ -repo-root . \ -known-failures ./tools/quality-report/known-failures.txt \ - > $out/quality-report.md 2>>"$RAW/stderr.log" + -coverage-baseline ./docs/coverage-baseline.txt \ + -coverage-max-drop 0.5 \ + > $out/quality-report.md 2>>"$RAW/stderr.log" || qr_rc=$? + if [ "$qr_rc" -eq 3 ]; then + echo "WARNING: coverage ratchet breach (see stderr above); report still emitted" >&2 + elif [ "$qr_rc" -ne 0 ]; then + cat "$RAW/stderr.log" >&2 + exit "$qr_rc" + fi mkdir -p $out/raw cp -r "$RAW"/. $out/raw/ || true diff --git a/nix/tests/go-test-flavors.nix b/nix/tests/go-test-flavors.nix index ceb5371..267bcbc 100644 --- a/nix/tests/go-test-flavors.nix +++ b/nix/tests/go-test-flavors.nix @@ -30,15 +30,26 @@ let # binary so cross-flavor symbols (registry init order, marshaller # dispatch) get exercised. flavors = { - kafka = { tags = "dest_kafka"; }; - nats = { tags = "dest_nats"; }; - nsq = { tags = "dest_nsq"; }; - valkey = { tags = "dest_valkey"; }; - all = { tags = "dest_kafka dest_nats dest_nsq dest_valkey"; }; + kafka = { + tags = "dest_kafka"; + }; + nats = { + tags = "dest_nats"; + }; + nsq = { + tags = "dest_nsq"; + }; + valkey = { + tags = "dest_valkey"; + }; + all = { + tags = "dest_kafka dest_nats dest_nsq dest_valkey"; + }; }; mkFlavorTest = - name: { tags }: + name: + { tags }: pkgs.runCommand "xtcp2-test-go-flavor-${name}" { nativeBuildInputs = [ versions.go ]; diff --git a/nix/tests/go-test-per-package.nix b/nix/tests/go-test-per-package.nix index 1d1484c..fdefe91 100644 --- a/nix/tests/go-test-per-package.nix +++ b/nix/tests/go-test-per-package.nix @@ -32,17 +32,17 @@ let # binaries that already get coverage via their existing # `_test.go` files are listed too. packages = { - "pkg-xtcp" = "./pkg/xtcp/..."; - "pkg-xtcpnl" = "./pkg/xtcpnl/..."; - "pkg-io-uring" = "./pkg/io_uring/..."; - "pkg-misc" = "./pkg/misc/..."; + "pkg-xtcp" = "./pkg/xtcp/..."; + "pkg-xtcpnl" = "./pkg/xtcpnl/..."; + "pkg-io-uring" = "./pkg/io_uring/..."; + "pkg-misc" = "./pkg/misc/..."; "tools-quality-report" = "./tools/quality-report/..."; - "tools-netlink-audit" = "./tools/netlink-audit/..."; - "tools-iouring-audit" = "./tools/iouring-audit/..."; - "tools-metrics-audit" = "./tools/metrics-audit/..."; + "tools-netlink-audit" = "./tools/netlink-audit/..."; + "tools-iouring-audit" = "./tools/iouring-audit/..."; + "tools-metrics-audit" = "./tools/metrics-audit/..."; "tools-proto-field-audit" = "./tools/proto-field-audit/..."; - "cmd-xtcp2" = "./cmd/xtcp2/..."; - "cmd-xtcp2client" = "./cmd/xtcp2client/..."; + "cmd-xtcp2" = "./cmd/xtcp2/..."; + "cmd-xtcp2client" = "./cmd/xtcp2client/..."; }; mkPkgTest = diff --git a/pkg/io_uring/ring.go b/pkg/io_uring/ring.go index 2b60dfd..b4de133 100644 --- a/pkg/io_uring/ring.go +++ b/pkg/io_uring/ring.go @@ -144,10 +144,10 @@ func requireProbe() error { // Close drains pending CQEs (best-effort, up to drainTimeout), releases // any in-flight pool buffers back to the caller's drain callback, then // unmaps the ring. Safe to call multiple times. -// closeDrainStepMs caps each blocking WaitCQETimeout call inside the +// closeDrainStep caps each blocking WaitCQETimeout call inside the // Close drain loop. Kept small so a sluggish kernel can't stall ring // teardown past the caller's overall deadline. -const closeDrainStepMs = 50 * time.Millisecond +const closeDrainStep = 50 * time.Millisecond // waitForNextDrainCQE blocks for one CQE up to `step` (or `deadline`, // whichever is shorter). Returns drainContinue when the caller should @@ -170,8 +170,8 @@ func (r *Ring) waitForNextDrainCQE(deadline time.Time) ([]Result, drainOutcome) return nil, drainAbort } step := remaining - if step > closeDrainStepMs { - step = closeDrainStepMs + if step > closeDrainStep { + step = closeDrainStep } ts := syscall.NsecToTimespec(int64(step)) if _, err := r.r.WaitCQETimeout(&ts); err != nil { diff --git a/pkg/xtcp/deserializers_table_test.go b/pkg/xtcp/deserializers_table_test.go index 1631972..47a823c 100644 --- a/pkg/xtcp/deserializers_table_test.go +++ b/pkg/xtcp/deserializers_table_test.go @@ -15,7 +15,7 @@ import ( // // gocyclo took InitDeserializers from 17 → 5 by replacing 13 repeated // `if _, exists := Enabled[key]; exists { ... }` blocks with a single -// walk over dispatchTable. These tests pin the resulting behaviour +// walk over dispatchTable. These tests pin the resulting behavior // from every direction so a future refactor cannot silently regress. // allKnownDispatchKeys is the canonical key list the production @@ -357,7 +357,7 @@ func TestInitDeserializers_idempotent(t *testing.T) { } // TestInitDeserializers_concurrentDifferentInstances exercises the -// race detector. Two goroutines initialising *separate* XTCP fixtures +// race detector. Two goroutines initializing *separate* XTCP fixtures // should never race — they only share the read-only dispatchTable. // Run with `go test -race`. func TestInitDeserializers_concurrentDifferentInstances(t *testing.T) { diff --git a/pkg/xtcp/destinations_kafka.go b/pkg/xtcp/destinations_kafka.go index 2556a49..0ea3df9 100644 --- a/pkg/xtcp/destinations_kafka.go +++ b/pkg/xtcp/destinations_kafka.go @@ -29,18 +29,45 @@ const ( KafkaHeaderSizeCst = 6 ) +// kafkaProducer captures the surface of *kgo.Client that kafkaDest +// actually calls. Lifting it to an interface lets the destination's +// Send/Close/pingKafkaWithRetries paths run against an in-process +// fake without a real broker — see destinations_kafka_test.go. +// Production uses *kgo.Client which satisfies this interface via its +// concrete methods. +type kafkaProducer interface { + Produce(ctx context.Context, r *kgo.Record, promise func(*kgo.Record, error)) + Flush(ctx context.Context) error + Close() + Ping(ctx context.Context) error + AllowRebalance() +} + // kafkaDest produces each marshalled record to a Kafka topic via franz-go. // Construction registers the proto schema with the Schema Registry, dials // the broker, and primes a sync.Pool of kgo.Record so each send avoids // allocation. type kafkaDest struct { x *XTCP - client *kgo.Client + client kafkaProducer regClient *sr.Client schemaID int recordPool sync.Pool } +// newKafkaProducerFn is the factory tests swap to inject a fake +// kafkaProducer without standing up a real kgo.Client. Production +// callers leave this at the default (newKafkaProducerReal). +var newKafkaProducerFn = newKafkaProducerReal + +// newKafkaProducerReal is the production factory: it constructs a real +// kgo.Client wired with the production options. Extracted so the test +// suite can substitute newKafkaProducerFn with a fake-returning +// closure and exercise newKafkaDest without a broker. +func newKafkaProducerReal(opts ...kgo.Opt) (kafkaProducer, error) { + return kgo.NewClient(opts...) +} + func newKafkaDest(ctx context.Context, x *XTCP) (Destination, error) { d := &kafkaDest{ x: x, @@ -92,7 +119,7 @@ func newKafkaDest(ctx context.Context, x *XTCP) (Destination, error) { })), } - d.client, err = kgo.NewClient(opts...) + d.client, err = newKafkaProducerFn(opts...) if err != nil { return nil, fmt.Errorf("newKafkaDest kgo.NewClient: %w", err) } diff --git a/pkg/xtcp/destinations_kafka_test.go b/pkg/xtcp/destinations_kafka_test.go new file mode 100644 index 0000000..fd4826d --- /dev/null +++ b/pkg/xtcp/destinations_kafka_test.go @@ -0,0 +1,776 @@ +//go:build dest_kafka + +package xtcp + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/twmb/franz-go/pkg/kgo" + "google.golang.org/protobuf/types/known/durationpb" + + "github.com/prometheus/client_golang/prometheus/testutil" +) + +// destinations_kafka_test.go exercises pkg/xtcp/destinations_kafka.go +// — only built under the `dest_kafka` build tag, which the +// `nix build .#test-go-flavor-kafka` target sets explicitly. The +// default `go test ./...` skips this file entirely (matches the +// production behaviour: only kafka-flavor builds compile it in). +// +// Scope: tests the helpers that DON'T require a real Kafka broker +// (schema-registry HTTP exchange, init-time registration in the +// destinations dispatch table, struct constants). The broker-bound +// helpers (newKafkaDest end-to-end, Send, Close, pingKafkaWithRetries +// against a real client) need a real broker and are covered by the +// microvm lifecycle harness later — out of scope here. + +// ─────────────────────────────────────────────────────────────────────── +// init() side effect — dispatch table contains "kafka" with this build +// tag set. Pin so a future RegisterDestination rename in +// destinations_core.go fails this test loudly. +// ─────────────────────────────────────────────────────────────────────── + +func TestKafkaDest_initRegistersScheme(t *testing.T) { + if !IsKnownScheme(schemeKafka) { + t.Errorf("scheme %q should be registered under build tag dest_kafka", schemeKafka) + } + _, status := lookupDestinationFactory(schemeKafka) + if status != destLookupFound { + t.Errorf("lookupDestinationFactory(%q) status = %d, want destLookupFound", schemeKafka, status) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// KafkaHeaderSizeCst — constant that the wire-format depends on. Pin +// against accidental tweaks. +// ─────────────────────────────────────────────────────────────────────── + +func TestKafkaHeaderSizeCst(t *testing.T) { + // Confluent's protobuf wire format prefixes records with a 1-byte + // magic + 4-byte schema ID + 1-byte first-message-index varint = 6. + // See https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format + if KafkaHeaderSizeCst != 6 { + t.Errorf("KafkaHeaderSizeCst = %d, want 6", KafkaHeaderSizeCst) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// newKafkaDestFixture — assembles an XTCP whose KafkaSchemaUrl points +// at the given httptest.Server and whose XtcpProtoFile is a tempfile +// containing the supplied proto-source bytes. Reusable across the +// schema-registry tests below. +// ─────────────────────────────────────────────────────────────────────── + +func newKafkaDestFixture(t *testing.T, schemaSrv *httptest.Server, protoSrc string) *XTCP { + t.Helper() + x := newTestXTCP(t, "kafka:127.0.0.1:9092") + x.config.Topic = "xtcp-test" + x.config.KafkaSchemaUrl = schemaSrv.URL + + tmp := filepath.Join(t.TempDir(), "x.proto") + if err := os.WriteFile(tmp, []byte(protoSrc), 0o600); err != nil { + t.Fatalf("write tmp proto: %v", err) + } + x.config.XtcpProtoFile = tmp + return x +} + +// ─────────────────────────────────────────────────────────────────────── +// getLatestSchemaID — table-driven against an httptest.Server. Covers +// the four meaningful response shapes: 200/ok, 404, other 5xx, and +// malformed JSON. +// ─────────────────────────────────────────────────────────────────────── + +func TestGetLatestSchemaID_table(t *testing.T) { + cases := []struct { + name string + category string + handler http.HandlerFunc + wantID int + wantErr bool + }{ + { + name: "positive_200_with_id", + category: "positive", + handler: func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]int{"id": 42}) + }, + wantID: 42, + }, + { + name: "positive_200_id_zero", + category: "positive", + handler: func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]int{"id": 0}) + }, + wantID: 0, + }, + { + name: "negative_404_returns_err", + category: "negative", + handler: func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound) }, + wantErr: true, + }, + { + name: "negative_500_returns_err", + category: "negative", + handler: func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) }, + wantErr: true, + }, + { + name: "boundary_300_redirect_unexpected_status", + category: "boundary", + handler: func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusMultipleChoices) }, + wantErr: true, + }, + { + name: "corner_malformed_json", + category: "corner", + handler: func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("not json")) + }, + wantErr: true, + }, + { + name: "corner_empty_body_200", + category: "corner", + handler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }, + wantErr: true, // empty body fails json.Decode + }, + { + name: "adversarial_giant_id_int_overflow_safe", + category: "adversarial", + handler: func(w http.ResponseWriter, r *http.Request) { + // Very large JSON number; Go's int64 fits anything < 2^63. + _ = json.NewEncoder(w).Encode(map[string]int64{"id": 1 << 62}) + }, + wantID: 1 << 62, + }, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + srv := httptest.NewServer(tc.handler) + defer srv.Close() + x := newKafkaDestFixture(t, srv, "syntax = \"proto3\";") + d := &kafkaDest{x: x} + gotID, err := d.getLatestSchemaID(context.Background()) + if (err != nil) != tc.wantErr { + t.Errorf("err = %v, wantErr = %v", err, tc.wantErr) + } + if !tc.wantErr && gotID != tc.wantID { + t.Errorf("id = %d, want %d", gotID, tc.wantID) + } + }) + } +} + +// TestGetLatestSchemaID_buildsURL pins the URL shape so a refactor of +// the schema-registry endpoint pattern fails this test loudly. +func TestGetLatestSchemaID_buildsURL(t *testing.T) { + var sawPath string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sawPath = r.URL.Path + _ = json.NewEncoder(w).Encode(map[string]int{"id": 1}) + })) + defer srv.Close() + x := newKafkaDestFixture(t, srv, "") + x.config.Topic = "my-topic" + d := &kafkaDest{x: x} + if _, err := d.getLatestSchemaID(context.Background()); err != nil { + t.Fatal(err) + } + wantPath := "/subjects/my-topic-value/versions/latest" + if sawPath != wantPath { + t.Errorf("URL path = %q, want %q", sawPath, wantPath) + } +} + +// TestGetLatestSchemaID_ctxCancel verifies the 10s ceiling honours +// caller ctx cancel. +func TestGetLatestSchemaID_ctxCancel(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Sleep longer than the test's ctx timeout so the request is + // aborted via ctx.Done rather than reaching us. + time.Sleep(2 * time.Second) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + x := newKafkaDestFixture(t, srv, "") + d := &kafkaDest{x: x} + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + start := time.Now() + _, err := d.getLatestSchemaID(ctx) + if err == nil { + t.Error("expected error on ctx-cancel") + } + if elapsed := time.Since(start); elapsed > 1*time.Second { + t.Errorf("returned in %v; ctx-cancel should be < 1s", elapsed) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// registerProtobufSchema — table-driven via sr.Client → httptest.Server. +// The franz-go sr package POSTs to /subjects//versions and decodes +// the {"id": N} response. +// ─────────────────────────────────────────────────────────────────────── + +// schemaRegistryHandler returns a path-aware HTTP handler that mimics +// the three endpoints franz-go's sr.CreateSchema touches: +// +// POST /subjects//versions → returns {"id": N} +// GET /schemas/ids//versions → returns [{subject, version}] +// GET /subjects//versions/ → returns full SubjectSchema +// +// The `createStatus` arg lets a test override the POST response code +// to drive error paths; the GET endpoints stay well-formed so the +// happy-path tests see exactly the failure surface they're aiming at. +// +// The matching uses substring/prefix checks since franz-go varies +// minor path details across versions. +func schemaRegistryHandler(id int, createStatus int) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/subjects/") && strings.HasSuffix(r.URL.Path, "/versions"): + if createStatus != 0 && createStatus != http.StatusOK { + w.WriteHeader(createStatus) + return + } + _ = json.NewEncoder(w).Encode(map[string]any{"id": id}) + case r.Method == http.MethodGet && strings.HasPrefix(r.URL.Path, "/schemas/ids/"): + _ = json.NewEncoder(w).Encode([]map[string]any{ + {"subject": "xtcp-test-value", "version": 1}, + }) + case r.Method == http.MethodGet && strings.HasPrefix(r.URL.Path, "/subjects/"): + // GET /subjects//versions/ → full SubjectSchema + _ = json.NewEncoder(w).Encode(map[string]any{ + "subject": "xtcp-test-value", + "version": 1, + "id": id, + "schema": `syntax = "proto3"; message M {}`, + "schemaType": "PROTOBUF", + }) + default: + w.WriteHeader(http.StatusNotFound) + } + } +} + +func TestRegisterProtobufSchema_table(t *testing.T) { + const protoSrc = `syntax = "proto3"; +package test; +message M { string a = 1; }` + cases := []struct { + name string + category string + handler http.HandlerFunc + wantErr bool + wantID int + }{ + { + name: "positive_register_returns_id", + category: "positive", + handler: schemaRegistryHandler(7, 0), + wantID: 7, + }, + { + name: "negative_4xx_error", + category: "negative", + handler: schemaRegistryHandler(0, http.StatusBadRequest), + wantErr: true, + }, + { + name: "negative_5xx_error", + category: "negative", + handler: schemaRegistryHandler(0, http.StatusServiceUnavailable), + wantErr: true, + }, + { + name: "boundary_id_zero", + category: "boundary", + handler: schemaRegistryHandler(0, 0), + wantID: 0, + }, + { + name: "corner_malformed_json_response", + category: "corner", + handler: func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("garbage")) + }, + wantErr: true, + }, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + srv := httptest.NewServer(tc.handler) + defer srv.Close() + x := newKafkaDestFixture(t, srv, protoSrc) + d := &kafkaDest{x: x} + err := d.registerProtobufSchema(context.Background()) + if (err != nil) != tc.wantErr { + t.Errorf("err = %v, wantErr = %v", err, tc.wantErr) + } + if !tc.wantErr && d.schemaID != tc.wantID { + t.Errorf("schemaID = %d, want %d", d.schemaID, tc.wantID) + } + }) + } +} + +// TestRegisterProtobufSchema_missingProtoFile pins the disk-read error +// branch: a non-existent XtcpProtoFile must surface as an err, not a +// panic or silent zero-schema-id. +func TestRegisterProtobufSchema_missingProtoFile(t *testing.T) { + srv := httptest.NewServer(schemaRegistryHandler(1, 0)) + defer srv.Close() + x := newKafkaDestFixture(t, srv, "") + x.config.XtcpProtoFile = "/no/such/file/proto/xtcp.proto" + d := &kafkaDest{x: x} + err := d.registerProtobufSchema(context.Background()) + if err == nil { + t.Error("expected error on missing proto file") + } + if !strings.Contains(err.Error(), "read proto") { + t.Errorf("err = %v, want substring 'read proto'", err) + } +} + +// pingKafkaWithRetries — retry loop with ctx-cancel awareness — is +// tightly coupled to d.client.Ping() (a method on a kgo.Client built +// against a real broker). Without an interface seam we can't drive it +// cleanly in unit tests; the lifecycle microvm harness covers it +// against a real broker. A future refactor that extracts a pingFunc +// seam would let us cover the retry + ctx-cancel logic here. + +// ─────────────────────────────────────────────────────────────────────── +// Race — drive the pure HTTP helpers concurrently. +// ─────────────────────────────────────────────────────────────────────── + +func TestKafkaSchemaRegistryHelpers_concurrent(t *testing.T) { + const protoSrc = `syntax = "proto3"; package test; message M {}` + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]any{"id": 1}) + })) + defer srv.Close() + + const goroutines = 16 + var wg sync.WaitGroup + var calls atomic.Int64 + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + x := newKafkaDestFixture(t, srv, protoSrc) + d := &kafkaDest{x: x} + for j := 0; j < 20; j++ { + _, _ = d.getLatestSchemaID(context.Background()) + calls.Add(1) + } + }() + } + wg.Wait() + if got := calls.Load(); got != goroutines*20 { + t.Errorf("calls = %d, want %d", got, goroutines*20) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Benchmarks +// ─────────────────────────────────────────────────────────────────────── + +func BenchmarkGetLatestSchemaID(b *testing.B) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, `{"id":42}`) + })) + defer srv.Close() + x := newKafkaDestFixture(&testing.T{}, srv, "") + d := &kafkaDest{x: x} + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = d.getLatestSchemaID(ctx) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// fakeKafkaProducer — implements the kafkaProducer interface so Send / +// Close / pingKafkaWithRetries run without a real broker. +// ─────────────────────────────────────────────────────────────────────── + +type fakeKafkaProducer struct { + produceErr error + pingErr error + flushErr error + produces atomic.Int64 + flushes atomic.Int64 + closes atomic.Int64 + pings atomic.Int64 + allowRebals atomic.Int64 + // failFirstNPings makes the first N Ping calls return pingErr, + // then subsequent calls succeed. Lets tests drive the + // pingKafkaWithRetries retry path then recovery. + failFirstNPings int +} + +func (f *fakeKafkaProducer) Produce(_ context.Context, r *kgo.Record, cb func(*kgo.Record, error)) { + f.produces.Add(1) + if cb != nil { + cb(r, f.produceErr) + } +} +func (f *fakeKafkaProducer) Flush(_ context.Context) error { f.flushes.Add(1); return f.flushErr } +func (f *fakeKafkaProducer) Close() { f.closes.Add(1) } +func (f *fakeKafkaProducer) Ping(_ context.Context) error { + n := f.pings.Add(1) + if f.failFirstNPings > 0 && int(n) <= f.failFirstNPings { + return f.pingErr + } + return nil +} +func (f *fakeKafkaProducer) AllowRebalance() { f.allowRebals.Add(1) } + +// newKafkaDestForTest assembles a kafkaDest with the fake producer + +// a sync.Pool of kgo.Record and a populated x.destBytesPool so Send's +// pool-return path runs cleanly. +func newKafkaDestForTest(t *testing.T, fake *fakeKafkaProducer) *kafkaDest { + t.Helper() + x := newTestXTCP(t, "kafka:127.0.0.1:9092") + x.config.Topic = "xtcp-test" + x.destBytesPool = sync.Pool{New: func() any { b := make([]byte, 0, 128); return &b }} + d := &kafkaDest{ + x: x, + client: fake, + recordPool: sync.Pool{ + New: func() any { return new(kgo.Record) }, + }, + } + return d +} + +// ─────────────────────────────────────────────────────────────────────── +// Send +// ─────────────────────────────────────────────────────────────────────── + +func TestKafkaDest_Send_table(t *testing.T) { + cases := []struct { + name string + category string + produceErr error + produceTimeout time.Duration + wantOKCounter float64 + wantErrCounter float64 + }{ + {"positive_clean_produce", "positive", nil, 0, 1, 0}, + {"positive_with_produce_timeout", "positive", nil, 100 * time.Millisecond, 1, 0}, + {"negative_produce_err_bumps_error", "negative", errors.New("broker EOF"), 0, 0, 1}, + {"boundary_zero_timeout_uses_ctx_directly", "boundary", nil, 0, 1, 0}, + {"corner_long_produce_timeout", "corner", nil, 24 * time.Hour, 1, 0}, + {"adversarial_huge_produce_error_string", "adversarial", + errors.New(strings.Repeat("e", 1<<16)), 0, 0, 1}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + fake := &fakeKafkaProducer{produceErr: tc.produceErr} + d := newKafkaDestForTest(t, fake) + if tc.produceTimeout > 0 { + d.x.config.KafkaProduceTimeout = durationpb.New(tc.produceTimeout) + } + // destBytesPool.Put requires the caller to pass a *[]byte; + // allocate one here so Send's pool-return path runs. + buf := d.x.destBytesPool.Get().(*[]byte) + *buf = append((*buf)[:0], []byte("payload")...) + n, err := d.Send(context.Background(), buf) + if err != nil { + t.Fatalf("Send err = %v (Send itself never errors; only the callback does)", err) + } + if n != 1 { + t.Errorf("n = %d, want 1", n) + } + if fake.produces.Load() != 1 { + t.Errorf("Produce calls = %d, want 1", fake.produces.Load()) + } + gotOK := testutil.ToFloat64(d.x.pC.WithLabelValues("destKafka", "Produce", "count")) + gotErr := testutil.ToFloat64(d.x.pC.WithLabelValues("destKafka", "Produce", "error")) + if gotOK != tc.wantOKCounter { + t.Errorf("OK counter = %v, want %v", gotOK, tc.wantOKCounter) + } + if gotErr != tc.wantErrCounter { + t.Errorf("Err counter = %v, want %v", gotErr, tc.wantErrCounter) + } + }) + } +} + +// TestKafkaDest_Send_debugLog covers the debugLevel>10 branch. +func TestKafkaDest_Send_debugLog(t *testing.T) { + fake := &fakeKafkaProducer{produceErr: errors.New("err")} + d := newKafkaDestForTest(t, fake) + d.x.debugLevel = 11 + buf := d.x.destBytesPool.Get().(*[]byte) + *buf = append((*buf)[:0], []byte("x")...) + _, _ = d.Send(context.Background(), buf) +} + +// ─────────────────────────────────────────────────────────────────────── +// Close +// ─────────────────────────────────────────────────────────────────────── + +func TestKafkaDest_Close_table(t *testing.T) { + t.Parallel() + cases := []struct { + name string + category string + flushErr error + }{ + {"positive_clean_close_flushes_then_closes", "positive", nil}, + {"negative_flush_err_still_closes", "negative", errors.New("flush failed")}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + t.Parallel() + fake := &fakeKafkaProducer{flushErr: tc.flushErr} + d := newKafkaDestForTest(t, fake) + if err := d.Close(); err != nil { + t.Errorf("Close err = %v, want nil", err) + } + if fake.flushes.Load() != 1 { + t.Errorf("Flush calls = %d, want 1", fake.flushes.Load()) + } + if fake.closes.Load() != 1 { + t.Errorf("Close calls = %d, want 1", fake.closes.Load()) + } + }) + } +} + +// TestKafkaDest_CloseNilClient pins the safety check (d.client != nil). +func TestKafkaDest_CloseNilClient(t *testing.T) { + x := newTestXTCP(t, "kafka:127.0.0.1:9092") + d := &kafkaDest{x: x, client: nil} + if err := d.Close(); err != nil { + t.Errorf("Close on nil client should be nil; got %v", err) + } +} + +// TestKafkaDest_Close_debugLog covers the debugLevel>10 branch in +// the FlushOnClose error path. +func TestKafkaDest_Close_debugLog(t *testing.T) { + fake := &fakeKafkaProducer{flushErr: errors.New("flush")} + d := newKafkaDestForTest(t, fake) + d.x.debugLevel = 11 + _ = d.Close() +} + +// ─────────────────────────────────────────────────────────────────────── +// pingKafkaWithRetries — drives the retry loop via failFirstNPings. +// ─────────────────────────────────────────────────────────────────────── + +func TestPingKafkaWithRetries_table(t *testing.T) { + cases := []struct { + name string + category string + failFirstN int + retries int + wantErr bool + wantTotalPings int + }{ + {"positive_first_ping_succeeds", "positive", 0, 3, false, 1}, + {"positive_third_ping_recovers", "positive", 2, 5, false, 3}, + {"negative_all_pings_fail", "negative", 5, 3, true, 3}, + {"boundary_retries_zero", "boundary", 0, 0, false, 0}, + {"corner_exact_match_recovers", "corner", 2, 3, false, 3}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + fake := &fakeKafkaProducer{ + pingErr: errors.New("connection refused"), + failFirstNPings: tc.failFirstN, + } + d := newKafkaDestForTest(t, fake) + err := d.pingKafkaWithRetries(context.Background(), tc.retries, 1*time.Microsecond) + if (err != nil) != tc.wantErr { + t.Errorf("err = %v, wantErr = %v", err, tc.wantErr) + } + if int(fake.pings.Load()) != tc.wantTotalPings { + t.Errorf("ping calls = %d, want %d", fake.pings.Load(), tc.wantTotalPings) + } + }) + } +} + +// TestPingKafkaWithRetries_ctxCancelAbortsSleep verifies the +// ctx-cancel-during-sleep branch. +func TestPingKafkaWithRetries_ctxCancelAbortsSleep(t *testing.T) { + fake := &fakeKafkaProducer{ + pingErr: errors.New("refused"), + failFirstNPings: 100, // always fail + } + d := newKafkaDestForTest(t, fake) + ctx, cancel := context.WithCancel(context.Background()) + cancel() // pre-cancelled → first sleep aborts + err := d.pingKafkaWithRetries(ctx, 10, 100*time.Millisecond) + if err == nil { + t.Error("expected ctx-cancel err") + } + // First ping fired, then ctx-cancel aborted the sleep before the + // next ping. Want pings ≤ 2 (some implementations might let the + // second ping run before checking ctx). + if got := fake.pings.Load(); got > 2 { + t.Errorf("pings = %d, want ≤ 2 (ctx-cancel should abort retries)", got) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// newKafkaDest end-to-end — registers schema, looks up id, builds the +// (fake) producer via newKafkaProducerFn, then pingKafkaWithRetries +// succeeds against the fake. Exercises the constructor's full happy +// path without a real broker. +// ─────────────────────────────────────────────────────────────────────── + +func TestNewKafkaDest_happy(t *testing.T) { + const protoSrc = `syntax = "proto3"; package t; message M {}` + srv := httptest.NewServer(schemaRegistryHandler(7, 0)) + defer srv.Close() + + x := newTestXTCP(t, "kafka:127.0.0.1:9092") + x.config.Topic = "xtcp-test" + x.config.KafkaSchemaUrl = srv.URL + tmp := filepath.Join(t.TempDir(), "x.proto") + if err := os.WriteFile(tmp, []byte(protoSrc), 0o600); err != nil { + t.Fatalf("write tmp proto: %v", err) + } + x.config.XtcpProtoFile = tmp + + fake := &fakeKafkaProducer{} + origFactory := newKafkaProducerFn + newKafkaProducerFn = func(_ ...kgo.Opt) (kafkaProducer, error) { return fake, nil } + defer func() { newKafkaProducerFn = origFactory }() + + d, err := newKafkaDest(context.Background(), x) + if err != nil { + t.Fatalf("newKafkaDest err = %v", err) + } + if d == nil { + t.Fatal("dest is nil") + } + if fake.allowRebals.Load() != 1 { + t.Errorf("AllowRebalance calls = %d, want 1", fake.allowRebals.Load()) + } + if fake.pings.Load() < 1 { + t.Errorf("pings = %d, want ≥1", fake.pings.Load()) + } + _ = d.Close() +} + +// TestNewKafkaDest_factoryErr drives the `newKafkaProducerFn err → +// fmt.Errorf("newKafkaDest kgo.NewClient: ...")` branch. +func TestNewKafkaDest_factoryErr(t *testing.T) { + srv := httptest.NewServer(schemaRegistryHandler(1, 0)) + defer srv.Close() + x := newTestXTCP(t, "kafka:127.0.0.1:9092") + x.config.Topic = "xtcp-test" + x.config.KafkaSchemaUrl = srv.URL + tmp := filepath.Join(t.TempDir(), "x.proto") + _ = os.WriteFile(tmp, []byte(`syntax = "proto3";`), 0o600) + x.config.XtcpProtoFile = tmp + + origFactory := newKafkaProducerFn + newKafkaProducerFn = func(_ ...kgo.Opt) (kafkaProducer, error) { + return nil, errors.New("factory failed") + } + defer func() { newKafkaProducerFn = origFactory }() + + d, err := newKafkaDest(context.Background(), x) + if err == nil { + t.Fatal("expected err on factory failure") + } + if d != nil { + t.Error("dest should be nil on factory err") + } + if !strings.Contains(err.Error(), "kgo.NewClient") { + t.Errorf("err = %q, want substring 'kgo.NewClient'", err) + } +} + +// TestNewKafkaDest_pingFailExhaustsRetries drives the +// pingKafkaWithRetries-exhausted branch via a fake that fails every +// ping. Shrinks pingRetrySleep via stubbed retry count. +func TestNewKafkaDest_pingFailExhaustsRetries(t *testing.T) { + srv := httptest.NewServer(schemaRegistryHandler(1, 0)) + defer srv.Close() + x := newTestXTCP(t, "kafka:127.0.0.1:9092") + x.config.Topic = "xtcp-test" + x.config.KafkaSchemaUrl = srv.URL + tmp := filepath.Join(t.TempDir(), "x.proto") + _ = os.WriteFile(tmp, []byte(`syntax = "proto3";`), 0o600) + x.config.XtcpProtoFile = tmp + + fake := &fakeKafkaProducer{ + pingErr: errors.New("refused"), + failFirstNPings: 100, // always fail + } + origFactory := newKafkaProducerFn + newKafkaProducerFn = func(_ ...kgo.Opt) (kafkaProducer, error) { return fake, nil } + defer func() { newKafkaProducerFn = origFactory }() + + // The constructor uses kafkaPingRetriesCst (5) + kafkaPingRetrySleepCst (1s). + // With 5 retries + 1s sleep + jitter, the test wall-clocks ~10s. + // That's acceptable for one focused test. + d, err := newKafkaDest(context.Background(), x) + if err == nil { + t.Fatal("expected ping-exhaust err") + } + if d != nil { + t.Error("dest should be nil on ping exhaust") + } + if !strings.Contains(err.Error(), "pingKafka") { + t.Errorf("err = %q, want substring 'pingKafka'", err) + } +} + +// TestNewKafkaProducerReal_returnsKgoClient pins that the production +// factory returns a usable *kgo.Client. kgo.NewClient is lazy (no +// dial at construction) so this runs without a broker. The return +// value satisfies the kafkaProducer interface via *kgo.Client's +// concrete methods. +func TestNewKafkaProducerReal_returnsKgoClient(t *testing.T) { + p, err := newKafkaProducerReal(kgo.SeedBrokers("127.0.0.1:9092")) + if err != nil { + t.Fatalf("newKafkaProducerReal err = %v", err) + } + if p == nil { + t.Fatal("producer nil") + } + defer p.Close() +} + +// TestPingKafkaWithRetries_debugLog covers the debug-log branch. +func TestPingKafkaWithRetries_debugLog(t *testing.T) { + fake := &fakeKafkaProducer{ + pingErr: errors.New("refused"), + failFirstNPings: 2, + } + d := newKafkaDestForTest(t, fake) + d.x.debugLevel = 11 + _ = d.pingKafkaWithRetries(context.Background(), 3, 1*time.Microsecond) +} diff --git a/pkg/xtcp/destinations_nats.go b/pkg/xtcp/destinations_nats.go index 8f48258..f23711c 100644 --- a/pkg/xtcp/destinations_nats.go +++ b/pkg/xtcp/destinations_nats.go @@ -17,10 +17,32 @@ const ( natsTimeoutCst = 1 * time.Second ) +// natsPublisher captures the surface of *nats.Conn that natsDest +// actually calls. Lifting it to an interface lets the destination's +// Send/Close paths run against an in-process fake without a real +// NATS server — see destinations_nats_test.go. *nats.Conn satisfies +// this interface via its concrete methods. +type natsPublisher interface { + Publish(subj string, data []byte) error + FlushTimeout(timeout time.Duration) error + Close() +} + // natsDest publishes each marshalled record to a NATS subject. type natsDest struct { x *XTCP - client *nats.Conn + client natsPublisher +} + +// newNATSConnFn is the factory tests swap to inject a fake +// natsPublisher without dialing a real NATS server. Production +// callers leave this at the default (newNATSConnReal). +var newNATSConnFn = newNATSConnReal + +// newNATSConnReal is the production factory: opens a real *nats.Conn +// via nats.Options.Connect with the canonical retry config. +func newNATSConnReal(opts nats.Options) (natsPublisher, error) { + return opts.Connect() } func newNATSDest(_ context.Context, x *XTCP) (Destination, error) { @@ -39,7 +61,7 @@ func newNATSDest(_ context.Context, x *XTCP) (Destination, error) { RetryOnFailedConnect: true, Timeout: natsTimeoutCst, } - client, err := opts.Connect() + client, err := newNATSConnFn(opts) if err != nil { return nil, fmt.Errorf("newNATSDest opts.Connect: %w", err) } diff --git a/pkg/xtcp/destinations_nats_test.go b/pkg/xtcp/destinations_nats_test.go new file mode 100644 index 0000000..b17af33 --- /dev/null +++ b/pkg/xtcp/destinations_nats_test.go @@ -0,0 +1,363 @@ +//go:build dest_nats + +package xtcp + +import ( + "context" + "errors" + "net" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + nats "github.com/nats-io/nats.go" + "github.com/prometheus/client_golang/prometheus/testutil" +) + +// destinations_nats_test.go exercises pkg/xtcp/destinations_nats.go +// under the `dest_nats` build tag. The production code is tightly +// coupled to the nats.Conn type (no interface seam), so unit tests +// without a real NATS broker are limited. End-to-end Send/Close/ +// publish coverage runs against a real nats-server inside the +// microvm lifecycle harness. +// +// What we CAN cover here: +// - init() side effect (dispatch registered) +// - constant values (natsReconnectsCst, natsTimeoutCst) +// - Close on nil client is safe (no panic) +// - newNATSDest fails-fast (or fast-enough) on an unreachable URL +// so callers don't hang forever during daemon startup + +// ─────────────────────────────────────────────────────────────────────── +// init() side effect +// ─────────────────────────────────────────────────────────────────────── + +func TestNATSDest_initRegistersScheme(t *testing.T) { + if !IsKnownScheme(schemeNats) { + t.Errorf("scheme %q should be registered under build tag dest_nats", schemeNats) + } + _, status := lookupDestinationFactory(schemeNats) + if status != destLookupFound { + t.Errorf("lookupDestinationFactory(%q) status = %d, want destLookupFound", schemeNats, status) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Constants — pin against unintended timeout / reconnect-count changes +// ─────────────────────────────────────────────────────────────────────── + +func TestNATSDestConstants(t *testing.T) { + // 5 reconnects @ 2s + jitter = bounded recovery window + if natsReconnectsCst != 5 { + t.Errorf("natsReconnectsCst = %d, want 5", natsReconnectsCst) + } + // 1s per connection attempt — keeps startup from hanging. + if natsTimeoutCst != 1*time.Second { + t.Errorf("natsTimeoutCst = %v, want 1s", natsTimeoutCst) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Close-on-nil-client must not panic. Pin the safety check at +// destinations_nats.go:67 (`if d.client != nil { … }`). +// ─────────────────────────────────────────────────────────────────────── + +func TestNATSDest_CloseNilClient(t *testing.T) { + d := &natsDest{x: newTestXTCP(t, "nats:127.0.0.1:4222"), client: nil} + if err := d.Close(); err != nil { + t.Errorf("Close on nil client should be nil; got %v", err) + } +} + +// (A "newNATSDest returns within natsTimeoutCst on unreachable URL" +// test would require precise control over nats.go's +// RetryOnFailedConnect semantics, which vary across versions and can +// block for MaxReconnect * ReconnectWait = 10s on connection refusal. +// The stripsScheme test below indirectly covers the bounded-time +// behaviour by completing within natsTimeoutCst + 2s grace once the +// fake listener accepts.) + +// TestNewNATSDest_stripsScheme verifies that the "nats:" scheme prefix +// is removed before being passed to nats.Options.Url. Without a real +// server we observe this indirectly: a URL of "nats:127.0.0.1:65535" +// must NOT result in nats trying to dial literally "nats:127.0.0.1:65535" +// (which would fail with "no such host" rather than "connection +// refused"). +// +// We test this by setting up a TCP listener on 127.0.0.1:0, deriving +// the addr, then asking newNATSDest to connect to "nats:" — if +// the prefix stripping works, the listener will receive a connection +// attempt within ~natsTimeoutCst. (We don't speak NATS protocol on +// the listener side; the goal is just to observe the dial reached the +// right host:port.) +func TestNewNATSDest_stripsScheme(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + defer func() { _ = ln.Close() }() + + connected := make(chan struct{}, 1) + go func() { + conn, err := ln.Accept() + if err != nil { + return + } + connected <- struct{}{} + _ = conn.Close() + }() + + x := newTestXTCP(t, "nats:"+ln.Addr().String()) + done := make(chan struct{}) + go func() { + defer close(done) + d, _ := newNATSDest(context.Background(), x) + if d != nil { + _ = d.Close() + } + }() + + select { + case <-connected: + // Dial reached our fake listener at the stripped host:port. + // Cleanup the dialer goroutine. + <-done + case <-time.After(natsTimeoutCst + 2*time.Second): + t.Fatal("nats client did not dial the stripped host:port within natsTimeoutCst + 2s") + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Race — drive the helpers concurrently. Each goroutine builds its +// own natsDest with a nil client and exercises Close. +// ─────────────────────────────────────────────────────────────────────── + +func TestNATSDest_concurrentCloseOnNil(t *testing.T) { + const goroutines = 16 + var wg sync.WaitGroup + var calls atomic.Int64 + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + d := &natsDest{x: newTestXTCP(t, "nats:127.0.0.1:4222"), client: nil} + if err := d.Close(); err != nil { + return + } + calls.Add(1) + } + }() + } + wg.Wait() + if got := calls.Load(); got != goroutines*100 { + t.Errorf("calls = %d, want %d", got, goroutines*100) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Benchmark +// ─────────────────────────────────────────────────────────────────────── + +func BenchmarkNATSDest_CloseNilClient(b *testing.B) { + x := newTestXTCP(&testing.T{}, "nats:127.0.0.1:4222") + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + d := &natsDest{x: x, client: nil} + _ = d.Close() + } +} + +// ─────────────────────────────────────────────────────────────────────── +// fakeNATSPublisher + Send/Close tests via the natsPublisher interface +// seam. Same shape as fakeKafkaProducer in destinations_kafka_test.go. +// ─────────────────────────────────────────────────────────────────────── + +type fakeNATSPublisher struct { + publishErr error + flushErr error + publishes atomic.Int64 + flushes atomic.Int64 + closes atomic.Int64 + lastSubj string + lastData []byte +} + +func (f *fakeNATSPublisher) Publish(subj string, data []byte) error { + f.publishes.Add(1) + f.lastSubj = subj + f.lastData = append(f.lastData[:0], data...) + return f.publishErr +} +func (f *fakeNATSPublisher) FlushTimeout(_ time.Duration) error { + f.flushes.Add(1) + return f.flushErr +} +func (f *fakeNATSPublisher) Close() { f.closes.Add(1) } + +func newNATSDestForTest(t *testing.T, fake *fakeNATSPublisher) *natsDest { + t.Helper() + x := newTestXTCP(t, "nats:127.0.0.1:4222") + x.config.Topic = "xtcp-test" + return &natsDest{x: x, client: fake} +} + +// ─────────────────────────────────────────────────────────────────────── +// Send +// ─────────────────────────────────────────────────────────────────────── + +func TestNATSDest_Send_table(t *testing.T) { + t.Parallel() + cases := []struct { + name string + category string + publishErr error + wantN int + wantErr bool + wantOKCounter float64 + wantErrCounter float64 + }{ + {"positive_clean_publish", "positive", nil, 1, false, 1, 0}, + {"negative_publish_err", "negative", errors.New("no servers"), 0, true, 0, 1}, + {"boundary_publish_returns_eof", "boundary", errors.New("EOF"), 0, true, 0, 1}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + t.Parallel() + fake := &fakeNATSPublisher{publishErr: tc.publishErr} + d := newNATSDestForTest(t, fake) + payload := []byte("payload") + n, err := d.Send(context.Background(), &payload) + if n != tc.wantN { + t.Errorf("n = %d, want %d", n, tc.wantN) + } + if (err != nil) != tc.wantErr { + t.Errorf("err = %v, wantErr = %v", err, tc.wantErr) + } + if fake.publishes.Load() != 1 { + t.Errorf("Publish calls = %d, want 1", fake.publishes.Load()) + } + if fake.lastSubj != "xtcp-test" { + t.Errorf("subj = %q, want xtcp-test", fake.lastSubj) + } + gotOK := testutil.ToFloat64(d.x.pC.WithLabelValues("destNATS", "Publish", "count")) + gotErr := testutil.ToFloat64(d.x.pC.WithLabelValues("destNATS", "Publish", "error")) + if gotOK != tc.wantOKCounter { + t.Errorf("OK counter = %v, want %v", gotOK, tc.wantOKCounter) + } + if gotErr != tc.wantErrCounter { + t.Errorf("Err counter = %v, want %v", gotErr, tc.wantErrCounter) + } + }) + } +} + +// TestNATSDest_Send_debugLog covers the debugLevel>10 branch. +func TestNATSDest_Send_debugLog(t *testing.T) { + fake := &fakeNATSPublisher{} + d := newNATSDestForTest(t, fake) + d.x.debugLevel = 11 + payload := []byte("x") + _, _ = d.Send(context.Background(), &payload) +} + +// ─────────────────────────────────────────────────────────────────────── +// Close +// ─────────────────────────────────────────────────────────────────────── + +func TestNATSDest_Close_table(t *testing.T) { + t.Parallel() + cases := []struct { + name string + category string + flushErr error + }{ + {"positive_clean_close", "positive", nil}, + {"negative_flush_err_still_closes", "negative", errors.New("flush timeout")}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + t.Parallel() + fake := &fakeNATSPublisher{flushErr: tc.flushErr} + d := newNATSDestForTest(t, fake) + if err := d.Close(); err != nil { + t.Errorf("Close err = %v, want nil", err) + } + if fake.flushes.Load() != 1 { + t.Errorf("FlushTimeout calls = %d, want 1", fake.flushes.Load()) + } + if fake.closes.Load() != 1 { + t.Errorf("Close calls = %d, want 1", fake.closes.Load()) + } + }) + } +} + +// TestNATSDest_Close_debugLog covers the debug-log branch on flush err. +func TestNATSDest_Close_debugLog(t *testing.T) { + fake := &fakeNATSPublisher{flushErr: errors.New("err")} + d := newNATSDestForTest(t, fake) + d.x.debugLevel = 11 + _ = d.Close() +} + +// TestNewNATSDest_happy drives the full constructor path via the +// newNATSConnFn factory seam. +func TestNewNATSDest_happy(t *testing.T) { + fake := &fakeNATSPublisher{} + orig := newNATSConnFn + newNATSConnFn = func(_ nats.Options) (natsPublisher, error) { return fake, nil } + defer func() { newNATSConnFn = orig }() + + x := newTestXTCP(t, "nats:127.0.0.1:4222") + d, err := newNATSDest(context.Background(), x) + if err != nil { + t.Fatalf("newNATSDest err = %v", err) + } + if d == nil { + t.Fatal("dest nil") + } + _ = d.Close() +} + +// TestNewNATSDest_factoryErr drives the constructor's error-wrap path. +func TestNewNATSDest_factoryErr(t *testing.T) { + orig := newNATSConnFn + newNATSConnFn = func(_ nats.Options) (natsPublisher, error) { + return nil, errors.New("synthetic") + } + defer func() { newNATSConnFn = orig }() + + x := newTestXTCP(t, "nats:127.0.0.1:4222") + d, err := newNATSDest(context.Background(), x) + if err == nil { + t.Error("expected err") + } + if d != nil { + t.Error("dest should be nil") + } + if !strings.Contains(err.Error(), "opts.Connect") { + t.Errorf("err = %q, want substring 'opts.Connect'", err) + } +} + +// TestNewNATSDest_debugLog covers the debug-log gate in newNATSDest. +func TestNewNATSDest_debugLog(t *testing.T) { + fake := &fakeNATSPublisher{} + orig := newNATSConnFn + newNATSConnFn = func(_ nats.Options) (natsPublisher, error) { return fake, nil } + defer func() { newNATSConnFn = orig }() + + x := newTestXTCP(t, "nats:127.0.0.1:4222") + x.debugLevel = 11 + d, err := newNATSDest(context.Background(), x) + if err != nil { + t.Fatalf("newNATSDest err = %v", err) + } + _ = d.Close() +} diff --git a/pkg/xtcp/destinations_nsq.go b/pkg/xtcp/destinations_nsq.go index 29de6fe..47dfa61 100644 --- a/pkg/xtcp/destinations_nsq.go +++ b/pkg/xtcp/destinations_nsq.go @@ -12,10 +12,31 @@ import ( nsq "github.com/nsqio/go-nsq" ) +// nsqProducer captures the surface of *nsq.Producer that nsqDest +// actually calls. Lifting it to an interface lets the destination's +// Send/Close paths run against an in-process fake without a real +// nsqd — see destinations_nsq_test.go. *nsq.Producer satisfies this +// interface via its concrete methods. +type nsqProducer interface { + Publish(topic string, body []byte) error + Stop() +} + // nsqDest publishes each marshalled record to an NSQ topic. type nsqDest struct { x *XTCP - producer *nsq.Producer + producer nsqProducer +} + +// newNSQProducerFn is the factory tests swap to inject a fake +// nsqProducer without spinning up an nsqd. Production callers leave +// this at the default (newNSQProducerReal). +var newNSQProducerFn = newNSQProducerReal + +// newNSQProducerReal is the production factory: nsq.NewProducer is +// lazy (no dial at construction), so this is a pure wrapper. +func newNSQProducerReal(addr string, cfg *nsq.Config) (nsqProducer, error) { + return nsq.NewProducer(addr, cfg) } func newNSQDest(_ context.Context, x *XTCP) (Destination, error) { @@ -26,7 +47,7 @@ func newNSQDest(_ context.Context, x *XTCP) (Destination, error) { log.Println("nsq addr:", addr) } cfg := nsq.NewConfig() - producer, err := nsq.NewProducer(addr, cfg) + producer, err := newNSQProducerFn(addr, cfg) if err != nil { return nil, fmt.Errorf("newNSQDest nsq.NewProducer: %w", err) } diff --git a/pkg/xtcp/destinations_nsq_test.go b/pkg/xtcp/destinations_nsq_test.go new file mode 100644 index 0000000..263454f --- /dev/null +++ b/pkg/xtcp/destinations_nsq_test.go @@ -0,0 +1,344 @@ +//go:build dest_nsq + +package xtcp + +import ( + "context" + "errors" + "net" + "sync" + "sync/atomic" + "testing" + "time" + + nsq "github.com/nsqio/go-nsq" + "github.com/prometheus/client_golang/prometheus/testutil" +) + +// destinations_nsq_test.go exercises pkg/xtcp/destinations_nsq.go +// under the `dest_nsq` build tag. Scope mirrors P2.1/P2.2: cover +// what's testable without a real nsqd. The end-to-end Publish flow +// runs against a real nsqd inside the microvm lifecycle harness. +// +// Conveniently, nsq.NewProducer is lazy — it validates the addr +// format but does not connect until Publish() is called — so the +// constructor and Close path are unit-testable. Send is testable +// only in the failure direction (no broker → returns error). + +// ─────────────────────────────────────────────────────────────────────── +// init() side effect +// ─────────────────────────────────────────────────────────────────────── + +func TestNSQDest_initRegistersScheme(t *testing.T) { + if !IsKnownScheme(schemeNsq) { + t.Errorf("scheme %q should be registered under build tag dest_nsq", schemeNsq) + } + _, status := lookupDestinationFactory(schemeNsq) + if status != destLookupFound { + t.Errorf("lookupDestinationFactory(%q) status = %d, want destLookupFound", schemeNsq, status) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// newNSQDest — happy path constructor + error path +// ─────────────────────────────────────────────────────────────────────── + +func TestNewNSQDest_table(t *testing.T) { + cases := []struct { + name string + category string + dest string + }{ + {"positive_host_port", "positive", "nsq:127.0.0.1:4150"}, + {"positive_localhost", "positive", "nsq:localhost:4150"}, + {"boundary_high_port", "boundary", "nsq:127.0.0.1:65535"}, + // Documented permissive behaviour: nsq.NewProducer doesn't dial + // at construction time and accepts almost any addr string; + // errors surface later via Publish. Pin that — a future + // NewProducer that pre-validates would catch these rows. + {"corner_empty_after_prefix", "corner", "nsq:"}, + {"corner_addr_without_port", "corner", "nsq:127.0.0.1"}, + {"adversarial_only_colon", "adversarial", "nsq::"}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + x := newTestXTCP(t, tc.dest) + d, err := newNSQDest(context.Background(), x) + if err != nil { + t.Errorf("newNSQDest err = %v; current NSQ behaviour is permissive at construction time", err) + } + if d != nil { + _ = d.Close() + } + }) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Close-on-nil-producer must not panic. +// ─────────────────────────────────────────────────────────────────────── + +func TestNSQDest_CloseNilProducer(t *testing.T) { + d := &nsqDest{x: newTestXTCP(t, "nsq:127.0.0.1:4150"), producer: nil} + if err := d.Close(); err != nil { + t.Errorf("Close on nil producer should be nil; got %v", err) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Send against an unreachable broker. The Publish call is synchronous +// — it dials, fails, returns the error. We pin: the metric counter +// for the error path is incremented exactly once. +// ─────────────────────────────────────────────────────────────────────── + +func TestNSQDest_SendUnreachableIncrementsErrCounter(t *testing.T) { + x := newTestXTCP(t, "nsq:127.0.0.1:1") // port 1 → refused + x.config.Topic = "xtcp-test" + d, err := newNSQDest(context.Background(), x) + if err != nil { + t.Fatalf("newNSQDest: %v", err) + } + defer func() { _ = d.Close() }() + + payload := []byte("hello") + n, sendErr := d.Send(context.Background(), &payload) + if sendErr == nil { + t.Error("expected Send to err against unreachable broker") + } + if n != 0 { + t.Errorf("n = %d on error, want 0", n) + } + got := testutil.ToFloat64(x.pC.WithLabelValues("destNSQ", "Publish", "error")) + if got != 1 { + t.Errorf("err counter = %v, want 1", got) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// "nsq:" scheme stripping — verified via a fake listener that accepts +// the TCP connection NSQ producer attempts when Publish is called. +// ─────────────────────────────────────────────────────────────────────── + +func TestNewNSQDest_stripsScheme(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + defer func() { _ = ln.Close() }() + + connected := make(chan struct{}, 1) + go func() { + conn, err := ln.Accept() + if err != nil { + return + } + connected <- struct{}{} + _ = conn.Close() + }() + + x := newTestXTCP(t, "nsq:"+ln.Addr().String()) + x.config.Topic = "xtcp-test" + d, err := newNSQDest(context.Background(), x) + if err != nil { + t.Fatalf("newNSQDest: %v", err) + } + defer func() { _ = d.Close() }() + + // Trigger an actual dial by calling Publish; we don't care if it + // completes — we only want to see the listener accept. + payload := []byte("ping") + go func() { _, _ = d.Send(context.Background(), &payload) }() + + select { + case <-connected: + // dial reached the stripped host:port + case <-time.After(3 * time.Second): + t.Fatal("nsq producer did not dial the stripped host:port within 3s") + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Race +// ─────────────────────────────────────────────────────────────────────── + +func TestNSQDest_concurrentCloseOnNil(t *testing.T) { + const goroutines = 16 + var wg sync.WaitGroup + var calls atomic.Int64 + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + d := &nsqDest{x: newTestXTCP(t, "nsq:127.0.0.1:4150"), producer: nil} + if err := d.Close(); err != nil { + return + } + calls.Add(1) + } + }() + } + wg.Wait() + if got := calls.Load(); got != goroutines*100 { + t.Errorf("calls = %d, want %d", got, goroutines*100) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Benchmark +// ─────────────────────────────────────────────────────────────────────── + +func BenchmarkNSQDest_NewAndClose(b *testing.B) { + x := newTestXTCP(&testing.T{}, "nsq:127.0.0.1:4150") + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + d, _ := newNSQDest(context.Background(), x) + if d != nil { + _ = d.Close() + } + } +} + +// ─────────────────────────────────────────────────────────────────────── +// fakeNSQProducer + Send/Close tests via the nsqProducer interface seam. +// ─────────────────────────────────────────────────────────────────────── + +type fakeNSQProducer struct { + publishErr error + publishes atomic.Int64 + stops atomic.Int64 + lastTopic string + lastBody []byte +} + +func (f *fakeNSQProducer) Publish(topic string, body []byte) error { + f.publishes.Add(1) + f.lastTopic = topic + f.lastBody = append(f.lastBody[:0], body...) + return f.publishErr +} +func (f *fakeNSQProducer) Stop() { f.stops.Add(1) } + +func newNSQDestForTest(t *testing.T, fake *fakeNSQProducer) *nsqDest { + t.Helper() + x := newTestXTCP(t, "nsq:127.0.0.1:4150") + x.config.Topic = "xtcp-test" + return &nsqDest{x: x, producer: fake} +} + +func TestNSQDest_Send_table(t *testing.T) { + t.Parallel() + cases := []struct { + name string + category string + publishErr error + wantN int + wantErr bool + wantOKCounter float64 + wantErrCounter float64 + }{ + {"positive_clean_publish", "positive", nil, 1, false, 1, 0}, + {"negative_publish_err", "negative", errors.New("broker EOF"), 0, true, 0, 1}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + t.Parallel() + fake := &fakeNSQProducer{publishErr: tc.publishErr} + d := newNSQDestForTest(t, fake) + payload := []byte("payload") + n, err := d.Send(context.Background(), &payload) + if n != tc.wantN { + t.Errorf("n = %d, want %d", n, tc.wantN) + } + if (err != nil) != tc.wantErr { + t.Errorf("err = %v, wantErr = %v", err, tc.wantErr) + } + if fake.lastTopic != "xtcp-test" { + t.Errorf("topic = %q, want xtcp-test", fake.lastTopic) + } + gotOK := testutil.ToFloat64(d.x.pC.WithLabelValues("destNSQ", "Publish", "count")) + gotErr := testutil.ToFloat64(d.x.pC.WithLabelValues("destNSQ", "Publish", "error")) + if gotOK != tc.wantOKCounter { + t.Errorf("OK counter = %v, want %v", gotOK, tc.wantOKCounter) + } + if gotErr != tc.wantErrCounter { + t.Errorf("Err counter = %v, want %v", gotErr, tc.wantErrCounter) + } + }) + } +} + +func TestNSQDest_Send_debugLog(t *testing.T) { + fake := &fakeNSQProducer{} + d := newNSQDestForTest(t, fake) + d.x.debugLevel = 11 + payload := []byte("x") + _, _ = d.Send(context.Background(), &payload) +} + +func TestNSQDest_Close_stopsProducer(t *testing.T) { + fake := &fakeNSQProducer{} + d := newNSQDestForTest(t, fake) + if err := d.Close(); err != nil { + t.Errorf("Close err = %v, want nil", err) + } + if fake.stops.Load() != 1 { + t.Errorf("Stop calls = %d, want 1", fake.stops.Load()) + } +} + +// TestNewNSQDest_happy drives the constructor via the newNSQProducerFn +// factory seam. +func TestNewNSQDest_happy(t *testing.T) { + fake := &fakeNSQProducer{} + orig := newNSQProducerFn + newNSQProducerFn = func(_ string, _ *nsq.Config) (nsqProducer, error) { return fake, nil } + defer func() { newNSQProducerFn = orig }() + + x := newTestXTCP(t, "nsq:127.0.0.1:4150") + d, err := newNSQDest(context.Background(), x) + if err != nil { + t.Fatalf("newNSQDest err = %v", err) + } + if d == nil { + t.Fatal("dest nil") + } + _ = d.Close() +} + +// TestNewNSQDest_factoryErr drives the error-wrap branch. +func TestNewNSQDest_factoryErr(t *testing.T) { + orig := newNSQProducerFn + newNSQProducerFn = func(_ string, _ *nsq.Config) (nsqProducer, error) { + return nil, errors.New("synthetic") + } + defer func() { newNSQProducerFn = orig }() + + x := newTestXTCP(t, "nsq:127.0.0.1:4150") + d, err := newNSQDest(context.Background(), x) + if err == nil { + t.Error("expected err") + } + if d != nil { + t.Error("dest should be nil") + } +} + +// TestNewNSQDest_debugLog covers the debug-log gate. +func TestNewNSQDest_debugLog(t *testing.T) { + fake := &fakeNSQProducer{} + orig := newNSQProducerFn + newNSQProducerFn = func(_ string, _ *nsq.Config) (nsqProducer, error) { return fake, nil } + defer func() { newNSQProducerFn = orig }() + + x := newTestXTCP(t, "nsq:127.0.0.1:4150") + x.debugLevel = 11 + d, err := newNSQDest(context.Background(), x) + if err != nil { + t.Fatalf("newNSQDest err = %v", err) + } + _ = d.Close() +} diff --git a/pkg/xtcp/destinations_valkey.go b/pkg/xtcp/destinations_valkey.go index 653d7e9..2f7cbc5 100644 --- a/pkg/xtcp/destinations_valkey.go +++ b/pkg/xtcp/destinations_valkey.go @@ -18,11 +18,56 @@ const ( valkeyTimeoutCst = 1 * time.Second ) +// valkeyPublisher is the surface valkeyDest needs from a Valkey/Redis +// client: an error-returning Publish, Ping, and Close. *redis.Client +// returns *redis.IntCmd / *redis.StatusCmd chains; we adapt those to +// flat error returns via redisClientAdapter below so tests can mock +// the whole thing without standing up a real Valkey server. +type valkeyPublisher interface { + Publish(ctx context.Context, channel string, msg []byte) error + Ping(ctx context.Context) error + Close() error +} + +// redisClientAdapter wraps a *redis.Client so it satisfies +// valkeyPublisher. Production-only; the test fake bypasses this. +type redisClientAdapter struct { + c *redis.Client +} + +func (a *redisClientAdapter) Publish(ctx context.Context, channel string, msg []byte) error { + return a.c.Publish(ctx, channel, msg).Err() +} + +func (a *redisClientAdapter) Ping(ctx context.Context) error { + _, err := a.c.Ping(ctx).Result() + return err +} + +func (a *redisClientAdapter) Close() error { return a.c.Close() } + // valkeyDest publishes each marshalled record to a Valkey (Redis-protocol) // pub/sub channel. type valkeyDest struct { x *XTCP - client *redis.Client + client valkeyPublisher +} + +// newValkeyClientFn is the factory tests swap to inject a fake +// valkeyPublisher without spinning up a real *redis.Client. Production +// callers leave this at the default (newValkeyClientReal). +var newValkeyClientFn = newValkeyClientReal + +// newValkeyClientReal is the production factory: builds a real +// *redis.Client wrapped in redisClientAdapter so it satisfies +// valkeyPublisher. +func newValkeyClientReal(addr string) valkeyPublisher { + return &redisClientAdapter{c: redis.NewClient(&redis.Options{ + Addr: addr, + Password: "", + DB: 0, + MaxIdleConns: valkeyMaxIdleConnsCst, + })} } func newValKeyDest(ctx context.Context, x *XTCP) (Destination, error) { @@ -32,17 +77,12 @@ func newValKeyDest(ctx context.Context, x *XTCP) (Destination, error) { log.Println("config.Dest:", x.config.Dest) log.Println("valkey addr:", addr) } - client := redis.NewClient(&redis.Options{ - Addr: addr, - Password: "", - DB: 0, - MaxIdleConns: valkeyMaxIdleConnsCst, - }) + client := newValkeyClientFn(addr) pCtx, cancel := context.WithTimeout(ctx, valkeyPingTimeoutCst) defer cancel() start := time.Now() - if _, err := client.Ping(pCtx).Result(); err != nil { + if err := client.Ping(pCtx); err != nil { return nil, fmt.Errorf("newValKeyDest ping (%0.6fs): %w", time.Since(start).Seconds(), err) } @@ -56,7 +96,7 @@ func (d *valkeyDest) Send(ctx context.Context, b *[]byte) (int, error) { start := time.Now() pCtx, cancel := context.WithTimeout(ctx, valkeyTimeoutCst) defer cancel() - err := d.client.Publish(pCtx, d.x.config.Topic, *b).Err() + err := d.client.Publish(pCtx, d.x.config.Topic, *b) dur := time.Since(start) if err != nil { d.x.pH.WithLabelValues("destValKey", "Publish", "error").Observe(dur.Seconds()) diff --git a/pkg/xtcp/destinations_valkey_test.go b/pkg/xtcp/destinations_valkey_test.go new file mode 100644 index 0000000..02c1545 --- /dev/null +++ b/pkg/xtcp/destinations_valkey_test.go @@ -0,0 +1,324 @@ +//go:build dest_valkey + +package xtcp + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" +) + +// destinations_valkey_test.go exercises pkg/xtcp/destinations_valkey.go +// under the `dest_valkey` build tag. +// +// Scope notes: newValKeyDest calls client.Ping() at construction +// time, and go-redis v9 does a multi-step RESP3 negotiation (HELLO, +// CLIENT SETINFO …) before PING. Faking that handshake in-process is +// hard to keep robust across go-redis upgrades, so this file only +// covers what's testable without a real Valkey/Redis server: +// +// - init() side effect (dispatch registered) +// - constants (pool size + ping/IO timeouts) +// - Close-on-nil-client safety +// - newValKeyDest against an unreachable URL: returns an err within +// valkeyPingTimeoutCst + grace, never hangs +// +// The happy-path Publish flow runs against a real Valkey inside the +// microvm lifecycle harness (where the kgo/sr / NATS / NSQ / Valkey +// integration tests all share an actual service). + +// ─────────────────────────────────────────────────────────────────────── +// init() side effect +// ─────────────────────────────────────────────────────────────────────── + +func TestValkeyDest_initRegistersScheme(t *testing.T) { + if !IsKnownScheme(schemeValkey) { + t.Errorf("scheme %q should be registered under build tag dest_valkey", schemeValkey) + } + _, status := lookupDestinationFactory(schemeValkey) + if status != destLookupFound { + t.Errorf("lookupDestinationFactory(%q) status = %d, want destLookupFound", schemeValkey, status) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Constants +// ─────────────────────────────────────────────────────────────────────── + +func TestValkeyDestConstants(t *testing.T) { + if valkeyPingTimeoutCst != 2*time.Second { + t.Errorf("valkeyPingTimeoutCst = %v, want 2s", valkeyPingTimeoutCst) + } + if valkeyTimeoutCst != 1*time.Second { + t.Errorf("valkeyTimeoutCst = %v, want 1s", valkeyTimeoutCst) + } + if valkeyMaxIdleConnsCst != 20 { + t.Errorf("valkeyMaxIdleConnsCst = %d, want 20", valkeyMaxIdleConnsCst) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Close on nil client must not panic. +// ─────────────────────────────────────────────────────────────────────── + +func TestValkeyDest_CloseNilClient(t *testing.T) { + d := &valkeyDest{x: newTestXTCP(t, "valkey:127.0.0.1:6379"), client: nil} + if err := d.Close(); err != nil { + t.Errorf("Close on nil client should be nil; got %v", err) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// newValKeyDest against an unreachable URL — must surface an err +// within valkeyPingTimeoutCst (2s) + 1s grace, never hang. +// ─────────────────────────────────────────────────────────────────────── + +func TestNewValKeyDest_unreachableURL(t *testing.T) { + x := newTestXTCP(t, "valkey:127.0.0.1:1") // port 1 → connection refused + start := time.Now() + d, err := newValKeyDest(context.Background(), x) + if err == nil { + t.Error("expected err on unreachable URL") + } + if d != nil { + _ = d.Close() + } + if elapsed := time.Since(start); elapsed > valkeyPingTimeoutCst+1*time.Second { + t.Errorf("returned in %v; expected ≤ %v", elapsed, valkeyPingTimeoutCst+1*time.Second) + } +} + +func TestNewValKeyDest_emptyAddrAfterPrefix(t *testing.T) { + x := newTestXTCP(t, "valkey:") + d, err := newValKeyDest(context.Background(), x) + if err == nil { + t.Error("expected err on empty addr") + } + if d != nil { + _ = d.Close() + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Race — concurrent Close-on-nil +// ─────────────────────────────────────────────────────────────────────── + +func TestValkeyDest_concurrentCloseOnNil(t *testing.T) { + const goroutines = 16 + var wg sync.WaitGroup + var calls atomic.Int64 + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + d := &valkeyDest{x: newTestXTCP(t, "valkey:127.0.0.1:6379"), client: nil} + if err := d.Close(); err != nil { + return + } + calls.Add(1) + } + }() + } + wg.Wait() + if got := calls.Load(); got != goroutines*100 { + t.Errorf("calls = %d, want %d", got, goroutines*100) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Benchmark +// ─────────────────────────────────────────────────────────────────────── + +func BenchmarkValkeyDest_CloseNilClient(b *testing.B) { + x := newTestXTCP(&testing.T{}, "valkey:127.0.0.1:6379") + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + d := &valkeyDest{x: x, client: nil} + _ = d.Close() + } +} + +// ─────────────────────────────────────────────────────────────────────── +// fakeValkeyPublisher + Send/Close tests via the valkeyPublisher +// interface seam. Same shape as fakeKafkaProducer / fakeNATSPublisher +// / fakeNSQProducer above. +// ─────────────────────────────────────────────────────────────────────── + +type fakeValkeyPublisher struct { + publishErr error + pingErr error + closeErr error + publishes atomic.Int64 + pings atomic.Int64 + closes atomic.Int64 + lastChan string + lastMsg []byte +} + +func (f *fakeValkeyPublisher) Publish(_ context.Context, channel string, msg []byte) error { + f.publishes.Add(1) + f.lastChan = channel + f.lastMsg = append(f.lastMsg[:0], msg...) + return f.publishErr +} +func (f *fakeValkeyPublisher) Ping(_ context.Context) error { f.pings.Add(1); return f.pingErr } +func (f *fakeValkeyPublisher) Close() error { f.closes.Add(1); return f.closeErr } + +func newValkeyDestForTest(t *testing.T, fake *fakeValkeyPublisher) *valkeyDest { + t.Helper() + x := newTestXTCP(t, "valkey:127.0.0.1:6379") + x.config.Topic = "xtcp-test" + return &valkeyDest{x: x, client: fake} +} + +func TestValkeyDest_Send_table(t *testing.T) { + t.Parallel() + cases := []struct { + name string + category string + publishErr error + wantN int + wantErr bool + wantOKCounter float64 + wantErrCounter float64 + }{ + {"positive_clean_publish", "positive", nil, 1, false, 1, 0}, + {"negative_publish_err", "negative", errors.New("connection refused"), 0, true, 0, 1}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + t.Parallel() + fake := &fakeValkeyPublisher{publishErr: tc.publishErr} + d := newValkeyDestForTest(t, fake) + payload := []byte("payload") + n, err := d.Send(context.Background(), &payload) + if n != tc.wantN { + t.Errorf("n = %d, want %d", n, tc.wantN) + } + if (err != nil) != tc.wantErr { + t.Errorf("err = %v, wantErr = %v", err, tc.wantErr) + } + if fake.lastChan != "xtcp-test" { + t.Errorf("channel = %q, want xtcp-test", fake.lastChan) + } + gotOK := testutil.ToFloat64(d.x.pC.WithLabelValues("destValKey", "Publish", "count")) + gotErr := testutil.ToFloat64(d.x.pC.WithLabelValues("destValKey", "Publish", "error")) + if gotOK != tc.wantOKCounter { + t.Errorf("OK counter = %v, want %v", gotOK, tc.wantOKCounter) + } + if gotErr != tc.wantErrCounter { + t.Errorf("Err counter = %v, want %v", gotErr, tc.wantErrCounter) + } + }) + } +} + +func TestValkeyDest_Send_debugLog(t *testing.T) { + fake := &fakeValkeyPublisher{} + d := newValkeyDestForTest(t, fake) + d.x.debugLevel = 11 + payload := []byte("x") + _, _ = d.Send(context.Background(), &payload) +} + +func TestValkeyDest_Close_table(t *testing.T) { + t.Parallel() + cases := []struct { + name string + category string + closeErr error + }{ + {"positive_clean_close", "positive", nil}, + {"negative_close_err", "negative", errors.New("close failed")}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + t.Parallel() + fake := &fakeValkeyPublisher{closeErr: tc.closeErr} + d := newValkeyDestForTest(t, fake) + err := d.Close() + if (err != nil) != (tc.closeErr != nil) { + t.Errorf("Close err = %v, want non-nil=%v", err, tc.closeErr != nil) + } + if fake.closes.Load() != 1 { + t.Errorf("Close calls = %d, want 1", fake.closes.Load()) + } + }) + } +} + +// TestValkeyDest_RedisClientAdapter pins the production adapter wraps +// a real *redis.Client through the interface. The adapter itself +// can't deeply exercise the real client without a server, but the +// type check + factory call should still work. +func TestValkeyDest_RedisClientAdapter_satisfiesIface(t *testing.T) { + adapter := &redisClientAdapter{c: nil} + var _ valkeyPublisher = adapter +} + +// TestNewValKeyDest_happy drives the constructor end-to-end via the +// newValkeyClientFn factory seam. Fake Ping succeeds so the +// constructor returns a fully-built dest. +func TestNewValKeyDest_happy(t *testing.T) { + fake := &fakeValkeyPublisher{} + orig := newValkeyClientFn + newValkeyClientFn = func(_ string) valkeyPublisher { return fake } + defer func() { newValkeyClientFn = orig }() + + x := newTestXTCP(t, "valkey:127.0.0.1:6379") + d, err := newValKeyDest(context.Background(), x) + if err != nil { + t.Fatalf("newValKeyDest err = %v", err) + } + if d == nil { + t.Fatal("dest nil") + } + if fake.pings.Load() != 1 { + t.Errorf("pings = %d, want 1", fake.pings.Load()) + } + _ = d.Close() +} + +// TestNewValKeyDest_pingErr drives the constructor's ping-fails-→ +// return-err branch. +func TestNewValKeyDest_pingErr(t *testing.T) { + fake := &fakeValkeyPublisher{pingErr: errors.New("refused")} + orig := newValkeyClientFn + newValkeyClientFn = func(_ string) valkeyPublisher { return fake } + defer func() { newValkeyClientFn = orig }() + + x := newTestXTCP(t, "valkey:127.0.0.1:6379") + d, err := newValKeyDest(context.Background(), x) + if err == nil { + t.Error("expected ping err") + } + if d != nil { + t.Error("dest should be nil on ping err") + } +} + +// TestNewValKeyDest_debugLog covers the debug-log gate during +// successful construction. +func TestNewValKeyDest_debugLog(t *testing.T) { + fake := &fakeValkeyPublisher{} + orig := newValkeyClientFn + newValkeyClientFn = func(_ string) valkeyPublisher { return fake } + defer func() { newValkeyClientFn = orig }() + + x := newTestXTCP(t, "valkey:127.0.0.1:6379") + x.debugLevel = 11 + d, err := newValKeyDest(context.Background(), x) + if err != nil { + t.Fatalf("newValKeyDest err = %v", err) + } + _ = d.Close() +} diff --git a/pkg/xtcp/netlinker_helpers_test.go b/pkg/xtcp/netlinker_helpers_test.go index 01dc447..79697b1 100644 --- a/pkg/xtcp/netlinker_helpers_test.go +++ b/pkg/xtcp/netlinker_helpers_test.go @@ -221,16 +221,14 @@ func TestCaptureToFileIfEnabled_payloadLength(t *testing.T) { if len(entries) == 0 { t.Fatal("expected one capture file, found none") } - for _, e := range entries { - full := filepath.Join(filepath.Dir(x.config.CapturePath), e.Name()) - data, err := os.ReadFile(full) //nolint:gosec // test code under t.TempDir - if err != nil { - t.Fatalf("read capture: %v", err) - } - if string(data) != "abc" { - t.Errorf("capture contents = %q, want %q", data, "abc") - } - break + first := entries[0] + full := filepath.Join(filepath.Dir(x.config.CapturePath), first.Name()) + data, err := os.ReadFile(full) //nolint:gosec // test code under t.TempDir + if err != nil { + t.Fatalf("read capture: %v", err) + } + if string(data) != "abc" { + t.Errorf("capture contents = %q, want %q", data, "abc") } } diff --git a/pkg/xtcp/netlinker_iouring.go b/pkg/xtcp/netlinker_iouring.go index 4c00760..1d28dd8 100644 --- a/pkg/xtcp/netlinker_iouring.go +++ b/pkg/xtcp/netlinker_iouring.go @@ -166,7 +166,7 @@ func (x *XTCP) netlinkerIoUring(ctx context.Context, wg *sync.WaitGroup, nsName // wg.Done already touched. // Just log + return; the deferred wg.Done and UnlockOSThread // handle cleanup once, and a mocked fatalf no longer leaves the - // function half-initialised. + // function half-initialized. log.Printf("netlinkerIoUring %d ring init: %v", id, err) return } diff --git a/pkg/xtcp/netlinker_loop_test.go b/pkg/xtcp/netlinker_loop_test.go new file mode 100644 index 0000000..5ba0d9e --- /dev/null +++ b/pkg/xtcp/netlinker_loop_test.go @@ -0,0 +1,170 @@ +package xtcp + +import ( + "context" + "sync" + "syscall" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/randomizedcoder/xtcp2/pkg/xtcp_config" +) + +// netlinker_loop_test.go drives the full netlinkerSyscall loop using a +// real socketpair fixture. The peer writes a few bytes (which the +// loop's Recvfrom returns), x.Deserialize fails to parse them (it's +// not a valid netlink message) and bumps the err counter, then ctx +// cancellation breaks the loop on the next iteration. +// +// Goal: exercise the loop scaffolding (header logs + metrics + capture +// branch + Deserialize-err branch + maybeForceGC + pool put on exit) +// without standing up a real netlink socket. + +// newNetlinkerLoopFixture wires the minimal XTCP fields netlinkerSyscall +// needs: pools (via InitSyncPools), prom metrics, fdToNsMap. +func newNetlinkerLoopFixture(t *testing.T) *XTCP { + t.Helper() + x := newTestXTCP(t, "null:") + x.config = &xtcp_config.XtcpConfig{ + // Same defaults the production daemon picks if InitSyncPools sees + // these as zero. + PacketSize: 64 * 1024, + WriteFiles: 0, + Modulus: 1, // 0 would divide-by-zero in Deserialize + EnabledDeserializers: &xtcp_config.EnabledDeserializers{Enabled: map[string]bool{}}, + } + // Mirror the prom seam newTestXTCP set up; newTestXTCP zeroed config above. + tx := newTestXTCP(t, "null:") + x.pC = tx.pC + x.pH = tx.pH + + x.fdToNsMap = &sync.Map{} + + var wg sync.WaitGroup + wg.Add(1) + x.InitSyncPools(&wg) + wg.Wait() + // InitSyncPools relies on RTATypeDeserializer for the rta pool — drive + // InitDeserializers too so subsequent Deserialize calls don't panic + // when consulting RTATypeDeserializer. + wg.Add(1) + x.InitDeserializers(&wg) + wg.Wait() + return x +} + +// TestNetlinkerSyscall_loopDrivesViaSocketpair pumps a few bytes +// through a socketpair to drive the netlinkerSyscall loop. Deserialize +// rejects the garbage payload, hits the err counter, then we cancel +// ctx so the loop exits. +// setShortRecvTimeout matches what setSocketTimeoutViaSyscall does in +// production but with a 50ms timeout so the loop returns to its ctx +// check every 50ms (otherwise Recvfrom blocks indefinitely on a +// socketpair with no more data). +func setShortRecvTimeout(t *testing.T, fd int) { + t.Helper() + tv := syscall.Timeval{Sec: 0, Usec: 50 * 1000} // 50ms + if err := syscall.SetsockoptTimeval(fd, syscall.SOL_SOCKET, syscall.SO_RCVTIMEO, &tv); err != nil { + t.Fatalf("SetsockoptTimeval: %v", err) + } +} + +func TestNetlinkerSyscall_loopDrivesViaSocketpair(t *testing.T) { + x := newNetlinkerLoopFixture(t) + readFD, writeFD, _ := makeSocketPair(t) + setShortRecvTimeout(t, readFD) + + // Map the readFD to a namespace name so the debug-log helper has + // something to print at debug>100 (kept low here, but the lookup + // path still runs). + x.fdToNsMap.Store(readFD, "test-ns") + + // Pre-write a few bytes so the first Recvfrom returns them. + if _, err := syscall.Write(writeFD, []byte("garbage-netlink-bytes")); err != nil { + t.Fatalf("write: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + nsName := "test-ns" + go x.netlinkerSyscall(ctx, &wg, &nsName, readFD, 7) + + // Give the loop a couple of iterations to chew through the payload + // + hit the Deserialize-err path, then cancel. + time.Sleep(50 * time.Millisecond) + cancel() + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("netlinkerSyscall did not exit after ctx cancel") + } + + // At least one packet was received + the Deserialize-err branch + // fired (garbage bytes aren't a valid netlink message). + if got := testutil.ToFloat64(x.pC.WithLabelValues("Netlinker", "packets", "count")); got < 1 { + t.Errorf("packets counter = %v, want ≥1", got) + } + if got := testutil.ToFloat64(x.pC.WithLabelValues("Netlinker", "complete", "count")); got != 1 { + t.Errorf("complete counter = %v, want 1", got) + } +} + +// TestNetlinkerSyscall_immediateCancelExitsCleanly verifies the +// goroutine returns within one tick when ctx is already canceled +// before entry. +func TestNetlinkerSyscall_immediateCancelExitsCleanly(t *testing.T) { + x := newNetlinkerLoopFixture(t) + readFD, _, _ := makeSocketPair(t) + setShortRecvTimeout(t, readFD) + ctx, cancel := context.WithCancel(context.Background()) + cancel() // pre-cancel + var wg sync.WaitGroup + wg.Add(1) + nsName := "ns" + done := make(chan struct{}) + go func() { + x.netlinkerSyscall(ctx, &wg, &nsName, readFD, 0) + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("netlinkerSyscall did not exit on pre-canceled ctx") + } +} + +// TestNetlinkerSyscall_captureBranchFires drives the WriteFiles > 0 +// branch of captureToFileIfEnabled by setting WriteFiles=2 + a valid +// CapturePath, then writing one packet to the socketpair. +func TestNetlinkerSyscall_captureBranchFires(t *testing.T) { + x := newNetlinkerLoopFixture(t) + x.config.WriteFiles = 2 + x.config.CapturePath = t.TempDir() + "/cap_" + readFD, writeFD, _ := makeSocketPair(t) + setShortRecvTimeout(t, readFD) + if _, err := syscall.Write(writeFD, []byte("xy")); err != nil { + t.Fatalf("write: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + nsName := "ns" + go x.netlinkerSyscall(ctx, &wg, &nsName, readFD, 0) + time.Sleep(50 * time.Millisecond) + cancel() + done := make(chan struct{}) + go func() { wg.Wait(); close(done) }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("netlinkerSyscall did not exit on ctx cancel") + } +} diff --git a/pkg/xtcp/ns_map_count.go b/pkg/xtcp/ns_map_count.go index 914e0df..6d70083 100644 --- a/pkg/xtcp/ns_map_count.go +++ b/pkg/xtcp/ns_map_count.go @@ -13,8 +13,6 @@ import ( const ( xtcpNSName = "xtcpNS" - - goRoutineReporterFrequency = 1 * time.Minute ) // guageUpdateFrequency + reconcileFrequency are var (not const) so tests diff --git a/pkg/xtcp/poller_helpers_test.go b/pkg/xtcp/poller_helpers_test.go index 2db5b27..6e2c01a 100644 --- a/pkg/xtcp/poller_helpers_test.go +++ b/pkg/xtcp/poller_helpers_test.go @@ -92,7 +92,7 @@ func TestHandleChangePollFrequency_table(t *testing.T) { // ticker.Reset panics on d <= 0. Wrap so panics convert to // test-pass + counter-skip; the production poller doesn't // validate the duration either, so this codifies the - // behaviour. + // behavior. func() { defer func() { if r := recover(); r != nil { diff --git a/pkg/xtcpnl/xtcpnl_fatalf_test.go b/pkg/xtcpnl/xtcpnl_fatalf_test.go index dd6cfa1..2b7cc64 100644 --- a/pkg/xtcpnl/xtcpnl_fatalf_test.go +++ b/pkg/xtcpnl/xtcpnl_fatalf_test.go @@ -92,7 +92,7 @@ func TestMillisToTimeval_table(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { tv := millisToTimeval(tc.millis) - if int64(tv.Sec) != tc.wantSec || int64(tv.Usec) != tc.wantUs { + if tv.Sec != tc.wantSec || int64(tv.Usec) != tc.wantUs { t.Errorf("millisToTimeval(%d) = {Sec:%d, Usec:%d}; want {Sec:%d, Usec:%d}", tc.millis, tv.Sec, tv.Usec, tc.wantSec, tc.wantUs) } diff --git a/pkg/xtcpnl/xtcpnl_readfile.go b/pkg/xtcpnl/xtcpnl_readfile.go index 0186c8b..2f62f84 100644 --- a/pkg/xtcpnl/xtcpnl_readfile.go +++ b/pkg/xtcpnl/xtcpnl_readfile.go @@ -14,7 +14,7 @@ import ( // single underlying read can return a short count, and Readfile // would error spuriously. Test fixtures stayed under 4 KB so the bug // never tripped, but Readfile's name implies a "give me the whole -// file" contract that the bufio approach can't honour. +// file" contract that the bufio approach can't honor. // // io.ReadFull loops over the underlying Read until the buffer is full // or an error / EOF is hit, which is what we actually want. diff --git a/tools/kafka_topic_reader/kafka_topic_reader.go b/tools/kafka_topic_reader/kafka_topic_reader.go index c3abb62..78b8e86 100644 --- a/tools/kafka_topic_reader/kafka_topic_reader.go +++ b/tools/kafka_topic_reader/kafka_topic_reader.go @@ -83,7 +83,15 @@ func runMain(ctx context.Context, args []string, stdout, stderr io.Writer) int { // pollLoop is the Kafka consumer body. Extracted so test code can call it // against a fake client (or skip it entirely via the runMain happy paths). -func pollLoop(ctx context.Context, client *kgo.Client, consumeTimeout time.Duration) { +// kafkaFetcher is the surface pollLoop needs from a Kafka consumer +// client. Lifting it to an interface lets tests drive pollLoop's +// happy-path EachRecord closure (which calls handleRecord) without a +// real broker. *kgo.Client satisfies this interface. +type kafkaFetcher interface { + PollFetches(ctx context.Context) kgo.Fetches +} + +func pollLoop(ctx context.Context, client kafkaFetcher, consumeTimeout time.Duration) { kgoFetchesPool := sync.Pool{ New: func() any { return new(kgo.Fetches) diff --git a/tools/kafka_topic_reader/kafka_topic_reader_test.go b/tools/kafka_topic_reader/kafka_topic_reader_test.go index a3906c4..d1b52ad 100644 --- a/tools/kafka_topic_reader/kafka_topic_reader_test.go +++ b/tools/kafka_topic_reader/kafka_topic_reader_test.go @@ -166,3 +166,66 @@ func TestPollLoop_emptyFetches(t *testing.T) { t.Fatal("pollLoop did not exit after ctx timeout") } } + +// fakeFetcher implements the kafkaFetcher interface so pollLoop can be +// driven with synthetic Fetches — exercises the EachRecord closure +// body (j++; records++; handleRecord(...)) that broker-bound tests +// couldn't reach. +type fakeFetcher struct { + fetches []kgo.Fetches + calls int + onCancel context.CancelFunc +} + +func (f *fakeFetcher) PollFetches(_ context.Context) kgo.Fetches { + f.calls++ + if f.calls > len(f.fetches) { + if f.onCancel != nil { + f.onCancel() + } + return kgo.Fetches{} + } + return f.fetches[f.calls-1] +} + +func makeFetchWithRecord(value []byte) kgo.Fetches { + return kgo.Fetches{ + { + Topics: []kgo.FetchTopic{ + { + Topic: "test-topic", + Partitions: []kgo.FetchPartition{ + {Records: []*kgo.Record{{Value: value}}}, + }, + }, + }, + }, + } +} + +// TestPollLoop_eachRecordClosureFires drives pollLoop with a synthetic +// Fetch containing a single record. handleRecord will fail to unmarshal +// the random bytes but that's fine — it returns nil and the closure +// completes. After the fake exhausts its fetches it cancels ctx. +func TestPollLoop_eachRecordClosureFires(t *testing.T) { + value := []byte{0x00, 0x01, 0x02} + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + fake := &fakeFetcher{ + fetches: []kgo.Fetches{makeFetchWithRecord(value)}, + onCancel: cancel, + } + done := make(chan struct{}) + go func() { + pollLoop(ctx, fake, 50*time.Millisecond) + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("pollLoop did not exit after fake fetcher exhausted") + } + if fake.calls < 1 { + t.Errorf("expected ≥1 PollFetches call; got %d", fake.calls) + } +} diff --git a/tools/proto-field-audit/helpers_test.go b/tools/proto-field-audit/helpers_test.go index 77cc42c..da1f14d 100644 --- a/tools/proto-field-audit/helpers_test.go +++ b/tools/proto-field-audit/helpers_test.go @@ -181,7 +181,7 @@ message Foo { // Documented limitation: the regex's `[\w.<>,]+` set does // not include space, so `map` (with space // after the comma) doesn't match. `map` would. - // Pin the current behaviour so a future regex tweak surfaces + // Pin the current behavior so a future regex tweak surfaces // this case. src: `message M { map m = 1; diff --git a/tools/quality-report/emit_helpers_test.go b/tools/quality-report/emit_helpers_test.go index 9d0e0f2..419627c 100644 --- a/tools/quality-report/emit_helpers_test.go +++ b/tools/quality-report/emit_helpers_test.go @@ -7,7 +7,7 @@ import ( // emit_helpers_test.go covers the eight helpers extracted from emit in // the gocyclo-27 → 1 refactor. Each helper has a positive / negative / -// boundary / corner / adversarial table where the behaviour is +// boundary / corner / adversarial table where the behavior is // meaningfully bounded, plus race + benchmarks. // ─────────────────────────────────────────────────────────────────────── diff --git a/tools/quality-report/ingest_test.go b/tools/quality-report/ingest_test.go index 376476d..11548c2 100644 --- a/tools/quality-report/ingest_test.go +++ b/tools/quality-report/ingest_test.go @@ -11,7 +11,7 @@ import ( // from runMain in the gocyclo-25 → 3 refactor. Each helper has a // positive / negative / boundary / corner / adversarial table. // Pre-existing main_test.go drives parsers; this file pins the -// ingestion-layer behaviour around them. +// ingestion-layer behavior around them. // writeRaw seeds a file under rawDir for one ingestion test. func writeRaw(t *testing.T, rawDir, name, contents string) { @@ -61,7 +61,7 @@ func TestParseRunMainFlags_table(t *testing.T) { t.Run(tc.category+"/"+tc.name, func(t *testing.T) { t.Parallel() var stderr trapWriter - raw, _, _, ec := parseRunMainFlags(tc.args, &stderr) + raw, _, _, _, _, ec := parseRunMainFlags(tc.args, &stderr) if ec != tc.wantExit { t.Errorf("exit = %d, want %d", ec, tc.wantExit) } @@ -325,7 +325,7 @@ func BenchmarkParseRunMainFlags(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - _, _, _, _ = parseRunMainFlags(args, &w) + _, _, _, _, _, _ = parseRunMainFlags(args, &w) } } diff --git a/tools/quality-report/main.go b/tools/quality-report/main.go index 4799a74..55403b5 100644 --- a/tools/quality-report/main.go +++ b/tools/quality-report/main.go @@ -236,7 +236,7 @@ type ingestCtx struct { } func runMain(args []string, stdout, stderr io.Writer) int { - rawDir, repoRoot, knownFile, parseErr := parseRunMainFlags(args, stderr) + rawDir, repoRoot, knownFile, baselineFile, maxDropAbs, parseErr := parseRunMainFlags(args, stderr) if parseErr != 0 { return parseErr } @@ -274,27 +274,77 @@ func runMain(args []string, stdout, stderr io.Writer) int { fmt.Fprintf(stderr, "quality-report: emit: %v\n", err) return 2 } + + // Coverage ratchet: refuse to land a report whose Total has dropped + // by more than maxDropAbs absolute points from the operator-set + // baseline in baselineFile. Skipped when the baseline file is + // missing (first run on a new branch) or coverage data is absent. + if baselineFile != "" && in.Coverage.Available { + if msg, breached := evaluateCoverageRatchet(baselineFile, in.Coverage.Total, maxDropAbs); breached { + fmt.Fprintf(stderr, "quality-report: coverage ratchet: %s\n", msg) + return 3 + } + } return 0 } +// evaluateCoverageRatchet reads the recorded baseline from baselineFile +// and compares it to current. Returns (msg, true) when the absolute +// drop exceeds maxDropAbs; (string, false) otherwise. Missing or +// unparseable baseline is treated as "no baseline" → pass. +func evaluateCoverageRatchet(baselineFile string, current, maxDropAbs float64) (string, bool) { + baseline, ok := readCoverageBaseline(baselineFile) + if !ok { + return "", false + } + drop := baseline - current + if drop <= maxDropAbs { + return "", false + } + return fmt.Sprintf("coverage dropped from %.2f%% (baseline %s) to %.2f%% (current); drop %.2f%% > allowed %.2f%%", + baseline, baselineFile, current, drop, maxDropAbs), true +} + +// readCoverageBaseline reads a single float from baselineFile, +// tolerating whitespace and a trailing percent sign. Returns ok=false +// on read or parse error so the caller can skip the check. +func readCoverageBaseline(path string) (float64, bool) { + if path == "" { + return 0, false + } + b, err := os.ReadFile(path) //nolint:gosec // operator-supplied path + if err != nil { + return 0, false + } + s := strings.TrimSpace(string(b)) + s = strings.TrimSuffix(s, "%") + v, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0, false + } + return v, true +} + // parseRunMainFlags isolates the flag-parsing block from the rest of // runMain so the orchestration is easier to test. Returns (rawDir, -// repoRoot, knownFailuresFile, exitCode). exitCode==0 means continue; -// otherwise caller returns it. -func parseRunMainFlags(args []string, stderr io.Writer) (string, string, string, int) { +// repoRoot, knownFailuresFile, baselineFile, maxDropAbs, exitCode). +// exitCode==0 means continue; otherwise caller returns it. +func parseRunMainFlags(args []string, stderr io.Writer) (string, string, string, string, float64, int) { fset := flag.NewFlagSet("quality-report", flag.ContinueOnError) fset.SetOutput(stderr) rawDir := fset.String("raw-dir", "", "directory with per-tool raw outputs") repoRoot := fset.String("repo-root", ".", "repo root (used to relativise paths)") knownFile := fset.String("known-failures", "", "file listing pre-existing test failures (Package/Test per line)") + baselineFile := fset.String("coverage-baseline", "", "path to coverage baseline file (single float, e.g. \"73.5\"); empty disables the ratchet") + maxDropAbs := fset.Float64("coverage-max-drop", 0.5, "max allowed absolute drop in Total coverage from baseline (percentage points)") if err := fset.Parse(args); err != nil { - return "", "", "", 2 + return "", "", "", "", 0, 2 } if *rawDir == "" { fmt.Fprintln(stderr, "quality-report: -raw-dir is required") - return "", "", "", 2 + return "", "", "", "", 0, 2 } - return *rawDir, *repoRoot, *knownFile, 0 + return *rawDir, *repoRoot, *knownFile, *baselineFile, *maxDropAbs, 0 } // ingestGolangciTiers ingests findings from the comprehensive tier diff --git a/tools/quality-report/main_test.go b/tools/quality-report/main_test.go index d49c49a..693c3fb 100644 --- a/tools/quality-report/main_test.go +++ b/tools/quality-report/main_test.go @@ -349,9 +349,9 @@ func TestSeverityOrder(t *testing.T) { // Ordering invariants: a sort-stable comparator must yield // error < warning < info, regardless of which vocabulary the // upstream linter uses. - if !(severityOrder(severityError) < severityOrder(severityWarning) && - severityOrder(severityWarning) < severityOrder(severityInfo) && - severityOrder(severityInfo) < severityOrder("unknown")) { + if severityOrder(severityError) >= severityOrder(severityWarning) || + severityOrder(severityWarning) >= severityOrder(severityInfo) || + severityOrder(severityInfo) >= severityOrder("unknown") { t.Errorf("severity ordering invariant broken: error/warning/info/unknown = %d/%d/%d/%d", severityOrder(severityError), severityOrder(severityWarning), severityOrder(severityInfo), severityOrder("unknown")) diff --git a/tools/quality-report/ratchet_test.go b/tools/quality-report/ratchet_test.go new file mode 100644 index 0000000..ce42573 --- /dev/null +++ b/tools/quality-report/ratchet_test.go @@ -0,0 +1,209 @@ +package main + +import ( + "os" + "path/filepath" + "testing" +) + +// ratchet_test.go covers the coverage-ratchet helpers added to +// tools/quality-report/main.go: +// - readCoverageBaseline (parses the baseline file) +// - evaluateCoverageRatchet (decides pass / breach) +// Wired into runMain via the -coverage-baseline + -coverage-max-drop +// flags; the orchestrator in nix/quality-report/default.nix passes the +// flag pointing at ./docs/coverage-baseline.txt inside the Nix +// sandbox. + +// ─────────────────────────────────────────────────────────────────────── +// readCoverageBaseline +// ─────────────────────────────────────────────────────────────────────── + +func TestReadCoverageBaseline_table(t *testing.T) { + t.Parallel() + cases := []struct { + name string + category string + writeBody string + writeFile bool + path string // override (e.g. "" for empty-path corner) + wantVal float64 + wantOK bool + }{ + {"positive_plain_float", "positive", "73.5", true, "", 73.5, true}, + {"positive_with_percent_suffix", "positive", "82.4%", true, "", 82.4, true}, + {"positive_whitespace", "positive", " 90.0 \n", true, "", 90.0, true}, + {"positive_zero", "positive", "0", true, "", 0.0, true}, + {"positive_three_decimals", "positive", "73.500", true, "", 73.5, true}, + {"negative_missing_file", "negative", "", false, "", 0, false}, + {"negative_empty_string_path", "negative", "ignored", false, "", 0, false}, + {"negative_unparseable", "negative", "not a number", true, "", 0, false}, + {"boundary_high_value", "boundary", "100.0", true, "", 100.0, true}, + {"boundary_negative_value", "boundary", "-5.0", true, "", -5.0, true}, + {"corner_only_percent_sign", "corner", "%", true, "", 0, false}, + {"adversarial_giant_value", "adversarial", "9999999.99", true, "", 9999999.99, true}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + t.Parallel() + path := tc.path + if tc.writeFile { + dir := t.TempDir() + path = filepath.Join(dir, "baseline.txt") + if err := os.WriteFile(path, []byte(tc.writeBody), 0o600); err != nil { + t.Fatalf("write: %v", err) + } + } + gotVal, gotOK := readCoverageBaseline(path) + if gotOK != tc.wantOK { + t.Errorf("ok = %v, want %v", gotOK, tc.wantOK) + } + if gotOK && gotVal != tc.wantVal { + t.Errorf("val = %v, want %v", gotVal, tc.wantVal) + } + }) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// evaluateCoverageRatchet +// ─────────────────────────────────────────────────────────────────────── + +func TestEvaluateCoverageRatchet_table(t *testing.T) { + t.Parallel() + cases := []struct { + name string + category string + baselineBody string + writeFile bool + baselinePath string + current float64 + maxDropAbs float64 + wantBreached bool + }{ + {"positive_current_above_baseline_passes", "positive", "70.0", true, "", 75.0, 0.5, false}, + {"positive_current_equals_baseline_passes", "positive", "73.5", true, "", 73.5, 0.5, false}, + {"positive_within_grace_passes", "positive", "73.5", true, "", 73.2, 0.5, false}, + {"positive_exactly_at_grace_passes", "positive", "74.0", true, "", 73.5, 0.5, false}, + {"negative_drop_just_over_grace_breaches", "negative", "74.0", true, "", 73.0, 0.5, true}, + {"negative_large_drop_breaches", "negative", "90.0", true, "", 50.0, 0.5, true}, + {"boundary_zero_baseline_passes", "boundary", "0", true, "", 5.0, 0.5, false}, + {"boundary_zero_max_drop_strict", "boundary", "70.0", true, "", 69.99, 0.0, true}, + {"corner_no_baseline_file_passes", "corner", "", false, "", 50.0, 0.5, false}, + {"corner_unparseable_baseline_passes", "corner", "garbage", true, "", 50.0, 0.5, false}, + {"adversarial_huge_drop_breaches", "adversarial", "99.9", true, "", 0.1, 0.5, true}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.category+"/"+tc.name, func(t *testing.T) { + t.Parallel() + path := tc.baselinePath + if tc.writeFile { + dir := t.TempDir() + path = filepath.Join(dir, "baseline.txt") + if err := os.WriteFile(path, []byte(tc.baselineBody), 0o600); err != nil { + t.Fatalf("write: %v", err) + } + } + msg, breached := evaluateCoverageRatchet(path, tc.current, tc.maxDropAbs) + if breached != tc.wantBreached { + t.Errorf("breached = %v (msg=%q), want %v", breached, msg, tc.wantBreached) + } + if breached && msg == "" { + t.Error("breached=true but msg is empty") + } + }) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// End-to-end: runMain exits with code 3 when a ratchet breach occurs. +// ─────────────────────────────────────────────────────────────────────── + +func TestRunMain_coverageRatchetBreach(t *testing.T) { + // Seed a high baseline that no minimal raw-dir can plausibly hit. + // runMain's ingestCoverage step looks for $RAW/coverage.out and + // $RAW/coverage-func.out; if both are missing, in.Coverage.Available + // is false and the ratchet is skipped. Provide a tiny synthetic + // coverage profile so Available=true and Total is computed. + dir := t.TempDir() + if err := os.WriteFile(filepath.Join(dir, "coverage-per-package.tsv"), + []byte("pkg/x\t50.0\n"), 0o600); err != nil { + t.Fatalf("write tsv: %v", err) + } + if err := os.WriteFile(filepath.Join(dir, "coverage-func.out"), + []byte("total: (statements) 50.0%\n"), 0o600); err != nil { + t.Fatalf("write func: %v", err) + } + baseline := filepath.Join(dir, "baseline.txt") + if err := os.WriteFile(baseline, []byte("99.0"), 0o600); err != nil { + t.Fatalf("write baseline: %v", err) + } + + var stdout, stderr trapWriter + args := []string{ + "-raw-dir", dir, + "-coverage-baseline", baseline, + "-coverage-max-drop", "0.5", + } + rc := runMain(args, &stdout, &stderr) + if rc != 3 { + t.Errorf("rc = %d, want 3 (ratchet breach)", rc) + } +} + +// TestRunMain_coverageRatchetPasses confirms the happy path: a +// baseline close to the synthetic 50.0% coverage does NOT trigger +// the ratchet. +func TestRunMain_coverageRatchetPasses(t *testing.T) { + dir := t.TempDir() + if err := os.WriteFile(filepath.Join(dir, "coverage-per-package.tsv"), + []byte("pkg/x\t50.0\n"), 0o600); err != nil { + t.Fatalf("write tsv: %v", err) + } + if err := os.WriteFile(filepath.Join(dir, "coverage-func.out"), + []byte("total: (statements) 50.0%\n"), 0o600); err != nil { + t.Fatalf("write func: %v", err) + } + baseline := filepath.Join(dir, "baseline.txt") + if err := os.WriteFile(baseline, []byte("49.8"), 0o600); err != nil { + t.Fatalf("write baseline: %v", err) + } + + var stdout, stderr trapWriter + args := []string{ + "-raw-dir", dir, + "-coverage-baseline", baseline, + "-coverage-max-drop", "0.5", + } + rc := runMain(args, &stdout, &stderr) + if rc != 0 { + t.Errorf("rc = %d, want 0 (ratchet passes)", rc) + } +} + +// ─────────────────────────────────────────────────────────────────────── +// Benchmark +// ─────────────────────────────────────────────────────────────────────── + +func BenchmarkEvaluateCoverageRatchet_baselineMissing(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = evaluateCoverageRatchet("/no/such/path", 75.0, 0.5) + } +} + +func BenchmarkReadCoverageBaseline_hit(b *testing.B) { + dir := b.TempDir() + path := filepath.Join(dir, "baseline.txt") + if err := os.WriteFile(path, []byte("73.5"), 0o600); err != nil { + b.Fatal(err) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = readCoverageBaseline(path) + } +}