diff --git a/CHANGELOG.md b/CHANGELOG.md index 974a16d07a..e8f6f67bd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569 * [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570 * [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553 +* [ENHANCEMENT] Store Gateway: Resolve the parquet shard count from the bucket index instead of reading the converter mark for each block, reducing object storage calls when the bucket index is enabled. #7648 * [ENHANCEMENT] Query Frontend: Improve the slow query log with `source`, `user_agent`, `engine_type`, `block_store_type`, and query stats fields to aid slow query diagnosis. #7601 * [ENHANCEMENT] Ring: Add ring metric to count number of duplicate tokens. #7626 * [ENHANCEMENT] Metrics: Add native histogram support to all remaining production histograms, enabling dual-format (classic + native) exposition across all Cortex components. #7636 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 73c3e2e272..fa063b3651 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -2006,13 +2006,14 @@ blocks_storage: [enabled: | default = true] # How frequently a bucket index, which previously failed to load, should - # be tried to load again. This option is used only by querier. + # be tried to load again. This option is used by querier and store-gateway + # parquet mode. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval [update_on_error_interval: | default = 1m] # How long a unused bucket index should be cached. Once this timeout # expires, the unused bucket index is removed from the in-memory cache. - # This option is used only by querier. + # This option is used by querier and store-gateway parquet mode. # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout [idle_timeout: | default = 1h] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 4e7e640ed2..a558d10265 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -2064,13 +2064,14 @@ blocks_storage: [enabled: | default = true] # How frequently a bucket index, which previously failed to load, should - # be tried to load again. This option is used only by querier. + # be tried to load again. This option is used by querier and store-gateway + # parquet mode. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval [update_on_error_interval: | default = 1m] # How long a unused bucket index should be cached. Once this timeout # expires, the unused bucket index is removed from the in-memory cache. - # This option is used only by querier. + # This option is used by querier and store-gateway parquet mode. # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout [idle_timeout: | default = 1h] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index bf35836c9b..8d318794f6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2650,13 +2650,14 @@ bucket_store: [enabled: | default = true] # How frequently a bucket index, which previously failed to load, should be - # tried to load again. This option is used only by querier. + # tried to load again. This option is used by querier and store-gateway + # parquet mode. # CLI flag: -blocks-storage.bucket-store.bucket-index.update-on-error-interval [update_on_error_interval: | default = 1m] # How long a unused bucket index should be cached. Once this timeout # expires, the unused bucket index is removed from the in-memory cache. This - # option is used only by querier. + # option is used by querier and store-gateway parquet mode. # CLI flag: -blocks-storage.bucket-store.bucket-index.idle-timeout [idle_timeout: | default = 1h] diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 646a988cad..bd92140080 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -457,8 +457,8 @@ type BucketIndexConfig struct { func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.BoolVar(&cfg.Enabled, prefix+"enabled", true, "True to enable querier and store-gateway to discover blocks in the storage via bucket index instead of bucket scanning. Disabling the bucket index is not recommended for production.") - f.DurationVar(&cfg.UpdateOnErrorInterval, prefix+"update-on-error-interval", time.Minute, "How frequently a bucket index, which previously failed to load, should be tried to load again. This option is used only by querier.") - f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used only by querier.") + f.DurationVar(&cfg.UpdateOnErrorInterval, prefix+"update-on-error-interval", time.Minute, "How frequently a bucket index, which previously failed to load, should be tried to load again. This option is used by querier and store-gateway parquet mode.") + f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used by querier and store-gateway parquet mode.") f.DurationVar(&cfg.MaxStalePeriod, prefix+"max-stale-period", time.Hour, "The maximum allowed age of a bucket index (last updated) before queries start failing because the bucket index is too old. The bucket index is periodically updated by the compactor, while this check is enforced in the querier (at query time).") } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index c905ff9239..20ef4b020e 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -49,6 +49,7 @@ type BucketStores interface { storepb.StoreServer SyncBlocks(ctx context.Context) error InitialSync(ctx context.Context) error + Stop() error } // ThanosBucketStores is a multi-tenant wrapper of Thanos BucketStore. @@ -230,6 +231,9 @@ func (u *ThanosBucketStores) SyncBlocks(ctx context.Context) error { }) } +// Stop implements BucketStores. ThanosBucketStores has no background services to stop. +func (u *ThanosBucketStores) Stop() error { return nil } + func (u *ThanosBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { retries := backoff.New(ctx, backoff.Config{ MinBackoff: 1 * time.Second, diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 14724d60b5..857f91d24b 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -391,6 +391,9 @@ func (g *StoreGateway) running(ctx context.Context) error { } func (g *StoreGateway) stopping(_ error) error { + if g.stores != nil { + _ = g.stores.Stop() + } if g.subservices != nil { return services.StopManagerAndAwaitStopped(context.Background(), g.subservices) } diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index 5a5d740bc4..abace8c6be 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -27,16 +27,20 @@ import ( "google.golang.org/grpc/status" cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" ) type parquetBucketStore struct { - logger log.Logger - bucket objstore.InstrumentedBucket - limits *validation.Overrides - concurrency int + logger log.Logger + bucket objstore.InstrumentedBucket + indexLoader *bucketindex.Loader // nil when bucket index disabled + limits *validation.Overrides + userID string + bucketIndexEnabled bool + concurrency int chunksDecoder *schema.PrometheusParquetChunksDecoder @@ -67,20 +71,15 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket) noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx)) - // Read converter marks and expand to per-shard (blockID, shardID) lists. - // TODO(Sungjin1212): Read the shard count from the bucket index instead of reading the converter mark for each block. + shardCounts, err := p.resolveShardCounts(ctx, blockIDs) + if err != nil { + return nil, err + } + var shardBlockIDs []string var shardIDs []int for _, blockID := range blockIDs { - uid, err := ulid.Parse(blockID) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID) - } - marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger) - if err != nil { - return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID) - } - numShards := marker.Shards + numShards := shardCounts[blockID] if numShards <= 0 { // backward compatibility: blocks without a shard count have one shard numShards = 1 @@ -112,6 +111,44 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher return parquetBlocks, nil } +// resolveShardCounts returns the number of parquet shards for each requested block ID. +// +// When the bucket index is enabled, the shard count is read from the bucket index. +// When the bucket index is disabled, it falls back to reading the converter mark +// for each block. +func (p *parquetBucketStore) resolveShardCounts(ctx context.Context, blockIDs []string) (map[string]int, error) { + shardCounts := make(map[string]int, len(blockIDs)) + + if p.bucketIndexEnabled && p.indexLoader != nil { + idx, _, err := p.indexLoader.GetIndex(ctx, p.userID) + if err != nil { + return nil, errors.Wrap(err, "failed to get bucket index") + } + for _, b := range idx.Blocks { + numShards := 1 + if b.Parquet != nil && b.Parquet.Shards > 0 { + numShards = b.Parquet.Shards + } + shardCounts[b.ID.String()] = numShards + } + return shardCounts, nil + } + + // Fallback: read the converter mark for each block. + for _, blockID := range blockIDs { + uid, err := ulid.Parse(blockID) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID) + } + marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger) + if err != nil { + return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID) + } + shardCounts[blockID] = marker.Shards + } + return shardCounts, nil +} + // Series implements the store interface for a single parquet bucket store func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { spanLog, ctx := spanlogger.New(seriesSrv.Context(), "ParquetBucketStore.Series") diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index f0c488ba68..548415f0f7 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -5,6 +5,7 @@ import ( "fmt" "sort" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -31,9 +32,11 @@ import ( "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_util "github.com/cortexproject/cortex/pkg/util" cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" "github.com/cortexproject/cortex/pkg/util/parquetutil" + "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -60,6 +63,10 @@ type ParquetBucketStores struct { rowRangesCache search.RowRangesForConstraintsCache inflightRequests *cortex_util.InflightRequestTracker + + // indexLoader lazily loads and caches the per-tenant bucket index in memory + // It is non-nil only when BucketIndex.Enabled. + indexLoader *bucketindex.Loader } // newParquetBucketStores creates a new multi-tenant parquet bucket stores @@ -108,6 +115,18 @@ func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, bucketClient objstore. u.matcherCache = storecache.NoopMatchersCache } + if cfg.BucketStore.BucketIndex.Enabled { + // Add a component label so the bucket index Loader metrics don't collide + // with the querier's Loader metrics when running in single-binary mode. + loaderReg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "parquet-storegateway"}, reg) + u.indexLoader = bucketindex.NewLoader(bucketindex.LoaderConfig{ + CheckInterval: time.Minute, + UpdateOnStaleInterval: cfg.BucketStore.SyncInterval, + UpdateOnErrorInterval: cfg.BucketStore.BucketIndex.UpdateOnErrorInterval, + IdleTimeout: cfg.BucketStore.BucketIndex.IdleTimeout, + }, bucketClient, limits, logger, loaderReg) + } + return u, nil } @@ -218,6 +237,18 @@ func (u *ParquetBucketStores) SyncBlocks(ctx context.Context) error { // InitialSync implements BucketStores func (u *ParquetBucketStores) InitialSync(ctx context.Context) error { + if u.indexLoader != nil { + // Start indexLoader + return services.StartAndAwaitRunning(ctx, u.indexLoader) + } + return nil +} + +// Stop implements BucketStores +func (u *ParquetBucketStores) Stop() error { + if u.indexLoader != nil { + return services.StopAndAwaitTerminated(context.Background(), u.indexLoader) + } return nil } @@ -270,14 +301,17 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits) store := &parquetBucketStore{ - logger: userLogger, - bucket: userBucket, - limits: u.limits, - concurrency: u.cfg.BucketStore.ParquetQueryConcurrency, - chunksDecoder: u.chunksDecoder, - matcherCache: u.matcherCache, - parquetShardCache: u.parquetShardCache, - rowRangesCache: u.rowRangesCache, + logger: userLogger, + bucket: userBucket, + indexLoader: u.indexLoader, + limits: u.limits, + userID: userID, + bucketIndexEnabled: u.cfg.BucketStore.BucketIndex.Enabled, + concurrency: u.cfg.BucketStore.ParquetQueryConcurrency, + chunksDecoder: u.chunksDecoder, + matcherCache: u.matcherCache, + parquetShardCache: u.parquetShardCache, + rowRangesCache: u.rowRangesCache, } return store, nil diff --git a/pkg/storegateway/parquet_bucket_stores_test.go b/pkg/storegateway/parquet_bucket_stores_test.go index d577de0655..797bd4c87c 100644 --- a/pkg/storegateway/parquet_bucket_stores_test.go +++ b/pkg/storegateway/parquet_bucket_stores_test.go @@ -1,6 +1,7 @@ package storegateway import ( + "bytes" "context" "errors" "fmt" @@ -30,6 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -485,3 +487,67 @@ func TestParquetBucketStores_Series_MultiShard(t *testing.T) { require.NoError(t, err) assert.Equal(t, numSeries, len(series), "all series from all shards must be returned") } + +// TestParquetBucketStores_Series_MultiShard_BucketIndex verifies that, when the bucket +// index is enabled, the Store Gateway resolves the parquet shard count from the bucket +// index instead of reading the per-block converter mark. +func TestParquetBucketStores_Series_MultiShard_BucketIndex(t *testing.T) { + const ( + userID = "user-1" + metricName = "test_metric" + numSeries = 6 // 6 unique series + // numRowGroups=1, maxRowsPerRowGroup=2 → ceil(6/1*2) = 3 shards + numRowGroups = 1 + maxRowsPerRowGroup = 2 + expectedShards = 3 + ) + + cfg := prepareStorageConfig(t) + cfg.BucketStore.BucketStoreType = string(cortex_tsdb.ParquetBucketStore) + cfg.BucketStore.BucketIndex.Enabled = true + + storageDir := t.TempDir() + + // Create a block with 6 unique series. + generateStorageBlockWithMultipleSeries(t, storageDir, userID, metricName, numSeries, 0, 100, 15) + + bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + overrides := validation.NewOverrides(validation.Limits{}, nil) + uBucket := bucket.NewUserBucketClient(userID, bkt, overrides) + + // Convert to parquet with 3 shards and write the (correct) converter mark. + userPath := filepath.Join(storageDir, userID) + blockIDs, err := convertToParquetBlocksWithShardsForTesting(userPath, uBucket, numRowGroups, maxRowsPerRowGroup) + require.NoError(t, err) + require.Len(t, blockIDs, 1) + + uidV2, err := ulidv2.Parse(blockIDs[0]) + require.NoError(t, err) + + // The bucket index discovers parquet blocks via the global markers location + // (parquet-markers/-parquet-converter-mark.json), so upload it there too. + require.NoError(t, uBucket.Upload(context.Background(), bucketindex.ConverterMarkFilePath(uidV2), bytes.NewReader([]byte("{}")))) + + // Build the bucket index (with parquet info) so the shard count is recorded there. + idx, _, _, err := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger()).EnableParquet().UpdateIndex(context.Background(), nil) + require.NoError(t, err) + require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, nil, idx)) + + // The bucket index must record the actual number of shards (3). + require.Len(t, idx.Blocks, 1) + require.NotNil(t, idx.Blocks[0].Parquet) + require.Equal(t, expectedShards, idx.Blocks[0].Parquet.Shards, "bucket index should record 3 shards") + + // Overwrite the converter mark with a wrong shard count (1). If the Store Gateway + // used the converter mark instead of the bucket index, it would only read 1 shard. + require.NoError(t, cortex_parquet.WriteConverterMark(context.Background(), uidV2, uBucket, 1)) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bkt), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + series, _, err := querySeries(stores, userID, metricName, 0, 100, blockIDs...) + require.NoError(t, err) + assert.Equal(t, numSeries, len(series), "all series must be returned using the bucket index shard count") +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index e3825a12e7..69e23be243 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -1094,7 +1094,7 @@ }, "idle_timeout": { "default": "1h0m0s", - "description": "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used only by querier.", + "description": "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used by querier and store-gateway parquet mode.", "type": "string", "x-cli-flag": "blocks-storage.bucket-store.bucket-index.idle-timeout", "x-format": "duration" @@ -1108,7 +1108,7 @@ }, "update_on_error_interval": { "default": "1m0s", - "description": "How frequently a bucket index, which previously failed to load, should be tried to load again. This option is used only by querier.", + "description": "How frequently a bucket index, which previously failed to load, should be tried to load again. This option is used by querier and store-gateway parquet mode.", "type": "string", "x-cli-flag": "blocks-storage.bucket-store.bucket-index.update-on-error-interval", "x-format": "duration"