Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
* [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-query-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group processing, and column materialization. #7613
* [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478
* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #7443
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
Expand Down
12 changes: 12 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,14 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

var newSeries []labels.Labels

delayObserver := i.metrics.ingestionDelaySeconds.WithLabelValues(userID)
nowMs := time.Now().UnixMilli()
observeDelay := func(timestampMs int64) {
if delayMs := nowMs - timestampMs; delayMs >= 0 {
delayObserver.Observe(float64(delayMs) / 1000.0)
}
}

for _, ts := range req.Timeseries {
// The labels must be sorted (in our case, it's guaranteed a write request
// has sorted labels once hit the ingester).
Expand Down Expand Up @@ -1557,6 +1565,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if ref != 0 {
if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil {
succeededSamplesCount++
observeDelay(s.TimestampMs)
continue
}

Expand All @@ -1568,6 +1577,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
newSeries = append(newSeries, copiedLabels)
}
succeededSamplesCount++
observeDelay(s.TimestampMs)
continue
}
}
Expand Down Expand Up @@ -1615,6 +1625,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if ref != 0 {
if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil {
succeededHistogramsCount++
observeDelay(hp.TimestampMs)
ingestedBucketsObserver.Observe(float64(hp.BucketCount()))
continue
}
Expand All @@ -1627,6 +1638,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
newSeries = append(newSeries, copiedLabels)
}
succeededHistogramsCount++
observeDelay(hp.TimestampMs)
ingestedBucketsObserver.Observe(float64(hp.BucketCount()))
continue
}
Expand Down
187 changes: 187 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2892,6 +2892,193 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) {
require.NoError(t, err)
}

func TestIngester_IngestionDelayMetric(t *testing.T) {
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)

t.Run("float samples with delays", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

userID := "user-1"
ctx := user.InjectOrgID(context.Background(), userID)

// Push samples with different delays (oldest first to avoid OOO)
now := time.Now().UnixMilli()
samples := []cortexpb.Sample{
{Value: 1, TimestampMs: now - 30000}, // 30 seconds ago
{Value: 2, TimestampMs: now - 10000}, // 10 seconds ago
{Value: 3, TimestampMs: now - 5000}, // 5 seconds ago
}

for _, sample := range samples {
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{sample},
nil,
nil,
cortexpb.API,
)
_, err = ing.Push(ctx, req)
require.NoError(t, err)
}

// Verify metric exists and has 3 observations
metricFamily, err := registry.Gather()
require.NoError(t, err)

var found bool
var sampleCount uint64
var sampleSum float64
for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" && label.GetValue() == userID {
found = true
if metric.Histogram != nil {
sampleCount = metric.Histogram.GetSampleCount()
sampleSum = metric.Histogram.GetSampleSum()
}
}
}
}
}
}

require.True(t, found, "ingestion delay metric not found")
assert.Equal(t, uint64(3), sampleCount, "expected 3 observations")

// Verify delays were actually measured (sum should be positive and reasonable)
// We sent samples 30s, 10s, and 5s old, so total delay should be ~45s (with some execution time added)
assert.Greater(t, sampleSum, 40.0, "sum of delays should be at least 40s")
})

t.Run("future timestamps are filtered", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

userID := "user-future"
ctx := user.InjectOrgID(context.Background(), userID)

// Push sample with future timestamp
futureTimestamp := time.Now().UnixMilli() + 60000 // 60 seconds in the future
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: futureTimestamp}},
nil,
nil,
cortexpb.API,
)

_, err = ing.Push(ctx, req)
require.NoError(t, err)

// Verify metric has 0 observations (future timestamp filtered)
metricFamily, err := registry.Gather()
require.NoError(t, err)

for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" && label.GetValue() == userID {
if metric.Histogram != nil {
assert.Equal(t, uint64(0), metric.Histogram.GetSampleCount(),
"future timestamp should not be observed")
}
}
}
}
}
}
})

t.Run("per-user isolation", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

// Push samples for two users
baseTime := time.Now().UnixMilli()
for _, u := range []struct {
id string
numSamples int
delayMs int64
}{
{"user-a", 2, 5000},
{"user-b", 3, 30000},
} {
ctx := user.InjectOrgID(context.Background(), u.id)
// Send in chronological order (oldest first)
for idx := u.numSamples - 1; idx >= 0; idx-- {
timestamp := baseTime - u.delayMs - int64(idx*1000)
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: float64(idx + 1), TimestampMs: timestamp}},
nil,
nil,
cortexpb.API,
)
_, err := ing.Push(ctx, req)
require.NoError(t, err)
}
}

// Verify each user has their own metric
metricFamily, err := registry.Gather()
require.NoError(t, err)

userCounts := make(map[string]uint64)
for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" {
userID := label.GetValue()
if metric.Histogram != nil {
userCounts[userID] = metric.Histogram.GetSampleCount()
}
}
}
}
}
}

assert.Equal(t, uint64(2), userCounts["user-a"])
assert.Equal(t, uint64(3), userCounts["user-b"])
})
}

func BenchmarkIngesterPush(b *testing.B) {
limits := defaultLimitsTestConfig()
benchmarkIngesterPush(b, limits, false)
Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ingesterMetrics struct {
ingestedExemplarsFail prometheus.Counter
ingestedMetadataFail prometheus.Counter
ingestedHistogramBuckets *prometheus.HistogramVec
ingestionDelaySeconds *prometheus.HistogramVec
oooLabelsTotal *prometheus.CounterVec
queries prometheus.Counter
queriedSamples prometheus.Histogram
Expand Down Expand Up @@ -149,6 +150,14 @@ func newIngesterMetrics(r prometheus.Registerer,
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1 to 512 buckets
}, []string{"user"}),
ingestionDelaySeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ingester_ingestion_delay_seconds",
Help: "Delay in seconds between sample ingestion time and sample timestamp.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1,
Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m
}, []string{"user"}),
oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_out_of_order_labels_total",
Help: "The total number of out of order label found per user.",
Expand Down Expand Up @@ -413,6 +422,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) {
m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID})
m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID})
m.ingestedHistogramBuckets.DeleteLabelValues(userID)
m.ingestionDelaySeconds.DeleteLabelValues(userID)

if m.memSeriesCreatedTotal != nil {
m.memSeriesCreatedTotal.DeleteLabelValues(userID)
Expand Down
Loading