From b48d08ebd12f126ca99f86a1e5d26b49fccf4df1 Mon Sep 17 00:00:00 2001 From: Divjot Arora Date: Thu, 11 Jun 2026 07:48:04 +0000 Subject: [PATCH] Add new sort order for int96 timestamps --- .../parquet/column/ParquetProperties.java | 21 ++ .../apache/parquet/schema/ColumnOrder.java | 17 +- .../parquet/schema/PrimitiveComparator.java | 33 ++ .../apache/parquet/schema/PrimitiveType.java | 20 +- .../schema/TestPrimitiveComparator.java | 59 +++ .../apache/parquet/ParquetReadOptions.java | 5 + .../converter/ParquetMetadataConverter.java | 35 +- .../hadoop/InternalParquetRecordWriter.java | 2 +- .../parquet/hadoop/ParquetFileWriter.java | 35 +- .../parquet/hadoop/ParquetInputFormat.java | 6 + .../parquet/hadoop/ParquetOutputFormat.java | 12 +- .../apache/parquet/hadoop/ParquetWriter.java | 12 + .../TestInt96TimestampStatistics.java | 340 ++++++++++++++++++ 13 files changed, 586 insertions(+), 11 deletions(-) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatistics.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..47a9585d13 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -66,6 +66,7 @@ public class ParquetProperties { public static final int DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER = 5; public static final boolean DEFAULT_STATISTICS_ENABLED = true; public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true; + public static final boolean DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED = false; public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; @@ -120,6 +121,7 @@ public static WriterVersion fromString(String name) { private final int statisticsTruncateLength; private final boolean statisticsEnabled; private final boolean sizeStatisticsEnabled; + private final boolean int96TimestampStatisticsEnabled; // The expected NDV (number of distinct values) for each columns private final ColumnProperty bloomFilterNDVs; @@ -154,6 +156,7 @@ private ParquetProperties(Builder builder) { this.statisticsTruncateLength = builder.statisticsTruncateLength; this.statisticsEnabled = builder.statisticsEnabled; this.sizeStatisticsEnabled = builder.sizeStatisticsEnabled; + this.int96TimestampStatisticsEnabled = builder.int96TimestampStatisticsEnabled; this.bloomFilterNDVs = builder.bloomFilterNDVs.build(); this.bloomFilterFPPs = builder.bloomFilterFPPs.build(); this.bloomFilterEnabled = builder.bloomFilterEnabled.build(); @@ -370,6 +373,10 @@ public boolean getSizeStatisticsEnabled(ColumnDescriptor column) { return sizeStatisticsEnabled; } + public boolean getInt96TimestampStatisticsEnabled() { + return int96TimestampStatisticsEnabled; + } + @Override public String toString() { return "Parquet page size to " + getPageSizeThreshold() + '\n' @@ -406,6 +413,7 @@ public static class Builder { private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH; private boolean statisticsEnabled = DEFAULT_STATISTICS_ENABLED; private boolean sizeStatisticsEnabled = DEFAULT_SIZE_STATISTICS_ENABLED; + private boolean int96TimestampStatisticsEnabled = DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED; private final ColumnProperty.Builder bloomFilterNDVs; private final ColumnProperty.Builder bloomFilterFPPs; private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES; @@ -756,6 +764,19 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) { return this; } + /** + * Sets whether min/max statistics are collected and written for INT96 columns using the + * chronological INT96_TIMESTAMP_ORDER column order (disabled by default). When enabled, INT96 + * columns are tagged with INT96_TIMESTAMP_ORDER in the file footer. + * + * @param enabled whether to collect and write INT96 timestamp statistics + * @return this builder for method chaining + */ + public Builder withInt96TimestampStatisticsEnabled(boolean enabled) { + this.int96TimestampStatisticsEnabled = enabled; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java index 94a1275569..5ce5005c6a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java @@ -36,11 +36,18 @@ public enum ColumnOrderName { /** * Type defined order meaning that the comparison order of the elements are based on its type. */ - TYPE_DEFINED_ORDER + TYPE_DEFINED_ORDER, + /** + * Chronological order for INT96 timestamps: values are compared by the Julian day (the last 4 + * bytes, as a little-endian signed int32), then by the nanoseconds within the day (the first 8 + * bytes, as a little-endian signed int64). Only supported for the INT96 physical type. + */ + INT96_TIMESTAMP_ORDER } private static final ColumnOrder UNDEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.UNDEFINED); private static final ColumnOrder TYPE_DEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.TYPE_DEFINED_ORDER); + private static final ColumnOrder INT96_TIMESTAMP_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.INT96_TIMESTAMP_ORDER); /** * @return a {@link ColumnOrder} instance representing an undefined order @@ -58,6 +65,14 @@ public static ColumnOrder typeDefined() { return TYPE_DEFINED_COLUMN_ORDER; } + /** + * @return a {@link ColumnOrder} instance representing the chronological order of INT96 timestamps + * @see ColumnOrderName#INT96_TIMESTAMP_ORDER + */ + public static ColumnOrder int96TimestampOrder() { + return INT96_TIMESTAMP_COLUMN_ORDER; + } + private final ColumnOrderName columnOrderName; private ColumnOrder(ColumnOrderName columnOrderName) { diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java index 50c4acd4c9..74176b4ff5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Comparator; import org.apache.parquet.io.api.Binary; @@ -293,4 +294,36 @@ public String toString() { return "BINARY_AS_FLOAT16_COMPARATOR"; } }; + + /* + * Comparator for two timestamps encoded as INT96 (12-byte little-endian) binary. + * Layout: first 8 bytes = nanoseconds within the day, last 4 bytes = Julian day. + * + * Two-level comparison, matching the INT96 timestamp sort order: + * 1. Compare the last 4 bytes (Julian day) as a signed little-endian int32. + * 2. If equal, compare the first 8 bytes (nanos) as a signed little-endian int64. + */ + static final PrimitiveComparator BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() { + @Override + int compareBinary(Binary b1, Binary b2) { + if (b1.length() != 12 || b2.length() != 12) { + throw new IllegalArgumentException( + "INT96 binary length must be 12, got " + b1.length() + " and " + b2.length()); + } + + ByteBuffer bb1 = b1.toByteBuffer().slice(); + ByteBuffer bb2 = b2.toByteBuffer().slice(); + bb1.order(ByteOrder.LITTLE_ENDIAN); + bb2.order(ByteOrder.LITTLE_ENDIAN); + + int result = Integer.compare(bb1.getInt(8), bb2.getInt(8)); + if (result != 0) return result; + return Long.compare(bb1.getLong(0), bb2.getLong(0)); + } + + @Override + public String toString() { + return "BINARY_AS_INT96_TIMESTAMP_COMPARATOR"; + } + }; } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 944cfb58eb..ca5d8d6eb6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -622,9 +622,15 @@ public PrimitiveType( private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) { if (primitive == PrimitiveTypeName.INT96) { Preconditions.checkArgument( - columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED, + columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED + || columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER, "The column order %s is not supported by INT96", columnOrder); + } else { + Preconditions.checkArgument( + columnOrder.getColumnOrderName() != ColumnOrderName.INT96_TIMESTAMP_ORDER, + "The column order %s is only supported by INT96", + columnOrder); } if (getLogicalTypeAnnotation() != null) { Preconditions.checkArgument( @@ -655,6 +661,15 @@ public PrimitiveType withLogicalTypeAnnotation(LogicalTypeAnnotation logicalType return new PrimitiveType(getRepetition(), primitive, length, getName(), logicalType, getId()); } + /** + * @param columnOrder the column order + * @return a new PrimitiveType with the same fields and the given column order + */ + public PrimitiveType withColumnOrder(ColumnOrder columnOrder) { + return new PrimitiveType( + getRepetition(), primitive, length, getName(), getLogicalTypeAnnotation(), getId(), columnOrder); + } + /** * @return the primitive type */ @@ -869,6 +884,9 @@ protected Type union(Type toMerge, boolean strict) { */ @SuppressWarnings("unchecked") public PrimitiveComparator comparator() { + if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER) { + return (PrimitiveComparator) PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR; + } return (PrimitiveComparator) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation()); } diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index 8fb53aca0f..d099cec01d 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -19,6 +19,7 @@ package org.apache.parquet.schema; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR; +import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR; import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR; @@ -33,8 +34,12 @@ import java.math.BigInteger; import java.nio.ByteBuffer; +import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.function.Function; +import org.apache.parquet.example.data.simple.NanoTime; import org.apache.parquet.io.api.Binary; import org.junit.Test; @@ -297,6 +302,60 @@ public void testBinaryAsSignedIntegerComparatorWithEquals() { } } + private static Binary int96(int julianDay, long nanosOfDay) { + return new NanoTime(julianDay, nanosOfDay).toBinary(); + } + + private static Binary timestampToInt96(String timestamp) { + LocalDateTime dt = LocalDateTime.parse(timestamp); + int julianDay = (int) (dt.toLocalDate().toEpochDay() + 2440588); + return new NanoTime(julianDay, dt.toLocalTime().toNanoOfDay()).toBinary(); + } + + @Test + public void testInt96TimestampComparator() { + Binary[] valuesInAscendingOrder = { + int96(Integer.MIN_VALUE, 0), // most negative julian day + int96(-1, 86_399_999_999_999L), // negative julian days sort before day 0 + int96(0, 0), // start of the julian period + int96(0, 86_399_999_999_999L), // same day, later time of day + timestampToInt96("1968-05-23T00:00:00.000000123"), // pre-epoch but positive julian day + timestampToInt96("2020-01-01T12:00:00"), + timestampToInt96("2020-02-01T11:00:00"), // later day even though earlier time of day + timestampToInt96("2020-02-01T11:00:00.000000001"), // nanos tie-break + int96(Integer.MAX_VALUE, 86_399_999_999_999L) + }; + + // The same value in different Binary representations must compare identically; the offset + // variant guards against absolute reads not being relative to the value's start + List> representations = List.of( + b -> b, + b -> Binary.fromReusedByteArray(b.getBytes()), + b -> Binary.fromConstantByteArray(b.getBytes()), + b -> { + byte[] bytes = b.getBytes(); + byte[] padded = new byte[bytes.length + 20]; + Arrays.fill(padded, (byte) 0xAA); + System.arraycopy(bytes, 0, padded, 10, bytes.length); + return Binary.fromReusedByteArray(padded, 10, bytes.length); + }); + + for (int i = 0; i < valuesInAscendingOrder.length; ++i) { + for (int j = 0; j < valuesInAscendingOrder.length; ++j) { + for (Function fi : representations) { + for (Function fj : representations) { + Binary bi = fi.apply(valuesInAscendingOrder[i]); + Binary bj = fj.apply(valuesInAscendingOrder[j]); + assertEquals( + "comparing value " + i + " to value " + j, + Integer.signum(Integer.compare(i, j)), + Integer.signum(BINARY_AS_INT96_TIMESTAMP_COMPARATOR.compare(bi, bj))); + } + } + } + } + } + @Test public void testFloat16Comparator() { Binary[] valuesInAscendingOrder = { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 895d0670fa..cdeadddc9c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -25,6 +25,7 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT; import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; @@ -291,6 +292,10 @@ public Builder(ParquetConfiguration conf) { if (badRecordThresh != null) { set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); } + String readInt96TimestampStats = conf.get(INT96_TIMESTAMP_STATISTICS_READING_ENABLED); + if (readInt96TimestampStats != null) { + set(INT96_TIMESTAMP_STATISTICS_READING_ENABLED, readInt96TimestampStats); + } } public Builder useSignedStringMinMax(boolean useSignedStringMinMax) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 3597898c30..c976dfc7f5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -86,6 +86,7 @@ import org.apache.parquet.format.GeographyType; import org.apache.parquet.format.GeometryType; import org.apache.parquet.format.GeospatialStatistics; +import org.apache.parquet.format.Int96TimestampOrder; import org.apache.parquet.format.IntType; import org.apache.parquet.format.KeyValue; import org.apache.parquet.format.LogicalType; @@ -111,6 +112,7 @@ import org.apache.parquet.format.Uncompressed; import org.apache.parquet.format.VariantType; import org.apache.parquet.format.XxHash; +import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -143,6 +145,7 @@ public class ParquetMetadataConverter { private static final TypeDefinedOrder TYPE_DEFINED_ORDER = new TypeDefinedOrder(); + private static final Int96TimestampOrder INT96_TIMESTAMP_ORDER = new Int96TimestampOrder(); public static final MetadataFilter NO_FILTER = new NoFilter(); public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k @@ -278,11 +281,16 @@ public FileMetaData toParquetMetadata( private List getColumnOrders(MessageType schema) { List columnOrders = new ArrayList<>(); - // Currently, only TypeDefinedOrder is supported, so we create a column order for each columns with - // TypeDefinedOrder even if some types (e.g. INT96) have undefined column orders. - for (int i = 0, n = schema.getPaths().size(); i < n; ++i) { + // Columns with the INT96_TIMESTAMP_ORDER column order are tagged as such; all other columns are + // tagged with TypeDefinedOrder even if some types have undefined column orders. + for (String[] path : schema.getPaths()) { ColumnOrder columnOrder = new ColumnOrder(); - columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER); + if (schema.getType(path).asPrimitiveType().columnOrder().getColumnOrderName() == + ColumnOrderName.INT96_TIMESTAMP_ORDER) { + columnOrder.setINT96_TIMESTAMP_ORDER(INT96_TIMESTAMP_ORDER); + } else { + columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER); + } columnOrders.add(columnOrder); } return columnOrders; @@ -891,7 +899,9 @@ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength, } private static boolean isMinMaxStatsSupported(PrimitiveType type) { - return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER; + ColumnOrderName name = type.columnOrder().getColumnOrderName(); + return name == ColumnOrderName.TYPE_DEFINED_ORDER + || name == ColumnOrderName.INT96_TIMESTAMP_ORDER; } /** @@ -2034,6 +2044,11 @@ private void buildChildren( || schemaElement.converted_type == ConvertedType.INTERVAL)) { columnOrder = org.apache.parquet.schema.ColumnOrder.undefined(); } + // INT96_TIMESTAMP_ORDER is only valid for INT96 columns; ignore it anywhere else + if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER + && schemaElement.type != Type.INT96) { + columnOrder = org.apache.parquet.schema.ColumnOrder.undefined(); + } primitiveBuilder.columnOrder(columnOrder); } childBuilder = primitiveBuilder; @@ -2086,14 +2101,22 @@ Repetition fromParquetRepetition(FieldRepetitionType repetition) { return Repetition.valueOf(repetition.name()); } - private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) { + private org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) { if (columnOrder.isSetTYPE_ORDER()) { return org.apache.parquet.schema.ColumnOrder.typeDefined(); } + if (columnOrder.isSetINT96_TIMESTAMP_ORDER() && readInt96TimestampStatisticsEnabled()) { + return org.apache.parquet.schema.ColumnOrder.int96TimestampOrder(); + } // The column order is not yet supported by this API return org.apache.parquet.schema.ColumnOrder.undefined(); } + private boolean readInt96TimestampStatisticsEnabled() { + return options == null + || options.isEnabled(ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED, true); + } + @Deprecated public void writeDataPageHeader( int uncompressedSize, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index dd51d1ef09..facd888f76 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -89,7 +89,7 @@ public InternalParquetRecordWriter( ParquetProperties props) { this.parquetFileWriter = parquetFileWriter; this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null"); - this.schema = schema; + this.schema = ParquetFileWriter.applyInt96TimestampOrder(schema, props); this.extraMetaData = extraMetaData; this.rowGroupSizeThreshold = rowGroupSize; this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 82f4577b83..dd158ba011 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -92,8 +92,12 @@ import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.schema.ColumnOrder; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; import org.apache.parquet.schema.TypeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -435,7 +439,7 @@ public ParquetFileWriter( throws IOException { this( file, - schema, + applyInt96TimestampOrder(schema, props), mode, rowGroupSize, maxPaddingSize, @@ -447,6 +451,35 @@ public ParquetFileWriter( props.getAllocator()); } + /** + * Returns the schema with INT96 columns tagged with the INT96_TIMESTAMP_ORDER column order if + * INT96 timestamp statistics are enabled, so that statistics are accumulated with the + * chronological comparator and the proper column order is written to the footer. + */ + static MessageType applyInt96TimestampOrder(MessageType schema, ParquetProperties props) { + if (!props.getInt96TimestampStatisticsEnabled()) { + return schema; + } + return new MessageType(schema.getName(), applyInt96TimestampOrder(schema.getFields())); + } + + private static List applyInt96TimestampOrder(List fields) { + List result = new ArrayList<>(fields.size()); + for (Type field : fields) { + if (field.isPrimitive()) { + PrimitiveType primitive = field.asPrimitiveType(); + if (primitive.getPrimitiveTypeName() == PrimitiveTypeName.INT96) { + field = primitive.withColumnOrder(ColumnOrder.int96TimestampOrder()); + } + result.add(field); + } else { + GroupType group = field.asGroupType(); + result.add(group.withNewFields(applyInt96TimestampOrder(group.getFields()))); + } + } + return result; + } + @Deprecated public ParquetFileWriter( OutputFile file, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 8e05d49bd3..95f9796a4e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -122,6 +122,12 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled"; + /** + * key to configure whether INT96 min/max statistics written with the INT96_TIMESTAMP_ORDER + * column order are read (enabled by default) + */ + public static final String INT96_TIMESTAMP_STATISTICS_READING_ENABLED = "parquet.int96.timestamp.statistics.read.enabled"; + /** * key to configure whether row group dictionary filtering is enabled */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 868ae634c1..ba0a6a5924 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -163,6 +163,7 @@ public static enum JobSummaryLevel { public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled"; public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled"; + public static final String INT96_TIMESTAMP_STATISTICS_ENABLED = "parquet.int96.timestamp.statistics.enabled"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -432,6 +433,14 @@ public static boolean getStatisticsEnabled(Configuration conf, String columnPath return conf.getBoolean(STATISTICS_ENABLED, ParquetProperties.DEFAULT_STATISTICS_ENABLED); } + public static void setInt96TimestampStatisticsEnabled(JobContext jobContext, boolean enabled) { + getConfiguration(jobContext).setBoolean(INT96_TIMESTAMP_STATISTICS_ENABLED, enabled); + } + + public static boolean getInt96TimestampStatisticsEnabled(Configuration conf) { + return conf.getBoolean(INT96_TIMESTAMP_STATISTICS_ENABLED, ParquetProperties.DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED); + } + public static void setSizeStatisticsEnabled(Configuration conf, boolean enabled) { conf.setBoolean(SIZE_STATISTICS_ENABLED, enabled); } @@ -526,7 +535,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withRowGroupRowCountLimit(getBlockRowCountLimit(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)) - .withStatisticsEnabled(getStatisticsEnabled(conf)); + .withStatisticsEnabled(getStatisticsEnabled(conf)) + .withInt96TimestampStatisticsEnabled(getInt96TimestampStatisticsEnabled(conf)); new ColumnConfigParser() .withColumnConfig( ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..afa06065c4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -952,6 +952,18 @@ public SELF withStatisticsEnabled(boolean enabled) { return self(); } + /** + * Sets whether min/max statistics are collected and written for INT96 columns using the + * chronological INT96_TIMESTAMP_ORDER column order (disabled by default). + * + * @param enabled whether to collect and write INT96 timestamp statistics + * @return this builder for method chaining + */ + public SELF withInt96TimestampStatisticsEnabled(boolean enabled) { + encodingPropsBuilder.withInt96TimestampStatisticsEnabled(enabled); + return self(); + } + /** * Sets the size statistics enabled/disabled for the specified column. All column size statistics are enabled by default. * diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatistics.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatistics.java new file mode 100644 index 0000000000..8345304e80 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatistics.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.file.Files; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.format.ColumnChunk; +import org.apache.parquet.format.FileMetaData; +import org.apache.parquet.format.RowGroup; +import org.apache.parquet.format.Statistics; +import org.apache.parquet.format.Type; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.ColumnOrder; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for INT96 timestamp statistics support (INT96_TIMESTAMP_ORDER). + */ +public class TestInt96TimestampStatistics { + + private static final MessageType SCHEMA = + parseMessageType("message test { required int96 ts; required int64 id; } "); + + // Chronologically: EARLY < SAME_DAY_EARLY < LATE_IN_DAY < NEXT_DAY. + // Byte-wise lexicographic comparison would order these incorrectly (nanos bytes come first), + // so these values detect a reader/writer using the wrong order. + private static final Binary EARLY = int96(2440000, 123L); // 1968-05-23 00:00:00.000000123 + private static final Binary SAME_DAY_EARLY = int96(2440588, 1_000L); // 1970-01-01 00:00:00.000001 + private static final Binary LATE_IN_DAY = int96(2440588, 86_399_999_999_999L); // 1970-01-01 23:59:59.999... + private static final Binary NEXT_DAY = int96(2440589, 0L); // 1970-01-02 00:00:00 + + private static final List VALUES = List.of(LATE_IN_DAY, NEXT_DAY, EARLY, SAME_DAY_EARLY); + private static final Binary EXPECTED_MIN = EARLY; + private static final Binary EXPECTED_MAX = NEXT_DAY; + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + private static Binary int96(int julianDay, long nanosOfDay) { + return Binary.fromConstantByteArray(ByteBuffer.allocate(12) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(nanosOfDay) + .putInt(julianDay) + .array()); + } + + private File writeFile(boolean int96StatsEnabled) throws IOException { + File file = new File(tmp.getRoot(), "int96_" + int96StatsEnabled + ".parquet"); + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(SCHEMA, conf); + SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); + ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.getAbsolutePath())) + .withConf(conf) + .withInt96TimestampStatisticsEnabled(int96StatsEnabled) + .build(); + try { + for (int i = 0; i < VALUES.size(); i++) { + writer.write(factory.newGroup().append("ts", VALUES.get(i)).append("id", (long) i)); + } + } finally { + writer.close(); + } + return file; + } + + private static ParquetMetadata readFooter(File file, Configuration conf) throws IOException { + Path path = new Path(file.getAbsolutePath()); + try (ParquetFileReader reader = ParquetFileReader.open( + HadoopInputFile.fromPath(path, conf), + HadoopReadOptions.builder(conf, path).build())) { + return reader.getFooter(); + } + } + + private static FileMetaData readRawFooter(File file) throws IOException { + byte[] bytes = Files.readAllBytes(file.toPath()); + int footerLen = ByteBuffer.wrap(bytes, bytes.length - 8, 4) + .order(ByteOrder.LITTLE_ENDIAN) + .getInt(); + int footerStart = bytes.length - 8 - footerLen; + return Util.readFileMetaData(new ByteArrayInputStream(bytes, footerStart, footerLen)); + } + + /** Rewrites the footer of src into a new file, keeping all data pages byte-identical. */ + private File rewriteFooter(File src, FileMetaData footer, String name) + throws IOException { + byte[] bytes = Files.readAllBytes(src.toPath()); + int footerLen = ByteBuffer.wrap(bytes, bytes.length - 8, 4) + .order(ByteOrder.LITTLE_ENDIAN) + .getInt(); + int footerStart = bytes.length - 8 - footerLen; + File dst = new File(tmp.getRoot(), name); + try (FileOutputStream out = new FileOutputStream(dst)) { + out.write(bytes, 0, footerStart); + ByteArrayOutputStream serialized = new ByteArrayOutputStream(); + Util.writeFileMetaData(footer, serialized); + out.write(serialized.toByteArray()); + out.write(ByteBuffer.allocate(4) + .order(ByteOrder.LITTLE_ENDIAN) + .putInt(serialized.size()) + .array()); + out.write(ParquetFileWriter.MAGIC); + } + return dst; + } + + /** + * Injects min/max statistics for the INT96 column into the raw footer while keeping TYPE_ORDER + * as the column order. This simulates a legacy writer that emitted INT96 stats before + * INT96_TIMESTAMP_ORDER existed. + */ + private static void injectInt96Stats(FileMetaData footer) { + // No need to touch column_orders: the writer already stamps TYPE_ORDER for every column when + // INT96 timestamp statistics are disabled + for (RowGroup rowGroup : footer.getRow_groups()) { + rowGroup.getColumns().stream() + .map(c -> c.getMeta_data()) + .filter(md -> md.getType() == Type.INT96) + .forEach(md -> { + Statistics stats = new Statistics(); + stats.setMin_value(EXPECTED_MIN.getBytes()); + stats.setMax_value(EXPECTED_MAX.getBytes()); + stats.setNull_count(0); + md.setStatistics(stats); + }); + } + } + + private static ColumnChunkMetaData getColumn(ParquetMetadata footer, String name) { + BlockMetaData block = footer.getBlocks().get(0); + return block.getColumns().stream() + .filter(c -> c.getPath().toDotString().equals(name)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("no column " + name)); + } + + private static void assertStatsIgnored(ColumnChunkMetaData column) { + assertTrue(column.getStatistics() == null || !column.getStatistics().hasNonNullValue()); + } + + private static void assertStatsUsable(ColumnChunkMetaData column) { + assertTrue(column.getStatistics() != null && column.getStatistics().hasNonNullValue()); + assertArrayEquals(EXPECTED_MIN.getBytes(), column.getStatistics().getMinBytes()); + assertArrayEquals(EXPECTED_MAX.getBytes(), column.getStatistics().getMaxBytes()); + } + + @Test + public void testWriterOmitsInt96StatsByDefault() throws IOException { + File file = writeFile(false); + FileMetaData rawFooter = readRawFooter(file); + for (RowGroup rowGroup : rawFooter.getRow_groups()) { + for (ColumnChunk chunk : rowGroup.getColumns()) { + if (chunk.getMeta_data().getType() == Type.INT96) { + Statistics stats = chunk.getMeta_data().getStatistics(); + assertTrue(stats == null || (!stats.isSetMin_value() && !stats.isSetMax_value())); + } + } + } + + // Without the new order, the column order written for INT96 stays TYPE_ORDER + assertTrue(rawFooter.getColumn_orders().get(0).isSetTYPE_ORDER()); + assertStatsIgnored(getColumn(readFooter(file, new Configuration()), "ts")); + + // Without the new order, column index must not be written for the INT96 column. + Configuration conf = new Configuration(); + Path path = new Path(file.getAbsolutePath()); + try ( + ParquetFileReader reader = ParquetFileReader.open( + HadoopInputFile.fromPath(path, conf), HadoopReadOptions.builder(conf, path).build() + ) + ) { + assertNotNull(reader.readColumnIndex(getColumn(reader.getFooter(), "id"))); + assertNull(reader.readColumnIndex(getColumn(reader.getFooter(), "ts"))); + } + } + + @Test + public void testWriterEmitsInt96StatsAndColumnOrderWhenEnabled() throws IOException { + File file = writeFile(true); + FileMetaData rawFooter = readRawFooter(file); + // schema[0] is the message root; column_orders are indexed by leaf order: ts=0, id=1 + assertTrue(rawFooter.getColumn_orders().get(0).isSetINT96_TIMESTAMP_ORDER()); + assertTrue(rawFooter.getColumn_orders().get(1).isSetTYPE_ORDER()); + + for (RowGroup rowGroup : rawFooter.getRow_groups()) { + for (ColumnChunk chunk : rowGroup.getColumns()) { + if (chunk.getMeta_data().getType() == Type.INT96) { + Statistics stats = chunk.getMeta_data().getStatistics(); + assertTrue(stats != null && stats.isSetMin_value()); + assertArrayEquals(EXPECTED_MIN.getBytes(), stats.getMin_value()); + assertArrayEquals(EXPECTED_MAX.getBytes(), stats.getMax_value()); + } + } + } + + // Column index should be present for both columns. + Configuration conf = new Configuration(); + Path path = new Path(file.getAbsolutePath()); + try ( + ParquetFileReader reader = ParquetFileReader.open( + HadoopInputFile.fromPath(path, conf), HadoopReadOptions.builder(conf, path).build() + ) + ) { + assertNotNull(reader.readColumnIndex(getColumn(reader.getFooter(), "id"))); + + ColumnIndex columnIndex = reader.readColumnIndex(getColumn(reader.getFooter(), "ts")); + assertNotNull(columnIndex); + assertArrayEquals(EXPECTED_MIN.getBytes(), toArray(columnIndex.getMinValues().get(0))); + assertArrayEquals(EXPECTED_MAX.getBytes(), + toArray(columnIndex.getMaxValues().get(columnIndex.getMaxValues().size() - 1))); + } + } + + @Test + public void testStatsAccumulationUsesChronologicalOrder() throws IOException { + // Values are written in non-chronological order; the writer must still produce the + // chronological min/max, not first/last or byte-wise extremes. + File file = writeFile(true); + ParquetMetadata footer = readFooter(file, new Configuration()); + byte[] minBytes = getColumn(footer, "ts").getStatistics().getMinBytes(); + assertFalse(Binary.fromConstantByteArray(minBytes).equals(NEXT_DAY)); + assertArrayEquals(EXPECTED_MIN.getBytes(), minBytes); + assertArrayEquals(EXPECTED_MAX.getBytes(), + getColumn(footer, "ts").getStatistics().getMaxBytes()); + } + + @Test + public void testReaderReadsStatsWrittenWithInt96TimestampOrder() throws IOException { + File file = writeFile(true); + ParquetMetadata footer = readFooter(file, new Configuration()); + + PrimitiveType ts = footer.getFileMetaData().getSchema().getType("ts").asPrimitiveType(); + assertEquals(ColumnOrder.int96TimestampOrder(), ts.columnOrder()); + assertEquals("BINARY_AS_INT96_TIMESTAMP_COMPARATOR", ts.comparator().toString()); + + assertStatsUsable(getColumn(footer, "ts")); + } + + @Test + public void testReaderReadsStatsWrittenWithInt96TimestampOrderWhenDisabled() throws IOException { + File file = writeFile(true); + + Configuration conf = new Configuration(); + conf.setBoolean(ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED, false); + ParquetMetadata footer = readFooter(file, conf); + + assertEquals(ColumnOrder.undefined(), + footer.getFileMetaData().getSchema().getType("ts").asPrimitiveType().columnOrder()); + assertStatsIgnored(getColumn(footer, "ts")); + } + + @Test + public void testReaderIgnoresInt96StatsWithTypeDefinedOrder() throws IOException { + // Legacy layout: stats present, but column order is TYPE_ORDER so they are ignored. + File file = writeFile(false); + FileMetaData rawFooter = readRawFooter(file); + injectInt96Stats(rawFooter); + File legacy = rewriteFooter(file, rawFooter, "legacy.parquet"); + + // Validate the data is still intact after rewriting the footer. + try ( + ParquetReader reader = ParquetReader.builder( + new GroupReadSupport(), new Path(legacy.getAbsolutePath()) + ).build() + ) { + for (int i = 0; i < VALUES.size(); i++) { + Group group = reader.read(); + assertEquals(VALUES.get(i), group.getInt96("ts", 0)); + assertEquals(i, group.getLong("id", 0)); + } + } + + ParquetMetadata footer = readFooter(legacy, new Configuration()); + assertEquals(ColumnOrder.undefined(), + footer.getFileMetaData().getSchema().getType("ts").asPrimitiveType().columnOrder()); + assertStatsIgnored(getColumn(footer, "ts")); + // The non-INT96 sibling column is unaffected. + assertTrue(getColumn(footer, "id").getStatistics().hasNonNullValue()); + } + + private static byte[] toArray(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.duplicate().get(bytes); + return bytes; + } +}