Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions src/iceberg/manifest/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,39 +389,40 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* arrow_schema,
}

Status ParsePartitionValues(ArrowArrayView* view, int64_t row_idx,
const std::shared_ptr<PrimitiveType>& field_type,
std::vector<ManifestEntry>& manifest_entries) {
auto& partition = manifest_entries[row_idx].data_file->partition;
if (view->storage_type == ArrowType::NANOARROW_TYPE_NA ||
ArrowArrayViewIsNull(view, row_idx)) {
partition.AddValue(Literal::Null(field_type));
return {};
}
switch (view->storage_type) {
case ArrowType::NANOARROW_TYPE_BOOL: {
auto value = ArrowArrayViewGetUIntUnsafe(view, row_idx);
manifest_entries[row_idx].data_file->partition.AddValue(
Literal::Boolean(value != 0));
} break;
case ArrowType::NANOARROW_TYPE_INT32: {
auto value = ArrowArrayViewGetIntUnsafe(view, row_idx);
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Int(value));
} break;
case ArrowType::NANOARROW_TYPE_INT64: {
auto value = ArrowArrayViewGetIntUnsafe(view, row_idx);
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Long(value));
} break;
case ArrowType::NANOARROW_TYPE_FLOAT: {
auto value = ArrowArrayViewGetDoubleUnsafe(view, row_idx);
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Float(value));
} break;
case ArrowType::NANOARROW_TYPE_DOUBLE: {
auto value = ArrowArrayViewGetDoubleUnsafe(view, row_idx);
manifest_entries[row_idx].data_file->partition.AddValue(Literal::Double(value));
} break;
case ArrowType::NANOARROW_TYPE_BOOL:
partition.AddValue(
Literal::Boolean(ArrowArrayViewGetUIntUnsafe(view, row_idx) != 0));
break;
case ArrowType::NANOARROW_TYPE_INT32:
partition.AddValue(Literal::Int(ArrowArrayViewGetIntUnsafe(view, row_idx)));
break;
case ArrowType::NANOARROW_TYPE_INT64:
partition.AddValue(Literal::Long(ArrowArrayViewGetIntUnsafe(view, row_idx)));
break;
case ArrowType::NANOARROW_TYPE_FLOAT:
partition.AddValue(Literal::Float(ArrowArrayViewGetDoubleUnsafe(view, row_idx)));
break;
case ArrowType::NANOARROW_TYPE_DOUBLE:
partition.AddValue(Literal::Double(ArrowArrayViewGetDoubleUnsafe(view, row_idx)));
break;
case ArrowType::NANOARROW_TYPE_STRING: {
auto value = ArrowArrayViewGetStringUnsafe(view, row_idx);
manifest_entries[row_idx].data_file->partition.AddValue(
Literal::String(std::string(value.data, value.size_bytes)));
auto str_value = ArrowArrayViewGetStringUnsafe(view, row_idx);
partition.AddValue(
Literal::String(std::string(str_value.data, str_value.size_bytes)));
} break;
case ArrowType::NANOARROW_TYPE_BINARY: {
auto buffer = ArrowArrayViewGetBytesUnsafe(view, row_idx);
manifest_entries[row_idx].data_file->partition.AddValue(
Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
buffer.data.as_char + buffer.size_bytes)));
auto buf_value = ArrowArrayViewGetBytesUnsafe(view, row_idx);
partition.AddValue(Literal::Binary(std::vector<uint8_t>(
buf_value.data.as_char, buf_value.data.as_char + buf_value.size_bytes)));
} break;
default:
return InvalidManifest("Unsupported type {} for partition values",
Expand Down Expand Up @@ -473,14 +474,15 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
case DataFile::kPartitionFieldId: {
ICEBERG_RETURN_UNEXPECTED(
AssertViewType(field_view, ArrowType::NANOARROW_TYPE_STRUCT, field_name));
const auto& partition_type =
internal::checked_cast<const StructType&>(*field->get().type());
for (int64_t part_idx = 0; part_idx < field_view->n_children; part_idx++) {
auto part_view = field_view->children[part_idx];
auto part_field_type = internal::checked_pointer_cast<PrimitiveType>(
partition_type.fields()[part_idx].type());
for (int64_t row_idx = 0; row_idx < part_view->length; row_idx++) {
if (ArrowArrayViewIsNull(part_view, row_idx)) {
break;
}
ICEBERG_RETURN_UNEXPECTED(
ParsePartitionValues(part_view, row_idx, manifest_entries));
ICEBERG_RETURN_UNEXPECTED(ParsePartitionValues(
part_view, row_idx, part_field_type, manifest_entries));
}
}
} break;
Expand Down
75 changes: 75 additions & 0 deletions src/iceberg/test/manifest_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,81 @@ TEST_P(TestManifestReader, TestManifestReaderWithPartitionMetadata) {
EXPECT_EQ(read_entry.data_file->partition.values()[0], Literal::Int(0));
}

TEST_P(TestManifestReader, NullPartitionValuePreservedPositionallyAcrossRows) {
auto version = GetParam();

// Two partition fields, with several entries. The first entry has a null in
// partition field 0. The null must be preserved as a typed null in place so
// that (a) the tuple keeps one value per partition field (arity matches the
// spec), (b) the non-null value for field 1 stays in position 1 instead of
// shifting into position 0, and (c) parsing of the remaining rows is not
// aborted. This mirrors Iceberg Java's PartitionData, a fixed-size positional
// array that stores null in place.
auto multi_schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(/*field_id=*/3, "id", int32()),
SchemaField::MakeRequired(/*field_id=*/4, "data", string())});

