Skip to content

[Tracking] feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366

Draft
schenksj wants to merge 108 commits into
apache:mainfrom
schenksj:contrib-delta-direct
Draft

[Tracking] feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366
schenksj wants to merge 108 commits into
apache:mainfrom
schenksj:contrib-delta-direct

Conversation

@schenksj

@schenksj schenksj commented May 19, 2026

Copy link
Copy Markdown
Contributor

📋 Tracking PR — complete Delta contrib work product (being split for review)

This is the tracking / umbrella PR. It holds the complete native Delta
Lake scan work product on schenksj:contrib-delta-direct, validated end-to-end on
the fork (CI on schenksj#2).
Because the full diff is too large to review as one unit (~27k lines, 120+ files),
it is being split into a sequence of independently-reviewable, independently-mergeable
PRs to main
. This PR stays open as a draft so reviewers can see the whole diff
and track progress; it closes when the final unit merges.

The complete product is frozen on the fork at branch ref/delta-complete and tag
delta-complete-2026-06-10 (never rebased). The split is enabled by the build gate
(-Pcontrib-delta / --features contrib-delta): default builds carry zero Delta
surface, so every intermediate slice is safe to land on main.

Split sequence & status

Legend: 📋 not started · 🔨 in progress · 🔎 in review · ✅ merged

Unit Title Review size Depends on PR Status
A.1 Core SPI for contrib leaf scans (CometScanWithPlanData trait, leaf-scan match, DPP rewrite, reflective injector slot) ~300–400 main 📋
A.2 Build gate + inert Delta wiring (Maven profile, Cargo feature, proto messages, native shim, DeltaIntegration bridge, rule hooks, gate script + CI, stub crate) ~800 + stubs A.1 📋
A.3a Rust: driver-side planning (error/engine/predicate/scan/jni.rs, 54 unit tests) ~2.6k A.2 📋
A.3b Rust: executor-side read path (dv_reader/kernel_scan/planner.rs, 35 unit tests) ~2.4k A.3a 📋
A.4a Scala: claim/decline + reflection (DeltaConf/DeltaReflection/DeltaScanMetadata/CometDeltaScanMarker/RowTrackingAugmentedFileIndex/DeltaScanRule) ~2.3k A.2 (∥ A.3x) 📋
A.4b Scala: execution — end-to-end native reads (CometDeltaNativeScan serde, Native.scala, exec node, DeltaPlanDataInjector + suites) ~2.4k A.3b, A.4a 📋
A.5 CDF native reads (CometDeltaCdfScanExec, CometExecRule CDF hook, CDC suites) ~1.2k A.4b 📋
A.6a Contrib test battery (repro/audit suites, delta_contrib_test.yml, check-suites.py) ~4k A.4b/A.5 + all extractions 📋
A.6b Delta own-suite regression harness (dev/diffs/*, run-regression.sh, run-test.sh, workflow) ~2.1k A.4b/A.5 + all extractions 📋
A.7 Docs (contrib/delta/docs/*, user-guide pages) ~3k prose A.4b 📋
A.8 Follow-up: FAILED_READ_FILE parity for Delta ~150 #4536 merged 📋
extractions (#4524…#4588, %-path) ─────────────────────────────────┐ (gate A.6a/A.6b only)
                                                                    ▼
A.1 ─► A.2 ─┬─► A.3a ─► A.3b ─┐                           A.6a   A.6b   A.8
            └─► A.4a ─────────┴─► A.4b ─► A.5 ─► A.7  ─►  (gated on extractions)

Core-change PRs extracted from this work (land independently)

Small, self-contained core/shuffle fixes carved out of the monolith so the core touchpoints
are reviewed on their own rather than buried in the Delta diff. The open ones are not
front-of-cycle dependencies — they only gate the final test/regression units (A.6a/A.6b).

PR Title Touches Status
#4523 correct GetStructField null handling for null parent struct core ✅ merged
#4531 rebalance deep AND/OR chains (protobuf recursion limit) core ✅ merged
#4524 shuffle: tolerate non-UTF-8 bytes in get_string (lossy decode) shuffle 🔎 open · awaiting review
#4525 decline native V1 scans on object_store-unsupported FS schemes core 🔎 open · awaiting review
#4532 materialize ConstantColumnVector on serialize/export paths core 🔎 open · awaiting review
#4533 decline CreateArray with struct-nullability-divergent children core 🔎 open · awaiting review (mergeable)
#4535 O(1) PlanDataInjector lookup by op kind core 🔎 open · awaiting review
#4536 surface native parquet read failures as FAILED_READ_FILE core 🔎 open · awaiting review (mergeable); gates A.8
#4588 rebalance associative bitwise/Add/Multiply chains (protobuf recursion) core 🔎 open · awaiting review
%-path read local files whose path contains a literal % / spaces core ⏸️ deferred into A.6a — verified a no-op on current main (core read test passes with and without it; object_store 0.13.2's from_url_path already handles %/spaces). Folds in only if a Delta %-suite goes red without it

Complete work product — technical reference (the full monolith on this branch)

Briefing

This PR lands a native Delta Lake scan for Comet. It supersedes #3932 — the
SPI/registry design discussed there was rejected in favor of the Iceberg-style
contrib pattern this PR uses (typed proto variant + ~40 lines of feature-gated
core touchpoints + standalone contrib/delta/ tree). Default builds are
entirely unaware of this code: no SPI lookups, no ServiceLoader scans, no
contrib surface at runtime. Only when the -Pcontrib-delta Maven profile (and
parallel contrib-delta Cargo feature) is activated do the contrib classes
land on the classpath and the reflection bridge resolve.

The integration reads Delta metadata via delta-kernel-rs on the driver,
encodes the resolved file list (with column mappings, DV info, partition
values, row-tracking baseRowId) into a typed OpStruct::DeltaScan proto, and
executes via DataFusion's parquet reader on each executor.

Status & recent correctness fixes

All Delta 4.1 own-suite regression failure families triaged this round are
fixed and each has a lightweight contrib regression guard (a full Delta
own-suite re-run is in progress to confirm end-to-end); deliberate tradeoffs and any remaining
limitations are tracked in contrib/delta/docs/08-known-limitations.md
for post-merge issues.

Area Fix Scope
DPP on partitioned tables (MERGE/join) no longer crashes (CometSubqueryAdaptiveBroadcastExec); prunes to required partitions even inside a native block contrib
Time travel reads the pinned scannedSnapshot, not head contrib
Row tracking reads MATERIALIZED _row-id-col-* / _row-commit-version-col-* from parquet (stable IDs across rewrites) contrib
DV / row_index + pushed data filter data-filter pushdown suppressed when a physical-position synthetic is emitted (was wrong row IDs) contrib
CM-mode + shuffle logicalLink propagated so AQE doesn't assert contrib
Deep boolean predicates balanced And/Or serialization (protobuf recursion limit) core Comet
Corrupted/short file reads wrapped as FAILED_READ_FILE.NO_HINT core Comet
HDFS / libhdfs native scans FS-scheme allowlist honors fs.comet.libhdfs.schemes core Comet

Reviewers: the bottom three rows touch CORE Comet code (not just the
contrib subtree) and affect all Comet queries/scans — see "Core touchpoints"
in Review strategy.

Coverage

Supported, fully native (broad):

  • Deletion vectors — kernel resolves the bitmap on the driver, DeltaDvFilterExec filters rows on executors. DV filter is chained AFTER synthetic emission (so row_index reflects original file positions) when both are needed
  • Column mapping both name AND id mode. name rewrites logical→physical names in the planner; id translates Delta's delta.columnMapping.id to parquet's PARQUET:field_id on every StructField (including nested struct/array/map) so the parquet reader matches by ID
  • Type widening — DataFusion's parquet schema adapter handles the read-time cast
  • Row tracking — supported in three modes:
    • Materialised: rewritten to read the physical _row-id-col-<uuid> column from parquet
    • Unmaterialised: row_id = base_row_id + physical_row_index per file, all synthesised natively — base_row_id is emitted as a per-file Int64 constant from AddFile.baseRowId and _row-id-col-<uuid> is emitted as all-NULL so Delta's GenerateRowIDs Project falls back to the computed expression
    • Mixed within one query: the wrapped exec's emit order matches scan.requiredSchema ordinal-by-ordinal so the upstream Filter(__delta_internal_is_row_deleted = 0) binds correctly
  • Native synthesis of Delta's internal columns __delta_internal_row_index / __delta_internal_is_row_deleted for UPDATE/DELETE/MERGE flows. is_row_deleted is emitted as Int8 (matching Delta's ByteType) to avoid DataFusion's interval-propagator panicking on Int32 vs Int8 mismatches in stats pushdown
  • Native synthesis of Spark _metadata.* virtual columns (file_path / file_name / file_size / file_block_start / file_block_length / file_modification_time) detected from scan.output even when not in scan.requiredSchema
  • Output column reorder when synthetics aren't already a canonical suffix — proto carries final_output_indices, native dispatcher wraps with a ProjectionExec so downstream operators that bind by ordinal don't silently misread one synthetic as another
  • General-purpose Parquet field-ID matching when spark.sql.parquet.fieldId.read.enabled=true (same wiring as CM-id)
  • Partition pruning, including DPP (resolved after AQE broadcasts are ready)
  • Predicate pushdown into parquet (with synthetic-column filters excluded — those are handled by the upstream Filter after synthetic emission)
  • Multi-task-per-partition packing for cluster utilisation
  • input_file_name() and friends — one-task-per-partition + a per-task InputFileBlockHolder hook in CometExecRDD + CometDeltaNativeScanExec plumbs per-partition file paths through to the RDD
  • FAILED_READ_FILE.NO_HINT exception wrapping with file path
  • Encryption that routes through the shared CometParquetUtils config check
  • _delta_log, _change_data, and _commits parquet reads via the same scan
  • S3A Hadoop credential chain (SimpleAWS / TemporaryAWS / AssumedRole / IAMInstance) resolved Scala-side at planning time so kernel log replay authenticates under the same chain as data reads. Reflective lookup against S3AUtils.createAWSCredentialProviderList; cached Method handles
  • checkLatestSchemaOnRead=false — our path is pinned to a single snapshot version via extractSnapshotVersion(relation) so the Delta-side at-read check doesn't apply to us
  • Time travel (versionAsOf / timestampAsOf) and snapshot reads — files are resolved from the snapshot the scan was prepared against (preparedScan.scannedSnapshot), exactly what vanilla Spark+Delta reads; re-querying filesForScan picks up the freshest DV descriptors that snapshot carries
  • All public S3 / Azure / GCS / OSS schemes; local file://

Falls back to Spark's reader (with withInfo reason surfaced in explain-fallback):

Correctness fallbacks — load-bearing, do not remove:

  • DV materialisation failure (DV file missing / unsupported version / read error) — kernel can't give us the deleted-row indexes, so we can't filter
  • Reflective AddFile extraction failure — no file list, nothing to scan
  • Kernel-rs log-replay error — same
  • Phase 6 reader-feature gate — currently an empty list; defense-in-depth for any future kernel-rs return of unsupported feature names

Shared Comet limits (apply to any native scan, not Delta-specific) — each is its own per-case work in core:

  • Encryption with unsupported KMS config — shared CometParquetUtils.isEncryptionConfigSupported
  • Custom Hadoop FS schemes (fake:// etc.) — object_store has no Hadoop FS plugin layer; would need a bridge
  • CometScanTypeChecker rejections (ShortType under default config, string collation, variant struct) — each is a Comet-core feature gap, not a Delta-contrib problem. Variant in particular: arrow-rs has parquet-variant crates but Comet hasn't integrated them yet

External:

  • TahoeLogFileIndexWithCloudFetch — Databricks-proprietary file index, not in OSS Delta. Defensive guard for DBR users only

Workaround tracked upstream:

  • CreateArray with mismatched element types — caller-side decline for apache/datafusion#22366. Removable once upstream lands

User off-switches:

  • spark.comet.scan.deltaNative.enabled=false, spark.comet.exec.enabled=false

Shape

Layer Path Lives in
Typed proto variant delta_scan = 117 native/proto/src/proto/operator.proto Core
Reflection bridge spark/.../comet/rules/DeltaIntegration.scala Core
Scan-rule arm spark/.../comet/rules/CometScanRule.scala Core (one block)
Exec-rule arm spark/.../comet/rules/CometExecRule.scala Core (one case)
PlanDataInjector.opStructCase spark/.../sql/comet/operators.scala Core (one method)
Per-partition file paths CometExecRDD, CometNativeScanExec, CometExecIterator, ShimSparkErrorConverter Core (load-bearing for input_file_name() and FAILED_READ_FILE.NO_HINT wrapping in any native scan)
Native dispatcher arm (DV / synthetic / reorder / CM-id) contrib/delta/native/src/core_glue.rs (compiled into core via #[path]; see "Why the dispatcher file lives in contrib but compiles in core" below) Compiled into core, file homed in contrib
Delta scan rule, exec, serde contrib/delta/src/main/scala/... Contrib
Kernel-rs engine + cache, scan, DV filter, synthetic columns, planner contrib/delta/native/src/*.rs Contrib
Maven profile, Cargo feature spark/pom.xml, contrib/delta/native/Cargo.toml, native/core/Cargo.toml Build
Build-gate verification dev/verify-contrib-delta-gate.sh Build
Regression harness contrib/delta/dev/run-regression.sh + dev/diffs/delta/4.1.0.diff Contrib

Key design decisions

Iceberg-style contrib, not SPI. Static helper objects with stable names
(DeltaScanRule.transformV1IfDelta, CometDeltaNativeScan.MODULE$); a single
reflection bridge in core resolves and caches Method handles once per JVM.
No registry, no ServiceLoader, no extension points beyond what core already
exposes. The contrib is just classpath-or-not.

Typed proto, not an envelope. OpStruct::DeltaScan is a first-class
variant. Avoids the ContribOp { kind, payload } envelope discussed in #3932;
PlanDataInjector keys by OpStructCase for O(1) dispatch.

Split-mode plan serialization. CometDeltaNativeScan.convert emits a
DeltaScan proto with the common block only (schemas, table root, filters);
each partition's tasks ride in a per-partition byte array via
PlanDataInjector at execution time. Avoids closure-capturing every file in
every partition.

Native synthetic-column synthesis. DeltaSyntheticColumnsExec (in
contrib/delta/native/src/synthetic_columns.rs) emits the standard four
Delta internals (__delta_internal_row_index as Int64, __delta_internal_is_row_deleted
as Int8, row_id, row_commit_version) PLUS Spark _metadata.* virtual
columns PLUS row-tracking-specific synthetics (base_row_id per-file
constant from AddFile.baseRowId, _row-id-col-<uuid> / _row-commit-version-col-<uuid>
as NULL-filled). When emit is on, each file gets its own FileGroup so the
per-file row offset / baseRowId arithmetic is well-defined.

Synthetic-suffix ordering matters. The wrapped exec's output ordering is
checked against scan.requiredSchema AND the canonical native emit order. If
the synthetic block isn't already in canonical order at the right ordinals,
the proto carries final_output_indices and the native dispatcher wraps with
a ProjectionExec to reorder. Without this, an upstream
Filter(__delta_internal_is_row_deleted = 0) binding by ordinal would silently
misread row_index as is_row_deleted (caught and fixed mid-PR; the
DV-after-DELETE test bisected the bug to a one-ordinal swap).

DV filter chained after synthetic emission, not mutually exclusive. When
both synthetics and a DV are present, we previously chose one wrapper or the
other — which meant any read that surfaced _tmp_metadata_row_index got
NO DV filtering applied. The wrappers are now chained:
parquet → DeltaSyntheticColumnsExecDeltaDvFilterExec (skipped when
emit_is_row_deleted is on so UPDATE/DELETE/MERGE writers still see every row).

CM-name rename before synthetics. Synthetic columns have fixed names
(never CM-renamed) and are appended AFTER the parquet read; the rename
projection has to apply to the parquet output BEFORE the append so the
length-match check works correctly.

Spark _metadata.* driven from scan.output, not just scan.requiredSchema.
Delta's PreprocessTableWithDVs strategy can append _metadata.file_path to
scan.output without putting it in scan.requiredSchema. The synthetic
exec detects these from scan.output so the wrapped exec's output schema
includes them and downstream attribute resolution works.

is_row_deleted is Int8, not Int32. Delta declares the column as
ByteType. Emitting Int32 trips DataFusion's interval propagator with
Only intervals with the same data type are intersectable, lhs:Int32, rhs:Int8
whenever the upstream Filter pushes stats. Caught by the CM + DV combined
coverage test.

InputFileBlockHolder thread-local hook in CometExecRDD.compute.
Comet's native scans bypass Spark's FileScanRDD, so the standard
input_file_name() thread-local would otherwise be empty for any native
scan (not just Delta). One small but load-bearing core change that fixes
both Delta's UPDATE/DELETE/MERGE flows AND the FAILED_READ_FILE.NO_HINT
error wrapping. CometDeltaNativeScanExec plumbs its per-partition file
paths through to CometExecRDD so InputFileBlockHolder.set(path) fires
correctly.

Read from the prepared snapshot (not head) for PreparedDeltaFileIndex.
preparedScan.files caches the AddFile list at FileIndex construction time.
DeltaReflection.preparedSnapshotFiles re-queries
preparedScan.scannedSnapshot.filesForScan(filters, false).files — the snapshot
the scan was prepared against, which is exactly what vanilla reads — to pick up
the freshest DV descriptors that snapshot carries, falling back to the cached
preparedScan.files if reflection fails. An earlier revision refreshed to head
via deltaLog.update(), but that returned current data for a time-travel query
(versionAsOf=0 yielded head's rows) and diverged from vanilla on the
consecutive-DELETE / DeltaLog-cache-staleness case; reading scannedSnapshot
matches vanilla in every case.

Engine cache by (scheme, authority, DeltaStorageConfig). kernel-rs's
DefaultEngine<TokioBackgroundExecutor> spawns one OS thread per executor.
Without caching, hundreds of scans/min was leaking threads faster than tokio
reaped them, tripping pthread_create EAGAIN ~2h into regression. The cache
bounds live thread count by table-storage diversity instead of by request
count.

DV filter ordering safeguards. DeltaDvFilterExec tracks
current_row_offset across batches, which assumes physical-order input.
Overrides maintains_input_order() = [true] and
benefits_from_input_partitioning() = [false] so any future optimizer that
wants to insert a RepartitionExec is forced to bail rather than silently
re-order rows.

One new core trait method. PlanDataInjector.opStructCase is the only
core trait addition. It keys the existing injector map for O(1) dispatch.

Why the dispatcher file lives in contrib but compiles in core

contrib/delta/native/src/core_glue.rs is physically co-located with the
rest of the Delta integration but is compiled as a module of the core crate
via #[cfg(feature = "contrib-delta")] #[path = "../../../../contrib/delta/native/src/core_glue.rs"] mod contrib_delta_scan;. The reason: this file implements
PhysicalPlanner::plan_delta_scan and reaches into core's pub(crate)
helpers (create_expr, init_datasource_exec,
prepare_object_store_with_configs). A true cross-crate impl block is
forbidden by Rust, and a contrib → core cargo dependency would create a
cycle with core's optional contrib-delta dep on contrib, so #[path] is
the available tool that lets the FILE's home be with Delta while its
COMPILATION unit stays in core. Build gate (cfg(feature = "contrib-delta"))
is preserved exactly — default builds carry zero Delta surface (see
"Validation" below).

Audit of remaining Delta references in core

After moving the dispatcher body into contrib/, every Delta reference left
in native/core/src/ is either feature-gated or a structural one-line arm
in an exhaustive match OpStruct:

File Reference Why it's there
planner.rs:33-35 mod contrib_delta_scan; The #[path]-relocated module declaration. #[cfg(feature = "contrib-delta")].
planner.rs:1512-1527 OpStruct::DeltaScan dispatcher arm Both halves feature-gated. Default-build half returns "Received a DeltaScan operator but core was built without the contrib-delta Cargo feature" so a misconfigured driver gets a clear error.
jni_api.rs:op_name OpStruct::DeltaScan(_) => "DeltaScan" Exhaustive enum match; returns a string for tracing. No contrib logic.
planner/operator_registry.rs:to_operator_type OpStruct::DeltaScan(_) => None Exhaustive enum match; signals "not in OperatorType enum". No contrib logic.

OpStruct is a proto-generated enum (in datafusion-comet-proto); Rust
requires exhaustive matches everywhere it appears. Keeping the structural
arms un-gated is intentional — it lets default builds identify a misrouted
DeltaScan operator by name in the error message.

Validation

The build gate is enforced by dev/verify-contrib-delta-gate.sh, which runs
6 independent checks across 3 layers and exits non-zero on the first
failure. Designed to be wired into CI.

# Requires a JDK ≥17 on PATH (and as JAVA_HOME for the Maven sub-runs).
dev/verify-contrib-delta-gate.sh

What the script asserts:

Layer Check
Cargo cargo tree -p datafusion-comet --no-default-features has zero comet-contrib-delta / delta_kernel entries
Cargo cargo tree -p datafusion-comet --features contrib-delta correctly pulls both (catches accidental off)
Maven mvn -Pspark-4.1 dependency:list has zero io.delta:* deps
Maven mvn -Pspark-4.1,contrib-delta dependency:list correctly pulls io.delta:delta-spark
Maven Default test-compile produces no org/apache/comet/contrib/**.class and no CometDeltaNativeScan* / DeltaScanRule* / DeltaReflection* classes (only the always-present DeltaIntegration reflection bridge)
Native Default libcomet.dylib is meaningfully smaller (~57 MB delta on macOS arm64 debug build) AND has zero comet_contrib_delta / delta_kernel / DeltaDvFilter* / DeltaSynthetic* external symbols

Current run on this branch: all 6 PASS.

Running the contrib Scala test suite

49 tests across four suites (24 coverage + 25 feature/native/column-mapping):

# JDK 17, contrib + spark-4.1 profiles
JAVA_HOME=$(/usr/libexec/java_home -v 17) \
  mvn -Pspark-4.1,contrib-delta -pl spark -am test \
    -Dsuites='org.apache.comet.contrib.delta.CometDeltaFeaturesSuite,
              org.apache.comet.contrib.delta.CometDeltaNativeSuite,
              org.apache.comet.contrib.delta.CometDeltaColumnMappingSuite,
              org.apache.comet.contrib.delta.CometDeltaCoverageSuite' \
    -Djava.version=17 -Dmaven.compiler.source=17 -Dmaven.compiler.target=17

Current run: 49/49 pass.

CometDeltaCoverageSuite is the accelerator-coverage matrix — each test
asserts BOTH (a) the executed plan contains CometDeltaNativeScanExec
(actually engaged, no silent fall-back) AND (b) the rows match vanilla
Spark+Delta exactly. Covers: SELECT */column-prune/arithmetic/LIMIT/DISTINCT,
filters (eq/neq/IN/IS NULL/BETWEEN/LIKE/AND/OR/NOT), ORDER BY, aggregates
(count/sum/avg/min/max/GROUP BY/HAVING/COUNT DISTINCT), joins
(self/inner/left/leftsemi/leftanti), set ops (UNION/INTERSECT/EXCEPT),
window functions, scalar + IN subqueries, CTEs, partition-pruned reads,
column-mapping reads, DV-bearing reads, nested data (struct/array/map).

Running the contrib Rust test suite

cargo test -p comet-contrib-delta
# Plus the integration tests that exercise plan_delta_scan against a
# real parquet + _delta_log tree:
cargo test -p datafusion-comet --features contrib-delta

What the in-PR validation looks like end-to-end

  1. dev/verify-contrib-delta-gate.sh — proves default builds carry zero Delta surface.
  2. Contrib Scala suite (4 suites, 49 tests) — proves accelerator engages and matches vanilla across the SQL surface area.
  3. Contrib Rust unit + integration tests — proves the kernel-rs engine cache, DV filter, synthetic columns, predicate, and CM-rewriter behave correctly in isolation.
  4. Full Delta 4.1 regression (contrib/delta/dev/run-regression.sh against dev/diffs/delta/4.1.0.diff) — proves we don't regress anything in Delta's own test suite.

Review strategy

Suggested order with different bars:

  1. Core touchpoints (~10 minutes, high bar). New core surface area is
    small but ships in default builds:

    • native/proto/src/proto/operator.proto (one OpStruct variant + DeltaScan messages)
    • The dispatcher arm in native/core/src/execution/planner.rs:1512-1527 (the actual body lives in contrib/delta/native/src/core_glue.rs; see "Why the dispatcher file lives in contrib but compiles in core" above)
    • spark/.../comet/rules/DeltaIntegration.scala (whole file — reflection bridge)
    • The new arm in CometScanRule.transformV1Scan and the new case in CometExecRule.transform
    • CometExecRDD + CometExecIterator + CometNativeScanExec diffs (per-partition file paths, InputFileBlockHolder hook)
    • ShimSparkErrorConverter.wrapNativeParquetError
    • spark/.../comet/serde/arrays.scala (CreateArray decline — references the upstream issue)
    • spark/.../comet/serde/QueryPlanSerde.scala + predicates.scalaCometAnd/CometOr now serialize a BALANCED And/Or tree (createBalancedBinaryExpr/flattenAssociative) instead of a left-deep one, so a deep boolean predicate doesn't overflow protobuf's 100-level recursion limit when the plan is re-parsed (findShuffleScanIndices). Affects ALL Comet queries; Comet's And/Or is vectorized (non-short-circuiting) so rebalancing is semantically identical
    • spark/.../comet/CometExecIterator.scalaisFileReadError also wraps object-store read failures ("Requested range was invalid", object-not-found) into FAILED_READ_FILE.NO_HINT, not just Parquet error: (matches Spark on corrupted/truncated files). Affects all native scans
    • spark/.../comet/rules/CometScanRule.scala — V1 native-scan filesystem-scheme allowlist (declines schemes object_store can't read); honors spark.hadoop.fs.comet.libhdfs.schemes so HDFS/custom-libhdfs scans are NOT declined
    • common/.../comet/util/Utils.scala + comet/vector/NativeUtil.scala — materialize Spark ConstantColumnVector to Arrow FieldVector (incl. TimestampNTZType); previously a hard error
    • spark/.../sql/comet/operators.scalaCometScanWithPlanData gains optional dynamicPruningFilters/withDynamicPruningFilters hooks (default no-op) so a scan can have its DPP rewrite installed in place; base scans unaffected
  2. Contrib Scala (~30 minutes, contrib bar):

    • DeltaScanRule.scala — entry point, gates documented under "Coverage" above
    • CometDeltaNativeScan.scala — split serde, kernel-rs call, task prune/split/pack, column-mapping fixup, synthetic-column detection + suffix reorder, CM-id field-ID translator, S3A credential chain resolution
    • CometDeltaNativeScanExec.scala — exec wrapper, DPP partition pruning, metric reporting, per-partition file paths plumbed to InputFileBlockHolder
    • DeltaPlanDataInjector.scala, DeltaInputFileBlockHolder.scala — small
    • DeltaReflection.scala — reflection bridge into Delta internals (incl. refreshedSnapshotFiles for snapshot staleness)
    • RowTrackingAugmentedFileIndex.scala — small
    • CometDeltaCoverageSuite.scala — the accelerator-coverage matrix
  3. Contrib Rust (~30 minutes, contrib bar):

    • contrib/delta/native/src/engine.rs — kernel-rs engine + cache
    • contrib/delta/native/src/scan.rsplan_delta_scan, DV row-index resolution, extract_row_tracking_for_selected (reads fileConstantValues from raw RecordBatch)
    • contrib/delta/native/src/synthetic_columns.rsDeltaSyntheticColumnsExec (emits row_index Int64 + is_row_deleted Int8 + row_id + row_commit_version + Spark _metadata.* + row-tracking synthetics; per-batch row offset counter; DV-walk for is_row_deleted)
    • contrib/delta/native/src/dv_filter.rsDeltaDvFilterExec (chained after synthetic emission when DV+synthetics both needed)
    • contrib/delta/native/src/planner.rsbuild_delta_partitioned_files, SessionTimezone, ColumnMappingFilterRewriter
    • contrib/delta/native/src/core_glue.rs — the in-core dispatcher body (homed here, compiled into core via #[path])
    • contrib/delta/native/src/jni.rsplanDeltaScan JNI entry
  4. Build / regression infra (~5 minutes):

    • spark/pom.xml -Pcontrib-delta profile
    • native/core/Cargo.toml contrib-delta feature
    • contrib/delta/native/Cargo.toml (standalone, not in workspace — intentional to avoid arrow-57 / arrow-58 cross-contamination)
    • dev/verify-contrib-delta-gate.sh — build-gate enforcement
    • contrib/delta/dev/run-regression.sh + dev/diffs/delta/4.1.0.diff

git log --oneline main..HEAD is also a useful walk — commits are labeled by
phase (P7a..P7z) and each commit message documents the specific concern it
addresses. Two prior comprehensive reviews are reflected in commits 43768c1c
(first review) and 2d13a147 (review of the gate-unblock work).

Follow-ups (not in this PR)

  • Variant type native support — arrow-rs has parquet-variant crates but Comet hasn't integrated them; would unblock CometScanTypeChecker.isVariantStruct for all native scans
  • String collation native support in expression evaluators
  • ProjectionExec column-mapping rename pushdown into ParquetSource's schema adapter (perf item from in-PR sweep)
  • Engine cache TTL / credential-rotation eviction (fine for validation; would block long-lived production drivers using STS)
  • Filter-rewriter linear field lookup → name→index HashMap (perf audit item; per-filter not per-batch)
  • Extract a ContribPlannerCtx trait in a small shared crate so the core_glue.rs body can compile in the contrib crate proper (eliminates the #[path] indirection at the cost of a new crate). Tracked as a separate task.

Test plan

  • Default builds (no -Pcontrib-delta): mvn -pl spark -am test-compile green
  • -Pcontrib-delta builds green (Maven + Cargo)
  • dev/verify-contrib-delta-gate.sh passes all 6 build-gate checks
  • Contrib Scala test suite: 49/49 pass across CometDeltaFeaturesSuite / CometDeltaNativeSuite / CometDeltaColumnMappingSuite / CometDeltaCoverageSuite
  • Contrib Rust unit tests pass
  • Two comprehensive code reviews completed; both rounds of findings addressed
  • Targeted retest of every cluster surfaced during validation, all pass:
    • DescribeDeltaHistorySuite "replaceWhere on data column" — 8/8
    • DeltaTableHadoopOptionsSuite "dropFeatureSupport - with filesystem options" — 1/1
    • SnapshotManagementSuite "should not recover when the current checkpoint is broken..." — 2/2
    • DeltaColumnMappingSuite "physical data and partition schema" + "write/merge df to table" (CM-id + CM-name) — 2/2
  • Engine-cache fix verified end-to-end (no more pthread_create EAGAIN)
  • Full Delta 4.1 regression with all gate-unblock commits in place
  • CI: default + -Pcontrib-delta build paths exercised + dev/verify-contrib-delta-gate.sh wired

Upstream issue

apache/datafusion#22366
filed for make_array element-type strictness. The CometCreateArray
decline in this PR is a caller-side workaround until upstream relaxes.

Reviewer orientation: delta-only vs inherited

This PR is stacked on the 8 extracted core PRs (#4523#4536), so most of the non-Delta surface is reviewed there. The delta-only code to review here is:

  • Everything under contrib/delta/** — the kernel-planning JNI bridge, the native DV-sweep / synthetic-columns / DV-reader execs, the Scala CometDeltaNativeScan / DeltaScanRule / DeltaReflection, the regression harness, and the contrib test suites.
  • A handful of delta-only core hooks: rules/DeltaIntegration.scala, the isDeltaScanMarker arm in CometExecRule.scala, CometDeltaScanMarker.scala, DeltaPlanDataInjector.scala, CometPlanAdaptiveDynamicPruningFilters.scala, the delta remainders in CometScanRule.scala / operators.scala, and the native delta arm in native/core/src/execution/planner/delta_scan.rs (plus jni_api.rs, operator.proto, operator_registry.rs, and Cargo/pom wiring).

Everything else (native/spark-expr, native/shuffle, the non-Delta serde, get_struct_field.rs, etc.) is inherited from the extracted PRs and is reviewed in those.

The highest-risk delta-only area is the column-mapping physicalisation in CometDeltaNativeScan.scala (logical→physical nested names and delta.columnMapping.idparquet.field.id), historically the source of nested-type data-corruption bugs.

Known limitation: useMetadataRowIndex=false slow path

The base DeletionVectorsSuite runs with useMetadataRowIndex=false. In that mode the contrib intentionally declines the native DV read and falls back to a vanilla Delta scan. This is correct but slower, a deliberate correctness-over-speed choice (the native row-index materialisation that mode needs isn't supported), not a defect. On very large tables it is a real perf cliff. It is what times out the base 2B-row DV stress test under parallel-test load.

🤖 Generated with Claude Code

schenksj added a commit to schenksj/datafusion-comet that referenced this pull request May 22, 2026
…e hardening)

Addresses the 8 findings from the independent code review (see PR apache#4366
comments). 49/49 contrib tests still pass on BOTH Spark 4.1 + Delta 4.1.0
and Spark 3.5 + Delta 3.3.2 after these changes.

Critical:

  1. native/shuffle/src/spark_unsafe/unsafe_object.rs: replace
     `from_utf8_unchecked` with `from_utf8_lossy` returning `Cow<'_, str>`.
     The previous version constructed a `&str` from arbitrary bytes
     (Spark's binary-cast-to-string case, e.g. Delta's Z-Order
     `interleave_bits(...).cast(StringType)`) -- the Rust reference defines
     that as UB even when the bytes only get copied downstream, because
     downstream Arrow ops internally use `str::from_utf8_unchecked` on the
     StringArray buffer and would propagate the UB. `from_utf8_lossy` is
     well-defined: zero-cost borrow for valid UTF-8, allocates a String
     with U+FFFD replacements for invalid bytes (only fires on the
     binary-cast case, which Spark never displays as text anyway). All
     call sites pass to `StringBuilder::append_value` which takes
     `AsRef<str>`; `Cow<str>`'s `AsRef<str>` impl makes them work
     transparently. No call-site changes.

  2. DeltaIntegration.scala: narrow the `case _: Exception => None`
     swallow in `transformV1IfDelta` to ONLY catch true reflection
     binding failures (`NoSuchMethodException`/`NoSuchFieldException`/
     `IllegalAccessException`) and invocation errors
     (`IllegalAccessException`/`IllegalArgumentException`). An
     `InvocationTargetException` -- the contrib's transform actually
     threw -- now log-warnings and declines instead of silently falling
     back to vanilla. Without this, kernel-rs IO errors, CCE on a Delta
     version bump, NPE in the CM-id translator etc. would silently
     decline and the user would never know. Same narrowing applied to
     `scanHandler` and `DeltaPlanDataInjector` lookup (operators.scala).

Should-fix:

  3. CometExecRDD.compute: don't set InputFileBlockHolder when a
     partition has multiple files. Previous code took
     `partition.filePaths.head` always, which would silently report the
     first file's path for every row when a contrib accidentally batched
     multiple files in one partition. (Tried `require(length == 1)`
     first; that's too strict because partitioned reads legitimately have
     multi-file partitions but don't query `input_file_name()`. Skipping
     the hook on multi-file partitions preserves correctness for
     `input_file_name()` callers -- which MUST one-task-per-partition
     anyway -- without false-positive failing legitimate partitioned
     reads.)

  4. engine.rs: LRU-bound the engine cache at MAX_CACHE_ENTRIES=32. The
     cache key included `DeltaStorageConfig` which contains
     `aws_session_token`; long-running drivers with rotating STS/IRSA
     credentials would grow one entry per rotation and LEAK one
     `TokioBackgroundExecutor` thread per stale entry. With LRU eviction,
     `Arc<DeltaEngine>` drops on eviction, `DefaultEngine` drops its
     `TokioBackgroundExecutor`, the OS thread joins, thread count
     stabilizes. Test `get_or_create_engine_evicts_lru_when_full`
     verifies the bound + eviction order.

Nits:

  5. planner.rs: error message for the "DeltaScan in default build" case
     now mentions BOTH `-Pcontrib-delta` (Maven) and `--features
     contrib-delta` (Cargo) -- previously mentioned only the Cargo flag.

  6. dev/verify-contrib-delta-gate.sh: also assert the contrib-enabled
     libcomet has >0 Delta-related external symbols. Without this, a
     future Rust toolchain change that mangles symbol names differently
     would silently turn the default-build symbol check into a no-op
     while still passing -- the gate would lie about being enforced.
     Asserting both "default has 0" AND "contrib has >0" catches grep
     pattern drift.

Build infrastructure:

  7. pom.xml + spark/pom.xml: move `<delta.version>` default to the
     parent POM's top-level properties. Per-Spark-profile `delta.version`
     overrides cleanly (spark-3.5 -> 3.3.2, spark-4.1 -> 4.1.0), and
     spotless-style invocations without a Spark profile still resolve
     the property. The previous arrangement (default in `contrib-delta`
     profile) had Spark-profile overrides silently lose to the
     contrib-delta default because of POM profile-document-order property
     precedence.

  8. Make `PlanDataInjector` and `DeltaIntegration` extend
     `org.apache.spark.internal.Logging` so the new `logWarning` calls
     compile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
schenksj and others added 26 commits May 29, 2026 20:07
A field of a NULL struct must be NULL (Spark semantics). Arrow stores a
StructArray's child arrays with their own validity, INDEPENDENT of the parent
struct's null buffer, so the raw child value at a row where the struct itself is
null can be non-null (e.g. parquet files where a logically-null struct column
still carries a populated child buffer). GetStructField.evaluate returned the
child column verbatim, so isnotnull(struct.field) wrongly evaluated TRUE for a
null struct.

Fix: union the parent struct's null mask into the extracted child (null where the
struct is null OR the child is null). Adds a standalone unit test that fails
without the fix and passes with it.

Closes apache#4432

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Spark's UnsafeRow.getUTF8String performs no UTF-8 validation, and
cast(BinaryType -> StringType) is a zero-copy reinterpret, so a StringType
column can legitimately hold arbitrary non-UTF-8 bytes. get_string decoded with
from_utf8(..).unwrap(), which panics on such rows even though Spark treats them
as opaque.

Use from_utf8_lossy (returning Cow<str>): a zero-cost borrow for valid UTF-8 and
a String with U+FFFD replacements otherwise -- defined behavior, no UB. Avoids
from_utf8_unchecked, which would construct a &str from arbitrary bytes (UB) and
propagate into downstream Arrow ops. Adds a standalone unit test that panics
without the fix and passes with it.

Closes apache#4521

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…chemes

Comet's native readers go through object_store, which only understands a fixed set
of URL schemes. A custom Hadoop FileSystem (e.g. registered via
spark.hadoop.fs.<scheme>.impl) crashes the native reader at execution with
"Generic URL error: Unable to recognise URL", with no graceful recovery. Decline
such scans at planning time so Spark's Hadoop-FS-aware reader handles them.

Whether object_store recognizes a scheme is answered by the native layer itself
(NativeBase.isObjectStoreSchemeSupported, backed by object_store's
ObjectStoreScheme::parse -- the same path prepare_object_store_with_configs uses)
rather than a hardcoded list, so the planner can't drift from object_store's actual
support. The user's libhdfs scheme config (spark.hadoop.fs.comet.libhdfs.schemes) is
unioned in on the JVM side; results are cached per scheme; if native can't be
consulted the scheme is assumed supported rather than over-restricting.

Adds CometScanSchemeFallbackSuite, which asserts a `fake://` scan falls back to
Spark; it fails without the gate (Comet claims the scan) and passes with it.

Closes apache#4520

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
A left-deep chain of N associative boolean operands serializes to a proto
nested N levels deep. With N > protobuf's default recursion limit (100), the
message overflows when the serialized plan is re-parsed -- on the JVM via
Operator.parseFrom (findShuffleScanIndices / explain) and in the Rust prost
decoder -- failing an otherwise-supported query.

Comet evaluates AND/OR vectorially (both sides always evaluated, no row-level
short-circuit), so the chains are fully associative. Flatten each chain and
rebuild it as a balanced O(log n) tree before serialization; this is
semantically identical and only changes the proto's shape.

Adds QueryPlanSerde.flattenAssociative + createBalancedBinaryExpr and routes
CometAnd / CometOr through them.

Closes apache#4526

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Spark wraps file-source partition columns and other per-batch constants in
ConstantColumnVector. When such a batch reaches Comet's serialization path
(Utils.getBatchFieldVectors, used by broadcast/shuffle) or FFI export path
(NativeUtil.exportBatch), it was rejected with "Comet execution only takes
Arrow Arrays".

Materialize the constant into a fresh Arrow FieldVector (the constant repeated
numRows times) inline. The materializer reuses the existing per-type
ArrowFieldWriters, so it covers every type -- scalars, decimal, timestamps, and
complex struct/array/map -- and stays in sync with Spark's type handling.

Adds ConstantColumnVectors.materialize (arrow package) +
Utils.materializeConstantColumnVector, with new match arms in
getBatchFieldVectors and exportBatch.

Closes apache#4527

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
DataFusion's make_array asserts strict element-type equality in
MutableArrayData and panics on a mismatch. Spark's CreateArray coerces element
types with `sameType`, which ignores nullability, so children that share a
surface type but differ only in a nested struct field's nullability get no
unifying cast (e.g. array(struct(a not null), struct(a nullable))). Native
execution then panics: "Arrays with inconsistent types passed to
MutableArrayData".

DataFusion tolerates container nullability differences (ArrayType.containsNull /
MapType.valueContainsNull are coerced), so decline only the cases that actually
panic: children that still differ after normalizing container nullability while
keeping struct field nullability significant. Those fall back to Spark's
evaluator.

Closes apache#4528

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
PlanDataInjector.injectPlanData walked every operator in the tree against every
registered injector (`for (injector <- injectors if injector.canInject(op))`)
-- N operators x M injectors canInject calls -- even though most operators in
any tree are non-scan and match no injector.

Add `opStructCase` to the PlanDataInjector trait and key a
Map[OpStructCase, PlanDataInjector]. Look up by op.getOpStructCase (O(1)) then a
single canInject confirm; non-scan operators skip the iteration entirely. Pure
performance change -- no behavior difference.

Closes apache#4530

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
When Comet's native DataFusion scan hits a corrupt footer, corrupt page/column
data, a truncated/empty file, or a deleted file, it rethrew the raw native
message instead of Spark's FAILED_READ_FILE. The native path does not go through
Spark's FileScanRDD, so the offending path was usually missing too.

Classify these failures by TYPED DataFusionError variant in the native error
path (ParquetError / ObjectStore / ArrowError-wrapping-ParquetError / IoError,
unwrapping Context/Shared) rather than by matching error-message prose -- the
strings come from three upstream crates (DataFusion, arrow-rs, object_store) and
drift across version bumps with no compile-time signal. The match arms are
checked by the compiler.

- native: new SparkError::CannotReadFile { file_path, message } variant; a typed
  try_classify_file_read_error in the JNI bridge converts a file-read
  DataFusionError into it, replacing the previous "not found"/"No such file"
  string match. file_path is taken from object_store::Error::NotFound when
  available. Deliberately does NOT match object_store Generic errors (also used
  for non-file config errors that must surface as-is).
- JVM: the structured error crosses JNI as the existing CometQueryExecutionException
  JSON payload; SparkErrorConverter decodes "CannotReadFile" and, when the native
  error carried no path, fills it from the per-task file list threaded from
  CometNativeScanExec via CometExecRDD. The shims wrap it via
  QueryExecutionErrors.cannotReadFilesError. No JVM-side message matching.

Closes apache#4529

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
check-suites.py requires every *Suite.scala to appear in both
pr_build_linux.yml and pr_build_macos.yml. Add the new
PlanDataInjectorSuite alongside its sibling org.apache.spark.sql.comet
suites.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
check-suites.py requires every *Suite.scala to appear in both
pr_build_linux.yml and pr_build_macos.yml. Add the new
CometScanSchemeFallbackSuite alongside its sibling
org.apache.comet.rules suites.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…la 2.12

SQLTestUtils.withSQLConf returns Unit on Spark 3.5 but a value on Spark 4.x, so
assigning its block result to `val sparkPlan: SparkPlan` failed to compile under
the spark-3.5 profile (type mismatch: found Unit, required SparkPlan). Capture
the plan via a var assigned inside the block, which is cross-version-safe.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback: import java.lang.Boolean (as JBoolean),
java.net.URI, java.util.Locale and java.util.concurrent.ConcurrentHashMap
rather than referencing them with fully-qualified class names in the
newly-added scheme-gating code.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…R WHERE

Address review feedback on the deep-chain rebalancing PR:

- flattenAssociative now uses an explicit work stack and an accumulating
  buffer instead of recursion. The chains that trigger this are left-deep
  and O(n) deep, so the prior recursive walk could itself overflow the JVM
  stack and the `++` accumulation was O(n^2).

- The recursion-limit test now mixes a nullable column into the chains so
  the rebalanced tree is exercised under SQL three-valued logic, and adds
  a deep OR in a WHERE clause -- a common trigger that, unlike a top-level
  AND, Spark does not split and so stays deeply nested.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…che#4521)

Address review feedback: add a Spark-level regression test demonstrating
the bug. cast(binary -> string) is a zero-copy reinterpret in Spark, so a
StringType column can hold arbitrary non-UTF-8 bytes. The test disables
Comet's Cast so those raw bytes reach Comet's columnar (JVM) shuffle inside
a JVM UnsafeRow, exercising the native row->Arrow get_string path that used
to panic via from_utf8(..).unwrap() and now decodes lossily.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rename)

The unsupported-scheme fallback still called withInfo, the old name of
withFallbackReason (renamed in apache#4508). It was the only remaining old-name
call in the file and broke compilation after merging main; rename it to
match the rest of CometScanRule.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… nullable

GetStructField::nullable() reported only the extracted field's own nullability,
ignoring whether the parent struct can be null. A field of a null struct is null
(Spark semantics, enforced here by project_field unioning the parent null mask),
so a NON-nullable field of a NULLABLE struct must itself be reported nullable.

Reporting the field's own flag under-declares: the projected column then carries
the parent's nulls while claiming non-nullable, and Arrow RecordBatch validation
rejects it downstream with "Column '...' is declared as non-nullable but contains
null values" (e.g. once the column reaches a shuffle read-back or a projection over
a final aggregate). This is the companion to the value-side null-mask propagation
in this PR -- the value is now correct, this makes the declared nullability match.
Mirrors Spark's GetStructField.nullable = child.nullable || field.nullable.

Surfaced by Delta's action frame: each log row is exactly one action type, so the
action columns (add, remove, ...) are nullable structs whose inner fields are
declared NON-nullable by Delta's typed SingleAction schema (e.g. add.size). The
non-AddFile rows leave add null, so add.size carries nulls while declared
non-nullable, crashing Comet's native shuffle during OPTIMIZE / commit.

Tests:
- Rust unit tests for the nullability matrix (nullable/non-nullable parent x field).
- A Spark repro in CometExpressionSuite that builds that exact shape with an explicit
  in-memory schema (a Parquet round-trip would mark every field nullable, and a
  CreateNamedStruct would be declined), shuffles it, and projects the non-nullable
  inner field. It fails with the above error before this fix and passes after.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…Error), not CannotReadFile

A genuinely-missing file (object_store NotFound) is distinct from a corrupt/truncated one:
Spark surfaces it via `readCurrentFileNotFoundError` ("It is possible the underlying files
have been updated."), not `cannotReadFilesError` (FAILED_READ_FILE). `try_classify_file_read_error`
mapped every per-file read failure -- including NotFound -- to `SparkError::CannotReadFile`, so a
file removed between planning and execution produced the wrong Spark error.

Classify object_store NotFound as `SparkError::FileNotFound` instead. The NotFound may arrive
directly (`DataFusionError::ObjectStore`) or wrapped by the parquet reader as
`ParquetError::External(..)` / `ArrowError::ParquetError`, so a `source_chain_has_object_store_not_found`
helper walks the typed source chain (never message text). Corrupt/truncated reads stay
CannotReadFile -> FAILED_READ_FILE. The JVM shim already maps the `FileNotFound` errorType to
`readCurrentFileNotFoundError`, so no shim change is needed.

Surfaced by Delta's CDC-after-VACUUM read: `DeltaVacuumSuite` "vacuum for cdc - update/merge" and
"... - delete tombstones" vacuum the `_change_data` files and assert the subsequent read throws
`readCurrentFileNotFoundError`; with the native scan these failed because Comet returned the
cannotReadFilesError message. Both pass with this fix (verified locally).

Tests:
- Rust unit tests for the classifier: object_store NotFound (direct and ParquetError::External-wrapped)
  -> FileNotFound; corrupt ParquetError stays CannotReadFile.
- Spark `CometExecSuite` "native parquet read of a missing file surfaces readCurrentFileNotFoundError"
  (red before, green after): reads a file deleted between planning and execution.
- Made the existing FAILED_READ_FILE corrupt-file assertion spark-version-stable (assert
  "Encountered error while reading file" -- present on both 3.5 and 4.x; only 4.x prepends the
  [FAILED_READ_FILE.NO_HINT] class tag), so the test passes under -Pspark-3.5 as well.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ons, IoError scoping

Addresses @andygrove's review on apache#4536:

- spark-3.4 shim: add the `CannotReadFile` case (it only existed in the 3.5 and 4.x shims),
  so a corrupt/truncated read is wrapped via `cannotReadFilesError` (FAILED_READ_FILE) on
  Spark 3.4 too. (The `FileNotFound` case was already present on 3.4.)
- SparkErrorConverterSuite: assert on the version-stable message ("Encountered error while
  reading file ...") instead of the `FAILED_READ_FILE` literal, which only Spark 4.x prepends
  to getMessage as the error-class tag (3.4/3.5 render only the message). Fixes the two failing
  tests on 3.4/3.5; same version-stable style already applied to the CometExecSuite e2e test.
- native classifier: stop treating a bare `DataFusionError::IoError` as a file read. Scans
  surface read failures as a typed ParquetError/ObjectStore error; a bare IoError can also come
  from non-scan paths (spill, shuffle), which must not be relabelled FAILED_READ_FILE with a
  per-task path attached. Test updated accordingly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ow path

Follow-up to @andygrove's review on apache#4536:

- (point 3, wording) parquet-rs reports a bad magic / unreadable footer as
  "Invalid Parquet file. Corrupt footer", whereas Spark's reader -- and Spark's
  `ParquetQuerySuite` ("ignoreCorruptFiles", "ignoreMissingFiles using parquet") --
  phrase it as "<file> is not a Parquet file". `cannot_read_file_message` now appends
  Spark's phrasing for the magic/footer case so the FAILED_READ_FILE cause carries it.
  The outer `cannotReadFilesError` wrapper ("Encountered error while reading file …")
  is unchanged, so this composes with Spark's tests and does not disturb the Delta
  shims that match Comet's outer message. Other read failures keep their native message.
  (On behavior: the native scan already declines and falls back to Spark when
  `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` is enabled --
  CometNativeScan.scala -- so the skip semantics are preserved; no behavior gap.)

- (point 5, tidy) `try_classify_file_read_error` is no longer evaluated twice
  (`.is_some()` guard + `.unwrap()`): the DataFusion arm is a single `if let Some(..)`,
  and the generic fallback is extracted to `throw_generic_exception`.

Tests: classifier unit tests for the magic/footer wording (added) vs other parquet
errors (unchanged native message).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Replace `String::from_utf8_lossy` in `get_string` with `decode_utf8_spark_lossy`,
which mirrors `sun.nio.cs.UTF_8.Decoder` (action REPLACE) byte-for-byte so a Comet
columnar shuffle of arbitrary bytes renders identically to a Spark JVM shuffle.

`from_utf8_lossy` follows the Unicode "maximal subpart" rule and can emit more than
one U+FFFD per ill-formed multi-byte unit; the JDK collapses certain units (notably
surrogate-range three-byte sequences `ED A0..BF ..`, e.g. CESU-8 / modified-UTF-8
supplementary chars) into a single U+FFFD. Valid UTF-8 still returns a zero-cost
borrow via the fast path.

Tests use JDK-17 `new String(bytes, UTF_8)` output as the oracle: a 7-case
replacement-granularity table (incl. the `ED A0 80` -> single U+FFFD parity case),
zero-copy borrow for valid UTF-8, and valid multibyte chars preserved around an
invalid byte.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback on apache#4525. When `spark.hadoop.fs.comet.libhdfs.schemes`
is unset, the scheme gate now defaults `libhdfsSchemes` to `Set("hdfs")` rather
than the empty set, mirroring the native default: `is_hdfs_scheme`
(parquet_support.rs) treats `hdfs` as natively readable when the config is unset,
and `create_hdfs_object_store` is in the default build (`default = ["hdfs-opendal"]`).

Previously a plain `hdfs://` V1 scan was declined and silently fell back to Spark
in the default HDFS configuration even though native could read it. `s3a`/`file`
are unaffected (object_store recognizes them via `parse_url`); an explicit config
value still takes over verbatim.

Test: add `native scan claims hdfs:// when libhdfs.schemes is unset` to
CometScanSchemeFallbackSuite, alongside the existing `fake://` decline case. It
backs the `hdfs` scheme with a local FS (FakeHdfsSchemeFileSystem) so an `hdfs://`
path is exercised without a live cluster, then asserts CometScanRule claims the
scan. Verified RED (fails with `Set.empty`: scan falls back to Spark) -> GREEN
(passes with `Set("hdfs")`) on Spark 3.5.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
schenksj and others added 26 commits June 7, 2026 22:12
…DC fallback (apache#76 Stage C)

A probe routing the residual (legacy) Delta reads to a Spark fallback to test whether
DeltaSyntheticColumnsExec could be deleted broke 4 CDC tests (readChangeFeed engagement,
_change_type/_commit_version reads, the column-count repros). CDC's inverted-DV / _change_data
semantics aren't expressible via kernel's scan, so those reads genuinely need the native
DeltaSyntheticColumnsExec path. The exec is therefore retained as a narrow fallback for the
CDC family + the rare DML declines, rather than deleted -- regular and DML reads no longer
use it (they synthesize in-worker via kernel). Comment documents the intent at the residual
branch.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…hanges

The previous commit's claim that CDC "isn't expressible via kernel's scan" was wrong: it was
inferred from Comet's current code (which declines CDC and uses the bespoke exec), not from
kernel. delta-kernel-rs has a first-class change-data-feed API,
`delta_kernel::table_changes::TableChanges`, that emits _change_type / _commit_version /
_commit_timestamp and reuses the same per-file read + transform primitives as the regular scan.
The exec is retained for CDC only because that TableChanges read path isn't wired yet (tracked
as the unblocker for deleting the exec), NOT because kernel lacks the capability.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e#76/apache#84, Rust foundation)

Add the executor-side Change Data Feed read using delta-kernel-rs's TableChanges API, the
kernel-native way to read change data (corrects the earlier mistaken "kernel can't do CDC").

- read_cdf_via_kernel: TableChanges::try_new(url, start, Option<end>) -> into_scan_builder().
  build().execute() -> Arrow batches with data + _change_type / _commit_version /
  _commit_timestamp (column mapping / partitions / DVs resolved by kernel). Single-task: kernel's
  CDF per-file API (scan_metadata / CdfScanFile / get_cdf_transform_expr) is pub(crate), so only
  the all-in-one execute() is usable -- one partition reads the whole bounded version range.
- DeltaKernelScanExec gains `cdf: Option<(start, end)>`; read_all routes to read_all_cdf, which
  reuses the by-name assembler to place the CDF columns in scan.output order.
- proto: DeltaScanCommon.cdf_read / cdf_start_version / cdf_end_version. core maps them and
  bypasses the kernel data-column schema requirement (CDF ships no per-file tasks/schemas).

Additive + dormant: the Scala side doesn't set cdf_read yet, so existing reads are unchanged
(contrib Rust tests 109/0). Remaining (apache#84): Scala CDF detection/routing -- intercept
DeltaCDFRelation (path readChangeFeed) + CdcAddFileIndex (streaming), extract start/end versions
+ the CDF output schema -- then CometDeltaCdcSuite, then retire DeltaSyntheticColumnsExec.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…parity (apache#76)

DeltaKernelScanExec streamed batches with no BaselineMetrics, so its
output_rows/numOutputRows SQLMetric was always 0 even though data flowed
correctly -- silently breaking Spark UI row counts, streaming numInputRows,
and any metric-reading test. Wire BaselineMetrics.record_output per batch in
execute(), mirroring core ScanExec. (Real product fix, not a test workaround.)

This surfaced via Delta's StatsCollectionSuite "gather stats", whose
recordsScanned helper also broke on Comet's plan shapes:
  - it located the scan via a pre-execution `.find`, but under AQE Comet's
    scan only materializes during execution;
  - Delta's own suites DISABLE AQE, so the marker->native conversion (which
    runs in AQE query-stage prep) never fires and the plan keeps a
    CometDeltaScanMarker delegating to the wrapped FileSourceScanExec.
Patch recordsScanned in 4.1.0.diff to materialize once, strip AQE, and read
numOutputRows off whichever scan shape is present (native scan / marker ->
originalScan / vanilla). Result: StatsCollectionSuite 67/67.

Also ignore DeltaColumnMappingSuite "explicit id matching" with a full
rationale (kernel's id->name fallback diverges on a manual field-id repoint;
nil real-user impact; upstream kernel fix) + document it as known-limitation
A7.

Guard: CometDeltaStatsSkippingReproSuite proves data-skipping parity (9
unfiltered, 1 after id=1) in BOTH AQE-on (native scan) and AQE-off (marker)
shapes; red->green against the metric fix.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Groundwork for native Change Data Feed reads. A readChangeFeed read is a
RowDataSourceScanExec over CDCReader$DeltaCDFRelation (a CatalystScan
BaseRelation), NOT a FileSourceScanExec/HadoopFsRelation -- which is why Comet
doesn't currently intercept it and CDC declines.

Add DeltaReflection.isCdfRelation / extractCdfTableRoot / extractCdfVersions to
pull the table root and inclusive [start, end] version range a native kernel
TableChanges read needs. Guarded by CometDeltaCdfReflectionReproSuite, which
also pins the plan-shape assumption (RowDataSourceScanExec over DeltaCDFRelation).

Note for the follow-up interception (recorded on apache#84): the marker->native
conversion is AQE-query-stage-prep-gated, and CDF reads aren't AQE-wrapped, so
the existing CometDeltaScanMarker path won't convert CDF -- the interception
must emit CometNativeExec(CometDeltaNativeScanExec) directly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…pache#84)

readChangeFeed reads previously declined to vanilla Spark: a CDF read is a
RowDataSourceScanExec over DeltaCDFRelation (a CatalystScan), a scan family
Comet never intercepted. Wire it natively end-to-end:

- DeltaIntegration (core): isCdfRelation (pure class-name check) + transformCdf
  reflection bridge to the contrib.
- CometExecRule (core): intercept RowDataSourceScanExec(DeltaCDFRelation) and
  replace it with the native exec. Runs in preColumnarTransitions + query-stage
  prep, so it fires on simple non-AQE CDF plans too.
- CometDeltaNativeScan.convertCdf (contrib): build a DeltaScanCommon with
  kernel_read + cdf_read + the [start, end] version range and the full CDF
  output schema (data + _change_type/_commit_version/_commit_timestamp), gated
  by COMET_DELTA_NATIVE_ENABLED. Clamps a requested endingVersion that exceeds
  the table's latest (kernel's TableChanges errors where Delta clamps).
- CometDeltaCdfScanExec (new contrib exec): single-partition CometLeafExec that
  runs the cdf DeltaScan op; the native DeltaKernelScanExec reconstructs
  delta-kernel TableChanges(start, end) and calls execute(). No per-file tasks,
  DPP, or encryption coupling.
- DeltaReflection.extractCdfLatestVersion: the relation's pinned snapshot
  version, used for the end-version clamp.

The native read path (read_cdf_via_kernel / read_all_cdf, proto cdf fields,
planner wiring) already existed from dc60b71; this commit supplies the Scala
interception that reaches it.

Tests (verified under -Dspark.version=4.1.1; full contrib package 152/0):
CometDeltaCdcSuite flipped from "asserts decline" to "asserts native engagement
+ matches vanilla" (3/3); CometDeltaScanConfAuditSuite GAP CDF flipped to assert
engagement; CometDeltaCdfReflectionReproSuite adds an engagement+correctness
test. This unblocks deleting DeltaSyntheticColumnsExec (apache#82) -- CDC no longer
needs it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…nthesis only (apache#82)

With CDC now reading natively (apache#84), nothing legitimately needs the legacy stacked
DeltaSyntheticColumnsExec. Close its two remaining routes in CometDeltaNativeScan.convert:

- The `case None` subset residual (was the CDC-family + rare DML declines) now declines to
  vanilla Spark via withFallbackReason instead of buildTaskListFromAddFiles. CDC-family reads
  no longer reach here (they're intercepted upstream as CometDeltaCdfScanExec); the rare DML
  declines (CM-id materialised row_commit_version; OPTIMIZE file-not-found race) correctly fall
  back to Spark.
- Remove the `spark.comet.delta.synthesizeInWorker.enabled` escape hatch: synthesizeInWorker is
  now `!isSubsetFileIndex` (regular reads always synthesize in-worker; matched DML rewrites too;
  everything else declines). So synthesizeInWorker is always true at proto-build time and the
  native side never constructs the exec.

Deletes the now-dead buildTaskListFromAddFiles + its partition-name map. The native
DeltaSyntheticColumnsExec is now dead code (removed in a follow-up). Verified: full contrib
package 152/0 under -Dspark.version=4.1.1.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The exec is now dead code: with CDC reading natively (apache#84) and the
synthesizeInWorker escape hatch removed (b604ccf), the driver never sets
synthesize_in_worker=false for a native read, so core's planner never
constructs DeltaSyntheticColumnsExec. Remove it:

- Delete contrib/delta/native/src/synthetic_columns.rs (the exec).
- native/core planner (delta_scan.rs): drop the need_synthetics branch that
  built the exec; DeltaKernelScanExec is the only scan exec now and always
  applies the DV itself (apply_dv = true). In-worker synthesis (data +
  partitions + row_index/row_id/is_row_deleted/row_commit_version/_metadata.*,
  by name) is the sole native synthesis path.
- lib.rs: drop the module + its doc reference.

Verified: full contrib package 152/0 under -Dspark.version=4.1.1 with the exec
deleted. Residual cosmetic cleanup left as follow-up (the now-always-true
apply_dv field + the set-but-ignored proto emit flags + stale comments).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
 follow-up)

The spark.comet.delta.synthesizeInWorker.enabled escape hatch was removed in
b604ccf (in-worker synthesis is now the only native path); drop the now-unused
config entry.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
apache#76 follow-up)

The spark-4.1 profile (and top-level default) pinned spark.version=4.1.2, but
delta-spark 4.1.0 references org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData,
which Spark removed in 4.1.2. So `-Pspark-4.1,contrib-delta` builds/tests failed at
runtime with NoClassDefFoundError (the contrib-delta CI matrix already labelled the cell
4.1.1, but the build resolved the pom's 4.1.2). Comet itself works on either patch; the
constraint is delta-spark. Pin to 4.1.1 in both spots so CI and local agree and the contrib
suites run. Verified: full contrib package 152/0 with no -Dspark.version override.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…che#77)

Core's native/core/.../delta_scan.rs held ~250 lines of Delta-specific scan planning
(kernel schema selection, KernelScanFile mapping, storage-config/S3-bucket resolution,
final_output_indices reorder, the kernel_read gate). Move all of it into
comet_contrib_delta::planner::plan_delta_scan, so core stays free of Delta planning logic
(cleaner for upstreaming apache#4366 -- reviewers see core untouched by Delta).

Core's delta_scan.rs is now a thin shim: it computes the requested + partition Arrow schemas
(core owns the proto->arrow `to_arrow_datatype` converter, used across the planner, and the
contrib crate can't depend on core -- that would cycle), calls the contrib planner, and wraps
the returned ExecutionPlan in a SparkPlan. No behaviour change.

Verified: full contrib package 152/0 under Spark 4.1.1.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…can (#2)

A readChangeFeed scan was a single native TableChanges read over the whole
version range on one task. Split the inclusive [start, end] range into up to
spark.comet.delta.cdf.maxPartitions (default 8) contiguous sub-ranges, one per
Spark partition, each running an independent native TableChanges read of its
slice.

The split keeps CometDeltaCdfScanExec as a SINGLE CometNativeExec emitting N
partitions: the shared DeltaScanCommon carries the full range + schema, while
each per-partition DeltaScan carries only its sub-range in a minimal common that
DeltaPlanDataInjector splices over the shared range at execution. This avoids a
CometUnionExec wrapper (a CometExec, not a CometNativeExec), which would make a
downstream native shuffle / aggregation ineligible (e.g. orderBy over CDF). No
proto or native changes: cdf_start/end_version already ride on DeltaScanCommon
and plan_delta_scan already reads them; each Spark partition runs an independent
single-DataFusion-partition native plan fed its injected sub-range.

Verified: CometDeltaCdfReflectionReproSuite 3/3 (incl. new split test),
CometDeltaCdcSuite 3/3 (incl. orderBy-over-CDF), CometDeltaNativeSuite 19/0
(regular task injection unaffected).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Build gate: use the ./mvnw wrapper instead of bare `mvn`. The CI gate job
  runs in a bare amd64/rust container with no system Maven on PATH, so `mvn`
  produced no dependency output and tripped the gate's anti-vacuous guard.
  ./mvnw self-provisions the pinned Maven regardless of the image. Fixes the
  Build-gate verification CI failure.
- Docs: prettier-format the 12 contrib/delta/docs/*.md files plus
  docs/source/user-guide/latest/delta.md. The persistent warning on
  08-known-limitations.md was a bare `object_store` opening a runaway markdown
  emphasis span that corrupted two lines and broke prettier idempotency;
  backticked it. Fixes the Preflight markdown CI failure.
- DeltaPlanDataInjector: document that the CDF range-override branch is
  intentionally not idempotent-guarded (injectPlanData walks each operator
  once per partition).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Three CI fixes surfaced by PR #2's first green-attempt run:

- Build gate: add `-am` to the gate's `dependency:list` calls. In a fresh CI
  checkout `-pl spark` alone can't resolve spark's sibling reactor modules
  (comet-common, shims), so mvnw exited non-zero with no output and tripped the
  anti-vacuous guard. The matching test job uses `-pl spark -am`; mirror it.
  (Completes the earlier ./mvnw switch.)
- Smoke regression: replace `rsync -a` with `cp -a` in run-regression.sh. The
  bare amd64/rust smoke container has no rsync on PATH, so it exited 127 and
  failed every smoke cell right after BUILD SUCCESS. cp -a is coreutils-only and
  copies the same contents with attributes preserved.
- check-suites: register org.apache.comet.CometParquetPercentPathSuite (a core
  literal-% filename read repro this branch added) in pr_build_linux.yml so the
  Preflight missing-suites check passes.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
apache#86)

The delta-contrib/spark-3.5.8 CI cell failed 8/146 tests, all
SparkFileNotFoundException on bare deletion-vector UUIDs. Root cause: under
`Utils.isTesting`, Delta prepends `spark.databricks.delta.testOnly.dvFileNamePrefix`
(default `test%dv%prefix-` on Delta 3.3.2, empty on 4.x) to every DV file it
writes -> `<prefix>deletion_vector_<uuid>.bin`. delta-kernel-rs has no knowledge
of that JVM-only conf, so for "u" (UUID-relative) storage it resolves the
un-prefixed name and the executor-side DV read fails. It passed on 4.x only
because the effective prefix is empty there.

Fix: ship the conf to native and splice it into kernel's own resolved DV path
for "u" descriptors, reading through an absolute-path ("p") descriptor.

- DeltaReflection.dvFileNamePrefix reads the value via the `ConfigEntry`
  (DeltaSQLConf.TEST_DV_NAME_PREFIX) -- the prefix is the entry's testing-gated
  DEFAULT, not a session setting, so getConfString(key, "") would always miss it.
  Reflection (no compile dep on delta-spark); empty when the entry is absent.
- New DeltaScanCommon.dv_file_name_prefix proto field carries it to the executor.
- dv_reader::read_dv_indexes splices the prefix in front of the
  `deletion_vector_<uuid>.bin` filename (only for "u" storage; "p"/"i" untouched)
  via kernel's absolute_path, operating on the decoded fs path so the literal `%`
  re-encodes correctly. Empty prefix (production) is a no-op.

Validated: full contrib.delta package 153/0 on BOTH spark-3.5/delta-3.3.2 (was
9/1 on the repro suite, 8 failures across the package) and spark-4.1/delta-4.1.0
(no regression). Plus 3 splice unit tests. The existing
CometDeltaFilterPushdownAuditSuite "DV + range filter" test is the red->green
guard on the 3.5 matrix cell.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…pache#85)

The design docs had drifted from the kernel-read refactor (apache#76/apache#77/apache#80/apache#81/apache#82/
apache#84/#2/apache#78/apache#86). Audited all 13 docs against current code and corrected:

- Removed the deleted ParquetSource + DV-sweep + DeltaSyntheticColumnsExec read
  stack as the "current" path everywhere; it is now kernel-read only (apache#50/apache#82),
  with DeltaKernelScanExec doing in-worker synthesis. The old stack is kept only
  as clearly-labeled history / rejected alternatives.
- delta_scan.rs is a ~72-line shim delegating to comet_contrib_delta::planner
  (apache#77); column-mapping physicalisation dropped, kernel ships the schemas (apache#76).
- CDF (readChangeFeed) is kernel-native via TableChanges -> CometDeltaCdfScanExec,
  split multi-partition (apache#84/#2) -- corrected docs that called it unsupported,
  declined, or a synthetic-columns fallback.
- 08-known-limitations.md: removed all of Part B (B1-B9 were development-time
  regressions, all now fixed + guarded) and A3 (path-based CDF now engages
  native, apache#84); kept only genuine current limitations (A1 DPP residual, A2e
  credential residual, A4 VARIANT, A5 decline gates, A6 INT96 kernel gap, A7
  CM-id repoint). 466 -> 230 lines.
- Fixed config keys, build/module layout, JNI symbols, file paths, CI workflow
  references, and supported-feature lists (added CDF, _metadata, INT96) across
  the build / README / user-guide docs.

Every claim verified against code; markdown passes prettier.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
`spark.comet.delta.kernelRead.enabled` was a transitional toggle from when the
legacy ParquetSource read path co-existed with the kernel-read path. The legacy
path is gone (apache#50) and kernel-read is the only read path, so the toggle was
dead: setting it false didn't fall back to anything, it made the native planner
error ("the legacy read path was removed; kernel-read is required").

Removed end to end:
- DeltaScanCommon.kernel_read proto field -> `reserved 25` / `reserved
  "kernel_read"`; the native planner's `if !kernel_read { error }` gate deleted.
- DeltaConf.COMET_DELTA_KERNEL_READ_ENABLED config entry; the two
  setKernelRead(...) call sites dropped (CDF + regular).
- assertKernelReadEngaged no longer checks the (gone) proto field -- a
  CometDeltaNativeScanExec in the plan is the engagement signal now; the 7
  withSQLConf(...kernelRead -> "true") test wrappers removed.
- Doc references (user-guide tuning table, fallback off-switches, planning /
  native-execution proto notes). Also refreshed a couple of stale code comments
  that still described the deleted ParquetSource + DeltaSyntheticColumnsExec
  stack.

Verified: native rebuilds clean (proto regen), CometDeltaNativeSuite 19/0 on
spark-4.1/delta-4.1.0; docs pass prettier.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Three CI failures from the latest run, all environmental:

- Build-gate: the gate's `./mvnw dependency:list` failed instantly with no
  output in the bare amd64/rust container, tripping the anti-vacuous guard. Cause:
  the git-commit-id Maven plugin reads git metadata, which errors on the
  container's "dubious ownership" checkout. The passing delta-contrib job already
  uses `-Dmaven.gitcommitid.skip`; mirror it on all the gate's maven calls. Also
  capture + dump mvnw stderr on the anti-vacuous failure so future breakage is
  diagnosable instead of silent.
- Smoke regression: after the rsync->cp fix, the copy failed because its source
  `$HOME/.m2/repository/org/apache/datafusion` didn't exist -- CI's Maven local
  repo isn't $HOME/.m2. Pin `-Dmaven.repo.local=$M2_REPO` on the install and read
  the copy from the same $M2_REPO so they stay in lockstep; error clearly if the
  artifacts still aren't there.
- Preflight (check-missing-suites): CometParquetPercentPathSuite was registered
  in pr_build_linux.yml but not pr_build_macos.yml; the check requires both.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The PR Build Lint jobs (Rust `cargo fmt --check`, syntactic scalafix) were
path-skipped on this branch and only started running once these commits touched
core/workflow files, so they surfaced pre-existing formatting drift in the
never-linted contrib + a couple of core files.

- cargo fmt --all under the CI-matching stable rustfmt (1.9.0 / rustc 1.96.0,
  commit ac68faa20 -- the exact build CI uses): contrib/delta/native/*.rs plus
  native/jni-bridge/src/errors.rs and native/shuffle/src/spark_unsafe/unsafe_object.rs
  (CI flagged exactly these eight).
- scalafix --syntactic: drop two no-op `s"..."` interpolators in
  DeltaIntegration.scala and CometScanSchemeFallbackSuite.scala.

No behavior change; `cargo fmt --check` and `scalafix --check` are both clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…resh)

The build-gate's Maven `dependency:list -pl spark -am` fails in a fresh CI repo:
`-am` adds the sibling reactor modules (comet-common, shims) to the reactor for
listing but does NOT build/install them, so resolving spark's dependencies dies
with "Could not find artifact ...comet-common... (absent)" and no output --
which then tripped the anti-vacuous guard. (Reproduced locally with an empty
-Dmaven.repo.local.)

Switch the dependency checks to `help:effective-pom`, which only merges POM
models (no artifact resolution) and so works without building the reactor.
Extract the ACTIVE top-level <dependencies> (after </dependencyManagement>,
before the <profiles> listing) -- what dependency:list would have shown -- and:
  - default: assert zero io.delta and (anti-vacuous) spark present;
  - per Spark profile: read delta-spark's interpolated <version> and assert the
    pinning (4.1->4.1.x, 3.5->3.x, 4.0->4.0.x). (help:evaluate was unreliable
    here -- it returns the root delta.version, not the per-profile value.)

Verified end to end with a fresh local repo: default spark=6/io.delta=0;
delta-spark 4.1.0 / 3.3.2 / 4.0.0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…pom errors

The effective-pom gate still failed fast in CI (8s, no output) though it works
on a fresh local repo. The likely container-only cause is git "dubious
ownership": the checkout is owned by a different user than the job runs as, so a
Maven git-metadata read (e.g. the git-commit-id plugin running as a build
extension, which -Dmaven.gitcommitid.skip doesn't fully suppress) errors
instantly. Mark the tree safe up front.

Also stop swallowing the effective-pom output: capture it and dump the tail on
the anti-vacuous failure, so if something else is wrong the next run shows it
instead of "produced no spark deps".

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
With the Maven + compiled-class gates now passing, the gate reached its final
layer and failed: it stat'd `target/debug/libcomet.dylib`, but the cdylib is
`libcomet.so` on Linux (CI) -- `.dylib` is macOS only. `nm -gU` is likewise a
macOS-ism.

Detect whichever lib the build produced (`comet_lib`), size it with the
platform's `stat` flag, and grep the full `nm` symbol table (portable on both)
for Delta symbols. Error clearly if the build produced no lib at all.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… exit 2)

The dylib gate reached `LIB_DEFAULT="$(comet_lib)"` and the script died with exit
2 even though the build succeeded and the lib existed: `ls libcomet.so
libcomet.dylib` returns non-zero when one path is absent (always -- only one
extension exists per platform), and under `set -euo pipefail` that propagates
out of the command substitution. Replace the `ls | head` with a loop that echoes
the first existing lib and always returns 0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rence-audit

The macOS PR Build (which runs apache-rat:check) was path-skipped on this branch
and only started running this round, surfacing a pre-existing gap:
contrib/delta/docs/11-kernel-read-coherence-audit.md never had the ASF license
header (every sibling doc does). RAT failed all four macos Spark-4.0 test shards
with "Too many files with unapproved license: 1". Add the standard markdown
header; `apache-rat:check` is clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…met resolves

smoke/delta-4.0.0 and 4.1.0 pass; only 3.3.2 fails. The Comet install lands in
the script step's $HOME/.m2, but SBT runs with a different HOME in the CI
container (/root/.m2), so SBT's mavenLocal resolver can't see Comet -- the
`local-comet` resolver in build/sbt-config/repositories (added by each diff,
pointing at the isolated publish dir) is the bridge. Published: 3 modules
confirmed comet-spark IS in /tmp/comet-published-3.5, yet SBT's "not found" list
never mentions local-comet -- i.e. Delta 3.3.2's older build/sbt launcher
doesn't honour the repositories file. Delta 4.x's does, which is why those cells
pass.

Pass `-Dsbt.override.build.repos=true -Dsbt.repository.config=build/sbt-config/repositories`
on the SBT command, scoped to 3.3.2 so the working 4.x cells are untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…D_FILE

A WHERE/filter predicate pushed into the parquet row filter that throws during
evaluation (e.g. an ANSI divide-by-zero -- now reachable by default since Scala
UDF codegen dispatch landed, apache#4514) is returned by DataFusion's row filter as an
`ArrowError` (`ComputeError`), which the parquet reader then wraps as
`ParquetError::External(<arrow error>)`. `try_classify_file_read_error` matched
the blanket `ParquetError` arm and mislabelled it `CannotReadFile`/FAILED_READ_FILE,
masking the real error (e.g. DIVIDE_BY_ZERO).

This regressed `SQLQueryTestSuite` udf/postgreSQL/udf-select_having.sql query apache#20:
expected `SparkArithmeticException` DIVIDE_BY_ZERO, got `cannotReadFilesError`. The
apache#4514/apache#4517 harness workaround collapses `CometNativeException`(DivideByZero) to
[DIVIDE_BY_ZERO]; the FAILED_READ_FILE relabel broke that surface.

A predicate-evaluation failure is not a file-read failure. Tell the two apart by
TYPE, not message text (DataFusion builds the message via `{:?}`): a genuine
corrupt/truncated/missing-file error is `ParquetError::General`/`EOF`/
`External(io|object_store)` and never wraps an `ArrowError`, whereas the row-filter
predicate failure is `ParquetError::External(<ArrowError>)`. Carve that one case out
so the underlying error surfaces through the normal native-exception path. Genuine
read failures (including `External(io)`) are unaffected. Adds red-green unit tests
for both the predicate-eval and the External(io) cases.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@schenksj schenksj changed the title feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib) [Tracking] feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib) Jun 10, 2026
@schenksj

Copy link
Copy Markdown
Contributor Author

@andygrove FYI... plan for review chunks is in the description of this PR now. will keep this PR in place so reviewers can reference the "full" work product if there are questions about the current status/features/test coverage/etc... as the feature is feathered in.

schenksj added a commit to schenksj/datafusion-comet that referenced this pull request Jun 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant