Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,24 +239,17 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::CaseSensitive(
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IncludeColumnStats() {
context_.return_column_stats = true;
context_.columns_to_keep_stats.clear();
requested_column_stats_.reset();
return *this;
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::IncludeColumnStats(
const std::vector<std::string>& requested_columns) {
context_.return_column_stats = true;
context_.columns_to_keep_stats.clear();
context_.columns_to_keep_stats.reserve(requested_columns.size());

ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_ref, ResolveSnapshotSchema());
const auto& schema = schema_ref.get();
for (const auto& column_name : requested_columns) {
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field, schema->FindFieldByName(column_name));
if (field.has_value()) {
context_.columns_to_keep_stats.insert(field.value().get().field_id());
}
}
requested_column_stats_ = requested_columns;
ICEBERG_BUILDER_RETURN_IF_ERROR(ResolveColumnStatsSelection());

return *this;
}
Expand Down Expand Up @@ -295,14 +288,15 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseSnapshot(int64_t snap
context_.snapshot_id.value());
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id));
context_.snapshot_id = snapshot_id;
InvalidateSnapshotSchema();
return *this;
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseRef(const std::string& ref) {
if (ref == SnapshotRef::kMainBranch) {
snapshot_schema_ = nullptr;
context_.snapshot_id.reset();
InvalidateSnapshotSchema();
return *this;
}

Expand All @@ -315,6 +309,7 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseRef(const std::string
const int64_t snapshot_id = iter->second->snapshot_id;
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id));
context_.snapshot_id = snapshot_id;
InvalidateSnapshotSchema();

return *this;
}
Expand All @@ -339,6 +334,7 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
}
this->context_.from_snapshot_id = from_snapshot_id;
this->context_.from_snapshot_id_inclusive = inclusive;
InvalidateSnapshotSchema();
return *this;
}

Expand All @@ -361,6 +357,7 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_sn
{
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(to_snapshot_id));
context_.to_snapshot_id = to_snapshot_id;
InvalidateSnapshotSchema();
return *this;
}

Expand All @@ -387,9 +384,36 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch,
"Ref {} is not a branch", branch);
context_.branch = branch;
InvalidateSnapshotSchema();
return *this;
}

template <typename ScanType>
void TableScanBuilder<ScanType>::InvalidateSnapshotSchema() {
snapshot_schema_ = nullptr;
}

template <typename ScanType>
Status TableScanBuilder<ScanType>::ResolveColumnStatsSelection() {
if (!requested_column_stats_.has_value()) {
return {};
}

context_.columns_to_keep_stats.clear();
context_.columns_to_keep_stats.reserve(requested_column_stats_->size());

ICEBERG_ASSIGN_OR_RAISE(auto schema_ref, ResolveSnapshotSchema());
const auto& schema = schema_ref.get();
for (const auto& column_name : *requested_column_stats_) {
ICEBERG_ASSIGN_OR_RAISE(auto field, schema->FindFieldByName(column_name));
if (field.has_value()) {
context_.columns_to_keep_stats.insert(field.value().get().field_id());
}
}

return {};
}

template <typename ScanType>
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
TableScanBuilder<ScanType>::ResolveSnapshotSchema() {
Expand All @@ -410,6 +434,7 @@ TableScanBuilder<ScanType>::ResolveSnapshotSchema() {
template <typename ScanType>
Result<std::unique_ptr<ScanType>> TableScanBuilder<ScanType>::Build() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
ICEBERG_RETURN_UNEXPECTED(ResolveColumnStatsSelection());
ICEBERG_RETURN_UNEXPECTED(context_.Validate());

ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,14 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector {

// Return the schema bound to the specified snapshot.
Result<std::reference_wrapper<const std::shared_ptr<Schema>>> ResolveSnapshotSchema();
void InvalidateSnapshotSchema();
Status ResolveColumnStatsSelection();

std::shared_ptr<TableMetadata> metadata_;
std::shared_ptr<FileIO> io_;
internal::TableScanContext context_;
std::shared_ptr<Schema> snapshot_schema_;
std::optional<std::vector<std::string>> requested_column_stats_;
};

/// \brief Represents a configured scan operation on a table.
Expand Down
54 changes: 54 additions & 0 deletions src/iceberg/test/table_scan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,60 @@ TEST_P(TableScanTest, UseRefPreservesInt64SnapshotIds) {
EXPECT_EQ(snapshot->snapshot_id, kLargeSnapshotId);
}

TEST_P(TableScanTest, IncludeColumnStatsUsesFinalSnapshotSchema) {
constexpr int64_t kBaseSnapshotId = 1000L;
constexpr int64_t kEvolvedSnapshotId = 2000L;
constexpr int32_t kBaseIdFieldId = 1;
constexpr int32_t kEvolvedIdFieldId = 10;
constexpr int32_t kEvolvedDataFieldId = 11;
constexpr int32_t kEvolvedSchemaId = 1;

auto evolved_schema = std::make_shared<Schema>(
std::vector<SchemaField>{
SchemaField::MakeRequired(kEvolvedIdFieldId, "id", int32()),
SchemaField::MakeRequired(kEvolvedDataFieldId, "data", string())},
kEvolvedSchemaId);
table_metadata_->schemas.push_back(evolved_schema);
table_metadata_->last_column_id = kEvolvedDataFieldId;
table_metadata_->snapshots.push_back(std::make_shared<Snapshot>(
Snapshot{.snapshot_id = kEvolvedSnapshotId,
.parent_snapshot_id = kBaseSnapshotId,
.sequence_number = 2L,
.timestamp_ms = TimePointMsFromUnixMs(1609459201000L),
.manifest_list = "/tmp/metadata/snap-2000-2-manifest-list.avro",
.schema_id = evolved_schema->schema_id()}));
table_metadata_->refs["evolved-branch"] = std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = kEvolvedSnapshotId, .retention = SnapshotRef::Branch{}});

{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
DataTableScanBuilder::Make(table_metadata_, file_io_));
builder->IncludeColumnStats({"id"}).UseSnapshot(kEvolvedSnapshotId);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto scan_schema, scan->schema());

EXPECT_EQ(scan_schema->schema_id(), evolved_schema->schema_id());
const auto& stats_fields = scan->context().columns_to_keep_stats;
EXPECT_EQ(stats_fields.size(), 1);
EXPECT_TRUE(stats_fields.contains(kEvolvedIdFieldId));
EXPECT_FALSE(stats_fields.contains(kBaseIdFieldId));
}

{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
DataTableScanBuilder::Make(table_metadata_, file_io_));
builder->IncludeColumnStats({"id"}).UseRef("evolved-branch");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto scan_schema, scan->schema());

EXPECT_EQ(scan_schema->schema_id(), evolved_schema->schema_id());
const auto& stats_fields = scan->context().columns_to_keep_stats;
EXPECT_EQ(stats_fields.size(), 1);
EXPECT_TRUE(stats_fields.contains(kEvolvedIdFieldId));
EXPECT_FALSE(stats_fields.contains(kBaseIdFieldId));
}
}

TEST_P(TableScanTest, TableScanBuilderValidationErrors) {
// Test negative min rows
ICEBERG_UNWRAP_OR_FAIL(auto builder,
Expand Down
Loading