std::shared_ptr<PartitionSpec> multi_spec;
ICEBERG_UNWRAP_OR_FAIL(
multi_spec,
PartitionSpec::Make(
/*spec_id=*/0, {PartitionField(/*source_id=*/3, /*field_id=*/1000, "id_part",
Transform::Identity()),
PartitionField(/*source_id=*/4, /*field_id=*/1001,
"data_bucket", Transform::Bucket(16))}));

const std::string manifest_path = MakeManifestPath();
auto writer_result = ManifestWriter::MakeWriter(
version, /*snapshot_id=*/1000L, manifest_path, file_io_, multi_spec, multi_schema,
ManifestContent::kData, /*first_row_id=*/0L);
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

// First entry: partition field 0 (identity int) is null, field 1 (bucket) is 9.
auto file_null =
MakeDataFile("/path/to/data-null.parquet",
PartitionValues({Literal::Null(int32()), Literal::Int(9)}));
auto file_b = MakeDataFile("/path/to/data-b.parquet",
PartitionValues({Literal::Int(1), Literal::Int(5)}));
auto file_c = MakeDataFile("/path/to/data-c.parquet",
PartitionValues({Literal::Int(2), Literal::Int(7)}));
ASSERT_THAT(
writer->WriteEntry(MakeEntry(ManifestStatus::kAdded, 1000L, std::move(file_null))),
IsOk());
ASSERT_THAT(
writer->WriteEntry(MakeEntry(ManifestStatus::kAdded, 1000L, std::move(file_b))),
IsOk());
ASSERT_THAT(
writer->WriteEntry(MakeEntry(ManifestStatus::kAdded, 1000L, std::move(file_c))),
IsOk());
ASSERT_THAT(writer->Close(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto manifest, writer->ToManifestFile());

ICEBERG_UNWRAP_OR_FAIL(
auto reader, ManifestReader::Make(manifest, file_io_, multi_schema, multi_spec));
ICEBERG_UNWRAP_OR_FAIL(auto read_entries, reader->Entries());

ASSERT_EQ(read_entries.size(), 3U);

// The null-partition row keeps both fields: a null in position 0 and the
// non-null value in position 1 (no shifting).
EXPECT_EQ(read_entries[0].data_file->file_path, "/path/to/data-null.parquet");
ASSERT_EQ(read_entries[0].data_file->partition.num_fields(), 2);
EXPECT_TRUE(read_entries[0].data_file->partition.values()[0].IsNull());
EXPECT_EQ(read_entries[0].data_file->partition.values()[1], Literal::Int(9));

// Rows after the null one must still have their partition values parsed.
EXPECT_EQ(read_entries[1].data_file->file_path, "/path/to/data-b.parquet");
ASSERT_EQ(read_entries[1].data_file->partition.num_fields(), 2);
EXPECT_EQ(read_entries[1].data_file->partition.values()[0], Literal::Int(1));
EXPECT_EQ(read_entries[1].data_file->partition.values()[1], Literal::Int(5));

EXPECT_EQ(read_entries[2].data_file->file_path, "/path/to/data-c.parquet");
ASSERT_EQ(read_entries[2].data_file->partition.num_fields(), 2);
EXPECT_EQ(read_entries[2].data_file->partition.values()[0], Literal::Int(2));
EXPECT_EQ(read_entries[2].data_file->partition.values()[1], Literal::Int(7));
}

TEST_P(TestManifestReader, ReadsEntriesWhenPartitionSourceFieldIsMissing) {
auto version = GetParam();
auto file = MakeDataFile("/path/to/historical-data.parquet",
Expand Down
Loading