diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 6881d34fb..03cca779d 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -239,6 +239,8 @@ TableScanBuilder& TableScanBuilder::CaseSensitive( template TableScanBuilder& TableScanBuilder::IncludeColumnStats() { context_.return_column_stats = true; + context_.columns_to_keep_stats.clear(); + requested_column_stats_.reset(); return *this; } @@ -246,17 +248,8 @@ template TableScanBuilder& TableScanBuilder::IncludeColumnStats( const std::vector& requested_columns) { context_.return_column_stats = true; - context_.columns_to_keep_stats.clear(); - context_.columns_to_keep_stats.reserve(requested_columns.size()); - - ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_ref, ResolveSnapshotSchema()); - const auto& schema = schema_ref.get(); - for (const auto& column_name : requested_columns) { - ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field, schema->FindFieldByName(column_name)); - if (field.has_value()) { - context_.columns_to_keep_stats.insert(field.value().get().field_id()); - } - } + requested_column_stats_ = requested_columns; + ICEBERG_BUILDER_RETURN_IF_ERROR(ResolveColumnStatsSelection()); return *this; } @@ -295,14 +288,15 @@ TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snap context_.snapshot_id.value()); ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id)); context_.snapshot_id = snapshot_id; + InvalidateSnapshotSchema(); return *this; } template TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) { if (ref == SnapshotRef::kMainBranch) { - snapshot_schema_ = nullptr; context_.snapshot_id.reset(); + InvalidateSnapshotSchema(); return *this; } @@ -315,6 +309,7 @@ TableScanBuilder& TableScanBuilder::UseRef(const std::string const int64_t snapshot_id = iter->second->snapshot_id; ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id)); context_.snapshot_id = snapshot_id; + InvalidateSnapshotSchema(); return *this; } @@ -339,6 +334,7 @@ TableScanBuilder& TableScanBuilder::FromSnapshot( } this->context_.from_snapshot_id = from_snapshot_id; this->context_.from_snapshot_id_inclusive = inclusive; + InvalidateSnapshotSchema(); return *this; } @@ -361,6 +357,7 @@ TableScanBuilder& TableScanBuilder::ToSnapshot(int64_t to_sn { ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(to_snapshot_id)); context_.to_snapshot_id = to_snapshot_id; + InvalidateSnapshotSchema(); return *this; } @@ -387,9 +384,36 @@ TableScanBuilder& TableScanBuilder::UseBranch( ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch, "Ref {} is not a branch", branch); context_.branch = branch; + InvalidateSnapshotSchema(); return *this; } +template +void TableScanBuilder::InvalidateSnapshotSchema() { + snapshot_schema_ = nullptr; +} + +template +Status TableScanBuilder::ResolveColumnStatsSelection() { + if (!requested_column_stats_.has_value()) { + return {}; + } + + context_.columns_to_keep_stats.clear(); + context_.columns_to_keep_stats.reserve(requested_column_stats_->size()); + + ICEBERG_ASSIGN_OR_RAISE(auto schema_ref, ResolveSnapshotSchema()); + const auto& schema = schema_ref.get(); + for (const auto& column_name : *requested_column_stats_) { + ICEBERG_ASSIGN_OR_RAISE(auto field, schema->FindFieldByName(column_name)); + if (field.has_value()) { + context_.columns_to_keep_stats.insert(field.value().get().field_id()); + } + } + + return {}; +} + template Result>> TableScanBuilder::ResolveSnapshotSchema() { @@ -410,6 +434,7 @@ TableScanBuilder::ResolveSnapshotSchema() { template Result> TableScanBuilder::Build() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + ICEBERG_RETURN_UNEXPECTED(ResolveColumnStatsSelection()); ICEBERG_RETURN_UNEXPECTED(context_.Validate()); ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema()); diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 64fb3ffd1..9b5f63f59 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -384,11 +384,14 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector { // Return the schema bound to the specified snapshot. Result>> ResolveSnapshotSchema(); + void InvalidateSnapshotSchema(); + Status ResolveColumnStatsSelection(); std::shared_ptr metadata_; std::shared_ptr io_; internal::TableScanContext context_; std::shared_ptr snapshot_schema_; + std::optional> requested_column_stats_; }; /// \brief Represents a configured scan operation on a table. diff --git a/src/iceberg/test/table_scan_test.cc b/src/iceberg/test/table_scan_test.cc index 11905a870..a8fbea50f 100644 --- a/src/iceberg/test/table_scan_test.cc +++ b/src/iceberg/test/table_scan_test.cc @@ -230,6 +230,60 @@ TEST_P(TableScanTest, UseRefPreservesInt64SnapshotIds) { EXPECT_EQ(snapshot->snapshot_id, kLargeSnapshotId); } +TEST_P(TableScanTest, IncludeColumnStatsUsesFinalSnapshotSchema) { + constexpr int64_t kBaseSnapshotId = 1000L; + constexpr int64_t kEvolvedSnapshotId = 2000L; + constexpr int32_t kBaseIdFieldId = 1; + constexpr int32_t kEvolvedIdFieldId = 10; + constexpr int32_t kEvolvedDataFieldId = 11; + constexpr int32_t kEvolvedSchemaId = 1; + + auto evolved_schema = std::make_shared( + std::vector{ + SchemaField::MakeRequired(kEvolvedIdFieldId, "id", int32()), + SchemaField::MakeRequired(kEvolvedDataFieldId, "data", string())}, + kEvolvedSchemaId); + table_metadata_->schemas.push_back(evolved_schema); + table_metadata_->last_column_id = kEvolvedDataFieldId; + table_metadata_->snapshots.push_back(std::make_shared( + Snapshot{.snapshot_id = kEvolvedSnapshotId, + .parent_snapshot_id = kBaseSnapshotId, + .sequence_number = 2L, + .timestamp_ms = TimePointMsFromUnixMs(1609459201000L), + .manifest_list = "/tmp/metadata/snap-2000-2-manifest-list.avro", + .schema_id = evolved_schema->schema_id()})); + table_metadata_->refs["evolved-branch"] = std::make_shared( + SnapshotRef{.snapshot_id = kEvolvedSnapshotId, .retention = SnapshotRef::Branch{}}); + + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + DataTableScanBuilder::Make(table_metadata_, file_io_)); + builder->IncludeColumnStats({"id"}).UseSnapshot(kEvolvedSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto scan_schema, scan->schema()); + + EXPECT_EQ(scan_schema->schema_id(), evolved_schema->schema_id()); + const auto& stats_fields = scan->context().columns_to_keep_stats; + EXPECT_EQ(stats_fields.size(), 1); + EXPECT_TRUE(stats_fields.contains(kEvolvedIdFieldId)); + EXPECT_FALSE(stats_fields.contains(kBaseIdFieldId)); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, + DataTableScanBuilder::Make(table_metadata_, file_io_)); + builder->IncludeColumnStats({"id"}).UseRef("evolved-branch"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto scan_schema, scan->schema()); + + EXPECT_EQ(scan_schema->schema_id(), evolved_schema->schema_id()); + const auto& stats_fields = scan->context().columns_to_keep_stats; + EXPECT_EQ(stats_fields.size(), 1); + EXPECT_TRUE(stats_fields.contains(kEvolvedIdFieldId)); + EXPECT_FALSE(stats_fields.contains(kBaseIdFieldId)); + } +} + TEST_P(TableScanTest, TableScanBuilderValidationErrors) { // Test negative min rows ICEBERG_UNWRAP_OR_FAIL(auto builder,