-
Notifications
You must be signed in to change notification settings - Fork 862
Parquet: Skip parquet conversion for blocks with too many labels #7524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b3d87bb
8871ac9
72d34b5
97e9407
d7b9836
416a562
8f6615c
57ea94d
64d4477
b1c3b5d
909274c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| //go:build integration | ||
|
|
||
| package integration | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "math/rand" | ||
| "path/filepath" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/prometheus/prometheus/model/labels" | ||
| "github.com/stretchr/testify/require" | ||
| "github.com/thanos-io/objstore" | ||
| "github.com/thanos-io/thanos/pkg/block" | ||
| "github.com/thanos-io/thanos/pkg/block/metadata" | ||
|
|
||
| "github.com/cortexproject/cortex/integration/e2e" | ||
| e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" | ||
| e2edb "github.com/cortexproject/cortex/integration/e2e/db" | ||
| "github.com/cortexproject/cortex/integration/e2ecortex" | ||
| "github.com/cortexproject/cortex/pkg/storage/bucket" | ||
| "github.com/cortexproject/cortex/pkg/storage/tsdb" | ||
| "github.com/cortexproject/cortex/pkg/util/log" | ||
| cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" | ||
| ) | ||
|
|
||
| func TestParquetConverter_NoConvertMarkWithTooManyLabels(t *testing.T) { | ||
| s, err := e2e.NewScenario(networkName) | ||
| require.NoError(t, err) | ||
| defer s.Close() | ||
|
|
||
| consul := e2edb.NewConsulWithName("consul") | ||
| memcached := e2ecache.NewMemcached() | ||
| require.NoError(t, s.StartAndWaitReady(consul, memcached)) | ||
|
|
||
| baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) | ||
| flags := mergeFlags( | ||
| baseFlags, | ||
| map[string]string{ | ||
| "-target": "all,parquet-converter", | ||
| "-blocks-storage.tsdb.block-ranges-period": "1m,24h", | ||
| "-blocks-storage.tsdb.ship-interval": "1s", | ||
| "-blocks-storage.bucket-store.sync-interval": "1s", | ||
| "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", | ||
| "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", | ||
| "-blocks-storage.bucket-store.bucket-index.enabled": "true", | ||
| "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, | ||
| // compactor | ||
| "-compactor.cleanup-interval": "1s", | ||
| // Ingester. | ||
| "-ring.store": "consul", | ||
| "-consul.hostname": consul.NetworkHTTPEndpoint(), | ||
| // Distributor. | ||
| "-distributor.replication-factor": "1", | ||
| // Store-gateway. | ||
| "-store-gateway.sharding-enabled": "false", | ||
| "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways | ||
| // alert manager | ||
| "-alertmanager.web.external-url": "http://localhost/alertmanager", | ||
| // Enable vertical sharding. | ||
| "-frontend.query-vertical-shard-size": "3", | ||
| "-frontend.max-cache-freshness": "1m", | ||
| // enable experimental promQL funcs | ||
| "-querier.enable-promql-experimental-functions": "true", | ||
| // parquet-converter | ||
| "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), | ||
| "-parquet-converter.conversion-interval": "1s", | ||
| "-parquet-converter.enabled": "true", | ||
| "-parquet-converter.max-block-label-names": "1", | ||
| // Querier | ||
| "-querier.enable-parquet-queryable": "true", | ||
| // Enable cache for parquet labels and chunks | ||
| "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", | ||
| "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), | ||
| "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", | ||
| "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), | ||
| }, | ||
| ) | ||
|
|
||
| // make alert manager config dir | ||
| require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) | ||
|
|
||
| ctx := context.Background() | ||
| rnd := rand.New(rand.NewSource(time.Now().Unix())) | ||
| dir := filepath.Join(s.SharedDir(), "data") | ||
| lbls := []labels.Labels{ | ||
| labels.FromStrings("__name__", "test_series_a", "job", "test"), | ||
| } | ||
|
|
||
| numSamples := 60 | ||
| scrapeInterval := time.Minute | ||
| now := time.Now() | ||
| start := now.Add(-time.Hour * 24) | ||
| end := now.Add(-time.Hour) | ||
|
|
||
| minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) | ||
| require.NoError(t, s.StartAndWaitReady(minio)) | ||
|
|
||
| cortex := e2ecortex.NewSingleBinary("cortex", flags, "") | ||
| require.NoError(t, s.StartAndWaitReady(cortex)) | ||
| storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) | ||
| require.NoError(t, err) | ||
| bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) | ||
|
|
||
| id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, | ||
| start.UnixMilli(), | ||
| end.UnixMilli(), | ||
| scrapeInterval.Milliseconds(), 10, | ||
| ) | ||
| require.NoError(t, err) | ||
|
|
||
| err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) | ||
| require.NoError(t, err) | ||
|
|
||
| // Wait for the converter to write the no-convert marker | ||
| cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { | ||
| noConvertMarkerPath := fmt.Sprintf("%s/parquet-no-convert-mark.json", id.String()) | ||
| found := false | ||
| err := bkt.Iter(ctx, "", func(name string) error { | ||
| if name == noConvertMarkerPath { | ||
| found = true | ||
| } | ||
| return nil | ||
| }, objstore.WithRecursiveIter()) | ||
| require.NoError(t, err) | ||
| return found | ||
| }) | ||
|
|
||
| // confirm the conversion did not happen (check both paths) | ||
| blockID := id.String() | ||
| markerPaths := []string{ | ||
| fmt.Sprintf("%s/parquet-converter-mark.json", blockID), | ||
| fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", blockID), | ||
| } | ||
| for _, markerPath := range markerPaths { | ||
| exists, err := bkt.Exists(ctx, markerPath) | ||
| require.NoError(t, err) | ||
| require.False(t, exists, "converter mark should not exist at %s", markerPath) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,9 @@ const ( | |
| ringKey = "parquet-converter" | ||
|
|
||
| converterMetaPrefix = "converter-meta-" | ||
|
|
||
| parquetConverterDataColumnDuration = time.Hour * 8 | ||
| parquetConverterSystemColumnCount = 2 // s_col_indexes and s_series_hash. | ||
| ) | ||
|
|
||
| var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) | ||
|
|
@@ -139,7 +142,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex | |
| metrics: newMetrics(registerer), | ||
| bkt: bkt, | ||
| baseConverterOptions: []convert.ConvertOption{ | ||
| convert.WithColDuration(time.Hour * 8), | ||
| convert.WithColDuration(parquetConverterDataColumnDuration), | ||
| convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup), | ||
| }, | ||
| } | ||
|
|
@@ -388,14 +391,32 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin | |
| } | ||
|
|
||
| // We don't convert blocks again if they already have a valid converter mark. | ||
| if cortex_parquet.ValidConverterMarkVersion(marker.Version) { | ||
| if cortex_parquet.ValidNoConvertMarkVersion(marker.Version) { | ||
| level.Debug(logger).Log("msg", "skipping block, no-convert marker already exists", "block", b.ULID.String()) | ||
| c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc() | ||
| continue | ||
| } | ||
|
|
||
| if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges) { | ||
| continue | ||
| } | ||
|
|
||
| configuredMaxBlockLabelNames := c.limits.ParquetConverterMaxBlockLabelNames(userID) | ||
| maxBlockLabelNames := effectiveMaxBlockLabelNames(configuredMaxBlockLabelNames, b.MinTime, b.MaxTime) | ||
|
|
||
| // If the threshold is enabled, check for no-convert mark | ||
| if configuredMaxBlockLabelNames > 0 { | ||
|
|
||
| noConvertMark, err := cortex_parquet.ReadNoConvertMark(ctx, b.ULID, uBucket, logger) | ||
| if err != nil { | ||
| level.Error(logger).Log("msg", "failed to read parquet no-convert marker", "block", b.ULID.String(), "err", err) | ||
| continue | ||
| } | ||
| if cortex_parquet.ValidNoConvertMarkVersion(noConvertMark.Version) { | ||
| continue | ||
| } | ||
| } | ||
|
friedrichg marked this conversation as resolved.
|
||
|
|
||
| if err := os.RemoveAll(c.compactRootDir()); err != nil { | ||
| level.Error(logger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err) | ||
| if c.checkConvertError(userID, err) { | ||
|
|
@@ -425,6 +446,33 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin | |
| continue | ||
| } | ||
|
|
||
| if configuredMaxBlockLabelNames > 0 { | ||
| labelNames, err := tsdbBlock.LabelNames(ctx) | ||
| if err != nil { | ||
| _ = tsdbBlock.Close() | ||
| level.Error(logger).Log("msg", "failed to get label names", "block", b.ULID.String(), "err", err) | ||
| if c.checkConvertError(userID, err) { | ||
| return err | ||
| } | ||
| continue | ||
| } | ||
| labelNamesCount := len(labelNames) | ||
| if labelNamesCount > maxBlockLabelNames { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A note. Today the max column limit in parquet go is like 32767 IIRC. But since our parquet file has additional system columns, when configuring the max block label names we need to keep some buffer
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Is it 32767 + |
||
| if err := cortex_parquet.WriteNoConvertMark(ctx, b.ULID, uBucket, labelNamesCount, maxBlockLabelNames); err != nil { | ||
| _ = tsdbBlock.Close() | ||
| level.Error(logger).Log("msg", "failed to write parquet no-convert marker", "block", b.ULID.String(), "err", err) | ||
| if c.checkConvertError(userID, err) { | ||
| return err | ||
| } | ||
| continue | ||
| } | ||
| level.Debug(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames) | ||
| c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc() | ||
| _ = tsdbBlock.Close() | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir) | ||
| start := time.Now() | ||
|
|
||
|
|
@@ -486,6 +534,25 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin | |
| return nil | ||
| } | ||
|
|
||
| func effectiveMaxBlockLabelNames(configuredMaxBlockLabelNames int, mint, maxt int64) int { | ||
| if configuredMaxBlockLabelNames <= 0 { | ||
| return configuredMaxBlockLabelNames | ||
| } | ||
|
|
||
| dataColumnCount := 0 | ||
| if maxt >= mint { | ||
| dataColumnCount = int((maxt-mint)/parquetConverterDataColumnDuration.Milliseconds()) + 1 | ||
| } | ||
|
|
||
| // Reserve for s_col_indexes, s_series_hash, and generated s_data_* columns. | ||
| maxBlockLabelNames := max(parquet.MaxColumnIndex-parquetConverterSystemColumnCount-dataColumnCount, 0) | ||
|
|
||
| if configuredMaxBlockLabelNames > maxBlockLabelNames { | ||
| return maxBlockLabelNames | ||
| } | ||
| return configuredMaxBlockLabelNames | ||
| } | ||
|
|
||
| func (c *Converter) checkConvertError(userID string, err error) (terminate bool) { | ||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || c.isCausedByPermissionDenied(err) { | ||
| terminate = true | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right log here?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep the log needs to be removed