[python] Add compaction module with Ray distributed executor#7771
[python] Add compaction module with Ray distributed executor#7771TheR1sing3un wants to merge 12 commits into
Conversation
Lay the protocol-level groundwork for upcoming compaction work: - CommitMessage gains compact_before / compact_after fields so a single message can carry both deletion and addition of files in a compact result. - FileStoreCommit emits ADD entries for compact_after and DELETE entries for compact_before; commit() auto-selects COMPACT kind when no new_files are present, and a dedicated commit_compact() helper enforces COMPACT-only semantics with no row-id assignment. - DataFileMeta exposes to_dict / from_dict round-trip plus tagged-value encoding (bytes, decimal, datetime, date, time, Timestamp) so file metas can be shipped JSON-safely between processes. - New CommitMessageSerializer wraps the JSON form for use as a CompactTask payload (Phase 4 will consume it from the Ray executor). No write/read behavior changes for existing callers.
… values Promote encode_value/decode_value to the public DataFileMeta API and reuse them for CommitMessage.partition. Without this, partitions containing DATE/DECIMAL/bytes/Timestamp would crash json.dumps once Phase 4 ships CommitMessage payloads through Ray workers. Tests: round-trip date/Decimal/bytes and Timestamp partition tuples.
End-to-end Append-only compaction in-process, exposed as
table.new_compact_job(...).execute(). The plumbing follows the same
Coordinator → Executor → Driver-commit shape Spark uses, so plugging
in a Ray backend in Phase 4 only swaps out the executor.
New compact package layout (kept stable as Phase 3 will plug PK):
pypaimon/compact/
options.py
coordinator/{coordinator.py, append_compact_coordinator.py}
task/{compact_task.py, append_compact_task.py}
rewriter/{rewriter.py, append_compact_rewriter.py}
executor/{executor.py, local_executor.py}
job/compact_job.py
Behavior:
- Coordinator scans the latest snapshot via FileScanner.plan_files,
groups by (partition, bucket), filters out already target_file_size+
files, and chunks each bucket at max_file_num. full_compaction=True
rewrites every file regardless of size or count.
- Rewriter feeds files batch-by-batch into AppendOnlyDataWriter so the
writer's existing target_file_size rolling produces correctly sized
output without a separate rolling layer.
- AppendCompactTask captures the in-process FileStoreTable directly;
to_dict/from_dict are stubbed and raise — Phase 4 will fill them in
once Ray needs serialization.
- CompactJob assembles CommitMessage(compact_before, compact_after)
from each task and calls FileStoreCommit.commit_compact for a single
atomic snapshot tagged COMPACT.
Tests cover threshold behavior, full_compaction override, max_file_num
chunking, PK rejection, partitioned/unpartitioned e2e (file count
shrinks, data identity preserved, snapshot kind=COMPACT), and the
no-op path.
- Rewriter: stop mutating manifest-owned DataFileMeta. Resolve the read path locally each iteration, preferring external_path (matches SplitRead.file_reader_supplier) over file_path, and never write back. - Rewriter: seed sequence_generator per bucket_mode — 0 for BUCKET_UNAWARE and max(input.max_seq) for HASH_FIXED — matching FileStoreWrite._create_data_writer instead of always using max. - Rewriter: abort the AppendOnlyDataWriter on failure so partial output files don't leak when an executor raises mid-rewrite. - CompactOptions: validate min_file_num >= 1 and max_file_num >= min_file_num at construction so misconfiguration fails loudly instead of being silently rounded up. - AppendCompactCoordinator: drop the silent max(min, max) rescue and document that the trailing chunk below min_file_num is intentionally dropped (deferred to a future change). - CompactTask: align docstring with reality — JSON serialization is declared on the base class but concrete subclasses may defer it until distributed execution arrives in Phase 4. Tests: rewriter must not mutate input metadata; rewriter must abort output on failure; CompactOptions validation. All 15 compact tests plus 60 commit/manifest/scanner regression tests pass.
…strategy
End-to-end primary-key compaction in-process. table.new_compact_job(...)
on a PK table now plans a MergeTreeCompactTask per (partition, bucket)
that is eligible under UniversalCompaction's three-stage decision
(size-amp / size-ratio / file-num), rewrites it via SortMergeReader +
MergeFunction, and commits the result with snapshot kind=COMPACT.
New modules:
pypaimon/compact/levels.py
Direct port of Java mergetree.Levels — L0 ordered by maxSeq DESC,
L1..N hold one SortedRun each, update() routes per-level.
pypaimon/compact/strategy/
compact_unit.py + strategy.py + universal_compaction.py — full
Universal Compaction algorithm (size-amp, size-ratio, file-num,
force-pick-L0). EarlyFullCompaction / OffPeak left for later.
pypaimon/compact/rewriter/merge_tree_rolling_writer.py
Subclass of DataWriter that consumes pre-merged KV batches; rewrites
each appended file's metadata with the strategy's output_level, the
actual min/max sequence numbers and retract count.
pypaimon/compact/rewriter/merge_tree_compact_rewriter.py
Drives IntervalPartition → per-section ConcatRecordReader →
SortMergeReader (with the table's MergeFunction) → optional
DropDeleteRecordReader → buffered RecordBatch → rolling writer.
pypaimon/compact/coordinator/merge_tree_compact_coordinator.py
Per-(partition, bucket) Levels build + strategy.pick + drop_delete
rule (output_level >= non_empty_highest_level).
pypaimon/compact/task/merge_tree_compact_task.py
Carries one CompactUnit; assembles CommitMessage(compact_before,
compact_after) for the driver to commit atomically.
Read path:
pypaimon/read/reader/merge_function.py
Abstract MergeFunction + Factory; DeduplicateMergeFunction migrated
from sort_merge_reader.py. PartialUpdate / Aggregate / FirstRow are
stubbed so configured tables fail loudly with a Phase 6 message.
SortMergeReaderWithMinHeap accepts an optional merge_function
(default DeduplicateMergeFunction → existing read path unchanged).
KeyValue.row_tuple exposes the underlying physical tuple so the
rewriter can buffer KVs back into a RecordBatch.
CompactJob now routes PK tables to MergeTreeCompactCoordinator.
Tests: 19 unit (Levels semantics, UniversalCompaction trigger
algorithm, MergeFunction registry + stubs) + 2 PK e2e (full-compaction
dedup keeps latest values & promotes level; below-trigger no-op).
99-test combined regression on commit/manifest/scanner/reader paths.
- Rewriter: count_retract_rows now matches RowKind.is_add_byte (only UPDATE_BEFORE=1 and DELETE=3 are retracts). The previous != 0 check wrongly inflated delete_row_count by counting UPDATE_AFTER rows, which would skew downstream size-amplification estimates and metrics. - Levels.update: reject out-of-range levels with a clear ValueError instead of letting an IndexError leak from _update_level when a buggy strategy hands back an output_level above number_of_levels(). - Extract build_kv_file_fields() to split_read.py and consume it from both SplitRead._create_key_value_fields and the merge-tree rewriter, so the on-disk KV file schema layout (key cols / seq / kind / value) cannot drift between read and compact paths.
Compact jobs can now run their work on Ray. table.new_compact_job(..., executor=RayExecutor(), catalog_options=..., table_identifier=...).execute() plans on the driver, ships JSON-serialized CompactTask payloads through ray.remote, rebuilds the FileStoreTable inside each worker via the configured catalog, runs the rewriter, and returns CommitMessages back to the driver for one atomic commit. CompactTask base class: - with_table_loader(catalog_options, table_identifier) attaches the spec a worker uses to rebuild its table. - to_dict / from_dict are now concrete: a base envelope holding type + loader spec + payload, with subclasses owning _to_payload / _from_payload. CompactTask.deserialize(payload) returns the right subclass via the registry. - _resolve_table_via_loader() centralizes catalog rebuild so subclasses share a single in-process-vs-distributed branch. AppendCompactTask / MergeTreeCompactTask: - replace the Phase 3 NotImplementedError stubs with real payload encoders that round-trip files via DataFileMeta.to_dict and partition tuples via encode_value/decode_value (handles DATE / Decimal / Timestamp partition columns). - _resolve_table prefers the in-process table when LocalExecutor attached one and falls back to the loader otherwise. CompactJob: - Accepts catalog_options + table_identifier and propagates them onto every task before dispatch when present. LocalExecutor path unchanged. RayExecutor: - Top-level _run_task_payload worker so Ray pickling stays cheap and worker code can't capture driver state. - ray.init only when not already initialized; respects ray_init_args. - num_cpus_per_task + ray_remote_args expose the usual Ray knobs. DataFileMeta serialization: - Tolerate manifest-side BinaryRow (lazy-decoded) in addition to GenericRow, and pyarrow Array-like null_counts. Without this the Ray round trip fails on files that were just produced by the writer. setup.py already declared ray as an optional extra (pip install pypaimon[ray]); no packaging changes required. Tests: - compact_task_serde_test (5 tests): round-trip Append + MergeTree payloads with loader spec and non-JSON-native partitions; clear error when neither table nor loader was attached; unknown-type rejection in the registry. - ray_executor_test (1 test): end-to-end Append-only compaction via a real ray.init(local_mode=True), asserting commit_kind=COMPACT and data identity. Skipped automatically if ray isn't installed.
- CompactJob.table_identifier default uses Identifier.get_full_name()
instead of str(identifier). Identifier is a dataclass with no custom
__str__, so str(...) returns its repr ("Identifier(database='db',
...)") and Identifier.from_string would refuse to parse that on the
worker side. The default path was untested in Phase 4 (e2e passed
only because the test explicitly passed table_identifier=...) — this
fixup also drops that explicit kwarg from the e2e so the default is
exercised.
- RayExecutor module imports the AppendCompactTask / MergeTreeCompactTask
modules at the top level so their @register_compact_task side effects
populate the task registry inside Ray worker processes. Without this,
a real (non-local_mode) Ray cluster would unpickle _run_task_payload
in a fresh process whose registry is empty and CompactTask.deserialize
would raise "Unknown CompactTask type".
- MergeTreeCompactTask docstring updated — it no longer says
"Phase 4 will plumb the loader fields" since Phase 4 already did.
…ctIncrement Restructure CommitMessage to mirror org.apache.paimon.table.sink.CommitMessageImpl exactly: instead of dropping new_files / compact_before / compact_after onto CommitMessage as flat fields, package them inside DataIncrement and CompactIncrement value objects that match their Java counterparts field-for-field. This makes Python and Java messages structurally identical and gives later phases a single, unambiguous slot to plug deletion vectors, changelog files, and global index deltas into without inventing parallel field names. New value objects: - DataIncrement(new_files, deleted_files, changelog_files, new_index_files, deleted_index_files) — direct port of org.apache.paimon.io.DataIncrement. - CompactIncrement(compact_before, compact_after, changelog_files, new_index_files, deleted_index_files) — direct port of org.apache.paimon.io.CompactIncrement. CommitMessage now holds (partition, bucket, total_buckets, data_increment, compact_increment, check_from_snapshot). Convenience properties (new_files, compact_before, compact_after, changelog_files, ...) keep call-sites readable without leaking the increment shape. Migration: - FileStoreWrite.prepare_commit, TableUpdate.prepare_commit, AppendCompactTask.run, MergeTreeCompactTask.run all build their CommitMessage through DataIncrement / CompactIncrement and now also populate total_buckets the way Java does. - CommitMessageSerializer wire format bumps to version=2 and round-trips the full increment shape, including index file lists. IndexFileMeta serialization covers identity fields only — dv_ranges / global_index_meta will be wired up alongside the deletion-vector and changelog phases. Tests updated to construct messages via increments. No behavior changes for the existing commit / read paths: FileStoreCommit still reads message.new_files / compact_before / compact_after through the new convenience properties.
This PR has not landed yet, so there is no on-disk / cross-process payload from a prior version to stay compatible with — VERSION still denotes "first shipped wire format". Bump it once we actually need to break compat with a released version.
Replace the count-based chunking in _pick_files_for_bucket with the size-based bin-packing algorithm Java's AppendCompactCoordinator .SubCoordinator.pack uses, so plans produced by the Python coordinator match Java's task shape on the same input: - Sort candidates by file_size ascending instead of by sequence number, so smaller files lead and the packer has the most room to grow each bin before overshooting. - Drain a bin as soon as it has >1 file AND its weighted size hits target_file_size * 2. The hardcoded ×2 is Java's "each task should yield ~2 target-sized output files" constant. - Account for source.split.open-file-cost in bin size, matching Java's per-file IO weight: a bucket of many tiny files now fans out into several tasks instead of being packed into one giant task. - Trailing bin emits only when it has at least min_file_num files; shorter tails wait for company on the next plan. full_compaction=True drops that minimum to 1 so a "rewrite this bucket" intent always produces at least one task. CompactOptions: - Drop max_file_num — Java has no such concept and size-based packing caps each task at ~2x target naturally. - Drop the now-irrelevant max>=min check; the only invariant left is min_file_num >= 1. Tests: - New append_compact_packing_test (9 cases) drives the algorithm directly with hand-built DataFileMeta lists, mirroring the kind of coverage Java's AppendCompactCoordinatorTest has for pack(). - E2E coordinator/rewriter/Ray tests now zero source.split.open-file-cost on their tiny test tables (default 4 MB would dominate the 1 KB parquet files and trigger spurious mid-loop drains). - Drop test_chunks_when_exceeding_max_file_num (max_file_num is gone) in favor of test_many_small_files_pack_into_single_task which documents the realistic tiny-file behavior.
… Java Java BaseAppendFileStoreWrite.compactRewrite seeds its RowDataRollingFileWriter with LongCounter(toCompact[0].minSequenceNumber()) and increments per row written. Each rolled output file therefore carries a precise [first_row_seq, last_row_seq] range and the union across all output files is contiguous: [seed, seed + total_input_rows - 1]. The previous Python rewriter: - seeded the writer with bucket-mode-dependent values (UNAWARE→0, HASH_FIXED→max(input.max_seq)) which had no Java analog; - never advanced sequence_generator.current — so every committed file ended up with min_seq == max_seq, i.e. compact output threw away the per-row seq information Java preserves. This commit introduces AppendCompactRollingWriter, an AppendOnlyDataWriter subclass that: - treats sequence_generator.start as Java's "next-to-assign" counter, so a slice of N rows is laid out as [seq_start, seq_start + N - 1]; - works around the base SequenceGenerator's off-by-one quirk by setting current = seq_end before super()._write_data_to_file (so the parent reads min/max as seq_start/seq_end exactly) and bumping both fields to seq_end + 1 afterwards (so the next slice picks up where this one ended); - stamps file_source = COMPACT on the just-appended DataFileMeta, the same shape MergeTreeRollingWriter uses on the PK side, instead of mutating it back in the rewriter. Rewriter: - seed_seq = files[0].min_sequence_number, matching Java's toCompact.get(0).minSequenceNumber(); - drops the bucket-mode-dependent _initial_max_seq helper. Tests: - new test_output_seq_range_starts_at_input0_min_seq_and_spans_total_rows enforces the Java contract: per-file (max - min + 1 == row_count) and cross-file (no gaps/overlaps, range = [seed, seed+total-1]); - new test_output_files_tagged_compact_source verifies file_source is set by the writer, not the rewriter. Out of scope (still NOTEd in the rewriter docstring): Java's reader path runs through ReadForCompact for schema-evolution + DV awareness; pypaimon still reads parquet directly. Both will be wired up alongside the broader schema-evolution / deletion-vector phases.
|
This pr is for tracking the full of work about compaction. I will initiate and apply for reviews based on the minimum separable pr |
JingsongLi
left a comment
There was a problem hiding this comment.
Review: [python] Add compaction module with Ray distributed executor
This is a substantial, well-structured contribution (5k+ lines) that brings end-to-end compaction to pypaimon through a clean Coordinator -> Task -> Executor -> Driver-commit architecture that mirrors Spark's CompactProcedure. The phased approach (append-only, then PK merge-tree, then Ray distribution) keeps the design coherent. Below are findings grouped by category.
Correctness
-
UniversalCompaction._pick_for_size_ratio_with_countuses floating-point arithmeticif candidate_size * (100.0 + self.size_ratio) / 100.0 < nxt.total_size():
Java uses integer arithmetic (
candidateSize * (100 + sizeRatio) / 100). For large file sizes (petabyte-scale warehouses), floating-point precision loss could cause a different pick decision than Java. Consider:if candidate_size * (100 + self.size_ratio) < nxt.total_size() * 100:
This avoids division entirely and keeps everything in integer domain.
-
RayExecutor: no cleanup of orphan files on partial failure
If one Ray task fails,ray.get(futures)raises the first exception, but other tasks may have already written output files. UnlikeLocalExecutorwhere the rewriter'sabort()cleans up, there is no mechanism to abort the remaining tasks or clean their partial output. Consider wrappingray.getwithray.cancelon the remaining futures in a try/except, or document this as a known limitation that relies on snapshot consistency (orphan files never become visible because the commit never happens). -
CompactTaskclass-level mutable defaults_catalog_loader_options: Optional[Dict[str, str]] = None _table_identifier: Optional[str] = None
These are class variables, not instance variables. While
self._catalog_loader_options = ...inwith_table_loader()correctly creates an instance attribute that shadows the class variable, the pattern is fragile. If a subclass ever reads viacls._catalog_loader_optionsor forgetsself., it would see stale state. Consider initializing these in an__init__or__init_subclass__. -
Levels.non_empty_highest_level()edge case
Returns0when only L0 has files,-1when everything is empty._should_drop_deletedoesunit.output_level >= levels.non_empty_highest_level(). Ifnon_empty_highest_level()returns -1, drop_delete would be True for any output_level. This path is unreachable (no files = no compaction), but a defensive guard or assertion would clarify intent.
Design
-
CommitMessagebreaking change is well-handled
Convertingnew_filesfrom a constructor field to a@propertydelegating todata_increment.new_filesis a clean migration. All internal callers are updated. Consider adding a deprecation note in the class docstring for any external pypaimon users who might constructCommitMessagedirectly. -
CompactRewriteras a marker base class
The docstring explains why there is no abstractrewrite()method (different signatures for append vs merge-tree). This is reasonable, but an alternative would be to make it aProtocolor simply remove the base class entirely, since it provides no contract enforcement. Not blocking, just a style observation. -
Ray worker registry bootstrapping via side-effect imports
from pypaimon.compact.task import append_compact_task as _append_task # noqa: F401 from pypaimon.compact.task import merge_tree_compact_task as _mt_task # noqa: F401
This relies on
ray_executor.pybeing imported in the worker process so the task subclasses register themselves. If Ray ever changes its serialization strategy (e.g., cloudpickle the function without importing the module), the registry would be empty on the worker. A safer pattern is to haveCompactTask.deserialize()perform a lazy import based on thetypefield, eliminating the need for a global registry. That said, the current approach works with Ray's actual behavior today. -
build_kv_file_fieldscentralization -- Good refactoring. Extracting this from_create_key_value_fieldsinto a shared utility prevents schema drift between the read and compact paths.
Minor / Nits
-
_level0_comparetreatsNonecreation_time as smallest -- Java's TreeSet comparator for L0 usesLong.compare(creationTime, ...)which would NPE on null. If Java never stores null creation times this is fine, but worth a comment explaining the divergence. -
MergeTreeRollingWriter._extract_seq_bounds-- Ifdatais empty (0 rows),pc.min(seq).as_py()returnsNone, which would fail downstream. The caller guards withif data.num_rows == 0: returnbut the static method itself is unguarded. -
Test
_make_coordbypasses__init__viaAppendCompactCoordinator.__new__(...)-- This is fine for unit-testing the pure algorithm but is brittle if the constructor ever adds required initialization. Consider a lightweight mock/stub table instead.
Summary
Overall this is a high-quality contribution with solid test coverage (unit, integration, e2e, and Ray). The architecture cleanly separates planning from execution, and the commit protocol correctly mirrors Java's ADD/DELETE manifest entry semantics. The main actionable items are (1) switching the size-ratio comparison to integer arithmetic for Java parity, (2) documenting or addressing the Ray partial-failure orphan-file scenario, and (3) converting class-level loader fields to instance variables in CompactTask.
|
Super big, split this to multiple PRs. |
Summary
Brings Apache Paimon's compaction story to pypaimon end-to-end:
CommitMessagewithcompact_before/compact_after;FileStoreCommitemits ADD + DELETE manifest entries and a newcommit_compact()helper produces snapshots withcommit_kind=COMPACT.DataFileMetagains JSON-friendlyto_dict/from_dictand aCommitMessageSerializerfor cross-process transport.table.new_compact_job(...).execute(), plumbed through aCoordinator → Task → Executor → Driver-commitshape that mirrors SparkCompactProcedure. Ships aLocalExecutorfor in-process / test usage.Levels+UniversalCompaction(size-amp / size-ratio / file-num three-stage decision). New abstractMergeFunction+ factory;DeduplicateMergeFunctionmigrated,PartialUpdate/Aggregate/FirstRowstubbed so configured tables fail loudly with a Phase 6 message.RayExecutorwires the sameCompactJobto Ray. Driver serializes eachCompactTaskto JSON (table + payload + catalog loader spec), workers rebuild theirFileStoreTablevia the catalog and run the rewriter, driver collects messages for one atomic commit.Each phase landed as a separate commit, with a follow-up
*-fixupcommit addressing the review findings inline. Eight commits total — a single PR keeps the design coherent for review while commits stay small enough to walk through.Out of scope (later PRs):
python -m pypaimon.compact.entrypointforray job submitPartialUpdate/Aggregate/FirstRowMergeFunctionbodiesPlan / design doc: `/Users/lcy/.claude/plans/paimon-compaction-java-spark-python-com-cached-cosmos.md` (local).
Test plan