feat(parquet): intra-file early stopping via statistics + dynamic filters#22450
feat(parquet): intra-file early stopping via statistics + dynamic filters#22450zhuqi-lucas wants to merge 6 commits into
Conversation
Two CI failures on PR apache#22450: 1. **cargo doc** — broken intra-doc link in `ParquetFileMetrics::row_groups_pruned_dynamic_filter`. Switch from `[\`row_groups_pruned_statistics\`]` to `[\`Self::row_groups_pruned_statistics\`]` so rustdoc can resolve it. 2. **sqllogictest substrait round-trip** — adding `dynamic_rg_pruning=eligible` to ParquetSource's `fmt_extra` output shifted every `EXPLAIN` line that already showed a `DynamicFilter` predicate. Add the marker to 13 SLT expectations: - clickbench, explain_analyze, limit, limit_pruning, dynamic_filter_pushdown_config, preserve_file_partitioning, projection_pushdown, push_down_filter_parquet, push_down_filter_regression, repartition_subset_satisfaction, sort_pushdown, statistics_registry, topk - 134 marker insertions total, all on `DataSourceExec:` lines whose predicate contains `DynamicFilter [`. Two summary-level analyze tests also need the new `row_groups_pruned_dynamic_filter=0` counter in their metrics block (`limit_pruning.slt`, `dynamic_filter_pushdown_config.slt`). Dev-level analyze output elides zero-valued counters so the other files don't need it. No behavior change beyond what was already in the previous commit.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
There was a problem hiding this comment.
Pull request overview
This PR adds runtime row-group pruning for Parquet scans driven by TopK’s dynamic filter, closing the gap where row groups selected at file open couldn’t be re-pruned after the TopK threshold tightens during execution.
Changes:
- Introduces a runtime
RowGroupPrunerthat re-evaluates a dynamic predicate at decoder-run boundaries and skips row groups proven unreachable. - Forces per-row-group decoder splitting when the predicate is dynamic so the runtime pruner has a boundary at every RG.
- Adds observability:
dynamic_rg_pruning=eligibleinEXPLAINand a new metricrow_groups_pruned_dynamic_filterinEXPLAIN ANALYZE, plus tests/SLTs updated accordingly.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/datasource-parquet/src/push_decoder.rs | Adds RowGroupPruner, tracks row-group indices per decoder run, and skips prunable runs at runtime. |
| datafusion/datasource-parquet/src/opener/mod.rs | Forces per-RG runs for dynamic predicates; wires pending runs + runtime pruner into PushDecoderStreamState. |
| datafusion/datasource-parquet/src/access_plan.rs | Extends split_runs with force_per_row_group to avoid coalescing runs for dynamic predicates. |
| datafusion/datasource-parquet/src/source.rs | Adds dynamic_rg_pruning=eligible marker in fmt_extra and unit tests for the marker. |
| datafusion/datasource-parquet/src/row_group_filter.rs | Exposes RowGroupPruningStatistics to reuse stats adapter for runtime pruning. |
| datafusion/datasource-parquet/src/metrics.rs | Adds row_groups_pruned_dynamic_filter metric to ParquetFileMetrics. |
| datafusion/core/tests/parquet/mod.rs | Adds helper to read row_groups_pruned_dynamic_filter from metrics. |
| datafusion/core/tests/parquet/dynamic_row_group_pruning.rs | New integration tests validating metric fires for TopK and stays quiet otherwise. |
| datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt | New SLT covering both EXPLAIN marker and EXPLAIN ANALYZE metric value. |
| datafusion/sqllogictest/test_files/topk.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/statistics_registry.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/push_down_filter_regression.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/push_down_filter_parquet.slt | Updates expected plans/metrics to include dynamic_rg_pruning=eligible and (where relevant) the new counter. |
| datafusion/sqllogictest/test_files/projection_pushdown.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/preserve_file_partitioning.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/limit.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/limit_pruning.slt | Updates expected metrics to include row_groups_pruned_dynamic_filter=0 plus eligibility marker. |
| datafusion/sqllogictest/test_files/explain_analyze.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt | Updates expected plans/metrics to include eligibility marker and row_groups_pruned_dynamic_filter=0 where applicable. |
| datafusion/sqllogictest/test_files/clickbench.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
Comments suppressed due to low confidence (1)
datafusion/datasource-parquet/src/access_plan.rs:458
split_runscomputesrow_group_needs_filteras!fully_matchedwithout considering theneeds_filterargument. Whenforce_per_row_group=trueand the scan has no row filter (needs_filter=false), this will still mark all runs asneeds_filter=true, causing the opener to treat them as filtered runs (e.g. attempting to fetch row filters / applying predicate-cache settings) even though no row-level filter exists.row_group_needs_filtershould be derived asneeds_filter && !fully_matchedso the run metadata stays consistent with the caller’s capabilities.
for (idx, (access, fully_matched)) in
row_groups.into_iter().zip(fully_matched).enumerate()
{
if !access.should_scan() {
continue;
}
let row_group_needs_filter = !fully_matched;
// Coalesce consecutive RGs into a run only when (a) they share
// the same filter requirement and (b) we're not forcing per-RG
// splitting for runtime pruning.
let can_coalesce = !force_per_row_group;
if can_coalesce
&& let Some(run) = runs
.last_mut()
.filter(|run| run.needs_filter == row_group_needs_filter)
{
run.access_plan.set(idx, access);
if fully_matched {
run.access_plan.mark_fully_matched(idx);
}
} else {
let mut run_plan = ParquetAccessPlan::new_none(num_row_groups);
run_plan.set(idx, access);
if fully_matched {
run_plan.mark_fully_matched(idx);
}
runs.push(RowGroupRun::new(row_group_needs_filter, run_plan));
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
Per Copilot review on apache#22450: `RowGroupPruner` was using a single `predicate_creation_errors` counter for both predicate construction (`build_pruning_predicate`) AND predicate evaluation (`PruningPredicate::prune`) failures. The log message also said "Ignoring error building..." when the failure was during evaluation. This misattributed evaluation failures and made the metric semantics inconsistent with the static row-group pruning path in `RowGroupAccessPlanFilter::prune_by_statistics`, which already separates the two. `RowGroupPruner::new` now takes both counters: - `predicate_creation_errors`: bumped on `build_pruning_predicate` failures. Wired to `prepared.predicate_creation_errors` from the opener — same field the static path uses. - `predicate_evaluation_errors`: bumped on `PruningPredicate::prune` failures. Wired to `prepared.file_metrics.predicate_evaluation_errors` — same field the static `prune_by_statistics` path uses, so the two paths accumulate into a shared counter. The error log message is updated to say "evaluating" so the metric and the log agree.
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (0828f1b) to a8f03fd (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
|
run benchmark topk_tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (0828f1b) to a8f03fd (merge-base) diff using: topk_tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetopk_tpch — base (merge-base)
topk_tpch — branch
File an issue against this benchmark runner |
┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ HEAD ┃ feat_topk-rg-level-dynamic-pruning ┃ Change ┃
┡━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2.14 / 2.74 ±0.76 / 4.10 ms │ 2.12 / 2.79 ±0.68 / 4.02 ms │ no change │
│ Q2 │ 10.66 / 11.36 ±0.68 / 12.23 ms │ 2.81 / 3.61 ±0.87 / 4.72 ms │ +3.15x faster │
│ Q3 │ 31.77 / 32.15 ±0.43 / 32.83 ms │ 31.71 / 31.92 ±0.16 / 32.18 ms │ no change │
│ Q4 │ 11.83 / 12.29 ±0.77 / 13.82 ms │ 3.13 / 3.25 ±0.13 / 3.48 ms │ +3.78x faster │
│ Q5 │ 9.94 / 10.14 ±0.18 / 10.46 ms │ 9.95 / 10.02 ±0.05 / 10.09 ms │ no change │
│ Q6 │ 17.19 / 17.39 ±0.15 / 17.56 ms │ 17.11 / 17.36 ±0.37 / 18.09 ms │ no change │
│ Q7 │ 37.07 / 38.08 ±1.17 / 40.08 ms │ 37.00 / 37.41 ±0.37 / 38.07 ms │ no change │
│ Q8 │ 28.13 / 28.59 ±0.60 / 29.71 ms │ 6.86 / 7.16 ±0.42 / 7.98 ms │ +3.99x faster │
│ Q9 │ 35.34 / 36.86 ±1.54 / 38.77 ms │ 8.36 / 8.50 ±0.08 / 8.60 ms │ +4.34x faster │
│ Q10 │ 54.13 / 55.29 ±1.83 / 58.93 ms │ 12.77 / 13.00 ±0.45 / 13.89 ms │ +4.25x faster │
│ Q11 │ 3.75 / 3.91 ±0.11 / 4.05 ms │ 3.82 / 4.08 ±0.31 / 4.68 ms │ no change │
└───────┴────────────────────────────────┴────────────────────────────────────┴───────────────┘cc @alamb @adriangb @Dandandan |
|
Nice, impressive 🚀🚀🚀 |
…mic RG pruning Adds a second integration test in `dynamic_row_group_pruning.rs` covering the **page-level `RowSelection`** path adriangb asked about: WHERE `v >= 500` engages the page index, masking out the first 5 of RG 0's 10 pages; `ORDER BY v DESC LIMIT 5` then drives runtime RG pruning, which drops RGs 0..3 in a single `into_builder` rebuild. If the rebuild lost or shifted the page-index-derived row selection, either the result rows would drift or the `page_index_pages_pruned` count would collapse. The test pins both values — pages_pruned >= 5 AND the top-5 descending values (4995..=4999) — so the rebuild's selection-preservation contract is nailed down end-to-end. The previous PR commit (`f7cb5e7bd`) covered the per-row `RowFilter` case; this one covers the page-level `RowSelection` case. Together they exercise both filter-state preservation paths arrow-rs's `into_builder` is responsible for. Tooling: extends `Unit` with a `RowGroupAndPage(rows_per_group, rows_per_page)` variant, and `make_test_file_rg` with an optional `row_per_page` argument that sets `data_page_row_count_limit` + `write_batch_size` so the writer produces multi-page RGs.
|
Addressed all comments besides:
I tried this before, but it failed for testing, i am trying more for this part. |
|
Update on this — I prototyped the per-RG toggle: push The current
Without the toggle, this off-by-one was harmless — we never used Fixing it cleanly needs one of:
I'm leaving the per-RG toggle out of this PR and will pick it up as a follow-up once the arrow-rs accessor lands (or by going with option 3 if that takes a while). The practical perf gap is narrow: arrow-rs's page-level |
makes sense, but doesn't that mean this PR is possibly a regression for some queries? |
|
run benchmarks |
|
run benchmark topk_tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (6c554cc) to 6176a6d (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (6c554cc) to 6176a6d (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (6c554cc) to 6176a6d (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (6c554cc) to 6176a6d (merge-base) diff using: topk_tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetopk_tpch — base (merge-base)
topk_tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
@zhuqi-lucas these failures in clickbench are worth looking at: │ QQuery 24 │ 41.97 / 47.71 ±5.80 / 56.58 ms │ FAIL │ incomparable │ |
ClickBench Q24 / Q26 (`SELECT … WHERE x <> '' ORDER BY ts LIMIT 10`)
were failing with:
Parquet error: into_builder called mid-row-group;
check is_at_row_group_boundary() first
`transition()` in `push_decoder.rs` re-enters Step 2 on every loop
iteration — including the iterations where Step 3 returned `NeedsData`
and we pushed byte ranges but the decoder has not yet handed back a
reader. At those moments the decoder sits in `ReadingRowGroup` state
but `is_at_row_group_boundary()` is `false`, and arrow-rs's
`into_builder` errors out (it can only rebuild at a clean RG boundary).
Step 2's prune-and-rebuild block now skips when the decoder is mid-RG.
Step 3 still drives the decoder forward in that iteration, and the next
boundary re-enters Step 2 with the pruner in the same state (the pruner
is stateful but idempotent — re-evaluating costs one cached
`pp.prune` per RG, no rebuild). No semantic change for queries that
were already passing.
Add `dynamic_rg_pruner_does_not_call_into_builder_mid_row_group`: a
20-RG × 50-row file plus `ORDER BY v ASC LIMIT 10` gives the pruner
multiple boundaries to attempt rebuilds and reliably trips the failure
mode on the pre-fix code.
Thanks @adriangb for good catch, and fixed in latest PR, let me rerun it to see. |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (608591f) to 96a6096 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
@adriangb It works well now for the latest run. |
adriangb
left a comment
There was a problem hiding this comment.
Thanks @zhuqi-lucas, I think this is ready!
Thank you @adriangb for review, added follow-up issue for full-matched improvement |
Which issue does this PR close?
Closes #22407.
Rationale for this change
DataFusion already prunes parquet at three granularities — file
(
EarlyStoppingStream+FilePruner), row group at scan-startup(
PruningPredicate→RowGroupAccessPlanFilter), and row inside anopen RG (
RowFilter).There's a gap in the middle: once row-group pruning runs at file open, that
decision is frozen because any dynamic filter is still
lit(true)atthat point. As
TopKtightens its threshold at runtime, subsequent RGs inthe already-opened file keep getting decoded even when their stats already
prove they cannot beat the threshold. This is the dominant cost for
ORDER BY ... LIMITqueries on multi-RG files where file-level pruningcan't help (single large file, or scrambled-RG multi-file).
See the issue for a full architectural diagram and a concrete trace
showing where the wasted I/O / decompression / decode lives.
What changes are included in this PR?
A single decoder paused at row-group boundaries, with a pruner consulted
between row groups and the decoder rebuilt via
into_builder()to skipthe row groups the pruner just rejected. Three coordinated pieces:
RowGroupPruner(datafusion/datasource-parquet/src/push_decoder.rs)mirrors
FilePrunerat row-group granularity. It uses theDynamicFilterTrackerAPI from feat(physical-expr): DynamicFilterTracker for cheap dynamic-filter change detection #22460 to subscribe once to everynot-yet-complete dynamic filter in the predicate;
tracker.changed()is a single atomic load — no tree traversal per check. The cached
PruningPredicateis rebuilt only when a watched filter has actuallymoved, then evaluated against the next pending row group's statistics
via the existing
RowGroupPruningStatisticsadapter. Predicateconstruction errors and predicate evaluation errors are counted into
two separate metrics so a flaky predicate path can never silently
drop data.
Single-decoder iteration model
(
PushDecoderStreamState::transition). The opener builds oneParquetPushDecoderfrom the prepared access plan, and the streamuses arrow-rs 59's
ParquetRecordBatchReaderiterator to pause atrow-group boundaries. At each boundary the pruner is consulted
against the head of
rg_plan(the remaining row-group indices). Ifthe pruner proves the head RG unwinnable, that index is dropped from
the plan and the decoder is rebuilt via
decoder.into_builder().with_row_groups(remaining).build()so theskipped RGs are bypassed entirely — no decode, no row-filter eval.
Already-fetched buffered bytes for downstream RGs carry across the
rebuild.
Gate: build the pruner only when the predicate actually moves.
The opener creates a
RowGroupPruneronly whenDynamicFilterTracking::classify(&predicate)reportsWatching(atleast one not-yet-complete dynamic filter) and more than one row
group remains in the access plan. Static or already-complete
predicates were fully consumed by
prune_by_statisticsat file open,so re-evaluating them per RG boundary would be wasted work.
The earlier multi-decoder design (
PendingDecoderRun,ParquetAccessPlan::split_runs,force_per_row_group) is removed —arrow-rs 59's
into_builder+with_row_groupsmakes a single decoderstrictly more capable.
Observability
Countmetricrow_groups_pruned_dynamic_filteronParquetFileMetricssurfaces the runtime saving.dynamic_rg_pruning=eligiblemarker onParquetSource'sEXPLAIN(fmt_extraDefault + Verbose) signals plan-timeeligibility, emitted whenever the predicate has a still-watching
dynamic portion. Eligible rather than true because the static
plan can't predict the runtime outcome.
Benchmarks (
benchmarks/sort_pushdown_inexact, 5 iterations)ORDER BY l_orderkey DESC LIMIT 100ORDER BY l_orderkey DESC LIMIT 1000SELECT * ... DESC LIMIT 100SELECT * ... DESC LIMIT 1000Narrow-projection queries gain the most — their per-RG cost is dominated
by metadata + sort-column read, which this PR eliminates for unwinnable
RGs. Wide-projection queries gain less because the kept RG's
all-column decode dominates total time, but still see meaningful
savings.
Are these changes tested?
Three layers:
push_decoder.rs::tests:RowGroupPrunerbasic pruning,tracker-driven dynamic-filter updates, fallback when the predicate
has no analyzable bounds.
source.rs::tests:dynamic_rg_pruning=eligiblemarkerpresent on dynamic predicate, absent on static predicate, absent
when there is no predicate at all.
datafusion/core/tests/parquet/dynamic_row_group_pruning.rs:asserts
row_groups_pruned_dynamic_filter >= 1end-to-end on a 5-RGORDER BY DESC LIMIT 5scan; a regression test for theprepare_access_planreorder bug that usesORDER BY ASCagainst afile written in descending value order so the sort-pushdown reorder
is exercised; and a quiet-without-TopK test that asserts the metric
stays at 0 (no spurious firing).
datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt:asserts both
EXPLAINsurfaces — plainEXPLAINshowsdynamic_rg_pruning=eligible, andEXPLAIN ANALYZEpinsrow_groups_pruned_dynamic_filter=4(five RGs, four pruned atruntime).
cargo clippy --all-targets --all-features -- -D warningsclean.Are there any user-facing changes?
Two visible additions, both opt-in via existing dynamic-filter
infrastructure:
row_groups_pruned_dynamic_filtercounter visible inEXPLAIN ANALYZEfor queries whose plan carries aDynamicFilterPhysicalExpr(today: only TopK withenable_topk_dynamic_filter_pushdown=true, which is the default).dynamic_rg_pruning=eligiblemarker visible inEXPLAINoutput for the same queries.
No config changes, no API breakage, no behavior change for queries
without a dynamic predicate.