From 4ad8fc258188ba953393d353ab32820468ae63b3 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Tue, 21 Apr 2026 05:48:16 +0000 Subject: [PATCH 1/2] adding ingestion delay metric Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 1 + pkg/ingester/ingester.go | 12 +++ pkg/ingester/ingester_test.go | 187 ++++++++++++++++++++++++++++++++++ pkg/ingester/metrics.go | 10 ++ 4 files changed, 210 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 974a16d07a..2c955d3b91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 * [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-query-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group processing, and column materialization. #7613 * [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478 +* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #6748 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 06d9f13747..a6963cd5f6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1517,6 +1517,14 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte var newSeries []labels.Labels + delayObserver := i.metrics.ingestionDelaySeconds.WithLabelValues(userID) + nowMs := time.Now().UnixMilli() + observeDelay := func(timestampMs int64) { + if delayMs := nowMs - timestampMs; delayMs >= 0 { + delayObserver.Observe(float64(delayMs) / 1000.0) + } + } + for _, ts := range req.Timeseries { // The labels must be sorted (in our case, it's guaranteed a write request // has sorted labels once hit the ingester). @@ -1557,6 +1565,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if ref != 0 { if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil { succeededSamplesCount++ + observeDelay(s.TimestampMs) continue } @@ -1568,6 +1577,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte newSeries = append(newSeries, copiedLabels) } succeededSamplesCount++ + observeDelay(s.TimestampMs) continue } } @@ -1615,6 +1625,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if ref != 0 { if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil { succeededHistogramsCount++ + observeDelay(hp.TimestampMs) ingestedBucketsObserver.Observe(float64(hp.BucketCount())) continue } @@ -1627,6 +1638,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte newSeries = append(newSeries, copiedLabels) } succeededHistogramsCount++ + observeDelay(hp.TimestampMs) ingestedBucketsObserver.Observe(float64(hp.BucketCount())) continue } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 1f33a0ebc9..a886afedfe 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2892,6 +2892,193 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) { require.NoError(t, err) } +func TestIngester_IngestionDelayMetric(t *testing.T) { + metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} + metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters) + + t.Run("float samples with delays", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + userID := "user-1" + ctx := user.InjectOrgID(context.Background(), userID) + + // Push samples with different delays (oldest first to avoid OOO) + now := time.Now().UnixMilli() + samples := []cortexpb.Sample{ + {Value: 1, TimestampMs: now - 30000}, // 30 seconds ago + {Value: 2, TimestampMs: now - 10000}, // 10 seconds ago + {Value: 3, TimestampMs: now - 5000}, // 5 seconds ago + } + + for _, sample := range samples { + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{sample}, + nil, + nil, + cortexpb.API, + ) + _, err = ing.Push(ctx, req) + require.NoError(t, err) + } + + // Verify metric exists and has 3 observations + metricFamily, err := registry.Gather() + require.NoError(t, err) + + var found bool + var sampleCount uint64 + var sampleSum float64 + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" && label.GetValue() == userID { + found = true + if metric.Histogram != nil { + sampleCount = metric.Histogram.GetSampleCount() + sampleSum = metric.Histogram.GetSampleSum() + } + } + } + } + } + } + + require.True(t, found, "ingestion delay metric not found") + assert.Equal(t, uint64(3), sampleCount, "expected 3 observations") + + // Verify delays were actually measured (sum should be positive and reasonable) + // We sent samples 30s, 10s, and 5s old, so total delay should be ~45s (with some execution time added) + assert.Greater(t, sampleSum, 40.0, "sum of delays should be at least 40s") + }) + + t.Run("future timestamps are filtered", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + userID := "user-future" + ctx := user.InjectOrgID(context.Background(), userID) + + // Push sample with future timestamp + futureTimestamp := time.Now().UnixMilli() + 60000 // 60 seconds in the future + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 1, TimestampMs: futureTimestamp}}, + nil, + nil, + cortexpb.API, + ) + + _, err = ing.Push(ctx, req) + require.NoError(t, err) + + // Verify metric has 0 observations (future timestamp filtered) + metricFamily, err := registry.Gather() + require.NoError(t, err) + + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" && label.GetValue() == userID { + if metric.Histogram != nil { + assert.Equal(t, uint64(0), metric.Histogram.GetSampleCount(), + "future timestamp should not be observed") + } + } + } + } + } + } + }) + + t.Run("per-user isolation", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + // Push samples for two users + baseTime := time.Now().UnixMilli() + for _, u := range []struct { + id string + numSamples int + delayMs int64 + }{ + {"user-a", 2, 5000}, + {"user-b", 3, 30000}, + } { + ctx := user.InjectOrgID(context.Background(), u.id) + // Send in chronological order (oldest first) + for idx := u.numSamples - 1; idx >= 0; idx-- { + timestamp := baseTime - u.delayMs - int64(idx*1000) + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: float64(idx + 1), TimestampMs: timestamp}}, + nil, + nil, + cortexpb.API, + ) + _, err := ing.Push(ctx, req) + require.NoError(t, err) + } + } + + // Verify each user has their own metric + metricFamily, err := registry.Gather() + require.NoError(t, err) + + userCounts := make(map[string]uint64) + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" { + userID := label.GetValue() + if metric.Histogram != nil { + userCounts[userID] = metric.Histogram.GetSampleCount() + } + } + } + } + } + } + + assert.Equal(t, uint64(2), userCounts["user-a"]) + assert.Equal(t, uint64(3), userCounts["user-b"]) + }) +} + func BenchmarkIngesterPush(b *testing.B) { limits := defaultLimitsTestConfig() benchmarkIngesterPush(b, limits, false) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index bd0ed95ebf..3cc9f5bf01 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -42,6 +42,7 @@ type ingesterMetrics struct { ingestedExemplarsFail prometheus.Counter ingestedMetadataFail prometheus.Counter ingestedHistogramBuckets *prometheus.HistogramVec + ingestionDelaySeconds *prometheus.HistogramVec oooLabelsTotal *prometheus.CounterVec queries prometheus.Counter queriedSamples prometheus.Histogram @@ -149,6 +150,14 @@ func newIngesterMetrics(r prometheus.Registerer, NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1 to 512 buckets }, []string{"user"}), + ingestionDelaySeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_ingester_ingestion_delay_seconds", + Help: "Delay in seconds between sample ingestion time and sample timestamp.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1, + Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m + }, []string{"user"}), oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_out_of_order_labels_total", Help: "The total number of out of order label found per user.", @@ -413,6 +422,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID}) m.ingestedHistogramBuckets.DeleteLabelValues(userID) + m.ingestionDelaySeconds.DeleteLabelValues(userID) if m.memSeriesCreatedTotal != nil { m.memSeriesCreatedTotal.DeleteLabelValues(userID) From 27bf2afad7547e51f8e7f8f1746268377d4d5a10 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Mon, 29 Jun 2026 00:52:43 +0000 Subject: [PATCH 2/2] updating pr id Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c955d3b91..e80213b0d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 * [ENHANCEMENT] Parquet: Add `-blocks-storage.bucket-store.parquet-query-concurrency` flag to configure the maximum number of concurrent goroutines applied at each level of parquet query processing in store-gateway: shard querying, row group processing, and column materialization. #7613 * [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478 -* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #6748 +* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #7443 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359