Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a5536ad
xtcp: s3parquet destination — direct Parquet → MinIO (retires Vector)
randomizedcoder May 24, 2026
511d1d1
microvm: s3parquet-long runner — long-soak parquet test with per-minu…
randomizedcoder May 24, 2026
bf61b6b
microvm: s3parquet-long → 63 MiB production flush threshold
randomizedcoder May 24, 2026
91ab699
xtcp + microvm: Pyroscope continuous-profiling agent + in-VM server
randomizedcoder May 24, 2026
817b545
xtcp ns: fix OS-thread leak in netNamespaceInstance under heavy ns churn
randomizedcoder May 24, 2026
7b34df3
xtcp2.service: grant CAP_SYS_ADMIN — root cause of the ns thread leak
randomizedcoder May 24, 2026
3435a69
xtcp ns: regression test + forbidigo lint guard for the LockOSThread/…
randomizedcoder May 24, 2026
03dd56e
xtcp2: fail-early capability check with per-cap diagnostics + capchec…
randomizedcoder May 24, 2026
29d06cc
nix/checks: lightweight capability-check derivations (no microvm needed)
randomizedcoder May 24, 2026
778a5df
microvm: 3-knob aggressive soak workload — exercises the full parquet…
randomizedcoder May 25, 2026
33e2b44
microvm: fix ordering cycle in xtcp2-soak-ns-traffic systemd unit
randomizedcoder May 25, 2026
182d81f
nsTest: -traffic flag — in-process loopback connection per new ns
randomizedcoder May 26, 2026
17bab54
nsTest: -conns N flag — N persistent loopback conns per ns with varie…
randomizedcoder May 26, 2026
91fe0d6
microvm: clickhouse-pipeline-parquet flavor — kafka + parquet side by…
randomizedcoder May 26, 2026
6b63bec
microvm: forward + open :9089 :8890 for the parquet xtcp2 instance; l…
randomizedcoder May 26, 2026
dc81345
microvm: bump clickhouse-pipeline-parquet memory budgets — 12 GiB VM …
randomizedcoder May 27, 2026
05d9e4e
clickhouse-pipeline-parquet: disable chatty internal log tables to ta…
randomizedcoder May 27, 2026
cf77240
clickhouse-kafka: cap per-poll memory with kafka_poll_max_batch_size=256
randomizedcoder May 27, 2026
506b33a
clickhouse-pipeline: schema-warm barrier + lower kafka_poll_max_batch…
randomizedcoder May 27, 2026
621fe06
Revert: clickpipe-up schema-warm barrier
randomizedcoder May 27, 2026
1a8eb0a
docs/SQL: correct the kafka MV "halt" framing from the prior commits
randomizedcoder May 28, 2026
b021fe4
clickhouse-pipeline-parquet: container 12000m→14000m + batch_size 64→16
randomizedcoder May 28, 2026
1ed1b0a
docs: note that bigger ClickHouse memory does NOT reduce OOMs
randomizedcoder May 28, 2026
72d2722
clickhouse + redpanda: explicit memory caps
randomizedcoder May 29, 2026
5466cd1
clickhouse kafka_engine: shrink Block to one envelope — eliminates OOMs
randomizedcoder May 29, 2026
945c8c0
docs: document the actual kafka_max_block_size root cause
randomizedcoder May 29, 2026
9544419
microvm prometheus: forward :19090 + scrape both xtcp2 instances
randomizedcoder May 29, 2026
448e395
build/scripts: add Prometheus probe + stability-summary tooling
randomizedcoder May 29, 2026
25c0103
microvm clickpipe: bound redpanda + persist CH data across restarts
randomizedcoder May 30, 2026
fbf59e5
clickhouse kafka: extend max.poll.interval.ms to tolerate slow MV ins…
randomizedcoder May 30, 2026
b8f234f
clickhouse: MALLOC_CONF to return jemalloc-retained chunks to OS
randomizedcoder May 30, 2026
96dfc76
microvm clickpipe-parquet: bigger docker disk + dedicated MinIO disk
randomizedcoder May 31, 2026
01e83a1
gofmt + nixfmt: format s3parquet destination + microvm files
randomizedcoder Jun 15, 2026
a7070c2
pkg/xtcp: capability-check test seam so Init-driven tests run unprivi…
randomizedcoder Jun 15, 2026
f999a61
cmd/xtcp2: init s3/pyroscope flag fields in TestPrintFlags + TestBuil…
randomizedcoder Jun 15, 2026
38032e4
pkg/xtcp: de-flake TestS3ParquetDest_corner_queueFull (2s → 30s deadl…
randomizedcoder Jun 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ linters:
- noctx
- contextcheck
- durationcheck
# Project-specific: forbid runtime.UnlockOSThread in code paths that
# also do setns/unshare, because the pairing is dangerous (see the
# forbidigo block below for the why).
- forbidigo

settings:
errcheck:
Expand Down Expand Up @@ -103,6 +107,37 @@ linters:
- name: increment-decrement
- name: var-declaration

# Forbidigo guards a specific landmine: `runtime.UnlockOSThread` paired
# with `unix.Setns` / `unix.Unshare`. If the goroutine modifies
# thread-global state (network namespace) and then unconditionally
# unlocks, a *tainted* M can be returned to Go's scheduler — the
# runtime can't safely reuse it, parks it, and spawns a new one.
# Under heavy ns churn this leaks OS threads up to the SetMaxThreads
# cap and crashes the daemon (incident: 12 h s3parquet-long soak hit
# `fatal error: thread exhaustion` at 1 h 45 min).
#
# The safe pattern is: do *not* defer UnlockOSThread; instead, inside
# the deferred restore func, call UnlockOSThread ONLY after Setns
# confirms the original namespace was restored. On restore failure,
# exit the goroutine with the lock still held — the Go runtime then
# terminates the OS thread (documented LockOSThread behaviour),
# which is exactly what we want for a tainted M.
#
# If you have a legitimate non-netns use of UnlockOSThread (e.g.
# io_uring SQ thread pinning), opt in with `//nolint:forbidigo //
# <reason>` at the call site. Anyone touching the netns path will
# then see why the rule exists.
forbidigo:
forbid:
- pattern: '^runtime\.UnlockOSThread$'
msg: |
runtime.UnlockOSThread is unsafe to defer unconditionally in
code that calls unix.Setns / unix.Unshare — the M can be
returned to the scheduler still in a modified namespace,
triggering an unbounded thread leak. Pair it with a
*conditional* unlock inside the restore defer, or
`//nolint:forbidigo // <reason>` if unrelated.

exclusions:
warn-unused: true
paths:
Expand Down
26 changes: 26 additions & 0 deletions build/containers/clickhouse/config.d/disable_chatty_logs.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0"?>
<!--
Disable ClickHouse's chatty internal observability tables.

Why: under heavy ingest (the mixed clickhouse-pipeline-parquet flavor
drives ~500 envelopeRows/sec through the kafka_engine MV), the periodic
flushes into system.latency_log + system.metric_log + system.asynchronous_metric_log
accumulate millions of rows. Background merges on those parts then need
GiBs of RAM and trip the per-server max-memory cap before the kafka MV
ever gets a chance.

Removing these log tables removes both the writes AND the background
merges. We keep query_log (low-volume, useful for debugging the kafka
consumer) and error_log.

Reference: docs.clickhouse.com/en/operations/server-configuration-parameters/settings
`<table_name remove="1"/>` removes the entry from the merged config so
the corresponding system table is never created.
-->
<clickhouse>
<latency_log remove="1"/>
<metric_log remove="1"/>
<asynchronous_metric_log remove="1"/>
<processors_profile_log remove="1"/>
<trace_log remove="1"/>
</clickhouse>
31 changes: 31 additions & 0 deletions build/containers/clickhouse/config.d/kafka_client_tuning.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0"?>
<!--
Extend the librdkafka consumer's poll-interval and session-timeout
windows. Defaults (5 min / 45 s respectively) are too tight for our
mixed-flavor workload — an occasional MV insert that takes 30-150 s
(memory pressure, parts merge, ZSTD batch) blows past
max.poll.interval.ms, the consumer is kicked from the group, rejoins
at the last committed offset, re-reads the same batch, fails again
→ indefinite rebalance death loop. A 24 h soak attempt was stuck at
~21 k rows after 1 h because of this; the 4 h soak earlier happened
to avoid the trap by luck.

Settings (librdkafka names; ClickHouse passes them through):
* max.poll.interval.ms 5 min → 15 min (900000)
Time the consumer may stay in a poll before kafka considers
it dead. Must comfortably exceed worst-case per-flush time.
* session.timeout.ms 45 s → 5 min (300000)
Heartbeat-loss window. Smaller is faster failure detection
but should be << max.poll.interval.ms to be meaningful.
* heartbeat.interval.ms 3 s default; explicit 10 s for clarity.

These are server-wide kafka client defaults applied to every kafka_engine
consumer; individual tables can override via their own settings if needed.
-->
<clickhouse>
<kafka>
<max_poll_interval_ms>900000</max_poll_interval_ms>
<session_timeout_ms>300000</session_timeout_ms>
<heartbeat_interval_ms>10000</heartbeat_interval_ms>
</kafka>
</clickhouse>
46 changes: 46 additions & 0 deletions build/containers/clickhouse/config.d/limit_memory.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0"?>
<!--
Constrain ClickHouse's discretionary memory use.

Why: under heavy mixed-flavor ingest, ClickHouse's MemoryTracking
parks just under the container-derived per-server cap regardless of
actual workload. The 131 MiB kafka_engine per-batch decode buffer
allocation then occasionally tips the tracker over and throws
MEMORY_LIMIT_EXCEEDED (~2/min). Bumping container memory didn't
fix this — CH just grew to fill the new cap. Past ~20 GiB the
larger heap also slowed MV inserts past max.poll.interval.ms,
triggering kafka consumer rebalance loops.

The real fix is to keep CH's caches small so MemoryTracking sits
well below the cap, leaving room for transient batch allocations.

Our working set:
* Main table `xtcp_flat_records`: ~55 MiB on disk total
* Errors table: ~55 MiB on disk
* No interactive queries; no joins; no aggregations of size
So the 5 GiB default mark cache is hilariously oversized.

Reference: https://clickhouse.com/docs/en/operations/server-configuration-parameters/settings
-->
<clickhouse>
<!-- Mark cache: 5 GiB default → 256 MiB. Marks for ~55 MiB of data
across two MergeTree tables fit easily in 256 MiB. -->
<mark_cache_size>268435456</mark_cache_size>

<!-- Uncompressed block cache: already 0 by default; be explicit. -->
<uncompressed_cache_size>0</uncompressed_cache_size>

<!-- Index-related caches: default ~5 GiB each. Same reasoning. -->
<index_mark_cache_size>134217728</index_mark_cache_size>
<index_uncompressed_cache_size>0</index_uncompressed_cache_size>

<!-- Compiled-expression cache: default 128 MiB; keep. -->
<compiled_expression_cache_size>134217728</compiled_expression_cache_size>

<!-- Leave max_server_memory_usage_to_ram_ratio at its 0.9 default.
At 14000m container that gives a ~12.3 GiB cap. With caches capped
above, MemoryTracking parks ~9.3 GiB, leaving ~3 GiB headroom for
the kafka_engine 131-256 MiB per-batch decode allocations. -->
<!-- max_server_memory_usage_to_ram_ratio default = 0.9 -->

</clickhouse>
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,25 @@ SETTINGS
kafka_num_consumers = 1,
kafka_thread_per_consumer = 0,
kafka_skip_broken_messages = 0,
kafka_handle_error_mode = 'stream';
kafka_handle_error_mode = 'stream',
-- ProtobufList already batches: each kafka message is an Envelope
-- containing ~100-1000 XtcpFlatRecord rows. The kafka_engine's
-- own Block accumulation (kafka_max_block_size, default 65,505 rows)
-- is therefore mostly redundant on top — it just holds rows in memory
-- across many kafka messages before pushing the MV. Combined with
-- the per-poll batch (kafka_poll_max_batch_size, 16 messages here),
-- a single MV flush at 65K rows was the source of 131 MiB chunk
-- allocations that tipped CH's per-server memory cap.
-- Settings:
-- kafka_poll_max_batch_size = 16 ~16 kafka messages per poll
-- kafka_max_block_size = 1024 ~1 envelope per flush
-- kafka_flush_interval_ms = 2000 backstop: flush at most every 2 s
-- With ~430 envelopeRows/sec from xtcp2 the Block fills in ~2.4 s on
-- average, so flushes happen at the row-threshold most of the time
-- and the time-backstop kicks in only when the producer is quiet.
kafka_max_block_size = 1024,
kafka_poll_max_batch_size = 16,
kafka_flush_interval_ms = 2000;

-- SHOW CREATE TABLE xtcp.xtcp_flat_records_kafka;
-- SELECT * FROM system.kafka_consumers FORMAT Vertical;
Expand Down
19 changes: 19 additions & 0 deletions build/scripts/clickpipe-prom-probe.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
PROM=http://127.0.0.1:19090
fmt() {
curl -sS --max-time 5 -G "$PROM/api/v1/query" --data-urlencode "query=$1" 2>/dev/null | \
python3 -c "
import json, sys
d = json.load(sys.stdin)
parts = []
for r in d.get('data',{}).get('result',[]):
inst = r['metric'].get('instance','?')
val = r['value'][1]
parts.append(inst + '=' + val)
print(' '.join(parts))
"
}
g=$(fmt 'go_goroutines{job="xtcp2"}')
h=$(fmt 'floor(go_memstats_heap_inuse_bytes{job="xtcp2"}/1048576)')
t=$(fmt 'go_threads{job="xtcp2"}')
echo "go_routines=[$g] heap_MiB=[$h] go_threads=[$t]"
81 changes: 81 additions & 0 deletions build/scripts/clickpipe-stability-summary.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env bash
# Query in-VM Prometheus (via host:19090) for soak-stability metrics.
# Outputs a compact report: goroutines, heap, GC, RSS — start vs end,
# min/max, and a pass/fail judgement.
#
# Usage: bash /tmp/cppq-stability.sh [SOAK_START_TS] [SOAK_END_TS]
# timestamps as unix seconds; default = "soak started ~5min ago,
# ended now" which matches a smoke. For real soaks, pass them.

PROM=http://127.0.0.1:19090
NOW=$(date +%s)
START=${1:-$((NOW - 14400))} # default: 4h ago
END=${2:-$NOW}

# Promql query helper: returns the .value[1] of the first result, or "?"
q() {
local res
res=$(curl -sS --max-time 10 -G "$PROM/api/v1/query" --data-urlencode "query=$1" 2>/dev/null)
echo "$res" | python3 -c '
import json, sys
try:
d = json.load(sys.stdin)
r = d["data"]["result"]
if not r: print("?"); sys.exit()
for entry in r:
inst = entry["metric"].get("instance", "?")
val = entry["value"][1]
print(f"{inst}={val}")
except Exception as e:
print(f"err:{e}")
' 2>/dev/null
}

echo "=== xtcp2 stability summary ==="
date -d @"$START" +"start: %F %T"
date -d @"$END" +"end: %F %T"
echo

# --- Goroutines: start / end / max over window ---
echo "goroutines (current):"
q "go_goroutines"
echo
echo "goroutines (max over soak window):"
q "max_over_time(go_goroutines[${SOAK_DUR_MIN:-240}m])"
echo

# --- OS threads ---
echo "go_threads (current):"
q "go_threads"
echo
echo "go_threads (max over soak window):"
q "max_over_time(go_threads[${SOAK_DUR_MIN:-240}m])"
echo

# --- Heap memory ---
echo "heap inuse (current MB):"
q "go_memstats_heap_inuse_bytes / 1024 / 1024"
echo
echo "heap inuse (max MB over soak):"
q "max_over_time((go_memstats_heap_inuse_bytes/1024/1024)[${SOAK_DUR_MIN:-240}m:])"
echo

# --- GC pauses ---
echo "GC pause sum (seconds total since start):"
q "go_gc_duration_seconds_sum"
echo
echo "GC pause p99 (recent seconds):"
q "go_gc_duration_seconds{quantile=\"1\"}"
echo

# --- Process RSS ---
echo "process RSS (current MB):"
q "process_resident_memory_bytes / 1024 / 1024"
echo
echo "process RSS (max MB over soak):"
q "max_over_time((process_resident_memory_bytes/1024/1024)[${SOAK_DUR_MIN:-240}m:])"
echo

# --- Sample counts to validate data range ---
echo "prom sample count (soak window):"
q "count_over_time(go_goroutines[${SOAK_DUR_MIN:-240}m])"
17 changes: 17 additions & 0 deletions cmd/ns/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"os"
"testing"

"github.com/randomizedcoder/xtcp2/pkg/xtcp"
)

// TestMain disables xtcp's hard startup capability check so the tests
// that construct a real XTCP via xtcp.NewNsTestingXTCP (e.g.
// TestRunDaemonDefault_constructs) run to completion on unprivileged CI
// sandboxes that lack CAP_SYS_ADMIN / CAP_NET_ADMIN.
func TestMain(m *testing.M) {
xtcp.SetCapabilityCheck(func(*xtcp.XTCP) error { return nil })
os.Exit(m.Run())
}
Loading