-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[GH-3609]: Add new sort order for int96 timestamps #3610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,11 +36,18 @@ public enum ColumnOrderName { | |
| /** | ||
| * Type defined order meaning that the comparison order of the elements are based on its type. | ||
| */ | ||
| TYPE_DEFINED_ORDER | ||
| TYPE_DEFINED_ORDER, | ||
| /** | ||
| * Chronological order for INT96 timestamps: values are compared by the Julian day (the last 4 | ||
| * bytes, as a little-endian signed int32), then by the nanoseconds within the day (the first 8 | ||
| * bytes, as a little-endian signed int64). Only supported for the INT96 physical type. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that this Javadoc is the right place for documenting the format and how to compare. This is part of the spec so the spec needs to be clear and this needs to state what the order means. |
||
| */ | ||
| INT96_TIMESTAMP_ORDER | ||
| } | ||
|
|
||
| private static final ColumnOrder UNDEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.UNDEFINED); | ||
| private static final ColumnOrder TYPE_DEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.TYPE_DEFINED_ORDER); | ||
| private static final ColumnOrder INT96_TIMESTAMP_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.INT96_TIMESTAMP_ORDER); | ||
|
|
||
| /** | ||
| * @return a {@link ColumnOrder} instance representing an undefined order | ||
|
|
@@ -58,6 +65,14 @@ public static ColumnOrder typeDefined() { | |
| return TYPE_DEFINED_COLUMN_ORDER; | ||
| } | ||
|
|
||
| /** | ||
| * @return a {@link ColumnOrder} instance representing the chronological order of INT96 timestamps | ||
| * @see ColumnOrderName#INT96_TIMESTAMP_ORDER | ||
| */ | ||
| public static ColumnOrder int96TimestampOrder() { | ||
| return INT96_TIMESTAMP_COLUMN_ORDER; | ||
| } | ||
|
|
||
| private final ColumnOrderName columnOrderName; | ||
|
|
||
| private ColumnOrder(ColumnOrderName columnOrderName) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
|
|
||
| import java.io.Serializable; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.ByteOrder; | ||
| import java.util.Comparator; | ||
| import org.apache.parquet.io.api.Binary; | ||
|
|
||
|
|
@@ -293,4 +294,36 @@ public String toString() { | |
| return "BINARY_AS_FLOAT16_COMPARATOR"; | ||
| } | ||
| }; | ||
|
|
||
| /* | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you intend for this to be Javadoc? |
||
| * Comparator for two timestamps encoded as INT96 (12-byte little-endian) binary. | ||
| * Layout: first 8 bytes = nanoseconds within the day, last 4 bytes = Julian day. | ||
| * | ||
| * Two-level comparison, matching the INT96 timestamp sort order: | ||
| * 1. Compare the last 4 bytes (Julian day) as a signed little-endian int32. | ||
| * 2. If equal, compare the first 8 bytes (nanos) as a signed little-endian int64. | ||
| */ | ||
| static final PrimitiveComparator<Binary> BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() { | ||
| @Override | ||
| int compareBinary(Binary b1, Binary b2) { | ||
| if (b1.length() != 12 || b2.length() != 12) { | ||
| throw new IllegalArgumentException( | ||
| "INT96 binary length must be 12, got " + b1.length() + " and " + b2.length()); | ||
| } | ||
|
|
||
| ByteBuffer bb1 = b1.toByteBuffer().slice(); | ||
| ByteBuffer bb2 = b2.toByteBuffer().slice(); | ||
| bb1.order(ByteOrder.LITTLE_ENDIAN); | ||
| bb2.order(ByteOrder.LITTLE_ENDIAN); | ||
|
|
||
| int result = Integer.compare(bb1.getInt(8), bb2.getInt(8)); | ||
| if (result != 0) return result; | ||
| return Long.compare(bb1.getLong(0), bb2.getLong(0)); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "BINARY_AS_INT96_TIMESTAMP_COMPARATOR"; | ||
| } | ||
| }; | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -622,9 +622,15 @@ public PrimitiveType( | |
| private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) { | ||
| if (primitive == PrimitiveTypeName.INT96) { | ||
| Preconditions.checkArgument( | ||
| columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED, | ||
| columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED | ||
| || columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER, | ||
| "The column order %s is not supported by INT96", | ||
| columnOrder); | ||
| } else { | ||
| Preconditions.checkArgument( | ||
| columnOrder.getColumnOrderName() != ColumnOrderName.INT96_TIMESTAMP_ORDER, | ||
| "The column order %s is only supported by INT96", | ||
| columnOrder); | ||
| } | ||
| if (getLogicalTypeAnnotation() != null) { | ||
| Preconditions.checkArgument( | ||
|
|
@@ -655,6 +661,15 @@ public PrimitiveType withLogicalTypeAnnotation(LogicalTypeAnnotation logicalType | |
| return new PrimitiveType(getRepetition(), primitive, length, getName(), logicalType, getId()); | ||
| } | ||
|
|
||
| /** | ||
| * @param columnOrder the column order | ||
| * @return a new PrimitiveType with the same fields and the given column order | ||
| */ | ||
| public PrimitiveType withColumnOrder(ColumnOrder columnOrder) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we always produce INT96 stats using the timestamp order, would we need this addition to the API? I think we could always produce a |
||
| return new PrimitiveType( | ||
| getRepetition(), primitive, length, getName(), getLogicalTypeAnnotation(), getId(), columnOrder); | ||
| } | ||
|
|
||
| /** | ||
| * @return the primitive type | ||
| */ | ||
|
|
@@ -869,6 +884,9 @@ protected Type union(Type toMerge, boolean strict) { | |
| */ | ||
| @SuppressWarnings("unchecked") | ||
| public <T> PrimitiveComparator<T> comparator() { | ||
| if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER) { | ||
| return (PrimitiveComparator<T>) PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR; | ||
| } | ||
| return (PrimitiveComparator<T>) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation()); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,7 @@ | |
| import org.apache.parquet.format.GeographyType; | ||
| import org.apache.parquet.format.GeometryType; | ||
| import org.apache.parquet.format.GeospatialStatistics; | ||
| import org.apache.parquet.format.Int96TimestampOrder; | ||
| import org.apache.parquet.format.IntType; | ||
| import org.apache.parquet.format.KeyValue; | ||
| import org.apache.parquet.format.LogicalType; | ||
|
|
@@ -111,6 +112,7 @@ | |
| import org.apache.parquet.format.Uncompressed; | ||
| import org.apache.parquet.format.VariantType; | ||
| import org.apache.parquet.format.XxHash; | ||
| import org.apache.parquet.hadoop.ParquetInputFormat; | ||
| import org.apache.parquet.hadoop.metadata.BlockMetaData; | ||
| import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; | ||
| import org.apache.parquet.hadoop.metadata.ColumnPath; | ||
|
|
@@ -143,6 +145,7 @@ | |
| public class ParquetMetadataConverter { | ||
|
|
||
| private static final TypeDefinedOrder TYPE_DEFINED_ORDER = new TypeDefinedOrder(); | ||
| private static final Int96TimestampOrder INT96_TIMESTAMP_ORDER = new Int96TimestampOrder(); | ||
| public static final MetadataFilter NO_FILTER = new NoFilter(); | ||
| public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); | ||
| public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k | ||
|
|
@@ -278,11 +281,16 @@ public FileMetaData toParquetMetadata( | |
|
|
||
| private List<ColumnOrder> getColumnOrders(MessageType schema) { | ||
| List<ColumnOrder> columnOrders = new ArrayList<>(); | ||
| // Currently, only TypeDefinedOrder is supported, so we create a column order for each columns with | ||
| // TypeDefinedOrder even if some types (e.g. INT96) have undefined column orders. | ||
| for (int i = 0, n = schema.getPaths().size(); i < n; ++i) { | ||
| // Columns with the INT96_TIMESTAMP_ORDER column order are tagged as such; all other columns are | ||
| // tagged with TypeDefinedOrder even if some types have undefined column orders. | ||
| for (String[] path : schema.getPaths()) { | ||
| ColumnOrder columnOrder = new ColumnOrder(); | ||
| columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER); | ||
| if (schema.getType(path).asPrimitiveType().columnOrder().getColumnOrderName() == | ||
| ColumnOrderName.INT96_TIMESTAMP_ORDER) { | ||
| columnOrder.setINT96_TIMESTAMP_ORDER(INT96_TIMESTAMP_ORDER); | ||
| } else { | ||
| columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER); | ||
| } | ||
| columnOrders.add(columnOrder); | ||
| } | ||
| return columnOrders; | ||
|
|
@@ -891,7 +899,9 @@ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength, | |
| } | ||
|
|
||
| private static boolean isMinMaxStatsSupported(PrimitiveType type) { | ||
| return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER; | ||
| ColumnOrderName name = type.columnOrder().getColumnOrderName(); | ||
| return name == ColumnOrderName.TYPE_DEFINED_ORDER | ||
| || name == ColumnOrderName.INT96_TIMESTAMP_ORDER; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -2034,6 +2044,11 @@ private void buildChildren( | |
| || schemaElement.converted_type == ConvertedType.INTERVAL)) { | ||
| columnOrder = org.apache.parquet.schema.ColumnOrder.undefined(); | ||
| } | ||
| // INT96_TIMESTAMP_ORDER is only valid for INT96 columns; ignore it anywhere else | ||
| if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER | ||
| && schemaElement.type != Type.INT96) { | ||
| columnOrder = org.apache.parquet.schema.ColumnOrder.undefined(); | ||
| } | ||
| primitiveBuilder.columnOrder(columnOrder); | ||
| } | ||
| childBuilder = primitiveBuilder; | ||
|
|
@@ -2086,14 +2101,22 @@ Repetition fromParquetRepetition(FieldRepetitionType repetition) { | |
| return Repetition.valueOf(repetition.name()); | ||
| } | ||
|
|
||
| private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) { | ||
| private org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) { | ||
| if (columnOrder.isSetTYPE_ORDER()) { | ||
| return org.apache.parquet.schema.ColumnOrder.typeDefined(); | ||
| } | ||
| if (columnOrder.isSetINT96_TIMESTAMP_ORDER() && readInt96TimestampStatisticsEnabled()) { | ||
| return org.apache.parquet.schema.ColumnOrder.int96TimestampOrder(); | ||
| } | ||
| // The column order is not yet supported by this API | ||
| return org.apache.parquet.schema.ColumnOrder.undefined(); | ||
| } | ||
|
|
||
| private boolean readInt96TimestampStatisticsEnabled() { | ||
| return options == null | ||
| || options.isEnabled(ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED, true); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would we have an option for reading INT96 stats? Wouldn't this always be true if we know the int96 stats are using the timestamp order? |
||
| } | ||
|
|
||
| @Deprecated | ||
| public void writeDataPageHeader( | ||
| int uncompressedSize, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,7 +89,7 @@ public InternalParquetRecordWriter( | |
| ParquetProperties props) { | ||
| this.parquetFileWriter = parquetFileWriter; | ||
| this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null"); | ||
| this.schema = schema; | ||
| this.schema = ParquetFileWriter.applyInt96TimestampOrder(schema, props); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is the right place to fixup order. It would be better to always use the new order for INT96 when constructing schemas. When converting from file schemas, we would need to detect timestamp vs unordered, but anything going through the write path should automatically use timestamp order because the write path should always produce it. |
||
| this.extraMetaData = extraMetaData; | ||
| this.rowGroupSizeThreshold = rowGroupSize; | ||
| this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why make this an option? Since we don't use int96 stats otherwise, I think it would be perfectly fine to keep it simple and just produce the new stats all the time.