diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc index 98956ba7c..32224d117 100644 --- a/src/iceberg/test/fast_append_test.cc +++ b/src/iceberg/test/fast_append_test.cc @@ -20,13 +20,20 @@ #include "iceberg/update/fast_append.h" #include +#include +#include +#include #include #include #include "iceberg/avro/avro_register.h" +#include "iceberg/constants.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_writer.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" +#include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" #include "iceberg/test/test_resource.h" @@ -72,6 +79,23 @@ class FastAppendTest : public UpdateTestBase { return data_file; } + Result WriteManifest( + const std::string& path, const std::vector>& files) { + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter(table_->metadata()->format_version, + kInvalidSnapshotId, path, file_io_, spec_, + schema_, ManifestContent::kData)); + for (const auto& file : files) { + ManifestEntry entry; + entry.status = ManifestStatus::kAdded; + entry.snapshot_id = std::nullopt; + entry.data_file = file; + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + std::shared_ptr spec_; std::shared_ptr schema_; std::shared_ptr file_a_; @@ -90,6 +114,9 @@ TEST_F(FastAppendTest, AppendDataFile) { EXPECT_EQ(snapshot->summary.at("added-data-files"), "1"); EXPECT_EQ(snapshot->summary.at("added-records"), "100"); EXPECT_EQ(snapshot->summary.at("added-files-size"), "1024"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsCreated), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsKept), "0"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsReplaced), "0"); } TEST_F(FastAppendTest, AppendMultipleDataFiles) { @@ -172,6 +199,40 @@ TEST_F(FastAppendTest, FinalizeIgnoresCleanupDeleteFailure) { IsOk()); } +TEST_F(FastAppendTest, RetryCopiesAppendManifestAgain) { + table_->metadata()->format_version = 1; + const auto path = table_location_ + "/metadata/input.avro"; + ICEBERG_UNWRAP_OR_FAIL(auto manifest, WriteManifest(path, {file_a_})); + + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + std::vector deleted_paths; + fast_append->DeleteWith([&](const std::string& deleted_path) { + deleted_paths.push_back(deleted_path); + return file_io_->DeleteFile(deleted_path); + }); + fast_append->AppendManifest(manifest); + + auto& update = static_cast(*fast_append); + // First Apply() copies the input manifest because v1 cannot inherit snapshot IDs. + ICEBERG_UNWRAP_OR_FAIL(auto first_apply, update.Apply()); + SnapshotCache first_cache(first_apply.snapshot.get()); + ICEBERG_UNWRAP_OR_FAIL(auto first_manifests, first_cache.Manifests(file_io_)); + ASSERT_EQ(first_manifests.size(), 1U); + const auto first_rewritten_path = first_manifests[0].manifest_path; + EXPECT_NE(first_rewritten_path, path); + + // Second Apply() simulates retry cleanup, then copies the original manifest again. + ICEBERG_UNWRAP_OR_FAIL(auto second_apply, update.Apply()); + EXPECT_THAT(deleted_paths, testing::Contains(first_rewritten_path)); + + SnapshotCache second_cache(second_apply.snapshot.get()); + ICEBERG_UNWRAP_OR_FAIL(auto second_manifests, second_cache.Manifests(file_io_)); + ASSERT_EQ(second_manifests.size(), 1U); + EXPECT_NE(second_manifests[0].manifest_path, path); + EXPECT_NE(second_manifests[0].manifest_path, first_rewritten_path); +} + TEST_F(FastAppendTest, AppendDuplicateFile) { std::shared_ptr fast_append; ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc index 24f8e744f..4167387c7 100644 --- a/src/iceberg/update/fast_append.cc +++ b/src/iceberg/update/fast_append.cc @@ -26,7 +26,7 @@ #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_util_internal.h" #include "iceberg/snapshot.h" -#include "iceberg/table.h" +#include "iceberg/table.h" // IWYU pragma: keep #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/transaction.h" @@ -58,7 +58,7 @@ FastAppend& FastAppend::AppendFile(const std::shared_ptr& file) { auto [iter, inserted] = data_files.insert(file); if (inserted) { has_new_files_ = true; - ICEBERG_BUILDER_RETURN_IF_ERROR(summary_.AddedFile(*spec, *file)); + ICEBERG_BUILDER_RETURN_IF_ERROR(added_data_files_summary_.AddedFile(*spec, *file)); } return *this; @@ -75,11 +75,13 @@ FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) { "Sequence number must be assigned during commit"); if (can_inherit_snapshot_id() && manifest.added_snapshot_id == kInvalidSnapshotId) { - summary_.AddedManifest(manifest); + appended_manifests_summary_.AddedManifest(manifest); append_manifests_.push_back(manifest); } else { // The manifest must be rewritten with this update's snapshot ID - ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest)); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, + CopyManifest(manifest, /*update_summary=*/true)); + append_manifests_to_copy_.push_back(manifest); rewritten_append_manifests_.push_back(std::move(copied_manifest)); } @@ -93,6 +95,16 @@ Result> FastAppend::Apply( std::vector manifests; ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests()); + // A retry cleanup deletes copied append manifests and clears the rewritten + // list; rebuild them from the original appended manifests before re-applying. + if (rewritten_append_manifests_.empty() && !append_manifests_to_copy_.empty()) { + for (const auto& manifest : append_manifests_to_copy_) { + ICEBERG_ASSIGN_OR_RAISE(auto copied_manifest, + CopyManifest(manifest, /*update_summary=*/false)); + rewritten_append_manifests_.push_back(std::move(copied_manifest)); + } + } + manifests.reserve(new_written_manifests.size() + append_manifests_.size() + rewritten_append_manifests_.size()); if (!new_written_manifests.empty()) { @@ -122,15 +134,31 @@ Result> FastAppend::Apply( snapshot_manifests.end()); } + manifest_count_summary_ = + BuildManifestCountSummary(manifests, /*replaced_manifests_count=*/0); + return manifests; } std::unordered_map FastAppend::Summary() { + summary_.Clear(); summary_.SetPartitionSummaryLimit( base().properties.Get(TableProperties::kWritePartitionSummaryLimit)); + summary_.Merge(added_data_files_summary_); + summary_.Merge(appended_manifests_summary_); + for (const auto& [property, value] : custom_summary_properties_) { + summary_.Set(property, value); + } + summary_.Merge(manifest_count_summary_); return summary_.Build(); } +void FastAppend::SetSummaryProperty(const std::string& property, + const std::string& value) { + custom_summary_properties_[property] = value; + SnapshotUpdate::SetSummaryProperty(property, value); +} + Status FastAppend::CleanUncommitted(const std::unordered_set& committed) { // Clean up new manifests that were written but not committed if (!new_manifests_.empty()) { @@ -151,24 +179,26 @@ Status FastAppend::CleanUncommitted(const std::unordered_set& commi std::ignore = DeleteFile(manifest.manifest_path); } } + rewritten_append_manifests_.clear(); } return {}; } bool FastAppend::CleanupAfterCommit() const { - // Cleanup after committing is disabled for FastAppend unless there are - // rewritten_append_manifests_ because: - // 1.) Appended manifests are never rewritten + // Cleanup after committing is disabled for FastAppend unless append manifests + // were copied or need to be copied on retry because: + // 1.) Directly appended manifests are never rewritten // 2.) Manifests which are written out as part of AppendFile are already cleaned // up between commit attempts in WriteNewManifests - return !rewritten_append_manifests_.empty(); + return !rewritten_append_manifests_.empty() || !append_manifests_to_copy_.empty(); } Result> FastAppend::Spec(int32_t spec_id) { return base().PartitionSpecById(spec_id); } -Result FastAppend::CopyManifest(const ManifestFile& manifest) { +Result FastAppend::CopyManifest(const ManifestFile& manifest, + bool update_summary) { const TableMetadata& current = base(); ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema()); ICEBERG_ASSIGN_OR_RAISE(auto spec, @@ -180,7 +210,8 @@ Result FastAppend::CopyManifest(const ManifestFile& manifest) { // Copy the manifest with the new snapshot ID. return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, snapshot_id, - new_manifest_path, current.format_version, &summary_); + new_manifest_path, current.format_version, + update_summary ? &appended_manifests_summary_ : nullptr); } Result> FastAppend::WriteNewManifests() { diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h index a04786c88..07018989c 100644 --- a/src/iceberg/update/fast_append.h +++ b/src/iceberg/update/fast_append.h @@ -72,6 +72,7 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { const TableMetadata& metadata_to_update, const std::shared_ptr& snapshot) override; std::unordered_map Summary() override; + void SetSummaryProperty(const std::string& property, const std::string& value) override; Status CleanUncommitted(const std::unordered_set& committed) override; bool CleanupAfterCommit() const override; @@ -84,8 +85,9 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { /// \brief Copy a manifest file with a new snapshot ID. /// /// \param manifest The manifest to copy + /// \param update_summary Whether to add copied entries to the append summary /// \return The copied manifest file - Result CopyManifest(const ManifestFile& manifest); + Result CopyManifest(const ManifestFile& manifest, bool update_summary); /// \brief Write new manifests for the accumulated data files. /// @@ -95,9 +97,18 @@ class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { private: std::string table_name_; std::unordered_map new_data_files_by_spec_; + // Stable input summaries for retry-safe summary_ rebuilds. + SnapshotSummaryBuilder added_data_files_summary_; + SnapshotSummaryBuilder appended_manifests_summary_; + // User-provided summary properties restored after summary_ rebuilds. + std::unordered_map custom_summary_properties_; std::vector append_manifests_; + // Original manifests kept to recreate copied manifests after retry cleanup. + std::vector append_manifests_to_copy_; std::vector rewritten_append_manifests_; std::vector new_manifests_; + // Manifest count summary from the latest Apply() result. + SnapshotSummaryBuilder manifest_count_summary_; bool has_new_files_{false}; }; diff --git a/src/iceberg/update/merging_snapshot_update.cc b/src/iceberg/update/merging_snapshot_update.cc index a0d882d14..77d3c41c0 100644 --- a/src/iceberg/update/merging_snapshot_update.cc +++ b/src/iceberg/update/merging_snapshot_update.cc @@ -944,26 +944,11 @@ Result> MergingSnapshotUpdate::Apply( result.insert(result.end(), std::make_move_iterator(merged_deletes.begin()), std::make_move_iterator(merged_deletes.end())); - // Manifest count summary: unassigned manifests count as neither created nor kept. - int32_t manifests_created = 0; - int32_t manifests_kept = 0; - for (const auto& m : result) { - if (m.added_snapshot_id == snapshot_id) { - ++manifests_created; - } else if (m.added_snapshot_id != kInvalidSnapshotId) { - ++manifests_kept; - } - } int32_t replaced_manifests_count = data_filter_manager_->ReplacedManifestsCount() + delete_filter_manager_->ReplacedManifestsCount() + data_merge_manager_->ReplacedManifestsCount() + delete_merge_manager_->ReplacedManifestsCount(); - summary_builder().Set(SnapshotSummaryFields::kManifestsCreated, - std::to_string(manifests_created)); - summary_builder().Set(SnapshotSummaryFields::kManifestsKept, - std::to_string(manifests_kept)); - summary_builder().Set(SnapshotSummaryFields::kManifestsReplaced, - std::to_string(replaced_manifests_count)); + summary_builder().Merge(BuildManifestCountSummary(result, replaced_manifests_count)); return result; } diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index bb5376fa8..03f68cc94 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -424,6 +424,28 @@ std::string SnapshotUpdate::ManifestListPath() { return ctx_->MetadataFileLocation(filename); } +SnapshotSummaryBuilder SnapshotUpdate::BuildManifestCountSummary( + std::span manifests, int32_t replaced_manifests_count) { + SnapshotSummaryBuilder summary; + int32_t manifests_created = 0; + int32_t manifests_kept = 0; + int64_t snapshot_id = SnapshotId(); + for (const auto& manifest : manifests) { + if (manifest.added_snapshot_id == snapshot_id) { + ++manifests_created; + } else if (manifest.added_snapshot_id != kInvalidSnapshotId) { + ++manifests_kept; + } + } + + summary.Set(SnapshotSummaryFields::kManifestsCreated, + std::to_string(manifests_created)); + summary.Set(SnapshotSummaryFields::kManifestsKept, std::to_string(manifests_kept)); + summary.Set(SnapshotSummaryFields::kManifestsReplaced, + std::to_string(replaced_manifests_count)); + return summary; +} + std::string SnapshotUpdate::ManifestPath() { // Generate manifest path // Format: {metadata_location}/{uuid}-m{manifest_count}.avro diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 03a74e788..7397034b3 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -216,6 +216,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { std::string ManifestPath(); std::string ManifestListPath(); SnapshotSummaryBuilder& summary_builder() { return summary_; } + SnapshotSummaryBuilder BuildManifestCountSummary( + std::span manifests, int32_t replaced_manifests_count); private: /// \brief Returns the snapshot summary from the implementation and updates totals.