From 80b8d13949ff7d36e3b29f5148bcce2dd03c8a38 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sun, 14 Jun 2026 22:20:25 +0800 Subject: [PATCH 1/4] feat(update): add OverwriteFiles for overwrite snapshot commits Summary: Add a production OverwriteFiles builder that brings iceberg-cpp to semantic parity with Java's BaseOverwriteFiles. It supports explicit file replacement (DeleteFile + AddFile) and range-based replacement (OverwriteByRowFilter + AddFile) with the same family of pre-commit concurrency validations. The builder is a thin subclass of MergingSnapshotUpdate and reuses the existing commit kernel (Apply/summary/retry/cleanup) unchanged. Changes: - New OverwriteFiles class (src/iceberg/update/overwrite_files.{h,cc}) and Table::NewOverwrite() / Transaction::NewOverwrite() entry points. - Builder surface: AddFile, DeleteFile, bulk DeleteFiles, OverwriteByRowFilter, ValidateFromSnapshot, ConflictDetectionFilter, ValidateNoConflictingData, ValidateNoConflictingDeletes, ValidateAddedFilesMatchOverwriteFilter, WithCaseSensitivity. - Validate(): conflict-filter resolution, concurrent add/delete conflict checks, and strict added-file range validation (projection + StrictMetricsEvaluator). - Tests (overwrite_files_test.cc, 45 cases) and CMake/meson wiring. Behavior alignment with Java: - operation() returns append/delete/overwrite from builder content. - Conflict-filter resolution mirrors BaseOverwriteFiles (explicit -> row filter -> AlwaysTrue); replaced-file delete checks honor ConflictDetectionFilter. - Strict added-file validation uses a single DataSpec(), rejecting multi-spec and empty added-file sets. - Deviations: public WithCaseSensitivity (vs caseSensitive) to avoid a protected-name clash; ValidateFromSnapshot rejects negative ids early. --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/table.cc | 7 + src/iceberg/table.h | 3 + src/iceberg/test/CMakeLists.txt | 3 +- src/iceberg/test/meson.build | 4 + src/iceberg/test/overwrite_files_test.cc | 1052 ++++++++++++++++++++++ src/iceberg/transaction.cc | 8 + src/iceberg/transaction.h | 3 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/meson.build | 1 + src/iceberg/update/overwrite_files.cc | 352 ++++++++ src/iceberg/update/overwrite_files.h | 152 ++++ 13 files changed, 1587 insertions(+), 1 deletion(-) create mode 100644 src/iceberg/test/overwrite_files_test.cc create mode 100644 src/iceberg/update/overwrite_files.cc create mode 100644 src/iceberg/update/overwrite_files.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0fe418d48..f710d0027 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -97,6 +97,7 @@ set(ICEBERG_SOURCES update/expire_snapshots.cc update/fast_append.cc update/merging_snapshot_update.cc + update/overwrite_files.cc update/pending_update.cc update/set_snapshot.cc update/snapshot_manager.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 8c4b49e9b..a8d4ced3e 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -119,6 +119,7 @@ iceberg_sources = files( 'update/expire_snapshots.cc', 'update/fast_append.cc', 'update/merging_snapshot_update.cc', + 'update/overwrite_files.cc', 'update/pending_update.cc', 'update/set_snapshot.cc', 'update/snapshot_manager.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 1255871c3..64597468e 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -33,6 +33,7 @@ #include "iceberg/transaction.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" +#include "iceberg/update/overwrite_files.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_location.h" @@ -217,6 +218,12 @@ Result> Table::NewFastAppend() { return FastAppend::Make(name().name, std::move(ctx)); } +Result> Table::NewOverwrite() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return OverwriteFiles::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 8d8849f37..54c40a29c 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -176,6 +176,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new FastAppend to append data files and commit the changes. virtual Result> NewFastAppend(); + /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. + virtual Result> NewOverwrite(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 0e8f03150..ac14656a1 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -232,7 +232,8 @@ if(ICEBERG_BUILD_BUNDLE) update_properties_test.cc update_schema_test.cc update_sort_order_test.cc - update_statistics_test.cc) + update_statistics_test.cc + overwrite_files_test.cc) add_iceberg_test(data_test USE_BUNDLE diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 03d9e1f6c..c20973eea 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -120,6 +120,10 @@ iceberg_tests = { ), 'use_data': true, }, + 'overwrite_files_test': { + 'sources': files('overwrite_files_test.cc'), + 'use_data': true, + }, } if get_option('rest').enabled() diff --git a/src/iceberg/test/overwrite_files_test.cc b/src/iceberg/test/overwrite_files_test.cc new file mode 100644 index 000000000..d003c6ee5 --- /dev/null +++ b/src/iceberg/test/overwrite_files_test.cc @@ -0,0 +1,1052 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/overwrite_files.h" + +#include +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/constants.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_field.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" +#include "iceberg/transform.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/util/data_file_set.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +// ===================================================================================== +// Test harness for OverwriteFiles. +// +// Modeled on src/iceberg/test/merging_snapshot_update_test.cc (same fixture style, +// same in-memory mock FileIO / catalog setup, same DataFile / commit helpers). Unlike +// that file, OverwriteFiles is the production class with a private constructor, so the +// tests drive it exclusively through its public builder surface (AddFile / DeleteFile / +// OverwriteByRowFilter / ... / operation() / Validate() / Commit()) and observe its +// behavior through the public API: operation() classification, the committed snapshot +// summary, and the public Validate(...) entry point that the commit kernel invokes. +// +// The base table (TableMetadataV2ValidMinimal.json) has schema {x: long (id 1), +// y: long (id 2), z: long (id 3)} and a single partition spec (spec id 0) that +// partitions by identity(x). +// ===================================================================================== +class OverwriteFilesTest : public UpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + std::string MetadataResource() const override { + return "TableMetadataV2ValidMinimal.json"; + } + + void SetUp() override { + UpdateTestBase::SetUp(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + } + + // A plain data file in spec 0 (identity(x)) with the given partition value for x. + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x, + int64_t record_count = 100) { + auto f = std::make_shared(); + f->content = DataFile::Content::kData; + f->file_path = table_location_ + path; + f->file_format = FileFormatType::kParquet; + f->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + f->file_size_in_bytes = 1024; + f->record_count = record_count; + f->partition_spec_id = spec_->spec_id(); + return f; + } + + // A data file carrying column metrics for column y (field id 2): lower/upper bounds + // plus value/null counts, so the StrictMetricsEvaluator can reason about it. + std::shared_ptr MakeDataFileWithYBounds(const std::string& path, + int64_t partition_x, int64_t y_lower, + int64_t y_upper) { + auto f = MakeDataFile(path, partition_x); + f->lower_bounds = {{2, Literal::Long(y_lower).Serialize().value()}}; + f->upper_bounds = {{2, Literal::Long(y_upper).Serialize().value()}}; + f->value_counts = {{2, f->record_count}}; + f->null_value_counts = {{2, 0}}; + return f; + } + + std::shared_ptr MakeDeleteFile(const std::string& path, int64_t partition_x) { + auto f = MakeDataFile(path, partition_x); + f->content = DataFile::Content::kPositionDeletes; + return f; + } + + // An equality delete file in spec 0 (partition x = partition_x). + std::shared_ptr MakeEqualityDeleteFile(const std::string& path, + int64_t partition_x) { + auto f = MakeDeleteFile(path, partition_x); + f->content = DataFile::Content::kEqualityDeletes; + f->equality_ids = {1}; + return f; + } + + // Write a delete manifest containing the given delete files, with the snapshot id and + // sequence number assigned on each entry (so the manifest list writer does not need to + // inherit them). + Result WriteDeleteManifest( + const std::string& path, const std::vector>& files, + int64_t snapshot_id, int64_t sequence_number) { + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestWriter::MakeWriter(/*format_version=*/2, snapshot_id, path, file_io_, + spec_, schema_, ManifestContent::kDeletes)); + for (const auto& f : files) { + ManifestEntry entry; + entry.status = ManifestStatus::kAdded; + entry.snapshot_id = snapshot_id; + entry.sequence_number = sequence_number; + entry.data_file = f; + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + // Build a synthetic snapshot from the given manifests (mirrors the merging-update test + // harness). Used to inject a concurrent commit between read and validate. + Result> MakeSyntheticSnapshot( + std::string operation, int64_t snapshot_id, + std::optional parent_snapshot_id, int64_t sequence_number, + const std::vector& manifests) { + auto manifest_list_path = table_location_ + "/metadata/manifest-list-" + + std::to_string(snapshot_id) + ".avro"; + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestListWriter::MakeWriter(table_->metadata()->format_version, snapshot_id, + parent_snapshot_id, manifest_list_path, file_io_, + sequence_number)); + ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + + ICEBERG_ASSIGN_OR_RAISE( + auto snapshot, + Snapshot::Make(sequence_number, snapshot_id, parent_snapshot_id, TimePointMs{}, + std::move(operation), {}, table_->metadata()->current_schema_id, + manifest_list_path)); + return std::shared_ptr(std::move(snapshot)); + } + + // Inject a concurrent snapshot that adds an equality delete file covering partition + // `partition_x`, layered on top of `parent`. Returns the metadata containing the new + // snapshot (as current) plus the new snapshot itself, so a subsequent Validate(...) can + // scan the range (parent, new] for new deletes. + struct ConcurrentDelete { + std::shared_ptr metadata; + std::shared_ptr snapshot; + }; + ConcurrentDelete InjectConcurrentEqualityDelete(const std::shared_ptr& parent, + const std::string& delete_path, + int64_t partition_x) { + ConcurrentDelete out; + auto del_file = MakeEqualityDeleteFile(delete_path, partition_x); + const int64_t new_snapshot_id = parent->snapshot_id + 1000; + const int64_t new_sequence_number = parent->sequence_number + 1; + auto manifest = + WriteDeleteManifest(table_location_ + "/metadata/concurrent-del-manifest.avro", + {del_file}, new_snapshot_id, new_sequence_number); + EXPECT_TRUE(manifest.has_value()); + auto snap = MakeSyntheticSnapshot(DataOperation::kOverwrite, new_snapshot_id, + parent->snapshot_id, new_sequence_number, + {manifest.value()}); + if (!snap.has_value()) { + ADD_FAILURE() << "MakeSyntheticSnapshot failed: " << snap.error().message; + } + EXPECT_TRUE(snap.has_value()); + out.snapshot = snap.value(); + out.metadata = std::make_shared(*table_->metadata()); + out.metadata->snapshots.push_back(out.snapshot); + out.metadata->current_snapshot_id = out.snapshot->snapshot_id; + out.metadata->last_sequence_number = out.snapshot->sequence_number; + return out; + } + + Result> NewOverwrite() { + return table_->NewOverwrite(); + } + + // Commit file_a_ with FastAppend and refresh the table; returns its snapshot id. + int64_t CommitFileA() { + auto fa = table_->NewFastAppend(); + EXPECT_TRUE(fa.has_value()); + fa.value()->AppendFile(file_a_); + EXPECT_THAT(fa.value()->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + auto snap = table_->current_snapshot(); + EXPECT_TRUE(snap.has_value()); + return snap.value()->snapshot_id; + } + + // Append a single file via FastAppend and refresh; returns the new snapshot. + std::shared_ptr CommitFastAppend(const std::shared_ptr& file) { + auto fa = table_->NewFastAppend(); + EXPECT_TRUE(fa.has_value()); + fa.value()->AppendFile(file); + EXPECT_THAT(fa.value()->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + auto snap = table_->current_snapshot(); + EXPECT_TRUE(snap.has_value()); + return snap.value(); + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +// ===================================================================================== +// 9.2 Entry-point and builder-method tests +// ===================================================================================== + +// Req 1.1: Table::NewOverwrite() returns a valid builder for a standalone operation. +TEST_F(OverwriteFilesTest, TableNewOverwriteReturnsBuilder) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + ASSERT_NE(op, nullptr); +} + +// Req 1.2, 1.3: Transaction::NewOverwrite() returns a valid builder registered with the +// transaction (commit deferred until the transaction commits). +TEST_F(OverwriteFilesTest, TransactionNewOverwriteReturnsBuilder) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, Transaction::Make(table_, TransactionKind::kUpdate)); + ICEBERG_UNWRAP_OR_FAIL(auto op, txn->NewOverwrite()); + ASSERT_NE(op, nullptr); + + (*op).OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))).AddFile(file_a_); + + // Within a transaction, the builder is staged by its own Commit() before the + // transaction is committed. + EXPECT_THAT(op->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto committed, txn->Commit()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +// Builder methods all return *this and chain (Req 2.1-2.3, 3.1, 4.1-4.2, 6.1, 6.3, 6.4). +TEST_F(OverwriteFilesTest, BuilderMethodsReturnSelfAndChain) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + auto* self = op.get(); + + EXPECT_EQ(&op->AddFile(file_a_), self); + EXPECT_EQ(&op->DeleteFile(file_b_), self); + EXPECT_EQ(&op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))), self); + EXPECT_EQ(&op->ValidateFromSnapshot(42), self); + EXPECT_EQ(&op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(1L))), + self); + EXPECT_EQ(&op->ValidateNoConflictingData(), self); + EXPECT_EQ(&op->ValidateNoConflictingDeletes(), self); + EXPECT_EQ(&op->ValidateAddedFilesMatchOverwriteFilter(), self); + EXPECT_EQ(&op->WithCaseSensitivity(false), self); + + // A single fluent chain compiles and returns the same instance. + OverwriteFiles& chained = + (*op) + .AddFile(MakeDataFile("/data/chain.parquet", 1L)) + .OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))) + .ValidateNoConflictingData(); + EXPECT_EQ(&chained, self); +} + +// ===================================================================================== +// 9.3 operation() truth-table tests (Req 5.1-5.4) +// ===================================================================================== + +TEST_F(OverwriteFilesTest, OperationAddOnlyIsAppend) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file_a_); + EXPECT_EQ(op->operation(), DataOperation::kAppend); +} + +TEST_F(OverwriteFilesTest, OperationDeleteOnlyIsDelete) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + EXPECT_EQ(op->operation(), DataOperation::kDelete); +} + +TEST_F(OverwriteFilesTest, OperationAddAndDeleteIsOverwrite) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(file_b_); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); +} + +TEST_F(OverwriteFilesTest, OperationPureRowFilterAndAddIsOverwrite) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(file_b_); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); +} + +TEST_F(OverwriteFilesTest, OperationNeitherIsOverwrite) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); +} + +// ===================================================================================== +// 9.4 Commit-path and snapshot-control tests (Req 11.2-11.5) +// ===================================================================================== + +// DeleteFile + AddFile commits as overwrite and the recorded operation matches +// operation() (Req 11.4). +TEST_F(OverwriteFilesTest, CommitDeleteAndAddIsOverwrite) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(file_b_); + const std::string expected_operation = op->operation(); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), expected_operation); + EXPECT_EQ(expected_operation, DataOperation::kOverwrite); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); +} + +// OverwriteByRowFilter + AddFile commits as overwrite. +TEST_F(OverwriteFilesTest, CommitRowFilterAndAddIsOverwrite) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/new_x1.parquet", 1L)); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +// An empty overwrite (no adds, no deletes) commits and records the overwrite operation. +TEST_F(OverwriteFilesTest, CommitEmptyOverwrite) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +// Duplicate AddFile / DeleteFile are deduplicated by the underlying set types (Req 3.7). +TEST_F(OverwriteFilesTest, CommitDeduplicatesDuplicateAddAndDelete) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + auto add = MakeDataFile("/data/dup_add.parquet", 1L); + op->DeleteFile(file_a_); + op->DeleteFile(file_a_); // duplicate delete + op->AddFile(add); + op->AddFile(add); // duplicate add + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); +} + +// StageOnly commits the snapshot without advancing the target branch (Req 11.2). +TEST_F(OverwriteFilesTest, CommitStageOnlyDoesNotAdvanceCurrentSnapshot) { + const int64_t base_snapshot_id = CommitFileA(); + const size_t base_snapshot_count = table_->metadata()->snapshots.size(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->StageOnly(); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + // The staged snapshot is recorded but the main branch still points at file_a's + // snapshot. + ICEBERG_UNWRAP_OR_FAIL(auto current, table_->current_snapshot()); + EXPECT_EQ(current->snapshot_id, base_snapshot_id); + EXPECT_GT(table_->metadata()->snapshots.size(), base_snapshot_count); +} + +// SetTargetBranch commits to a named branch (Req 11.2). +TEST_F(OverwriteFilesTest, CommitToTargetBranch) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->SetTargetBranch("audit"); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + EXPECT_TRUE(table_->metadata()->refs.contains("audit")); +} + +// A custom Set(property, value) is carried into the committed snapshot summary +// (Req 11.2). +TEST_F(OverwriteFilesTest, CommitCustomSummaryProperty) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->Set("custom-prop", "custom-value"); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("custom-prop"), "custom-value"); +} + +// ===================================================================================== +// 9.5 Bulk DeleteFiles tests (Req 3.3-3.7) +// ===================================================================================== + +// A DataFileSet + DeleteFileSet forwards data files to DeleteDataFile and delete files +// to DeleteDeleteFile; the committed snapshot reflects the data-file removal. (The +// delete file is forwarded to DeleteDeleteFile; with no matching committed delete file +// present its removal is a harmless no-op, mirroring the inherited missing-delete +// behavior.) +TEST_F(OverwriteFilesTest, BulkDeleteFilesRemovesDataAndDeleteFiles) { + // Seed the table with a data file. + { + ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); + seed->AddFile(file_a_); + EXPECT_THAT(seed->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L); + + // Build the bulk delete: remove file_a (data) plus del_file (delete file). + DataFileSet data_files; + data_files.insert(file_a_); + DeleteFileSet delete_files; + delete_files.insert(del_file); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(data_files, delete_files); + op->AddFile(file_b_); + // Both a data file deletion and an add => overwrite. + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); +} + +// Empty sets are a no-op: with no adds or deletes the builder is a bare overwrite. +TEST_F(OverwriteFilesTest, BulkDeleteFilesEmptySetsAreNoOp) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(DataFileSet{}, DeleteFileSet{}); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); // neither adds nor deletes +} + +// A DataFileSet portion of DeleteFiles records data files such that DeletesDataFiles +// becomes true (observed via operation() == delete), equivalent to repeated DeleteFile. +TEST_F(OverwriteFilesTest, BulkDeleteFilesDataPortionMarksDelete) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + DataFileSet data_files; + data_files.insert(file_a_); + data_files.insert(file_b_); + op->DeleteFiles(data_files, DeleteFileSet{}); + EXPECT_EQ(op->operation(), DataOperation::kDelete); +} + +// DeleteFiles is equivalent to repeated DeleteFile for the data-file portion: both +// classify as delete and (after committing against a seeded table) remove the files. +TEST_F(OverwriteFilesTest, BulkDeleteFilesEquivalentToRepeatedDeleteFile) { + // Seed file_a and file_b. + { + ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); + seed->AddFile(file_a_); + seed->AddFile(file_b_); + EXPECT_THAT(seed->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + DataFileSet data_files; + data_files.insert(file_a_); + data_files.insert(file_b_); + op->DeleteFiles(data_files, DeleteFileSet{}); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "2"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "0"); +} + +// ===================================================================================== +// 9.6 Concurrency-validation tests (Req 8.2, 8.3, 9.2-9.5; Properties 6, 7) +// ===================================================================================== + +// ValidateNoConflictingData: a competing FastAppend that added a data file matching the +// resolved conflict-detection filter makes the overwrite commit fail (Req 8.3). +TEST_F(OverwriteFilesTest, ValidateNoConflictingDataDetectsConflictingAdd) { + const int64_t first_id = CommitFileA(); + // Competing append of file_b (partition x=2) between read and commit. + CommitFastAppend(file_b_); + + // The overwrite targets the x=2 range, so the concurrent add of file_b conflicts. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(2L))); + op->AddFile(MakeDataFile("/data/replacement_x2.parquet", 2L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// A non-conflicting concurrent change still commits, and the recorded operation matches +// operation() (Req 11.4, 11.5; Property 6). +TEST_F(OverwriteFilesTest, ValidateNoConflictingDataAllowsNonConflictingChange) { + const int64_t first_id = CommitFileA(); + // Competing append of file_b in partition x=2. + CommitFastAppend(file_b_); + + // The overwrite targets the x=1 range; the concurrent x=2 add does not conflict. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/replacement_x1.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + op->ValidateNoConflictingDeletes(); + const std::string expected_operation = op->operation(); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), expected_operation); +} + +// ValidateNoConflictingDeletes: a competing snapshot that deleted a data file in the +// overwrite range makes the commit fail (Req 9.2-9.4). +TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesDetectsConflictingDelete) { + const int64_t first_id = CommitFileA(); + + // Competing overwrite removes file_a (partition x=1) between read and commit. + { + ICEBERG_UNWRAP_OR_FAIL(auto competing, NewOverwrite()); + competing->DeleteFile(file_a_); + EXPECT_THAT(competing->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/replacement_after_delete.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// ValidateNoConflictingDeletes allows a non-conflicting concurrent append to commit. +TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesAllowsNonConflictingChange) { + const int64_t first_id = CommitFileA(); + // Competing append in partition x=2 (no deletes in the x=1 range). + CommitFastAppend(file_b_); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/replacement_no_conflict.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Commit(), IsOk()); +} + +// ===================================================================================== +// Fix #1: the explicit replaced-files delete branch (Path B) honors the +// ConflictDetectionFilter. +// +// Path B fires when explicit data files were registered for replacement +// (deleted_data_files_ non-empty) under ValidateNoConflictingDeletes(). It now calls the +// STATIC data_filter overload of ValidateNoNewDeletesForDataFiles, passing the raw +// conflict_detection_filter_ (which may be nullptr = "no filter, consider all delete +// files"). These tests inject a concurrent equality-delete file that +// covers the replaced data file (file_a, partition x=1) and assert that: +// * with NO conflict filter, the concurrent delete is a conflict => FAIL; +// * with a conflict filter that does NOT cover x=1 (here x=2), the delete is filtered +// out of the conflict scope => SUCCEED. +// +// To exercise Path B in isolation, the builder uses explicit DeleteFile (no +// OverwriteByRowFilter), so RowFilter() stays AlwaysFalse and Path A is skipped. +// ===================================================================================== + +// No conflict filter => the concurrent delete on the replaced file is detected (Path B +// passes nullptr through, so all delete files are considered). +TEST_F(OverwriteFilesTest, PathBExplicitDeletesDetectsConcurrentDeleteWithoutFilter) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + + // Concurrent commit adds an equality delete covering file_a (partition x=1). + auto concurrent = InjectConcurrentEqualityDelete( + first_snapshot, "/delete/concurrent_x1.parquet", /*partition_x=*/1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); // explicit replacement => deleted_data_files_ non-empty + op->AddFile(MakeDataFile("/data/rewrite_x1.parquet", 1L)); + op->ValidateFromSnapshot(first_snapshot->snapshot_id); + op->ValidateNoConflictingDeletes(); + // No ConflictDetectionFilter => Path B considers all concurrent deletes => conflict. + EXPECT_THAT(op->Validate(*concurrent.metadata, concurrent.snapshot), + IsError(ErrorKind::kValidationFailed)); +} + +// A conflict filter that does not cover the replaced file's partition narrows the scope, +// so the concurrent delete is filtered out and validation succeeds. +TEST_F(OverwriteFilesTest, PathBExplicitDeletesConflictFilterNarrowsScope) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + + // Same concurrent equality delete covering file_a (partition x=1). + auto concurrent = InjectConcurrentEqualityDelete( + first_snapshot, "/delete/concurrent_x1.parquet", /*partition_x=*/1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(MakeDataFile("/data/rewrite_x1.parquet", 1L)); + op->ValidateFromSnapshot(first_snapshot->snapshot_id); + // Conflict filter targets x=2, which does NOT cover the x=1 delete => filtered out. + op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(2L))); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Validate(*concurrent.metadata, concurrent.snapshot), IsOk()); +} + +// ===================================================================================== +// 9.7 Strict added-file range validation tests (Req 10.1-10.6; Properties 9, 10) +// +// These exercise OverwriteFiles::Validate(...) directly (the same entry point the commit +// kernel invokes), which is sufficient and deterministic: the strict-range branch does +// not depend on concurrent snapshots. +// ===================================================================================== + +// Strict partition projection proves containment directly (Req 10.3). +TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictProjection) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/in_partition.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); +} + +// Strict partition projection is insufficient (filter on a non-partition column) but the +// StrictMetricsEvaluator proves containment from the file's bounds (Req 10.3). +TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictMetrics) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); + // y bounds [5, 5] => every row has y == 5, fully contained in the filter. + op->AddFile(MakeDataFileWithYBounds("/data/y_eq_5.parquet", 1L, 5L, 5L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); +} + +// Neither the strict projection nor the metrics can prove containment => fail (Req 10.5). +TEST_F(OverwriteFilesTest, StrictRangeRejectedWhenNotProvable) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); + // y bounds [1, 10] => not all rows are y == 5. + op->AddFile(MakeDataFileWithYBounds("/data/y_range.parquet", 1L, 1L, 10L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kValidationFailed)); +} + +// A file whose partition falls outside the inclusive projection is rejected (Req 10.4). +TEST_F(OverwriteFilesTest, StrictRangeRejectsFileOutsidePartitionRange) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/wrong_partition.parquet", /*partition_x=*/2L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kValidationFailed)); +} + +// ValidateAddedFilesMatchOverwriteFilter without a row filter fails (Req 10.1, 10.2; +// Property 10). +TEST_F(OverwriteFilesTest, StrictRangeRequiresRowFilter) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(MakeDataFile("/data/no_filter.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kValidationFailed)); +} + +// Fix #2: added files belonging to MORE THAN ONE partition spec are rejected, since the +// validation resolves a single spec via DataSpec() (which requires exactly one spec among +// added files). DataSpec() fails fast with a multi-spec error. +TEST_F(OverwriteFilesTest, StrictRangeRejectsMultiplePartitionSpecs) { + // Add a second partition spec (id 1) to the table metadata BEFORE creating the builder, + // so the producer's base metadata can resolve both specs when files are staged. + ICEBERG_UNWRAP_OR_FAIL( + auto spec1, PartitionSpec::Make(*schema_, /*spec_id=*/1, + {PartitionField(/*source_id=*/1, /*field_id=*/1001, + "x_v1", Transform::Identity())}, + /*allow_missing_fields=*/false)); + table_->metadata()->partition_specs.push_back( + std::shared_ptr(std::move(spec1))); + // Confirm both specs resolve. + ASSERT_THAT(table_->metadata()->PartitionSpecById(0), IsOk()); + ASSERT_THAT(table_->metadata()->PartitionSpecById(1), IsOk()); + + auto file_spec0 = MakeDataFile("/data/spec0_x1.parquet", 1L); // partition_spec_id 0 + auto file_spec1 = MakeDataFile("/data/spec1_x1.parquet", 1L); + file_spec1->partition_spec_id = 1; + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(file_spec0); + op->AddFile(file_spec1); + op->ValidateAddedFilesMatchOverwriteFilter(); + // DataSpec() rejects the two distinct specs with an InvalidArgument error. + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kInvalidArgument)); +} + +// Fix #3: enabling the strict added-file range validation with a row filter set but NO +// added data files (e.g. a pure overwrite-by-filter with no AddFile) fails, because +// DataSpec() rejects an empty added-files set. +TEST_F(OverwriteFilesTest, StrictRangeRejectsEmptyAddedFiles) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + // Row filter is set (precondition satisfied) but no AddFile was called. + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->ValidateAddedFilesMatchOverwriteFilter(); + // DataSpec() raises InvalidArgument because no data file was added. + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kInvalidArgument)); +} + +// ===================================================================================== +// 9.8 Case-sensitivity and null-rejection tests (Req 6.4, 12.1-12.4; Property 4) +// ===================================================================================== + +// Case-insensitive binding accepts a differently-cased column name where the +// case-sensitive (default) binding rejects it. Observed through the strict-range +// validation, which binds the row filter using the configured case sensitivity. +TEST_F(OverwriteFilesTest, CaseSensitivityAffectsFilterBinding) { + // Case-sensitive (default): the filter references "X" which does not match column "x". + { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("X", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/cs.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + // Binding "X" against schema {x, y, z} fails case-sensitively. + auto result = op->Validate(*table_->metadata(), /*snapshot=*/nullptr); + EXPECT_FALSE(result.has_value()); + } + // Case-insensitive: "X" binds to column "x" and the in-range file validates. + { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->WithCaseSensitivity(false); + op->OverwriteByRowFilter(Expressions::Equal("X", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/ci.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); + } +} + +// Null arguments to the pointer-taking builder methods surface at Commit() without +// crashing (Req 12.1, 12.2; Property 4). The ErrorCollector aggregates deferred builder +// errors and surfaces them as a kValidationFailed error at Commit() that preserves the +// underlying invalid-argument message. +TEST_F(OverwriteFilesTest, NullAddFileRejectedAtCommit) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); +} + +TEST_F(OverwriteFilesTest, NullDeleteFileRejectedAtCommit) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); +} + +TEST_F(OverwriteFilesTest, NullOverwriteByRowFilterRejectedAtCommit) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid row filter expression: null")); +} + +TEST_F(OverwriteFilesTest, NullConflictDetectionFilterRejectedAtCommit) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->ConflictDetectionFilter(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid conflict detection filter: null")); +} + +// The builder chain continues without crashing after a null argument is recorded. +TEST_F(OverwriteFilesTest, NullArgumentDoesNotCrashBuilderChain) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + (*op).AddFile(nullptr).AddFile(file_a_).OverwriteByRowFilter( + Expressions::Equal("x", Literal::Long(1L))); + // The recorded error still surfaces at commit. + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); +} + +// A null element cannot enter a DataFileSet / DeleteFileSet (insert() rejects nullptr), +// so DeleteFiles({null...}, {null...}) is a no-op rather than an error. The deferred +// null-element rejection inside DeleteFiles is therefore a defensive guard that is +// unreachable through the public set API; this test documents that observable behavior. +TEST_F(OverwriteFilesTest, DeleteFilesNullElementsCannotEnterSets) { + DataFileSet data_files; + data_files.insert(std::shared_ptr{nullptr}); + DeleteFileSet delete_files; + delete_files.insert(std::shared_ptr{nullptr}); + EXPECT_TRUE(data_files.empty()); + EXPECT_TRUE(delete_files.empty()); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(data_files, delete_files); + // No data files were deleted => still a bare overwrite, and commit succeeds. + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); + EXPECT_THAT(op->Commit(), IsOk()); +} + +// ValidateFromSnapshot accepts a non-negative id (0 is in the generated id range +// [0, INT64_MAX]) and rejects a negative id (including kInvalidSnapshotId == -1) as a +// deferred error surfaced at Commit(). Req 6.1. +TEST_F(OverwriteFilesTest, ValidateFromSnapshotRejectsNegativeSnapshotId) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file_a_).ValidateFromSnapshot(-1); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid snapshot id")); +} + +TEST_F(OverwriteFilesTest, ValidateFromSnapshotAcceptsZeroSnapshotId) { + // 0 is a legal generated snapshot id (the generator masks with int64_t::max()), so it + // must not be rejected at the builder stage. With no concurrency validation enabled, + // the starting id is merely recorded and the commit succeeds. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file_a_).ValidateFromSnapshot(0); + EXPECT_THAT(op->Commit(), IsOk()); +} + +// ===================================================================================== +// Property-style tests (parameterized via loops; no PBT library is available). +// ===================================================================================== + +// Property 2 (builder forwarding) + Property 3 (delete dual-tracking), Task 2.4. +// +// AddedDataFiles() / deleted_data_files_ are not publicly observable, so the properties +// are checked indirectly through operation(): for any non-null file, AddFile-only yields +// `append` (the file was added and the row filter is untouched), while DeleteFile-only +// and DeleteFiles-only (data portion) yield `delete` (the file was registered for +// deletion and DeletesDataFiles() became true). Validates Req 2.1, 2.2, 3.1-3.5. +TEST_F(OverwriteFilesTest, PropertyBuilderForwardingAndDualTracking) { + const std::vector> files = { + MakeDataFile("/data/p0.parquet", 1L), + MakeDataFile("/data/p1.parquet", 2L), + MakeDataFile("/data/p2.parquet", 3L), + MakeDataFile("/data/p3.parquet", 7L), + }; + + for (const auto& file : files) { + { + // AddFile preserves "no deletes" => append, proving the row filter is unchanged. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file); + EXPECT_EQ(op->operation(), DataOperation::kAppend) << file->file_path; + } + { + // DeleteFile dual-tracks => DeletesDataFiles() true, no adds => delete. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file); + EXPECT_EQ(op->operation(), DataOperation::kDelete) << file->file_path; + } + { + // Bulk DeleteFiles data portion behaves like repeated DeleteFile. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + DataFileSet data_files; + data_files.insert(file); + op->DeleteFiles(data_files, DeleteFileSet{}); + EXPECT_EQ(op->operation(), DataOperation::kDelete) << file->file_path; + } + } +} + +// Property 4 (null rejection), Task 2.5: every pointer-taking builder mutator records a +// deferred error that surfaces as an InvalidArgument-class error at Commit() without +// crashing. Validates Req 12.1, 12.2, 12.3, 12.4. +TEST_F(OverwriteFilesTest, PropertyNullArgumentRejection) { + using Mutator = std::function; + const std::vector mutators = { + [](OverwriteFiles& op) { op.AddFile(nullptr); }, + [](OverwriteFiles& op) { op.DeleteFile(nullptr); }, + [](OverwriteFiles& op) { op.OverwriteByRowFilter(nullptr); }, + [](OverwriteFiles& op) { op.ConflictDetectionFilter(nullptr); }, + }; + + for (const auto& mutate : mutators) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + mutate(*op); + // Deferred builder errors are aggregated and surfaced as kValidationFailed at commit. + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); + } +} + +// Property 1 (operation() reflects content), Task 3.2: exhaustive truth table over +// {adds, deletes (explicit or via row filter)}. Validates Req 5.1-5.4. +TEST_F(OverwriteFilesTest, PropertyOperationTruthTable) { + struct Case { + bool add; + bool delete_file; + bool row_filter; + std::string expected; + }; + const std::vector cases = { + {/*add=*/true, /*delete_file=*/false, /*row_filter=*/false, DataOperation::kAppend}, + {false, true, false, DataOperation::kDelete}, + {false, false, true, DataOperation::kDelete}, // row filter counts as a delete + {true, true, false, DataOperation::kOverwrite}, + {true, false, true, DataOperation::kOverwrite}, + {false, true, true, DataOperation::kDelete}, // deletes only (no adds) => delete + {false, false, false, DataOperation::kOverwrite}, // neither + }; + + int index = 0; + for (const auto& c : cases) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + if (c.add) { + op->AddFile(MakeDataFile("/data/tt_add" + std::to_string(index) + ".parquet", 1L)); + } + if (c.delete_file) { + op->DeleteFile( + MakeDataFile("/data/tt_del" + std::to_string(index) + ".parquet", 1L)); + } + if (c.row_filter) { + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + } + EXPECT_EQ(op->operation(), c.expected) << "case index " << index; + ++index; + } +} + +// Property 5 (conflict filter resolution), Task 4.4. +// +// DataConflictDetectionFilter() is private, so its three resolution outcomes are observed +// indirectly through ValidateNoConflictingData against a competing concurrent add of +// file_b (partition x=2): +// * explicit filter set -> the explicit filter is used (overrides the row filter); +// * row filter only -> the row filter is used; +// * file-replacement present -> AlwaysTrue (any concurrent add conflicts). +// Validates Req 7.1, 7.2, 7.3. +TEST_F(OverwriteFilesTest, PropertyConflictFilterResolution) { + // Resolution case 2 (row filter only): row filter x=1 does NOT match the concurrent + // x=2 add -> no conflict. + { + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/r2_ok.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), + IsOk()); + } + // Resolution case 2 (row filter only): row filter x=2 DOES match -> conflict, proving + // the row filter (not AlwaysFalse / AlwaysTrue blindly) is what was used. + { + SetUp(); // fresh table + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(2L))); + op->AddFile(MakeDataFile("/data/r2_conflict.parquet", 2L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), + IsError(ErrorKind::kValidationFailed)); + } + // Resolution case 1 (explicit filter overrides row filter): row filter x=1 would NOT + // conflict, but the explicit conflict filter x=2 does -> conflict. + { + SetUp(); + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(2L))); + op->AddFile(MakeDataFile("/data/r1.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), + IsError(ErrorKind::kValidationFailed)); + } + // Resolution case 3 (file replacement -> AlwaysTrue): with an explicit DeleteFile and + // no explicit conflict filter, ANY concurrent add conflicts (here file_b in x=2, even + // though the row filter is x=1). + { + SetUp(); + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->DeleteFile(file_a_); // makes deleted_data_files_ non-empty => AlwaysTrue + op->AddFile(MakeDataFile("/data/r3.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), + IsError(ErrorKind::kValidationFailed)); + } +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 049b0f49d..c28c7b554 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -34,6 +34,7 @@ #include "iceberg/table_update.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" +#include "iceberg/update/overwrite_files.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -478,6 +479,13 @@ Result> Transaction::NewFastAppend() { return fast_append; } +Result> Transaction::NewOverwrite() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr overwrite, + OverwriteFiles::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(overwrite)); + return overwrite; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 60fe935f3..af6897567 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -106,6 +106,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewFastAppend(); + /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. + Result> NewOverwrite(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 745c63acb..ff89d71db 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -223,6 +223,7 @@ class TransactionContext; /// \brief Update family. class ExpireSnapshots; class FastAppend; +class OverwriteFiles; class PendingUpdate; class SetSnapshot; class SnapshotManager; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 6405f603f..b8c06465f 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -20,6 +20,7 @@ install_headers( 'expire_snapshots.h', 'fast_append.h', 'merging_snapshot_update.h', + 'overwrite_files.h', 'pending_update.h', 'set_snapshot.h', 'snapshot_manager.h', diff --git a/src/iceberg/update/overwrite_files.cc b/src/iceberg/update/overwrite_files.cc new file mode 100644 index 000000000..165d43005 --- /dev/null +++ b/src/iceberg/update/overwrite_files.cc @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/overwrite_files.h" + +#include + +#include "iceberg/expression/binder.h" +#include "iceberg/expression/evaluator.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/projections.h" +#include "iceberg/expression/strict_metrics_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/type.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> OverwriteFiles::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create OverwriteFiles without a context"); + return std::shared_ptr( + new OverwriteFiles(std::move(table_name), std::move(ctx))); +} + +OverwriteFiles::OverwriteFiles(std::string table_name, + std::shared_ptr ctx) + : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {} + +OverwriteFiles::~OverwriteFiles() = default; + +OverwriteFiles& OverwriteFiles::AddFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + // Forward to the inherited staging path. Any error (e.g. missing partition spec + // ID) is captured by the ErrorCollector and surfaced at Commit() rather than + // dropped. RowFilter() and deleted_data_files_ are intentionally left unchanged. + ICEBERG_BUILDER_RETURN_IF_ERROR(AddDataFile(file)); + return *this; +} + +OverwriteFiles& OverwriteFiles::DeleteFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + // Dual-track: record the file for delete-conflict validation AND register it for + // removal via the inherited pipeline. + deleted_data_files_.insert(file); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + return *this; +} + +OverwriteFiles& OverwriteFiles::DeleteFiles(const DataFileSet& data_files_to_delete, + const DeleteFileSet& delete_files_to_delete) { + // Bulk equivalent of repeated DeleteFile(...) plus explicit delete-file removal. Empty + // sets are no-ops; the set types handle deduplication of repeated entries. + for (const auto& file : data_files_to_delete) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + // Dual-track: record for delete-conflict validation AND register for removal. + deleted_data_files_.insert(file); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + } + for (const auto& file : delete_files_to_delete) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid delete file: null"); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDeleteFile(file)); + } + return *this; +} + +OverwriteFiles& OverwriteFiles::OverwriteByRowFilter(std::shared_ptr expr) { + ICEBERG_BUILDER_CHECK(expr != nullptr, "Invalid row filter expression: null"); + // Forward to the inherited filter path: this sets RowFilter() and forwards the + // expression to both the data and delete filter managers. Any error is captured by the + // ErrorCollector and surfaced at Commit(). + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteByRowFilter(std::move(expr))); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateFromSnapshot(int64_t snapshot_id) { + // Snapshot ids are non-negative: SnapshotUtil::GenerateSnapshotId() masks with + // int64_t::max() and so yields a value in [0, INT64_MAX], while kInvalidSnapshotId is + // -1. A negative id therefore can never identify a real snapshot and is rejected via + // the deferred ErrorCollector (surfaced at Commit()); 0 is accepted because it is in + // the generator's range. A negative id is always a misuse, so we keep an early, + // clearer guard against the obviously-invalid sentinel/negative values. + ICEBERG_BUILDER_CHECK(snapshot_id >= 0, "Invalid snapshot id: {}", snapshot_id); + starting_snapshot_id_ = snapshot_id; + return *this; +} + +OverwriteFiles& OverwriteFiles::ConflictDetectionFilter( + std::shared_ptr expr) { + ICEBERG_BUILDER_CHECK(expr != nullptr, "Invalid conflict detection filter: null"); + conflict_detection_filter_ = std::move(expr); + return *this; +} + +OverwriteFiles& OverwriteFiles::WithCaseSensitivity(bool case_sensitive) { + // Forward to the protected MergingSnapshotUpdate::CaseSensitive(bool) setter (which + // returns void); the public name differs to avoid the public/protected name clash and + // keep a fluent return type. + CaseSensitive(case_sensitive); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateNoConflictingData() { + // Enable concurrent-data validation and require every explicitly-deleted old file to + // be hit during manifest filtering. + validate_no_conflicting_data_ = true; + FailMissingDeletePaths(); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateNoConflictingDeletes() { + // Enable concurrent-delete validation and require every explicitly-deleted old file to + // be hit during manifest filtering. + validate_no_conflicting_deletes_ = true; + FailMissingDeletePaths(); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateAddedFilesMatchOverwriteFilter() { + // Enable strict validation that every added data file is fully contained in the + // overwrite range. The precondition (a row filter must be set) is enforced in + // Validate(...). + validate_added_files_match_overwrite_filter_ = true; + return *this; +} + +// Classify the snapshot operation from the current builder content. DeletesDataFiles() +// is true for both an explicit DeleteFile(...) and a pure OverwriteByRowFilter(...) (the +// latter via ManifestFilterManager::ContainsDeletes()), so OverwriteByRowFilter + AddFile +// correctly classifies as overwrite without any RowFilter()-based fallback. +std::string OverwriteFiles::operation() { + if (DeletesDataFiles() && !AddsDataFiles()) { + return DataOperation::kDelete; + } + if (AddsDataFiles() && !DeletesDataFiles()) { + return DataOperation::kAppend; + } + return DataOperation::kOverwrite; +} + +// Select the conflict-detection filter. Precedence: (1) the explicitly-set +// conflict_detection_filter_; otherwise +// (2) the row filter when it is set and not AlwaysFalse and no explicit data files were +// registered for deletion (the pure overwriteByRowFilter case, where the row filter +// already describes the conflicting range); otherwise (3) the conservative AlwaysTrue, +// under which any newly-added data file counts as a conflict (file-replacement or mixed +// mode). RowFilter() defaults to the AlwaysFalse singleton when unset, so comparing +// against the Expressions::AlwaysFalse() singleton by pointer identity (consistent with +// how the filter managers track delete_expr_) correctly treats the unset case as "no +// row filter". +std::shared_ptr OverwriteFiles::DataConflictDetectionFilter() const { + if (conflict_detection_filter_ != nullptr) { + return conflict_detection_filter_; + } + if (RowFilter() != nullptr && RowFilter() != Expressions::AlwaysFalse() && + deleted_data_files_.empty()) { + return RowFilter(); + } + return Expressions::AlwaysTrue(); +} + +// Run the enabled overwrite-specific concurrency checks before commit. The branches are +// sequential and independent. The first failing check aborts the commit; on success we +// return OK and let the inherited Apply(...) build the snapshot. +Status OverwriteFiles::Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + // 0. Strict added-file range precondition: when + // ValidateAddedFilesMatchOverwriteFilter() + // is enabled, a row filter is required to define the overwrite range. RowFilter() + // defaults to the AlwaysFalse singleton when unset, so an unset or AlwaysFalse + // filter means there is no range to validate against and the commit must fail. + if (validate_added_files_match_overwrite_filter_) { + if (RowFilter() == nullptr || RowFilter() == Expressions::AlwaysFalse()) { + return ValidationFailed( + "Cannot validate added files match overwrite filter: row filter is not set"); + } + ICEBERG_RETURN_UNEXPECTED( + ValidateAddedFilesMatchOverwriteFilterImpl(current_metadata)); + } + + // 1. Concurrent newly-added data files: fail if any snapshot after the starting point + // added a data file matching the resolved conflict-detection filter. + if (validate_no_conflicting_data_) { + ICEBERG_RETURN_UNEXPECTED(ValidateAddedDataFiles( + current_metadata, starting_snapshot_id_, DataConflictDetectionFilter(), snapshot, + ctx_->table->io(), IsCaseSensitive())); + } + + // 2. Concurrent deletes: the two sub-checks are independent and both may fire. + if (validate_no_conflicting_deletes_) { + // Path A: a row filter is in play (set and not the AlwaysFalse singleton). Fail if a + // new delete file matches the range, or if a data file in the range was concurrently + // removed. Prefer the explicit conflict_detection_filter_ when set, else the row + // filter that describes the overwrite range. + if (RowFilter() != nullptr && RowFilter() != Expressions::AlwaysFalse()) { + auto delete_filter = conflict_detection_filter_ != nullptr + ? conflict_detection_filter_ + : RowFilter(); + ICEBERG_RETURN_UNEXPECTED( + ValidateNoNewDeleteFiles(current_metadata, starting_snapshot_id_, delete_filter, + snapshot, ctx_->table->io(), IsCaseSensitive())); + ICEBERG_RETURN_UNEXPECTED( + ValidateDeletedDataFiles(current_metadata, starting_snapshot_id_, delete_filter, + snapshot, ctx_->table->io(), IsCaseSensitive())); + } + + // Path B: explicit old data files were registered for replacement. Fail if a new + // delete file matching the conflict-detection filter covers any of them. Use the + // STATIC data_filter overload, passing the raw conflict_detection_filter_ (which may + // be nullptr — a nullptr filter means no filter, so all delete files are considered). + // The 3rd argument is an Expression, so this overload is unambiguous and needs no + // member-function-pointer cast. + if (!deleted_data_files_.empty()) { + ICEBERG_RETURN_UNEXPECTED(ValidateNoNewDeletesForDataFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, + deleted_data_files_, snapshot, ctx_->table->io(), IsCaseSensitive())); + } + } + + return {}; +} + +// Every added data file must be fully contained in the overwrite range defined by +// RowFilter(). The validation resolves a single partition spec via DataSpec() and then, +// for each added file, an inclusive partition projection provides a fast negative (reject +// when the file's partition cannot possibly fall in range), a strict partition projection +// provides a fast positive (accept when the whole partition is guaranteed in range), and +// a file-level StrictMetricsEvaluator provides the fallback proof. Metrics that are +// missing or insufficient yield a non-matching result and therefore fail validation +// (conservative: never silently accept an unprovable file). +// +// DataSpec() returns InvalidArgument when zero data files were added (empty-added-files +// is rejected) and when more than one partition spec is represented among the added files +// (multi-spec is rejected). DataSpec() reads the spec from the producer's base metadata; +// the projections use the table schema from metadata.Schema(). +// +// Implementation notes: +// * `Evaluator::Make` takes a `Schema`, and the partition type is obtained via +// `PartitionSpec::PartitionType(schema)` (which needs the table schema and returns a +// `StructType`). We therefore wrap the partition `StructType`'s fields in a `Schema` +// and bind the projected expression against it. +// * `StrictMetricsEvaluator::Make` binds its expression internally, and `Binder::Bind` +// rejects an already-bound predicate. So the bound filter is used only for the +// projections (`ProjectionEvaluator::Project` requires a bound expression), while the +// UNBOUND `RowFilter()` is handed to `StrictMetricsEvaluator::Make`. +Status OverwriteFiles::ValidateAddedFilesMatchOverwriteFilterImpl( + const TableMetadata& metadata) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + + // Single spec for all added files. This rejects both the empty-added-files case and + // the multi-spec case via InvalidArgument. + ICEBERG_ASSIGN_OR_RAISE(auto spec, DataSpec()); + + // Bind the row filter once for projection (Project requires a bound expression); build + // the strict metrics evaluator once over the UNBOUND row filter (it binds internally). + ICEBERG_ASSIGN_OR_RAISE(auto bound_filter, + Binder::Bind(*schema, RowFilter(), IsCaseSensitive())); + ICEBERG_ASSIGN_OR_RAISE( + auto strict_metrics_evaluator, + StrictMetricsEvaluator::Make(RowFilter(), schema, IsCaseSensitive())); + + // Build ONE inclusive and ONE strict partition-value evaluator from the single spec. + // The Evaluators bind on construction and do not retain the partition schema, so the + // local schema below may safely go out of scope. + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema)); + auto partition_fields = partition_type->fields(); + Schema partition_schema( + std::vector(partition_fields.begin(), partition_fields.end())); + + // Inclusive projection (fast negative): project the bound filter into the partition + // space and build an evaluator over the spec's partition values. + auto inclusive_projection = Projections::Inclusive(*spec, *schema, IsCaseSensitive()); + ICEBERG_ASSIGN_OR_RAISE(auto inclusive_expr, + inclusive_projection->Project(bound_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto inclusive_evaluator, + Evaluator::Make(partition_schema, inclusive_expr, IsCaseSensitive())); + + // Strict projection (fast positive). + auto strict_projection = Projections::Strict(*spec, *schema, IsCaseSensitive()); + ICEBERG_ASSIGN_OR_RAISE(auto strict_expr, strict_projection->Project(bound_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto strict_evaluator, + Evaluator::Make(partition_schema, strict_expr, IsCaseSensitive())); + + for (const auto& file : AddedDataFiles()) { + if (file == nullptr) { + return ValidationFailed( + "Cannot validate added files match overwrite filter: null data file"); + } + + // Step 1: inclusive projection — fast negative. If the file's partition cannot match + // the (inclusively projected) filter, the file is definitively outside the range. + ICEBERG_ASSIGN_OR_RAISE(bool inclusive_match, + inclusive_evaluator->Evaluate(file->partition)); + if (!inclusive_match) { + return ValidationFailed( + "Cannot commit file {}: added file does not match overwrite filter (outside " + "the overwrite range)", + file->file_path); + } + + // Step 2: strict projection — fast positive. If the whole partition is guaranteed in + // range, the file is accepted without inspecting metrics. + ICEBERG_ASSIGN_OR_RAISE(bool strict_match, + strict_evaluator->Evaluate(file->partition)); + if (strict_match) { + continue; + } + + // Step 3: file-level proof via strict metrics. A false result — including the case + // where metrics are missing or insufficient — fails validation conservatively rather + // than silently accepting a file that cannot be proved fully in range. + ICEBERG_ASSIGN_OR_RAISE(bool metrics_match, + strict_metrics_evaluator->Evaluate(*file)); + if (!metrics_match) { + return ValidationFailed( + "Cannot commit file {}: added file is not fully contained in the overwrite " + "filter", + file->file_path); + } + } + + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/overwrite_files.h b/src/iceberg/update/overwrite_files.h new file mode 100644 index 000000000..59f25914d --- /dev/null +++ b/src/iceberg/update/overwrite_files.h @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/overwrite_files.h + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/merging_snapshot_update.h" +#include "iceberg/util/data_file_set.h" + +namespace iceberg { + +/// \brief Overwrite data files in a table. +/// +/// OverwriteFiles expresses a logical overwrite transaction: either explicit file +/// replacement (`DeleteFile(old)` + `AddFile(new)`) or range-based replacement +/// (`OverwriteByRowFilter(expr)` + `AddFile(new...)`). It is a concrete subclass of +/// MergingSnapshotUpdate that adds only the overwrite-specific builder surface and the +/// overwrite-specific `Validate(...)` logic, reusing the inherited filter → write → +/// merge pipeline, summary building, commit retry, and cleanup unchanged. +class ICEBERG_EXPORT OverwriteFiles : public MergingSnapshotUpdate { + public: + /// \brief Create a new OverwriteFiles instance. + /// + /// Mirrors FastAppend::Make: validates inputs and returns a heap instance. + /// + /// \param table_name The name of the table + /// \param ctx The transaction context to use for this update + /// \return A Result containing the OverwriteFiles instance or an error + static Result> Make( + std::string table_name, std::shared_ptr ctx); + + ~OverwriteFiles() override; + + // --- Overwrite builder surface (chained, ErrorCollector-deferred) --- + + /// \brief Add a new data file to the overwrite. + /// + /// \param file The data file to add + /// \return Reference to this for method chaining + OverwriteFiles& AddFile(const std::shared_ptr& file); + + /// \brief Delete an existing data file as part of the overwrite. + /// + /// \param file The data file to delete + /// \return Reference to this for method chaining + OverwriteFiles& DeleteFile(const std::shared_ptr& file); + + /// \brief Bulk equivalent of repeated DeleteFile(...) plus explicit delete-file + /// removal. + /// + /// \param data_files_to_delete The data files to delete + /// \param delete_files_to_delete The delete files to remove + /// \return Reference to this for method chaining + OverwriteFiles& DeleteFiles(const DataFileSet& data_files_to_delete, + const DeleteFileSet& delete_files_to_delete); + + /// \brief Overwrite all rows matching the given expression. + /// + /// \param expr The row filter expression defining the overwrite range + /// \return Reference to this for method chaining + OverwriteFiles& OverwriteByRowFilter(std::shared_ptr expr); + + // --- Concurrency / correctness controls --- + + /// \brief Set the lower bound snapshot id for concurrency scans. + /// + /// \param snapshot_id The starting snapshot id + /// \return Reference to this for method chaining + OverwriteFiles& ValidateFromSnapshot(int64_t snapshot_id); + + /// \brief Set an explicit conflict-detection filter. + /// + /// \param expr The conflict-detection filter expression + /// \return Reference to this for method chaining + OverwriteFiles& ConflictDetectionFilter(std::shared_ptr expr); + + /// \brief Enable validation that no concurrent commit added conflicting data files. + /// + /// \return Reference to this for method chaining + OverwriteFiles& ValidateNoConflictingData(); + + /// \brief Enable validation that no concurrent commit added conflicting deletes. + /// + /// \return Reference to this for method chaining + OverwriteFiles& ValidateNoConflictingDeletes(); + + /// \brief Enable strict validation that every added file is fully within the + /// overwrite range. + /// + /// \return Reference to this for method chaining + OverwriteFiles& ValidateAddedFilesMatchOverwriteFilter(); + + /// \brief Set case sensitivity for binding, projection, and metrics evaluation. + /// + /// Forwards to the protected MergingSnapshotUpdate::CaseSensitive(bool); the public + /// name differs to avoid the public/protected name clash and keep a fluent return. + /// + /// \param case_sensitive Whether matching should be case-sensitive + /// \return Reference to this for method chaining + OverwriteFiles& WithCaseSensitivity(bool case_sensitive); + + // --- SnapshotUpdate / MergingSnapshotUpdate overrides --- + + std::string operation() override; + + Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) override; + + private: + explicit OverwriteFiles(std::string table_name, + std::shared_ptr ctx); + + /// \brief Select the conflict-detection filter from the configured state. + std::shared_ptr DataConflictDetectionFilter() const; + + /// \brief Verify every added data file is fully contained in the row filter. + Status ValidateAddedFilesMatchOverwriteFilterImpl(const TableMetadata& metadata); + + std::optional starting_snapshot_id_; + std::shared_ptr conflict_detection_filter_; + DataFileSet deleted_data_files_; + bool validate_no_conflicting_data_ = false; + bool validate_no_conflicting_deletes_ = false; + bool validate_added_files_match_overwrite_filter_ = false; +}; + +} // namespace iceberg From 628ebe3b83c8cde04d343909f518ad7a44dd0d33 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Mon, 15 Jun 2026 11:29:25 +0800 Subject: [PATCH 2/4] fix(update): validate DeleteFiles content types and fix Meson build --- src/iceberg/test/meson.build | 4 --- src/iceberg/test/overwrite_files_test.cc | 42 ++++++++++++++++++++++++ src/iceberg/update/overwrite_files.cc | 12 +++++++ 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index c20973eea..03d9e1f6c 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -120,10 +120,6 @@ iceberg_tests = { ), 'use_data': true, }, - 'overwrite_files_test': { - 'sources': files('overwrite_files_test.cc'), - 'use_data': true, - }, } if get_option('rest').enabled() diff --git a/src/iceberg/test/overwrite_files_test.cc b/src/iceberg/test/overwrite_files_test.cc index d003c6ee5..7c74cc4cd 100644 --- a/src/iceberg/test/overwrite_files_test.cc +++ b/src/iceberg/test/overwrite_files_test.cc @@ -526,6 +526,48 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesEquivalentToRepeatedDeleteFile) { EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "0"); } +// Content validation: because both sets hold std::shared_ptr, DeleteFiles +// guards content so a data file cannot be passed as a delete file (or vice versa). A +// delete file (position/equality) in the data-file set is rejected; the error surfaces +// at Commit(). +TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDeleteFileInDataSet) { + auto del_file = + MakeDeleteFile("/delete/del_a.parquet", 1L); // content = positionDeletes + DataFileSet data_files; + data_files.insert(del_file); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(data_files, DeleteFileSet{}); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("has delete-file content")); +} + +// A data file (content kData) in the delete-file set is rejected. +TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDataFileInDeleteSet) { + DeleteFileSet delete_files; + delete_files.insert(file_a_); // content = kData + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(DataFileSet{}, delete_files); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("has data-file content")); +} + +// An equality delete file is a valid delete file (content != kData) and is accepted in +// the delete-file set. +TEST_F(OverwriteFilesTest, BulkDeleteFilesAcceptsEqualityDeleteInDeleteSet) { + auto eq_delete = MakeEqualityDeleteFile("/delete/eq_a.parquet", 1L); + DeleteFileSet delete_files; + delete_files.insert(eq_delete); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(DataFileSet{}, delete_files); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); +} + // ===================================================================================== // 9.6 Concurrency-validation tests (Req 8.2, 8.3, 9.2-9.5; Properties 6, 7) // ===================================================================================== diff --git a/src/iceberg/update/overwrite_files.cc b/src/iceberg/update/overwrite_files.cc index 165d43005..8283c4529 100644 --- a/src/iceberg/update/overwrite_files.cc +++ b/src/iceberg/update/overwrite_files.cc @@ -76,14 +76,26 @@ OverwriteFiles& OverwriteFiles::DeleteFiles(const DataFileSet& data_files_to_del const DeleteFileSet& delete_files_to_delete) { // Bulk equivalent of repeated DeleteFile(...) plus explicit delete-file removal. Empty // sets are no-ops; the set types handle deduplication of repeated entries. + // + // Because both sets hold std::shared_ptr, content is validated here so a data + // file cannot be registered as a delete file (or vice versa): a data file must have + // content kData, and a delete file must be a position/equality delete (content != + // kData). This restores the type safety the C++ shared DataFile representation cannot + // enforce at compile time. for (const auto& file : data_files_to_delete) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->content == DataFile::Content::kData, + "Invalid data file to delete: {} has delete-file content", + file->file_path); // Dual-track: record for delete-conflict validation AND register for removal. deleted_data_files_.insert(file); ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); } for (const auto& file : delete_files_to_delete) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid delete file: null"); + ICEBERG_BUILDER_CHECK(file->content != DataFile::Content::kData, + "Invalid delete file to delete: {} has data-file content", + file->file_path); ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDeleteFile(file)); } return *this; From dbf276548b374b6109077d256c916da0c9590a2c Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 20 Jun 2026 08:08:59 +0800 Subject: [PATCH 3/4] fix(update): address overwrite files lint and comment style --- src/iceberg/test/overwrite_files_test.cc | 224 +++++------------------ src/iceberg/update/overwrite_files.cc | 114 +----------- src/iceberg/update/overwrite_files.h | 21 +-- 3 files changed, 55 insertions(+), 304 deletions(-) diff --git a/src/iceberg/test/overwrite_files_test.cc b/src/iceberg/test/overwrite_files_test.cc index 7c74cc4cd..9b4c6c0c4 100644 --- a/src/iceberg/test/overwrite_files_test.cc +++ b/src/iceberg/test/overwrite_files_test.cc @@ -50,21 +50,11 @@ namespace iceberg { -// ===================================================================================== // Test harness for OverwriteFiles. // -// Modeled on src/iceberg/test/merging_snapshot_update_test.cc (same fixture style, -// same in-memory mock FileIO / catalog setup, same DataFile / commit helpers). Unlike -// that file, OverwriteFiles is the production class with a private constructor, so the -// tests drive it exclusively through its public builder surface (AddFile / DeleteFile / -// OverwriteByRowFilter / ... / operation() / Validate() / Commit()) and observe its -// behavior through the public API: operation() classification, the committed snapshot -// summary, and the public Validate(...) entry point that the commit kernel invokes. -// // The base table (TableMetadataV2ValidMinimal.json) has schema {x: long (id 1), // y: long (id 2), z: long (id 3)} and a single partition spec (spec id 0) that // partitions by identity(x). -// ===================================================================================== class OverwriteFilesTest : public UpdateTestBase { protected: static void SetUpTestSuite() { avro::RegisterAll(); } @@ -239,18 +229,11 @@ class OverwriteFilesTest : public UpdateTestBase { std::shared_ptr file_b_; }; -// ===================================================================================== -// 9.2 Entry-point and builder-method tests -// ===================================================================================== - -// Req 1.1: Table::NewOverwrite() returns a valid builder for a standalone operation. TEST_F(OverwriteFilesTest, TableNewOverwriteReturnsBuilder) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); ASSERT_NE(op, nullptr); } -// Req 1.2, 1.3: Transaction::NewOverwrite() returns a valid builder registered with the -// transaction (commit deferred until the transaction commits). TEST_F(OverwriteFilesTest, TransactionNewOverwriteReturnsBuilder) { ICEBERG_UNWRAP_OR_FAIL(auto txn, Transaction::Make(table_, TransactionKind::kUpdate)); ICEBERG_UNWRAP_OR_FAIL(auto op, txn->NewOverwrite()); @@ -258,8 +241,6 @@ TEST_F(OverwriteFilesTest, TransactionNewOverwriteReturnsBuilder) { (*op).OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))).AddFile(file_a_); - // Within a transaction, the builder is staged by its own Commit() before the - // transaction is committed. EXPECT_THAT(op->Commit(), IsOk()); ICEBERG_UNWRAP_OR_FAIL(auto committed, txn->Commit()); EXPECT_THAT(table_->Refresh(), IsOk()); @@ -268,7 +249,6 @@ TEST_F(OverwriteFilesTest, TransactionNewOverwriteReturnsBuilder) { DataOperation::kOverwrite); } -// Builder methods all return *this and chain (Req 2.1-2.3, 3.1, 4.1-4.2, 6.1, 6.3, 6.4). TEST_F(OverwriteFilesTest, BuilderMethodsReturnSelfAndChain) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); auto* self = op.get(); @@ -284,7 +264,6 @@ TEST_F(OverwriteFilesTest, BuilderMethodsReturnSelfAndChain) { EXPECT_EQ(&op->ValidateAddedFilesMatchOverwriteFilter(), self); EXPECT_EQ(&op->WithCaseSensitivity(false), self); - // A single fluent chain compiles and returns the same instance. OverwriteFiles& chained = (*op) .AddFile(MakeDataFile("/data/chain.parquet", 1L)) @@ -293,10 +272,6 @@ TEST_F(OverwriteFilesTest, BuilderMethodsReturnSelfAndChain) { EXPECT_EQ(&chained, self); } -// ===================================================================================== -// 9.3 operation() truth-table tests (Req 5.1-5.4) -// ===================================================================================== - TEST_F(OverwriteFilesTest, OperationAddOnlyIsAppend) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->AddFile(file_a_); @@ -328,12 +303,6 @@ TEST_F(OverwriteFilesTest, OperationNeitherIsOverwrite) { EXPECT_EQ(op->operation(), DataOperation::kOverwrite); } -// ===================================================================================== -// 9.4 Commit-path and snapshot-control tests (Req 11.2-11.5) -// ===================================================================================== - -// DeleteFile + AddFile commits as overwrite and the recorded operation matches -// operation() (Req 11.4). TEST_F(OverwriteFilesTest, CommitDeleteAndAddIsOverwrite) { CommitFileA(); @@ -351,7 +320,6 @@ TEST_F(OverwriteFilesTest, CommitDeleteAndAddIsOverwrite) { EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); } -// OverwriteByRowFilter + AddFile commits as overwrite. TEST_F(OverwriteFilesTest, CommitRowFilterAndAddIsOverwrite) { CommitFileA(); @@ -366,7 +334,6 @@ TEST_F(OverwriteFilesTest, CommitRowFilterAndAddIsOverwrite) { DataOperation::kOverwrite); } -// An empty overwrite (no adds, no deletes) commits and records the overwrite operation. TEST_F(OverwriteFilesTest, CommitEmptyOverwrite) { CommitFileA(); @@ -380,7 +347,6 @@ TEST_F(OverwriteFilesTest, CommitEmptyOverwrite) { DataOperation::kOverwrite); } -// Duplicate AddFile / DeleteFile are deduplicated by the underlying set types (Req 3.7). TEST_F(OverwriteFilesTest, CommitDeduplicatesDuplicateAddAndDelete) { CommitFileA(); @@ -398,7 +364,6 @@ TEST_F(OverwriteFilesTest, CommitDeduplicatesDuplicateAddAndDelete) { EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); } -// StageOnly commits the snapshot without advancing the target branch (Req 11.2). TEST_F(OverwriteFilesTest, CommitStageOnlyDoesNotAdvanceCurrentSnapshot) { const int64_t base_snapshot_id = CommitFileA(); const size_t base_snapshot_count = table_->metadata()->snapshots.size(); @@ -416,7 +381,6 @@ TEST_F(OverwriteFilesTest, CommitStageOnlyDoesNotAdvanceCurrentSnapshot) { EXPECT_GT(table_->metadata()->snapshots.size(), base_snapshot_count); } -// SetTargetBranch commits to a named branch (Req 11.2). TEST_F(OverwriteFilesTest, CommitToTargetBranch) { CommitFileA(); @@ -429,8 +393,6 @@ TEST_F(OverwriteFilesTest, CommitToTargetBranch) { EXPECT_TRUE(table_->metadata()->refs.contains("audit")); } -// A custom Set(property, value) is carried into the committed snapshot summary -// (Req 11.2). TEST_F(OverwriteFilesTest, CommitCustomSummaryProperty) { CommitFileA(); @@ -444,17 +406,12 @@ TEST_F(OverwriteFilesTest, CommitCustomSummaryProperty) { EXPECT_EQ(snapshot->summary.at("custom-prop"), "custom-value"); } -// ===================================================================================== -// 9.5 Bulk DeleteFiles tests (Req 3.3-3.7) -// ===================================================================================== - // A DataFileSet + DeleteFileSet forwards data files to DeleteDataFile and delete files // to DeleteDeleteFile; the committed snapshot reflects the data-file removal. (The // delete file is forwarded to DeleteDeleteFile; with no matching committed delete file // present its removal is a harmless no-op, mirroring the inherited missing-delete // behavior.) TEST_F(OverwriteFilesTest, BulkDeleteFilesRemovesDataAndDeleteFiles) { - // Seed the table with a data file. { ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); seed->AddFile(file_a_); @@ -464,7 +421,6 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesRemovesDataAndDeleteFiles) { auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L); - // Build the bulk delete: remove file_a (data) plus del_file (delete file). DataFileSet data_files; data_files.insert(file_a_); DeleteFileSet delete_files; @@ -473,7 +429,6 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesRemovesDataAndDeleteFiles) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->DeleteFiles(data_files, delete_files); op->AddFile(file_b_); - // Both a data file deletion and an add => overwrite. EXPECT_EQ(op->operation(), DataOperation::kOverwrite); EXPECT_THAT(op->Commit(), IsOk()); @@ -483,15 +438,12 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesRemovesDataAndDeleteFiles) { EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); } -// Empty sets are a no-op: with no adds or deletes the builder is a bare overwrite. TEST_F(OverwriteFilesTest, BulkDeleteFilesEmptySetsAreNoOp) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->DeleteFiles(DataFileSet{}, DeleteFileSet{}); EXPECT_EQ(op->operation(), DataOperation::kOverwrite); // neither adds nor deletes } -// A DataFileSet portion of DeleteFiles records data files such that DeletesDataFiles -// becomes true (observed via operation() == delete), equivalent to repeated DeleteFile. TEST_F(OverwriteFilesTest, BulkDeleteFilesDataPortionMarksDelete) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); DataFileSet data_files; @@ -501,10 +453,7 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesDataPortionMarksDelete) { EXPECT_EQ(op->operation(), DataOperation::kDelete); } -// DeleteFiles is equivalent to repeated DeleteFile for the data-file portion: both -// classify as delete and (after committing against a seeded table) remove the files. TEST_F(OverwriteFilesTest, BulkDeleteFilesEquivalentToRepeatedDeleteFile) { - // Seed file_a and file_b. { ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); seed->AddFile(file_a_); @@ -526,10 +475,7 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesEquivalentToRepeatedDeleteFile) { EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "0"); } -// Content validation: because both sets hold std::shared_ptr, DeleteFiles -// guards content so a data file cannot be passed as a delete file (or vice versa). A -// delete file (position/equality) in the data-file set is rejected; the error surfaces -// at Commit(). +// DeleteFiles validates content because both input sets store DataFile pointers. TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDeleteFileInDataSet) { auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L); // content = positionDeletes @@ -543,7 +489,6 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDeleteFileInDataSet) { EXPECT_THAT(result, HasErrorMessage("has delete-file content")); } -// A data file (content kData) in the delete-file set is rejected. TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDataFileInDeleteSet) { DeleteFileSet delete_files; delete_files.insert(file_a_); // content = kData @@ -555,8 +500,6 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDataFileInDeleteSet) { EXPECT_THAT(result, HasErrorMessage("has data-file content")); } -// An equality delete file is a valid delete file (content != kData) and is accepted in -// the delete-file set. TEST_F(OverwriteFilesTest, BulkDeleteFilesAcceptsEqualityDeleteInDeleteSet) { auto eq_delete = MakeEqualityDeleteFile("/delete/eq_a.parquet", 1L); DeleteFileSet delete_files; @@ -568,18 +511,12 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesAcceptsEqualityDeleteInDeleteSet) { EXPECT_THAT(op->Commit(), IsOk()); } -// ===================================================================================== -// 9.6 Concurrency-validation tests (Req 8.2, 8.3, 9.2-9.5; Properties 6, 7) -// ===================================================================================== - // ValidateNoConflictingData: a competing FastAppend that added a data file matching the -// resolved conflict-detection filter makes the overwrite commit fail (Req 8.3). +// resolved conflict-detection filter makes the overwrite commit fail. TEST_F(OverwriteFilesTest, ValidateNoConflictingDataDetectsConflictingAdd) { const int64_t first_id = CommitFileA(); - // Competing append of file_b (partition x=2) between read and commit. CommitFastAppend(file_b_); - // The overwrite targets the x=2 range, so the concurrent add of file_b conflicts. ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(2L))); op->AddFile(MakeDataFile("/data/replacement_x2.parquet", 2L)); @@ -588,14 +525,10 @@ TEST_F(OverwriteFilesTest, ValidateNoConflictingDataDetectsConflictingAdd) { EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); } -// A non-conflicting concurrent change still commits, and the recorded operation matches -// operation() (Req 11.4, 11.5; Property 6). TEST_F(OverwriteFilesTest, ValidateNoConflictingDataAllowsNonConflictingChange) { const int64_t first_id = CommitFileA(); - // Competing append of file_b in partition x=2. CommitFastAppend(file_b_); - // The overwrite targets the x=1 range; the concurrent x=2 add does not conflict. ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); op->AddFile(MakeDataFile("/data/replacement_x1.parquet", 1L)); @@ -611,11 +544,10 @@ TEST_F(OverwriteFilesTest, ValidateNoConflictingDataAllowsNonConflictingChange) } // ValidateNoConflictingDeletes: a competing snapshot that deleted a data file in the -// overwrite range makes the commit fail (Req 9.2-9.4). +// overwrite range makes the commit fail. TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesDetectsConflictingDelete) { const int64_t first_id = CommitFileA(); - // Competing overwrite removes file_a (partition x=1) between read and commit. { ICEBERG_UNWRAP_OR_FAIL(auto competing, NewOverwrite()); competing->DeleteFile(file_a_); @@ -645,40 +577,20 @@ TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesAllowsNonConflictingChang EXPECT_THAT(op->Commit(), IsOk()); } -// ===================================================================================== -// Fix #1: the explicit replaced-files delete branch (Path B) honors the -// ConflictDetectionFilter. -// -// Path B fires when explicit data files were registered for replacement -// (deleted_data_files_ non-empty) under ValidateNoConflictingDeletes(). It now calls the -// STATIC data_filter overload of ValidateNoNewDeletesForDataFiles, passing the raw -// conflict_detection_filter_ (which may be nullptr = "no filter, consider all delete -// files"). These tests inject a concurrent equality-delete file that -// covers the replaced data file (file_a, partition x=1) and assert that: -// * with NO conflict filter, the concurrent delete is a conflict => FAIL; -// * with a conflict filter that does NOT cover x=1 (here x=2), the delete is filtered -// out of the conflict scope => SUCCEED. -// -// To exercise Path B in isolation, the builder uses explicit DeleteFile (no -// OverwriteByRowFilter), so RowFilter() stays AlwaysFalse and Path A is skipped. -// ===================================================================================== - -// No conflict filter => the concurrent delete on the replaced file is detected (Path B -// passes nullptr through, so all delete files are considered). +// Explicit replaced-file validation applies ConflictDetectionFilter to concurrent delete +// files that cover replaced data files. TEST_F(OverwriteFilesTest, PathBExplicitDeletesDetectsConcurrentDeleteWithoutFilter) { CommitFileA(); ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); - // Concurrent commit adds an equality delete covering file_a (partition x=1). auto concurrent = InjectConcurrentEqualityDelete( first_snapshot, "/delete/concurrent_x1.parquet", /*partition_x=*/1L); ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); - op->DeleteFile(file_a_); // explicit replacement => deleted_data_files_ non-empty + op->DeleteFile(file_a_); op->AddFile(MakeDataFile("/data/rewrite_x1.parquet", 1L)); op->ValidateFromSnapshot(first_snapshot->snapshot_id); op->ValidateNoConflictingDeletes(); - // No ConflictDetectionFilter => Path B considers all concurrent deletes => conflict. EXPECT_THAT(op->Validate(*concurrent.metadata, concurrent.snapshot), IsError(ErrorKind::kValidationFailed)); } @@ -689,7 +601,6 @@ TEST_F(OverwriteFilesTest, PathBExplicitDeletesConflictFilterNarrowsScope) { CommitFileA(); ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); - // Same concurrent equality delete covering file_a (partition x=1). auto concurrent = InjectConcurrentEqualityDelete( first_snapshot, "/delete/concurrent_x1.parquet", /*partition_x=*/1L); @@ -697,21 +608,15 @@ TEST_F(OverwriteFilesTest, PathBExplicitDeletesConflictFilterNarrowsScope) { op->DeleteFile(file_a_); op->AddFile(MakeDataFile("/data/rewrite_x1.parquet", 1L)); op->ValidateFromSnapshot(first_snapshot->snapshot_id); - // Conflict filter targets x=2, which does NOT cover the x=1 delete => filtered out. op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(2L))); op->ValidateNoConflictingDeletes(); EXPECT_THAT(op->Validate(*concurrent.metadata, concurrent.snapshot), IsOk()); } -// ===================================================================================== -// 9.7 Strict added-file range validation tests (Req 10.1-10.6; Properties 9, 10) -// // These exercise OverwriteFiles::Validate(...) directly (the same entry point the commit // kernel invokes), which is sufficient and deterministic: the strict-range branch does // not depend on concurrent snapshots. -// ===================================================================================== -// Strict partition projection proves containment directly (Req 10.3). TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictProjection) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); @@ -721,28 +626,27 @@ TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictProjection) { } // Strict partition projection is insufficient (filter on a non-partition column) but the -// StrictMetricsEvaluator proves containment from the file's bounds (Req 10.3). +// StrictMetricsEvaluator proves containment from the file's bounds. TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictMetrics) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); - // y bounds [5, 5] => every row has y == 5, fully contained in the filter. + // y bounds [5, 5] prove every row has y == 5. op->AddFile(MakeDataFileWithYBounds("/data/y_eq_5.parquet", 1L, 5L, 5L)); op->ValidateAddedFilesMatchOverwriteFilter(); EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); } -// Neither the strict projection nor the metrics can prove containment => fail (Req 10.5). +// Neither the strict projection nor the metrics can prove containment. TEST_F(OverwriteFilesTest, StrictRangeRejectedWhenNotProvable) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); - // y bounds [1, 10] => not all rows are y == 5. + // y bounds [1, 10] do not prove every row has y == 5. op->AddFile(MakeDataFileWithYBounds("/data/y_range.parquet", 1L, 1L, 10L)); op->ValidateAddedFilesMatchOverwriteFilter(); EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsError(ErrorKind::kValidationFailed)); } -// A file whose partition falls outside the inclusive projection is rejected (Req 10.4). TEST_F(OverwriteFilesTest, StrictRangeRejectsFileOutsidePartitionRange) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); @@ -752,8 +656,6 @@ TEST_F(OverwriteFilesTest, StrictRangeRejectsFileOutsidePartitionRange) { IsError(ErrorKind::kValidationFailed)); } -// ValidateAddedFilesMatchOverwriteFilter without a row filter fails (Req 10.1, 10.2; -// Property 10). TEST_F(OverwriteFilesTest, StrictRangeRequiresRowFilter) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->AddFile(MakeDataFile("/data/no_filter.parquet", 1L)); @@ -762,11 +664,9 @@ TEST_F(OverwriteFilesTest, StrictRangeRequiresRowFilter) { IsError(ErrorKind::kValidationFailed)); } -// Fix #2: added files belonging to MORE THAN ONE partition spec are rejected, since the -// validation resolves a single spec via DataSpec() (which requires exactly one spec among -// added files). DataSpec() fails fast with a multi-spec error. +// ValidateAddedFilesMatchOverwriteFilter resolves one data spec for all added files. TEST_F(OverwriteFilesTest, StrictRangeRejectsMultiplePartitionSpecs) { - // Add a second partition spec (id 1) to the table metadata BEFORE creating the builder, + // Add a second partition spec before creating the builder, // so the producer's base metadata can resolve both specs when files are staged. ICEBERG_UNWRAP_OR_FAIL( auto spec1, PartitionSpec::Make(*schema_, /*spec_id=*/1, @@ -775,7 +675,6 @@ TEST_F(OverwriteFilesTest, StrictRangeRejectsMultiplePartitionSpecs) { /*allow_missing_fields=*/false)); table_->metadata()->partition_specs.push_back( std::shared_ptr(std::move(spec1))); - // Confirm both specs resolve. ASSERT_THAT(table_->metadata()->PartitionSpecById(0), IsOk()); ASSERT_THAT(table_->metadata()->PartitionSpecById(1), IsOk()); @@ -788,43 +687,30 @@ TEST_F(OverwriteFilesTest, StrictRangeRejectsMultiplePartitionSpecs) { op->AddFile(file_spec0); op->AddFile(file_spec1); op->ValidateAddedFilesMatchOverwriteFilter(); - // DataSpec() rejects the two distinct specs with an InvalidArgument error. EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsError(ErrorKind::kInvalidArgument)); } -// Fix #3: enabling the strict added-file range validation with a row filter set but NO -// added data files (e.g. a pure overwrite-by-filter with no AddFile) fails, because -// DataSpec() rejects an empty added-files set. TEST_F(OverwriteFilesTest, StrictRangeRejectsEmptyAddedFiles) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); - // Row filter is set (precondition satisfied) but no AddFile was called. op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); op->ValidateAddedFilesMatchOverwriteFilter(); - // DataSpec() raises InvalidArgument because no data file was added. EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsError(ErrorKind::kInvalidArgument)); } -// ===================================================================================== -// 9.8 Case-sensitivity and null-rejection tests (Req 6.4, 12.1-12.4; Property 4) -// ===================================================================================== - // Case-insensitive binding accepts a differently-cased column name where the // case-sensitive (default) binding rejects it. Observed through the strict-range // validation, which binds the row filter using the configured case sensitivity. TEST_F(OverwriteFilesTest, CaseSensitivityAffectsFilterBinding) { - // Case-sensitive (default): the filter references "X" which does not match column "x". { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("X", Literal::Long(1L))); op->AddFile(MakeDataFile("/data/cs.parquet", 1L)); op->ValidateAddedFilesMatchOverwriteFilter(); - // Binding "X" against schema {x, y, z} fails case-sensitively. auto result = op->Validate(*table_->metadata(), /*snapshot=*/nullptr); EXPECT_FALSE(result.has_value()); } - // Case-insensitive: "X" binds to column "x" and the in-range file validates. { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->WithCaseSensitivity(false); @@ -836,9 +722,7 @@ TEST_F(OverwriteFilesTest, CaseSensitivityAffectsFilterBinding) { } // Null arguments to the pointer-taking builder methods surface at Commit() without -// crashing (Req 12.1, 12.2; Property 4). The ErrorCollector aggregates deferred builder -// errors and surfaces them as a kValidationFailed error at Commit() that preserves the -// underlying invalid-argument message. +// crashing. TEST_F(OverwriteFilesTest, NullAddFileRejectedAtCommit) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->AddFile(nullptr); @@ -871,21 +755,17 @@ TEST_F(OverwriteFilesTest, NullConflictDetectionFilterRejectedAtCommit) { EXPECT_THAT(result, HasErrorMessage("Invalid conflict detection filter: null")); } -// The builder chain continues without crashing after a null argument is recorded. TEST_F(OverwriteFilesTest, NullArgumentDoesNotCrashBuilderChain) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); (*op).AddFile(nullptr).AddFile(file_a_).OverwriteByRowFilter( Expressions::Equal("x", Literal::Long(1L))); - // The recorded error still surfaces at commit. auto result = op->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); } // A null element cannot enter a DataFileSet / DeleteFileSet (insert() rejects nullptr), -// so DeleteFiles({null...}, {null...}) is a no-op rather than an error. The deferred -// null-element rejection inside DeleteFiles is therefore a defensive guard that is -// unreachable through the public set API; this test documents that observable behavior. +// so DeleteFiles({null...}, {null...}) is a no-op rather than an error. TEST_F(OverwriteFilesTest, DeleteFilesNullElementsCannotEnterSets) { DataFileSet data_files; data_files.insert(std::shared_ptr{nullptr}); @@ -896,14 +776,13 @@ TEST_F(OverwriteFilesTest, DeleteFilesNullElementsCannotEnterSets) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->DeleteFiles(data_files, delete_files); - // No data files were deleted => still a bare overwrite, and commit succeeds. EXPECT_EQ(op->operation(), DataOperation::kOverwrite); EXPECT_THAT(op->Commit(), IsOk()); } // ValidateFromSnapshot accepts a non-negative id (0 is in the generated id range // [0, INT64_MAX]) and rejects a negative id (including kInvalidSnapshotId == -1) as a -// deferred error surfaced at Commit(). Req 6.1. +// deferred error surfaced at Commit(). TEST_F(OverwriteFilesTest, ValidateFromSnapshotRejectsNegativeSnapshotId) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->AddFile(file_a_).ValidateFromSnapshot(-1); @@ -913,25 +792,16 @@ TEST_F(OverwriteFilesTest, ValidateFromSnapshotRejectsNegativeSnapshotId) { } TEST_F(OverwriteFilesTest, ValidateFromSnapshotAcceptsZeroSnapshotId) { - // 0 is a legal generated snapshot id (the generator masks with int64_t::max()), so it - // must not be rejected at the builder stage. With no concurrency validation enabled, - // the starting id is merely recorded and the commit succeeds. ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->AddFile(file_a_).ValidateFromSnapshot(0); EXPECT_THAT(op->Commit(), IsOk()); } -// ===================================================================================== -// Property-style tests (parameterized via loops; no PBT library is available). -// ===================================================================================== - -// Property 2 (builder forwarding) + Property 3 (delete dual-tracking), Task 2.4. -// // AddedDataFiles() / deleted_data_files_ are not publicly observable, so the properties // are checked indirectly through operation(): for any non-null file, AddFile-only yields // `append` (the file was added and the row filter is untouched), while DeleteFile-only // and DeleteFiles-only (data portion) yield `delete` (the file was registered for -// deletion and DeletesDataFiles() became true). Validates Req 2.1, 2.2, 3.1-3.5. +// deletion and DeletesDataFiles() became true). TEST_F(OverwriteFilesTest, PropertyBuilderForwardingAndDualTracking) { const std::vector> files = { MakeDataFile("/data/p0.parquet", 1L), @@ -942,19 +812,16 @@ TEST_F(OverwriteFilesTest, PropertyBuilderForwardingAndDualTracking) { for (const auto& file : files) { { - // AddFile preserves "no deletes" => append, proving the row filter is unchanged. ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->AddFile(file); EXPECT_EQ(op->operation(), DataOperation::kAppend) << file->file_path; } { - // DeleteFile dual-tracks => DeletesDataFiles() true, no adds => delete. ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->DeleteFile(file); EXPECT_EQ(op->operation(), DataOperation::kDelete) << file->file_path; } { - // Bulk DeleteFiles data portion behaves like repeated DeleteFile. ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); DataFileSet data_files; data_files.insert(file); @@ -964,9 +831,6 @@ TEST_F(OverwriteFilesTest, PropertyBuilderForwardingAndDualTracking) { } } -// Property 4 (null rejection), Task 2.5: every pointer-taking builder mutator records a -// deferred error that surfaces as an InvalidArgument-class error at Commit() without -// crashing. Validates Req 12.1, 12.2, 12.3, 12.4. TEST_F(OverwriteFilesTest, PropertyNullArgumentRejection) { using Mutator = std::function; const std::vector mutators = { @@ -979,13 +843,10 @@ TEST_F(OverwriteFilesTest, PropertyNullArgumentRejection) { for (const auto& mutate : mutators) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); mutate(*op); - // Deferred builder errors are aggregated and surfaced as kValidationFailed at commit. EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); } } -// Property 1 (operation() reflects content), Task 3.2: exhaustive truth table over -// {adds, deletes (explicit or via row filter)}. Validates Req 5.1-5.4. TEST_F(OverwriteFilesTest, PropertyOperationTruthTable) { struct Case { bool add; @@ -994,13 +855,34 @@ TEST_F(OverwriteFilesTest, PropertyOperationTruthTable) { std::string expected; }; const std::vector cases = { - {/*add=*/true, /*delete_file=*/false, /*row_filter=*/false, DataOperation::kAppend}, - {false, true, false, DataOperation::kDelete}, - {false, false, true, DataOperation::kDelete}, // row filter counts as a delete - {true, true, false, DataOperation::kOverwrite}, - {true, false, true, DataOperation::kOverwrite}, - {false, true, true, DataOperation::kDelete}, // deletes only (no adds) => delete - {false, false, false, DataOperation::kOverwrite}, // neither + {.add = true, + .delete_file = false, + .row_filter = false, + .expected = DataOperation::kAppend}, + {.add = false, + .delete_file = true, + .row_filter = false, + .expected = DataOperation::kDelete}, + {.add = false, + .delete_file = false, + .row_filter = true, + .expected = DataOperation::kDelete}, // row filter counts as a delete + {.add = true, + .delete_file = true, + .row_filter = false, + .expected = DataOperation::kOverwrite}, + {.add = true, + .delete_file = false, + .row_filter = true, + .expected = DataOperation::kOverwrite}, + {.add = false, + .delete_file = true, + .row_filter = true, + .expected = DataOperation::kDelete}, // deletes only + {.add = false, + .delete_file = false, + .row_filter = false, + .expected = DataOperation::kOverwrite}, // neither }; int index = 0; @@ -1021,18 +903,11 @@ TEST_F(OverwriteFilesTest, PropertyOperationTruthTable) { } } -// Property 5 (conflict filter resolution), Task 4.4. -// // DataConflictDetectionFilter() is private, so its three resolution outcomes are observed // indirectly through ValidateNoConflictingData against a competing concurrent add of // file_b (partition x=2): -// * explicit filter set -> the explicit filter is used (overrides the row filter); -// * row filter only -> the row filter is used; -// * file-replacement present -> AlwaysTrue (any concurrent add conflicts). -// Validates Req 7.1, 7.2, 7.3. +// explicit filter, row filter only, and explicit file replacement. TEST_F(OverwriteFilesTest, PropertyConflictFilterResolution) { - // Resolution case 2 (row filter only): row filter x=1 does NOT match the concurrent - // x=2 add -> no conflict. { const int64_t first_id = CommitFileA(); CommitFastAppend(file_b_); @@ -1044,8 +919,6 @@ TEST_F(OverwriteFilesTest, PropertyConflictFilterResolution) { EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), IsOk()); } - // Resolution case 2 (row filter only): row filter x=2 DOES match -> conflict, proving - // the row filter (not AlwaysFalse / AlwaysTrue blindly) is what was used. { SetUp(); // fresh table const int64_t first_id = CommitFileA(); @@ -1058,8 +931,6 @@ TEST_F(OverwriteFilesTest, PropertyConflictFilterResolution) { EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), IsError(ErrorKind::kValidationFailed)); } - // Resolution case 1 (explicit filter overrides row filter): row filter x=1 would NOT - // conflict, but the explicit conflict filter x=2 does -> conflict. { SetUp(); const int64_t first_id = CommitFileA(); @@ -1073,16 +944,13 @@ TEST_F(OverwriteFilesTest, PropertyConflictFilterResolution) { EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), IsError(ErrorKind::kValidationFailed)); } - // Resolution case 3 (file replacement -> AlwaysTrue): with an explicit DeleteFile and - // no explicit conflict filter, ANY concurrent add conflicts (here file_b in x=2, even - // though the row filter is x=1). { SetUp(); const int64_t first_id = CommitFileA(); CommitFastAppend(file_b_); ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); - op->DeleteFile(file_a_); // makes deleted_data_files_ non-empty => AlwaysTrue + op->DeleteFile(file_a_); op->AddFile(MakeDataFile("/data/r3.parquet", 1L)); op->ValidateFromSnapshot(first_id); op->ValidateNoConflictingData(); diff --git a/src/iceberg/update/overwrite_files.cc b/src/iceberg/update/overwrite_files.cc index 8283c4529..4e5dbfeef 100644 --- a/src/iceberg/update/overwrite_files.cc +++ b/src/iceberg/update/overwrite_files.cc @@ -56,17 +56,12 @@ OverwriteFiles::~OverwriteFiles() = default; OverwriteFiles& OverwriteFiles::AddFile(const std::shared_ptr& file) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); - // Forward to the inherited staging path. Any error (e.g. missing partition spec - // ID) is captured by the ErrorCollector and surfaced at Commit() rather than - // dropped. RowFilter() and deleted_data_files_ are intentionally left unchanged. ICEBERG_BUILDER_RETURN_IF_ERROR(AddDataFile(file)); return *this; } OverwriteFiles& OverwriteFiles::DeleteFile(const std::shared_ptr& file) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); - // Dual-track: record the file for delete-conflict validation AND register it for - // removal via the inherited pipeline. deleted_data_files_.insert(file); ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); return *this; @@ -74,20 +69,13 @@ OverwriteFiles& OverwriteFiles::DeleteFile(const std::shared_ptr& file OverwriteFiles& OverwriteFiles::DeleteFiles(const DataFileSet& data_files_to_delete, const DeleteFileSet& delete_files_to_delete) { - // Bulk equivalent of repeated DeleteFile(...) plus explicit delete-file removal. Empty - // sets are no-ops; the set types handle deduplication of repeated entries. - // - // Because both sets hold std::shared_ptr, content is validated here so a data - // file cannot be registered as a delete file (or vice versa): a data file must have - // content kData, and a delete file must be a position/equality delete (content != - // kData). This restores the type safety the C++ shared DataFile representation cannot - // enforce at compile time. + // Both sets use DataFile pointers, so validate content before forwarding to the + // data-file and delete-file removal paths. for (const auto& file : data_files_to_delete) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); ICEBERG_BUILDER_CHECK(file->content == DataFile::Content::kData, "Invalid data file to delete: {} has delete-file content", file->file_path); - // Dual-track: record for delete-conflict validation AND register for removal. deleted_data_files_.insert(file); ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); } @@ -103,20 +91,11 @@ OverwriteFiles& OverwriteFiles::DeleteFiles(const DataFileSet& data_files_to_del OverwriteFiles& OverwriteFiles::OverwriteByRowFilter(std::shared_ptr expr) { ICEBERG_BUILDER_CHECK(expr != nullptr, "Invalid row filter expression: null"); - // Forward to the inherited filter path: this sets RowFilter() and forwards the - // expression to both the data and delete filter managers. Any error is captured by the - // ErrorCollector and surfaced at Commit(). ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteByRowFilter(std::move(expr))); return *this; } OverwriteFiles& OverwriteFiles::ValidateFromSnapshot(int64_t snapshot_id) { - // Snapshot ids are non-negative: SnapshotUtil::GenerateSnapshotId() masks with - // int64_t::max() and so yields a value in [0, INT64_MAX], while kInvalidSnapshotId is - // -1. A negative id therefore can never identify a real snapshot and is rejected via - // the deferred ErrorCollector (surfaced at Commit()); 0 is accepted because it is in - // the generator's range. A negative id is always a misuse, so we keep an early, - // clearer guard against the obviously-invalid sentinel/negative values. ICEBERG_BUILDER_CHECK(snapshot_id >= 0, "Invalid snapshot id: {}", snapshot_id); starting_snapshot_id_ = snapshot_id; return *this; @@ -130,41 +109,27 @@ OverwriteFiles& OverwriteFiles::ConflictDetectionFilter( } OverwriteFiles& OverwriteFiles::WithCaseSensitivity(bool case_sensitive) { - // Forward to the protected MergingSnapshotUpdate::CaseSensitive(bool) setter (which - // returns void); the public name differs to avoid the public/protected name clash and - // keep a fluent return type. CaseSensitive(case_sensitive); return *this; } OverwriteFiles& OverwriteFiles::ValidateNoConflictingData() { - // Enable concurrent-data validation and require every explicitly-deleted old file to - // be hit during manifest filtering. validate_no_conflicting_data_ = true; FailMissingDeletePaths(); return *this; } OverwriteFiles& OverwriteFiles::ValidateNoConflictingDeletes() { - // Enable concurrent-delete validation and require every explicitly-deleted old file to - // be hit during manifest filtering. validate_no_conflicting_deletes_ = true; FailMissingDeletePaths(); return *this; } OverwriteFiles& OverwriteFiles::ValidateAddedFilesMatchOverwriteFilter() { - // Enable strict validation that every added data file is fully contained in the - // overwrite range. The precondition (a row filter must be set) is enforced in - // Validate(...). validate_added_files_match_overwrite_filter_ = true; return *this; } -// Classify the snapshot operation from the current builder content. DeletesDataFiles() -// is true for both an explicit DeleteFile(...) and a pure OverwriteByRowFilter(...) (the -// latter via ManifestFilterManager::ContainsDeletes()), so OverwriteByRowFilter + AddFile -// correctly classifies as overwrite without any RowFilter()-based fallback. std::string OverwriteFiles::operation() { if (DeletesDataFiles() && !AddsDataFiles()) { return DataOperation::kDelete; @@ -175,16 +140,8 @@ std::string OverwriteFiles::operation() { return DataOperation::kOverwrite; } -// Select the conflict-detection filter. Precedence: (1) the explicitly-set -// conflict_detection_filter_; otherwise -// (2) the row filter when it is set and not AlwaysFalse and no explicit data files were -// registered for deletion (the pure overwriteByRowFilter case, where the row filter -// already describes the conflicting range); otherwise (3) the conservative AlwaysTrue, -// under which any newly-added data file counts as a conflict (file-replacement or mixed -// mode). RowFilter() defaults to the AlwaysFalse singleton when unset, so comparing -// against the Expressions::AlwaysFalse() singleton by pointer identity (consistent with -// how the filter managers track delete_expr_) correctly treats the unset case as "no -// row filter". +// Pure row-filter overwrites use the row filter as the data conflict filter. Explicit +// file replacement is more conservative unless the caller supplies a narrower filter. std::shared_ptr OverwriteFiles::DataConflictDetectionFilter() const { if (conflict_detection_filter_ != nullptr) { return conflict_detection_filter_; @@ -196,16 +153,8 @@ std::shared_ptr OverwriteFiles::DataConflictDetectionFilter() const return Expressions::AlwaysTrue(); } -// Run the enabled overwrite-specific concurrency checks before commit. The branches are -// sequential and independent. The first failing check aborts the commit; on success we -// return OK and let the inherited Apply(...) build the snapshot. Status OverwriteFiles::Validate(const TableMetadata& current_metadata, const std::shared_ptr& snapshot) { - // 0. Strict added-file range precondition: when - // ValidateAddedFilesMatchOverwriteFilter() - // is enabled, a row filter is required to define the overwrite range. RowFilter() - // defaults to the AlwaysFalse singleton when unset, so an unset or AlwaysFalse - // filter means there is no range to validate against and the commit must fail. if (validate_added_files_match_overwrite_filter_) { if (RowFilter() == nullptr || RowFilter() == Expressions::AlwaysFalse()) { return ValidationFailed( @@ -215,20 +164,13 @@ Status OverwriteFiles::Validate(const TableMetadata& current_metadata, ValidateAddedFilesMatchOverwriteFilterImpl(current_metadata)); } - // 1. Concurrent newly-added data files: fail if any snapshot after the starting point - // added a data file matching the resolved conflict-detection filter. if (validate_no_conflicting_data_) { ICEBERG_RETURN_UNEXPECTED(ValidateAddedDataFiles( current_metadata, starting_snapshot_id_, DataConflictDetectionFilter(), snapshot, ctx_->table->io(), IsCaseSensitive())); } - // 2. Concurrent deletes: the two sub-checks are independent and both may fire. if (validate_no_conflicting_deletes_) { - // Path A: a row filter is in play (set and not the AlwaysFalse singleton). Fail if a - // new delete file matches the range, or if a data file in the range was concurrently - // removed. Prefer the explicit conflict_detection_filter_ when set, else the row - // filter that describes the overwrite range. if (RowFilter() != nullptr && RowFilter() != Expressions::AlwaysFalse()) { auto delete_filter = conflict_detection_filter_ != nullptr ? conflict_detection_filter_ @@ -241,12 +183,6 @@ Status OverwriteFiles::Validate(const TableMetadata& current_metadata, snapshot, ctx_->table->io(), IsCaseSensitive())); } - // Path B: explicit old data files were registered for replacement. Fail if a new - // delete file matching the conflict-detection filter covers any of them. Use the - // STATIC data_filter overload, passing the raw conflict_detection_filter_ (which may - // be nullptr — a nullptr filter means no filter, so all delete files are considered). - // The 3rd argument is an Expression, so this overload is unambiguous and needs no - // member-function-pointer cast. if (!deleted_data_files_.empty()) { ICEBERG_RETURN_UNEXPECTED(ValidateNoNewDeletesForDataFiles( current_metadata, starting_snapshot_id_, conflict_detection_filter_, @@ -258,54 +194,26 @@ Status OverwriteFiles::Validate(const TableMetadata& current_metadata, } // Every added data file must be fully contained in the overwrite range defined by -// RowFilter(). The validation resolves a single partition spec via DataSpec() and then, -// for each added file, an inclusive partition projection provides a fast negative (reject -// when the file's partition cannot possibly fall in range), a strict partition projection -// provides a fast positive (accept when the whole partition is guaranteed in range), and -// a file-level StrictMetricsEvaluator provides the fallback proof. Metrics that are -// missing or insufficient yield a non-matching result and therefore fail validation -// (conservative: never silently accept an unprovable file). -// -// DataSpec() returns InvalidArgument when zero data files were added (empty-added-files -// is rejected) and when more than one partition spec is represented among the added files -// (multi-spec is rejected). DataSpec() reads the spec from the producer's base metadata; -// the projections use the table schema from metadata.Schema(). -// -// Implementation notes: -// * `Evaluator::Make` takes a `Schema`, and the partition type is obtained via -// `PartitionSpec::PartitionType(schema)` (which needs the table schema and returns a -// `StructType`). We therefore wrap the partition `StructType`'s fields in a `Schema` -// and bind the projected expression against it. -// * `StrictMetricsEvaluator::Make` binds its expression internally, and `Binder::Bind` -// rejects an already-bound predicate. So the bound filter is used only for the -// projections (`ProjectionEvaluator::Project` requires a bound expression), while the -// UNBOUND `RowFilter()` is handed to `StrictMetricsEvaluator::Make`. +// RowFilter(). Partition projection handles whole-partition proofs; strict metrics are +// used only when the partition alone is not enough. Status OverwriteFiles::ValidateAddedFilesMatchOverwriteFilterImpl( const TableMetadata& metadata) { ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); - // Single spec for all added files. This rejects both the empty-added-files case and - // the multi-spec case via InvalidArgument. ICEBERG_ASSIGN_OR_RAISE(auto spec, DataSpec()); - // Bind the row filter once for projection (Project requires a bound expression); build - // the strict metrics evaluator once over the UNBOUND row filter (it binds internally). + // Project requires a bound expression; StrictMetricsEvaluator binds internally. ICEBERG_ASSIGN_OR_RAISE(auto bound_filter, Binder::Bind(*schema, RowFilter(), IsCaseSensitive())); ICEBERG_ASSIGN_OR_RAISE( auto strict_metrics_evaluator, StrictMetricsEvaluator::Make(RowFilter(), schema, IsCaseSensitive())); - // Build ONE inclusive and ONE strict partition-value evaluator from the single spec. - // The Evaluators bind on construction and do not retain the partition schema, so the - // local schema below may safely go out of scope. ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema)); auto partition_fields = partition_type->fields(); Schema partition_schema( std::vector(partition_fields.begin(), partition_fields.end())); - // Inclusive projection (fast negative): project the bound filter into the partition - // space and build an evaluator over the spec's partition values. auto inclusive_projection = Projections::Inclusive(*spec, *schema, IsCaseSensitive()); ICEBERG_ASSIGN_OR_RAISE(auto inclusive_expr, inclusive_projection->Project(bound_filter)); @@ -313,7 +221,6 @@ Status OverwriteFiles::ValidateAddedFilesMatchOverwriteFilterImpl( auto inclusive_evaluator, Evaluator::Make(partition_schema, inclusive_expr, IsCaseSensitive())); - // Strict projection (fast positive). auto strict_projection = Projections::Strict(*spec, *schema, IsCaseSensitive()); ICEBERG_ASSIGN_OR_RAISE(auto strict_expr, strict_projection->Project(bound_filter)); ICEBERG_ASSIGN_OR_RAISE( @@ -326,8 +233,6 @@ Status OverwriteFiles::ValidateAddedFilesMatchOverwriteFilterImpl( "Cannot validate added files match overwrite filter: null data file"); } - // Step 1: inclusive projection — fast negative. If the file's partition cannot match - // the (inclusively projected) filter, the file is definitively outside the range. ICEBERG_ASSIGN_OR_RAISE(bool inclusive_match, inclusive_evaluator->Evaluate(file->partition)); if (!inclusive_match) { @@ -337,17 +242,12 @@ Status OverwriteFiles::ValidateAddedFilesMatchOverwriteFilterImpl( file->file_path); } - // Step 2: strict projection — fast positive. If the whole partition is guaranteed in - // range, the file is accepted without inspecting metrics. ICEBERG_ASSIGN_OR_RAISE(bool strict_match, strict_evaluator->Evaluate(file->partition)); if (strict_match) { continue; } - // Step 3: file-level proof via strict metrics. A false result — including the case - // where metrics are missing or insufficient — fails validation conservatively rather - // than silently accepting a file that cannot be proved fully in range. ICEBERG_ASSIGN_OR_RAISE(bool metrics_match, strict_metrics_evaluator->Evaluate(*file)); if (!metrics_match) { diff --git a/src/iceberg/update/overwrite_files.h b/src/iceberg/update/overwrite_files.h index 59f25914d..1c1979108 100644 --- a/src/iceberg/update/overwrite_files.h +++ b/src/iceberg/update/overwrite_files.h @@ -36,18 +36,11 @@ namespace iceberg { /// \brief Overwrite data files in a table. /// -/// OverwriteFiles expresses a logical overwrite transaction: either explicit file -/// replacement (`DeleteFile(old)` + `AddFile(new)`) or range-based replacement -/// (`OverwriteByRowFilter(expr)` + `AddFile(new...)`). It is a concrete subclass of -/// MergingSnapshotUpdate that adds only the overwrite-specific builder surface and the -/// overwrite-specific `Validate(...)` logic, reusing the inherited filter → write → -/// merge pipeline, summary building, commit retry, and cleanup unchanged. +/// Supports explicit file replacement and range-based replacement by row filter. class ICEBERG_EXPORT OverwriteFiles : public MergingSnapshotUpdate { public: /// \brief Create a new OverwriteFiles instance. /// - /// Mirrors FastAppend::Make: validates inputs and returns a heap instance. - /// /// \param table_name The name of the table /// \param ctx The transaction context to use for this update /// \return A Result containing the OverwriteFiles instance or an error @@ -56,8 +49,6 @@ class ICEBERG_EXPORT OverwriteFiles : public MergingSnapshotUpdate { ~OverwriteFiles() override; - // --- Overwrite builder surface (chained, ErrorCollector-deferred) --- - /// \brief Add a new data file to the overwrite. /// /// \param file The data file to add @@ -70,8 +61,7 @@ class ICEBERG_EXPORT OverwriteFiles : public MergingSnapshotUpdate { /// \return Reference to this for method chaining OverwriteFiles& DeleteFile(const std::shared_ptr& file); - /// \brief Bulk equivalent of repeated DeleteFile(...) plus explicit delete-file - /// removal. + /// \brief Remove data files and delete files as part of the overwrite. /// /// \param data_files_to_delete The data files to delete /// \param delete_files_to_delete The delete files to remove @@ -85,8 +75,6 @@ class ICEBERG_EXPORT OverwriteFiles : public MergingSnapshotUpdate { /// \return Reference to this for method chaining OverwriteFiles& OverwriteByRowFilter(std::shared_ptr expr); - // --- Concurrency / correctness controls --- - /// \brief Set the lower bound snapshot id for concurrency scans. /// /// \param snapshot_id The starting snapshot id @@ -117,15 +105,10 @@ class ICEBERG_EXPORT OverwriteFiles : public MergingSnapshotUpdate { /// \brief Set case sensitivity for binding, projection, and metrics evaluation. /// - /// Forwards to the protected MergingSnapshotUpdate::CaseSensitive(bool); the public - /// name differs to avoid the public/protected name clash and keep a fluent return. - /// /// \param case_sensitive Whether matching should be case-sensitive /// \return Reference to this for method chaining OverwriteFiles& WithCaseSensitivity(bool case_sensitive); - // --- SnapshotUpdate / MergingSnapshotUpdate overrides --- - std::string operation() override; Status Validate(const TableMetadata& current_metadata, From 14f6538cde7323395661ccebbd2ca537918151c6 Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Sat, 20 Jun 2026 08:37:15 +0800 Subject: [PATCH 4/4] fix(update): Fix OverwriteFiles data/delete content validation for direct AddFile/DeleteFile paths and trim overly verbose test comments. --- src/iceberg/test/overwrite_files_test.cc | 100 ++++++++--------------- src/iceberg/update/overwrite_files.cc | 6 ++ 2 files changed, 42 insertions(+), 64 deletions(-) diff --git a/src/iceberg/test/overwrite_files_test.cc b/src/iceberg/test/overwrite_files_test.cc index 9b4c6c0c4..bbdc06d13 100644 --- a/src/iceberg/test/overwrite_files_test.cc +++ b/src/iceberg/test/overwrite_files_test.cc @@ -50,11 +50,8 @@ namespace iceberg { -// Test harness for OverwriteFiles. -// // The base table (TableMetadataV2ValidMinimal.json) has schema {x: long (id 1), -// y: long (id 2), z: long (id 3)} and a single partition spec (spec id 0) that -// partitions by identity(x). +// y: long (id 2), z: long (id 3)} and partitions by identity(x). class OverwriteFilesTest : public UpdateTestBase { protected: static void SetUpTestSuite() { avro::RegisterAll(); } @@ -73,7 +70,6 @@ class OverwriteFilesTest : public UpdateTestBase { file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); } - // A plain data file in spec 0 (identity(x)) with the given partition value for x. std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x, int64_t record_count = 100) { auto f = std::make_shared(); @@ -87,8 +83,7 @@ class OverwriteFilesTest : public UpdateTestBase { return f; } - // A data file carrying column metrics for column y (field id 2): lower/upper bounds - // plus value/null counts, so the StrictMetricsEvaluator can reason about it. + // Add y metrics so StrictMetricsEvaluator can prove row-filter containment. std::shared_ptr MakeDataFileWithYBounds(const std::string& path, int64_t partition_x, int64_t y_lower, int64_t y_upper) { @@ -106,7 +101,6 @@ class OverwriteFilesTest : public UpdateTestBase { return f; } - // An equality delete file in spec 0 (partition x = partition_x). std::shared_ptr MakeEqualityDeleteFile(const std::string& path, int64_t partition_x) { auto f = MakeDeleteFile(path, partition_x); @@ -115,9 +109,7 @@ class OverwriteFilesTest : public UpdateTestBase { return f; } - // Write a delete manifest containing the given delete files, with the snapshot id and - // sequence number assigned on each entry (so the manifest list writer does not need to - // inherit them). + // Entries carry assigned snapshot and sequence numbers. Result WriteDeleteManifest( const std::string& path, const std::vector>& files, int64_t snapshot_id, int64_t sequence_number) { @@ -137,8 +129,6 @@ class OverwriteFilesTest : public UpdateTestBase { return writer->ToManifestFile(); } - // Build a synthetic snapshot from the given manifests (mirrors the merging-update test - // harness). Used to inject a concurrent commit between read and validate. Result> MakeSyntheticSnapshot( std::string operation, int64_t snapshot_id, std::optional parent_snapshot_id, int64_t sequence_number, @@ -161,10 +151,7 @@ class OverwriteFilesTest : public UpdateTestBase { return std::shared_ptr(std::move(snapshot)); } - // Inject a concurrent snapshot that adds an equality delete file covering partition - // `partition_x`, layered on top of `parent`. Returns the metadata containing the new - // snapshot (as current) plus the new snapshot itself, so a subsequent Validate(...) can - // scan the range (parent, new] for new deletes. + // Build metadata with a synthetic concurrent equality delete after `parent`. struct ConcurrentDelete { std::shared_ptr metadata; std::shared_ptr snapshot; @@ -199,7 +186,6 @@ class OverwriteFilesTest : public UpdateTestBase { return table_->NewOverwrite(); } - // Commit file_a_ with FastAppend and refresh the table; returns its snapshot id. int64_t CommitFileA() { auto fa = table_->NewFastAppend(); EXPECT_TRUE(fa.has_value()); @@ -211,7 +197,6 @@ class OverwriteFilesTest : public UpdateTestBase { return snap.value()->snapshot_id; } - // Append a single file via FastAppend and refresh; returns the new snapshot. std::shared_ptr CommitFastAppend(const std::shared_ptr& file) { auto fa = table_->NewFastAppend(); EXPECT_TRUE(fa.has_value()); @@ -406,11 +391,7 @@ TEST_F(OverwriteFilesTest, CommitCustomSummaryProperty) { EXPECT_EQ(snapshot->summary.at("custom-prop"), "custom-value"); } -// A DataFileSet + DeleteFileSet forwards data files to DeleteDataFile and delete files -// to DeleteDeleteFile; the committed snapshot reflects the data-file removal. (The -// delete file is forwarded to DeleteDeleteFile; with no matching committed delete file -// present its removal is a harmless no-op, mirroring the inherited missing-delete -// behavior.) +// With no matching committed delete file, deleting `del_file` is a harmless no-op. TEST_F(OverwriteFilesTest, BulkDeleteFilesRemovesDataAndDeleteFiles) { { ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); @@ -475,7 +456,30 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesEquivalentToRepeatedDeleteFile) { EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "0"); } -// DeleteFiles validates content because both input sets store DataFile pointers. +// OverwriteFiles validates content because the C++ API stores data and delete files in +// DataFile pointers, while Java uses separate DataFile/DeleteFile types. +TEST_F(OverwriteFilesTest, AddFileRejectsDeleteFileContent) { + auto del_file = MakeDeleteFile("/delete/del_as_data.parquet", 1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(del_file); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file to add")); + EXPECT_THAT(result, HasErrorMessage("has delete-file content")); +} + +TEST_F(OverwriteFilesTest, DeleteFileRejectsDeleteFileContent) { + auto del_file = MakeDeleteFile("/delete/del_as_delete.parquet", 1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(del_file); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file to delete")); + EXPECT_THAT(result, HasErrorMessage("has delete-file content")); +} + TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDeleteFileInDataSet) { auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L); // content = positionDeletes @@ -511,8 +515,6 @@ TEST_F(OverwriteFilesTest, BulkDeleteFilesAcceptsEqualityDeleteInDeleteSet) { EXPECT_THAT(op->Commit(), IsOk()); } -// ValidateNoConflictingData: a competing FastAppend that added a data file matching the -// resolved conflict-detection filter makes the overwrite commit fail. TEST_F(OverwriteFilesTest, ValidateNoConflictingDataDetectsConflictingAdd) { const int64_t first_id = CommitFileA(); CommitFastAppend(file_b_); @@ -543,8 +545,6 @@ TEST_F(OverwriteFilesTest, ValidateNoConflictingDataAllowsNonConflictingChange) EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), expected_operation); } -// ValidateNoConflictingDeletes: a competing snapshot that deleted a data file in the -// overwrite range makes the commit fail. TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesDetectsConflictingDelete) { const int64_t first_id = CommitFileA(); @@ -563,10 +563,8 @@ TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesDetectsConflictingDelete) EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); } -// ValidateNoConflictingDeletes allows a non-conflicting concurrent append to commit. TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesAllowsNonConflictingChange) { const int64_t first_id = CommitFileA(); - // Competing append in partition x=2 (no deletes in the x=1 range). CommitFastAppend(file_b_); ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); @@ -577,8 +575,7 @@ TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesAllowsNonConflictingChang EXPECT_THAT(op->Commit(), IsOk()); } -// Explicit replaced-file validation applies ConflictDetectionFilter to concurrent delete -// files that cover replaced data files. +// Explicit replaced-file validation checks concurrent deletes covering replaced files. TEST_F(OverwriteFilesTest, PathBExplicitDeletesDetectsConcurrentDeleteWithoutFilter) { CommitFileA(); ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); @@ -595,8 +592,7 @@ TEST_F(OverwriteFilesTest, PathBExplicitDeletesDetectsConcurrentDeleteWithoutFil IsError(ErrorKind::kValidationFailed)); } -// A conflict filter that does not cover the replaced file's partition narrows the scope, -// so the concurrent delete is filtered out and validation succeeds. +// A narrower conflict filter can exclude the concurrent delete. TEST_F(OverwriteFilesTest, PathBExplicitDeletesConflictFilterNarrowsScope) { CommitFileA(); ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); @@ -613,10 +609,6 @@ TEST_F(OverwriteFilesTest, PathBExplicitDeletesConflictFilterNarrowsScope) { EXPECT_THAT(op->Validate(*concurrent.metadata, concurrent.snapshot), IsOk()); } -// These exercise OverwriteFiles::Validate(...) directly (the same entry point the commit -// kernel invokes), which is sufficient and deterministic: the strict-range branch does -// not depend on concurrent snapshots. - TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictProjection) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); @@ -625,8 +617,6 @@ TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictProjection) { EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); } -// Strict partition projection is insufficient (filter on a non-partition column) but the -// StrictMetricsEvaluator proves containment from the file's bounds. TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictMetrics) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); @@ -636,7 +626,6 @@ TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictMetrics) { EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); } -// Neither the strict projection nor the metrics can prove containment. TEST_F(OverwriteFilesTest, StrictRangeRejectedWhenNotProvable) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); @@ -664,10 +653,8 @@ TEST_F(OverwriteFilesTest, StrictRangeRequiresRowFilter) { IsError(ErrorKind::kValidationFailed)); } -// ValidateAddedFilesMatchOverwriteFilter resolves one data spec for all added files. TEST_F(OverwriteFilesTest, StrictRangeRejectsMultiplePartitionSpecs) { - // Add a second partition spec before creating the builder, - // so the producer's base metadata can resolve both specs when files are staged. + // Add the second spec before creating the builder so staged files can resolve it. ICEBERG_UNWRAP_OR_FAIL( auto spec1, PartitionSpec::Make(*schema_, /*spec_id=*/1, {PartitionField(/*source_id=*/1, /*field_id=*/1001, @@ -699,9 +686,7 @@ TEST_F(OverwriteFilesTest, StrictRangeRejectsEmptyAddedFiles) { IsError(ErrorKind::kInvalidArgument)); } -// Case-insensitive binding accepts a differently-cased column name where the -// case-sensitive (default) binding rejects it. Observed through the strict-range -// validation, which binds the row filter using the configured case sensitivity. +// Strict-range validation binds the row filter with the configured case sensitivity. TEST_F(OverwriteFilesTest, CaseSensitivityAffectsFilterBinding) { { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); @@ -721,8 +706,6 @@ TEST_F(OverwriteFilesTest, CaseSensitivityAffectsFilterBinding) { } } -// Null arguments to the pointer-taking builder methods surface at Commit() without -// crashing. TEST_F(OverwriteFilesTest, NullAddFileRejectedAtCommit) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->AddFile(nullptr); @@ -764,8 +747,7 @@ TEST_F(OverwriteFilesTest, NullArgumentDoesNotCrashBuilderChain) { EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); } -// A null element cannot enter a DataFileSet / DeleteFileSet (insert() rejects nullptr), -// so DeleteFiles({null...}, {null...}) is a no-op rather than an error. +// DataFileSet/DeleteFileSet reject nullptr on insert. TEST_F(OverwriteFilesTest, DeleteFilesNullElementsCannotEnterSets) { DataFileSet data_files; data_files.insert(std::shared_ptr{nullptr}); @@ -780,9 +762,7 @@ TEST_F(OverwriteFilesTest, DeleteFilesNullElementsCannotEnterSets) { EXPECT_THAT(op->Commit(), IsOk()); } -// ValidateFromSnapshot accepts a non-negative id (0 is in the generated id range -// [0, INT64_MAX]) and rejects a negative id (including kInvalidSnapshotId == -1) as a -// deferred error surfaced at Commit(). +// Snapshot id 0 is valid; negative ids are rejected as builder errors. TEST_F(OverwriteFilesTest, ValidateFromSnapshotRejectsNegativeSnapshotId) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); op->AddFile(file_a_).ValidateFromSnapshot(-1); @@ -797,11 +777,6 @@ TEST_F(OverwriteFilesTest, ValidateFromSnapshotAcceptsZeroSnapshotId) { EXPECT_THAT(op->Commit(), IsOk()); } -// AddedDataFiles() / deleted_data_files_ are not publicly observable, so the properties -// are checked indirectly through operation(): for any non-null file, AddFile-only yields -// `append` (the file was added and the row filter is untouched), while DeleteFile-only -// and DeleteFiles-only (data portion) yield `delete` (the file was registered for -// deletion and DeletesDataFiles() became true). TEST_F(OverwriteFilesTest, PropertyBuilderForwardingAndDualTracking) { const std::vector> files = { MakeDataFile("/data/p0.parquet", 1L), @@ -903,10 +878,7 @@ TEST_F(OverwriteFilesTest, PropertyOperationTruthTable) { } } -// DataConflictDetectionFilter() is private, so its three resolution outcomes are observed -// indirectly through ValidateNoConflictingData against a competing concurrent add of -// file_b (partition x=2): -// explicit filter, row filter only, and explicit file replacement. +// Exercise explicit filter, row-filter-only, and explicit file replacement resolution. TEST_F(OverwriteFilesTest, PropertyConflictFilterResolution) { { const int64_t first_id = CommitFileA(); diff --git a/src/iceberg/update/overwrite_files.cc b/src/iceberg/update/overwrite_files.cc index 4e5dbfeef..8b493babb 100644 --- a/src/iceberg/update/overwrite_files.cc +++ b/src/iceberg/update/overwrite_files.cc @@ -56,12 +56,18 @@ OverwriteFiles::~OverwriteFiles() = default; OverwriteFiles& OverwriteFiles::AddFile(const std::shared_ptr& file) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->content == DataFile::Content::kData, + "Invalid data file to add: {} has delete-file content", + file->file_path); ICEBERG_BUILDER_RETURN_IF_ERROR(AddDataFile(file)); return *this; } OverwriteFiles& OverwriteFiles::DeleteFile(const std::shared_ptr& file) { ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->content == DataFile::Content::kData, + "Invalid data file to delete: {} has delete-file content", + file->file_path); deleted_data_files_.insert(file); ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); return *this;