Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,18 @@ Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) {
return {};
}

Status ToAvroNodeVisitor::Visit(const VariantType&, ::avro::NodePtr*) {
return NotSupported("Writing Iceberg variant type to Avro is not supported");
}

Status ToAvroNodeVisitor::Visit(const GeometryType&, ::avro::NodePtr*) {
return NotSupported("Writing Iceberg geometry type to Avro is not supported");
}

Status ToAvroNodeVisitor::Visit(const GeographyType&, ::avro::NodePtr*) {
return NotSupported("Writing Iceberg geography type to Avro is not supported");
}

Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodeRecord>();

Expand Down Expand Up @@ -631,6 +643,11 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
break;
case TypeId::kUnknown:
return {};
case TypeId::kVariant:
case TypeId::kGeometry:
case TypeId::kGeography:
return NotSupported("Reading Iceberg type {} from Avro is not supported",
expected_type);
default:
break;
}
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/avro/avro_schema_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class ToAvroNodeVisitor {
Status Visit(const FixedType& type, ::avro::NodePtr* node);
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
Status Visit(const UnknownType&, ::avro::NodePtr*);
Status Visit(const VariantType&, ::avro::NodePtr*);
Status Visit(const GeometryType&, ::avro::NodePtr*);
Status Visit(const GeographyType&, ::avro::NodePtr*);
Status Visit(const StructType& type, ::avro::NodePtr* node);
Status Visit(const ListType& type, ::avro::NodePtr* node);
Status Visit(const MapType& type, ::avro::NodePtr* node);
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/delete_file_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Status EqualityDeleteFile::ConvertBoundsIfNeeded() const {
}

const auto& schema_field = field.value().get();
if (schema_field.type()->is_nested()) {
if (!schema_field.type()->is_primitive()) {
continue;
}

Expand Down Expand Up @@ -103,7 +103,7 @@ Result<bool> CanContainEqDeletesForFile(const DataFile& data_file,
}

const auto& field = found_field.value().get();
if (field.type()->is_nested()) {
if (!field.type()->is_primitive()) {
continue;
}

Expand Down
89 changes: 67 additions & 22 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,12 @@ nlohmann::json ToJson(const Type& type) {
return "uuid";
case TypeId::kUnknown:
return "unknown";
case TypeId::kVariant:
return "variant";
case TypeId::kGeometry:
return type.ToString();
case TypeId::kGeography:
return type.ToString();
}
std::unreachable();
}
Expand Down Expand Up @@ -486,58 +492,97 @@ Result<std::unique_ptr<Type>> MapTypeFromJson(const nlohmann::json& json) {
Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
if (json.is_string()) {
std::string type_str = json.get<std::string>();
if (type_str == "boolean") {
std::string lower_type_str = StringUtils::ToLower(type_str);
if (lower_type_str == "boolean") {
return std::make_unique<BooleanType>();
} else if (type_str == "int") {
} else if (lower_type_str == "int") {
return std::make_unique<IntType>();
} else if (type_str == "long") {
} else if (lower_type_str == "long") {
return std::make_unique<LongType>();
} else if (type_str == "float") {
} else if (lower_type_str == "float") {
return std::make_unique<FloatType>();
} else if (type_str == "double") {
} else if (lower_type_str == "double") {
return std::make_unique<DoubleType>();
} else if (type_str == "date") {
} else if (lower_type_str == "date") {
return std::make_unique<DateType>();
} else if (type_str == "time") {
} else if (lower_type_str == "time") {
return std::make_unique<TimeType>();
} else if (type_str == "timestamp") {
} else if (lower_type_str == "timestamp") {
return std::make_unique<TimestampType>();
} else if (type_str == "timestamptz") {
} else if (lower_type_str == "timestamptz") {
return std::make_unique<TimestampTzType>();
} else if (type_str == "timestamp_ns") {
} else if (lower_type_str == "timestamp_ns") {
return std::make_unique<TimestampNsType>();
} else if (type_str == "timestamptz_ns") {
} else if (lower_type_str == "timestamptz_ns") {
return std::make_unique<TimestampTzNsType>();
} else if (type_str == "string") {
} else if (lower_type_str == "string") {
return std::make_unique<StringType>();
} else if (type_str == "binary") {
} else if (lower_type_str == "binary") {
return std::make_unique<BinaryType>();
} else if (type_str == "uuid") {
} else if (lower_type_str == "uuid") {
return std::make_unique<UuidType>();
} else if (type_str == "unknown") {
} else if (lower_type_str == "unknown") {
return std::make_unique<UnknownType>();
} else if (type_str.starts_with("fixed")) {
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
} else if (lower_type_str == "variant") {
return std::make_unique<VariantType>();
} else if (lower_type_str.starts_with("fixed")) {
static const std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
std::smatch match;
if (std::regex_match(type_str, match, fixed_regex)) {
if (std::regex_match(lower_type_str, match, fixed_regex)) {
ICEBERG_ASSIGN_OR_RAISE(auto length,
StringUtils::ParseNumber<int32_t>(match[1].str()));
return std::make_unique<FixedType>(length);
}
return JsonParseError("Invalid fixed type: {}", type_str);
} else if (type_str.starts_with("decimal")) {
std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))");
} else if (lower_type_str.starts_with("decimal")) {
static const std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))");
std::smatch match;
if (std::regex_match(type_str, match, decimal_regex)) {
if (std::regex_match(lower_type_str, match, decimal_regex)) {
ICEBERG_ASSIGN_OR_RAISE(auto precision,
StringUtils::ParseNumber<int32_t>(match[1].str()));
ICEBERG_ASSIGN_OR_RAISE(auto scale,
StringUtils::ParseNumber<int32_t>(match[2].str()));
return std::make_unique<DecimalType>(precision, scale);
}
return JsonParseError("Invalid decimal type: {}", type_str);
} else if (lower_type_str.starts_with("geometry")) {
static const std::regex geometry_regex(R"(geometry\s*(?:\(\s*([^)]*?)\s*\))?)",
std::regex_constants::icase);
std::smatch match;
if (std::regex_match(type_str, match, geometry_regex)) {
if (match[1].matched) {
auto crs = match[1].str();
if (crs.empty()) {
return JsonParseError("Invalid geometry type: {}", type_str);
}
return std::make_unique<GeometryType>(std::move(crs));
}
return std::make_unique<GeometryType>();
}
return JsonParseError("Invalid geometry type: {}", type_str);
} else if (lower_type_str.starts_with("geography")) {
static const std::regex geography_regex(
R"(geography\s*(?:\(\s*([^,]*?)\s*(?:,\s*(\w*)\s*)?\))?)",
std::regex_constants::icase);
std::smatch match;
if (std::regex_match(type_str, match, geography_regex)) {
auto crs = match[1].str();
if (match[1].matched && crs.empty()) {
return JsonParseError("Invalid geography type: {}", type_str);
}
if (match[2].matched) {
ICEBERG_ASSIGN_OR_RAISE(auto algorithm,
EdgeAlgorithmFromString(match[2].str()));
return std::make_unique<GeographyType>(std::move(crs), algorithm);
}
if (match[1].matched) {
return std::make_unique<GeographyType>(std::move(crs));
}
return std::make_unique<GeographyType>();
}
return JsonParseError("Invalid geography type: {}", type_str);
} else {
return JsonParseError("Unknown primitive type: {}", type_str);
return JsonParseError("Cannot parse type string: {}", type_str);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/iceberg/metrics_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
Status Visit(const Type& type) {
if (type.is_nested()) {
return VisitNested(internal::checked_cast<const NestedType&>(type));
} else if (type.type_id() == TypeId::kVariant) {
return {};
} else {
return VisitPrimitive(internal::checked_cast<const PrimitiveType&>(type));
}
Expand All @@ -207,8 +209,7 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
if (!ShouldContinue()) {
break;
}
// TODO(zhuo.wang): variant type should also be handled here
if (field.type()->is_primitive()) {
if (!field.type()->is_nested()) {
ids_.insert(field.field_id());
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/parquet/parquet_metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ class CollectMetricsVisitor {

Status VisitMap(const MapType& /*type*/, const std::string& /*prefix*/) { return {}; }

Status VisitVariant(const VariantType& /*type*/, const std::string& /*prefix*/) {
return {};
}

Status VisitPrimitive(const PrimitiveType& /*type*/, const std::string& /*prefix*/) {
return {};
}
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/parquet/parquet_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ Status ValidateParquetSchemaEvolution(
break;
case TypeId::kUnknown:
return {};
case TypeId::kVariant:
case TypeId::kGeometry:
case TypeId::kGeography:
return NotSupported("Reading Iceberg type {} from Parquet is not supported",
expected_type);
case TypeId::kStruct:
if (arrow_type->id() == ::arrow::Type::STRUCT) {
return {};
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/parquet/parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ class FieldMetricsCollector {

Status VisitMap(const MapType& /*type*/, const ::arrow::Array& /*array*/) { return {}; }

Status VisitVariant(const VariantType& /*type*/, const ::arrow::Array& /*array*/) {
return {};
}

Status VisitPrimitive(const PrimitiveType& type, const ::arrow::Array& array) {
switch (type.type_id()) {
case TypeId::kFloat:
Expand Down
Loading
Loading