From b48d08ebd12f126ca99f86a1e5d26b49fccf4df1 Mon Sep 17 00:00:00 2001
From: Divjot Arora
Date: Thu, 11 Jun 2026 07:48:04 +0000
Subject: [PATCH] Add new sort order for int96 timestamps
---
.../parquet/column/ParquetProperties.java | 21 ++
.../apache/parquet/schema/ColumnOrder.java | 17 +-
.../parquet/schema/PrimitiveComparator.java | 33 ++
.../apache/parquet/schema/PrimitiveType.java | 20 +-
.../schema/TestPrimitiveComparator.java | 59 +++
.../apache/parquet/ParquetReadOptions.java | 5 +
.../converter/ParquetMetadataConverter.java | 35 +-
.../hadoop/InternalParquetRecordWriter.java | 2 +-
.../parquet/hadoop/ParquetFileWriter.java | 35 +-
.../parquet/hadoop/ParquetInputFormat.java | 6 +
.../parquet/hadoop/ParquetOutputFormat.java | 12 +-
.../apache/parquet/hadoop/ParquetWriter.java | 12 +
.../TestInt96TimestampStatistics.java | 340 ++++++++++++++++++
13 files changed, 586 insertions(+), 11 deletions(-)
create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatistics.java
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index f29214b458..47a9585d13 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -66,6 +66,7 @@ public class ParquetProperties {
public static final int DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER = 5;
public static final boolean DEFAULT_STATISTICS_ENABLED = true;
public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true;
+ public static final boolean DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED = false;
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
@@ -120,6 +121,7 @@ public static WriterVersion fromString(String name) {
private final int statisticsTruncateLength;
private final boolean statisticsEnabled;
private final boolean sizeStatisticsEnabled;
+ private final boolean int96TimestampStatisticsEnabled;
// The expected NDV (number of distinct values) for each columns
private final ColumnProperty bloomFilterNDVs;
@@ -154,6 +156,7 @@ private ParquetProperties(Builder builder) {
this.statisticsTruncateLength = builder.statisticsTruncateLength;
this.statisticsEnabled = builder.statisticsEnabled;
this.sizeStatisticsEnabled = builder.sizeStatisticsEnabled;
+ this.int96TimestampStatisticsEnabled = builder.int96TimestampStatisticsEnabled;
this.bloomFilterNDVs = builder.bloomFilterNDVs.build();
this.bloomFilterFPPs = builder.bloomFilterFPPs.build();
this.bloomFilterEnabled = builder.bloomFilterEnabled.build();
@@ -370,6 +373,10 @@ public boolean getSizeStatisticsEnabled(ColumnDescriptor column) {
return sizeStatisticsEnabled;
}
+ public boolean getInt96TimestampStatisticsEnabled() {
+ return int96TimestampStatisticsEnabled;
+ }
+
@Override
public String toString() {
return "Parquet page size to " + getPageSizeThreshold() + '\n'
@@ -406,6 +413,7 @@ public static class Builder {
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
private boolean statisticsEnabled = DEFAULT_STATISTICS_ENABLED;
private boolean sizeStatisticsEnabled = DEFAULT_SIZE_STATISTICS_ENABLED;
+ private boolean int96TimestampStatisticsEnabled = DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED;
private final ColumnProperty.Builder bloomFilterNDVs;
private final ColumnProperty.Builder bloomFilterFPPs;
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
@@ -756,6 +764,19 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) {
return this;
}
+ /**
+ * Sets whether min/max statistics are collected and written for INT96 columns using the
+ * chronological INT96_TIMESTAMP_ORDER column order (disabled by default). When enabled, INT96
+ * columns are tagged with INT96_TIMESTAMP_ORDER in the file footer.
+ *
+ * @param enabled whether to collect and write INT96 timestamp statistics
+ * @return this builder for method chaining
+ */
+ public Builder withInt96TimestampStatisticsEnabled(boolean enabled) {
+ this.int96TimestampStatisticsEnabled = enabled;
+ return this;
+ }
+
public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java
index 94a1275569..5ce5005c6a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java
@@ -36,11 +36,18 @@ public enum ColumnOrderName {
/**
* Type defined order meaning that the comparison order of the elements are based on its type.
*/
- TYPE_DEFINED_ORDER
+ TYPE_DEFINED_ORDER,
+ /**
+ * Chronological order for INT96 timestamps: values are compared by the Julian day (the last 4
+ * bytes, as a little-endian signed int32), then by the nanoseconds within the day (the first 8
+ * bytes, as a little-endian signed int64). Only supported for the INT96 physical type.
+ */
+ INT96_TIMESTAMP_ORDER
}
private static final ColumnOrder UNDEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.UNDEFINED);
private static final ColumnOrder TYPE_DEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.TYPE_DEFINED_ORDER);
+ private static final ColumnOrder INT96_TIMESTAMP_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.INT96_TIMESTAMP_ORDER);
/**
* @return a {@link ColumnOrder} instance representing an undefined order
@@ -58,6 +65,14 @@ public static ColumnOrder typeDefined() {
return TYPE_DEFINED_COLUMN_ORDER;
}
+ /**
+ * @return a {@link ColumnOrder} instance representing the chronological order of INT96 timestamps
+ * @see ColumnOrderName#INT96_TIMESTAMP_ORDER
+ */
+ public static ColumnOrder int96TimestampOrder() {
+ return INT96_TIMESTAMP_COLUMN_ORDER;
+ }
+
private final ColumnOrderName columnOrderName;
private ColumnOrder(ColumnOrderName columnOrderName) {
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java
index 50c4acd4c9..74176b4ff5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java
@@ -20,6 +20,7 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.Comparator;
import org.apache.parquet.io.api.Binary;
@@ -293,4 +294,36 @@ public String toString() {
return "BINARY_AS_FLOAT16_COMPARATOR";
}
};
+
+ /*
+ * Comparator for two timestamps encoded as INT96 (12-byte little-endian) binary.
+ * Layout: first 8 bytes = nanoseconds within the day, last 4 bytes = Julian day.
+ *
+ * Two-level comparison, matching the INT96 timestamp sort order:
+ * 1. Compare the last 4 bytes (Julian day) as a signed little-endian int32.
+ * 2. If equal, compare the first 8 bytes (nanos) as a signed little-endian int64.
+ */
+ static final PrimitiveComparator BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() {
+ @Override
+ int compareBinary(Binary b1, Binary b2) {
+ if (b1.length() != 12 || b2.length() != 12) {
+ throw new IllegalArgumentException(
+ "INT96 binary length must be 12, got " + b1.length() + " and " + b2.length());
+ }
+
+ ByteBuffer bb1 = b1.toByteBuffer().slice();
+ ByteBuffer bb2 = b2.toByteBuffer().slice();
+ bb1.order(ByteOrder.LITTLE_ENDIAN);
+ bb2.order(ByteOrder.LITTLE_ENDIAN);
+
+ int result = Integer.compare(bb1.getInt(8), bb2.getInt(8));
+ if (result != 0) return result;
+ return Long.compare(bb1.getLong(0), bb2.getLong(0));
+ }
+
+ @Override
+ public String toString() {
+ return "BINARY_AS_INT96_TIMESTAMP_COMPARATOR";
+ }
+ };
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index 944cfb58eb..ca5d8d6eb6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -622,9 +622,15 @@ public PrimitiveType(
private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) {
if (primitive == PrimitiveTypeName.INT96) {
Preconditions.checkArgument(
- columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED,
+ columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED
+ || columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER,
"The column order %s is not supported by INT96",
columnOrder);
+ } else {
+ Preconditions.checkArgument(
+ columnOrder.getColumnOrderName() != ColumnOrderName.INT96_TIMESTAMP_ORDER,
+ "The column order %s is only supported by INT96",
+ columnOrder);
}
if (getLogicalTypeAnnotation() != null) {
Preconditions.checkArgument(
@@ -655,6 +661,15 @@ public PrimitiveType withLogicalTypeAnnotation(LogicalTypeAnnotation logicalType
return new PrimitiveType(getRepetition(), primitive, length, getName(), logicalType, getId());
}
+ /**
+ * @param columnOrder the column order
+ * @return a new PrimitiveType with the same fields and the given column order
+ */
+ public PrimitiveType withColumnOrder(ColumnOrder columnOrder) {
+ return new PrimitiveType(
+ getRepetition(), primitive, length, getName(), getLogicalTypeAnnotation(), getId(), columnOrder);
+ }
+
/**
* @return the primitive type
*/
@@ -869,6 +884,9 @@ protected Type union(Type toMerge, boolean strict) {
*/
@SuppressWarnings("unchecked")
public PrimitiveComparator comparator() {
+ if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER) {
+ return (PrimitiveComparator) PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR;
+ }
return (PrimitiveComparator) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation());
}
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
index 8fb53aca0f..d099cec01d 100644
--- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
@@ -19,6 +19,7 @@
package org.apache.parquet.schema;
import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR;
+import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR;
import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR;
import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR;
@@ -33,8 +34,12 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.function.Function;
+import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.io.api.Binary;
import org.junit.Test;
@@ -297,6 +302,60 @@ public void testBinaryAsSignedIntegerComparatorWithEquals() {
}
}
+ private static Binary int96(int julianDay, long nanosOfDay) {
+ return new NanoTime(julianDay, nanosOfDay).toBinary();
+ }
+
+ private static Binary timestampToInt96(String timestamp) {
+ LocalDateTime dt = LocalDateTime.parse(timestamp);
+ int julianDay = (int) (dt.toLocalDate().toEpochDay() + 2440588);
+ return new NanoTime(julianDay, dt.toLocalTime().toNanoOfDay()).toBinary();
+ }
+
+ @Test
+ public void testInt96TimestampComparator() {
+ Binary[] valuesInAscendingOrder = {
+ int96(Integer.MIN_VALUE, 0), // most negative julian day
+ int96(-1, 86_399_999_999_999L), // negative julian days sort before day 0
+ int96(0, 0), // start of the julian period
+ int96(0, 86_399_999_999_999L), // same day, later time of day
+ timestampToInt96("1968-05-23T00:00:00.000000123"), // pre-epoch but positive julian day
+ timestampToInt96("2020-01-01T12:00:00"),
+ timestampToInt96("2020-02-01T11:00:00"), // later day even though earlier time of day
+ timestampToInt96("2020-02-01T11:00:00.000000001"), // nanos tie-break
+ int96(Integer.MAX_VALUE, 86_399_999_999_999L)
+ };
+
+ // The same value in different Binary representations must compare identically; the offset
+ // variant guards against absolute reads not being relative to the value's start
+ List> representations = List.of(
+ b -> b,
+ b -> Binary.fromReusedByteArray(b.getBytes()),
+ b -> Binary.fromConstantByteArray(b.getBytes()),
+ b -> {
+ byte[] bytes = b.getBytes();
+ byte[] padded = new byte[bytes.length + 20];
+ Arrays.fill(padded, (byte) 0xAA);
+ System.arraycopy(bytes, 0, padded, 10, bytes.length);
+ return Binary.fromReusedByteArray(padded, 10, bytes.length);
+ });
+
+ for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
+ for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
+ for (Function fi : representations) {
+ for (Function fj : representations) {
+ Binary bi = fi.apply(valuesInAscendingOrder[i]);
+ Binary bj = fj.apply(valuesInAscendingOrder[j]);
+ assertEquals(
+ "comparing value " + i + " to value " + j,
+ Integer.signum(Integer.compare(i, j)),
+ Integer.signum(BINARY_AS_INT96_TIMESTAMP_COMPARATOR.compare(bi, bj)));
+ }
+ }
+ }
+ }
+ }
+
@Test
public void testFloat16Comparator() {
Binary[] valuesInAscendingOrder = {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index 895d0670fa..cdeadddc9c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -25,6 +25,7 @@
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT;
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
@@ -291,6 +292,10 @@ public Builder(ParquetConfiguration conf) {
if (badRecordThresh != null) {
set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
}
+ String readInt96TimestampStats = conf.get(INT96_TIMESTAMP_STATISTICS_READING_ENABLED);
+ if (readInt96TimestampStats != null) {
+ set(INT96_TIMESTAMP_STATISTICS_READING_ENABLED, readInt96TimestampStats);
+ }
}
public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 3597898c30..c976dfc7f5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -86,6 +86,7 @@
import org.apache.parquet.format.GeographyType;
import org.apache.parquet.format.GeometryType;
import org.apache.parquet.format.GeospatialStatistics;
+import org.apache.parquet.format.Int96TimestampOrder;
import org.apache.parquet.format.IntType;
import org.apache.parquet.format.KeyValue;
import org.apache.parquet.format.LogicalType;
@@ -111,6 +112,7 @@
import org.apache.parquet.format.Uncompressed;
import org.apache.parquet.format.VariantType;
import org.apache.parquet.format.XxHash;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -143,6 +145,7 @@
public class ParquetMetadataConverter {
private static final TypeDefinedOrder TYPE_DEFINED_ORDER = new TypeDefinedOrder();
+ private static final Int96TimestampOrder INT96_TIMESTAMP_ORDER = new Int96TimestampOrder();
public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
@@ -278,11 +281,16 @@ public FileMetaData toParquetMetadata(
private List getColumnOrders(MessageType schema) {
List columnOrders = new ArrayList<>();
- // Currently, only TypeDefinedOrder is supported, so we create a column order for each columns with
- // TypeDefinedOrder even if some types (e.g. INT96) have undefined column orders.
- for (int i = 0, n = schema.getPaths().size(); i < n; ++i) {
+ // Columns with the INT96_TIMESTAMP_ORDER column order are tagged as such; all other columns are
+ // tagged with TypeDefinedOrder even if some types have undefined column orders.
+ for (String[] path : schema.getPaths()) {
ColumnOrder columnOrder = new ColumnOrder();
- columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER);
+ if (schema.getType(path).asPrimitiveType().columnOrder().getColumnOrderName() ==
+ ColumnOrderName.INT96_TIMESTAMP_ORDER) {
+ columnOrder.setINT96_TIMESTAMP_ORDER(INT96_TIMESTAMP_ORDER);
+ } else {
+ columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER);
+ }
columnOrders.add(columnOrder);
}
return columnOrders;
@@ -891,7 +899,9 @@ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength,
}
private static boolean isMinMaxStatsSupported(PrimitiveType type) {
- return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER;
+ ColumnOrderName name = type.columnOrder().getColumnOrderName();
+ return name == ColumnOrderName.TYPE_DEFINED_ORDER
+ || name == ColumnOrderName.INT96_TIMESTAMP_ORDER;
}
/**
@@ -2034,6 +2044,11 @@ private void buildChildren(
|| schemaElement.converted_type == ConvertedType.INTERVAL)) {
columnOrder = org.apache.parquet.schema.ColumnOrder.undefined();
}
+ // INT96_TIMESTAMP_ORDER is only valid for INT96 columns; ignore it anywhere else
+ if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER
+ && schemaElement.type != Type.INT96) {
+ columnOrder = org.apache.parquet.schema.ColumnOrder.undefined();
+ }
primitiveBuilder.columnOrder(columnOrder);
}
childBuilder = primitiveBuilder;
@@ -2086,14 +2101,22 @@ Repetition fromParquetRepetition(FieldRepetitionType repetition) {
return Repetition.valueOf(repetition.name());
}
- private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) {
+ private org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) {
if (columnOrder.isSetTYPE_ORDER()) {
return org.apache.parquet.schema.ColumnOrder.typeDefined();
}
+ if (columnOrder.isSetINT96_TIMESTAMP_ORDER() && readInt96TimestampStatisticsEnabled()) {
+ return org.apache.parquet.schema.ColumnOrder.int96TimestampOrder();
+ }
// The column order is not yet supported by this API
return org.apache.parquet.schema.ColumnOrder.undefined();
}
+ private boolean readInt96TimestampStatisticsEnabled() {
+ return options == null
+ || options.isEnabled(ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED, true);
+ }
+
@Deprecated
public void writeDataPageHeader(
int uncompressedSize,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index dd51d1ef09..facd888f76 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -89,7 +89,7 @@ public InternalParquetRecordWriter(
ParquetProperties props) {
this.parquetFileWriter = parquetFileWriter;
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null");
- this.schema = schema;
+ this.schema = ParquetFileWriter.applyInt96TimestampOrder(schema, props);
this.extraMetaData = extraMetaData;
this.rowGroupSizeThreshold = rowGroupSize;
this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 82f4577b83..dd158ba011 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -92,8 +92,12 @@
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.schema.ColumnOrder;
+import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.TypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -435,7 +439,7 @@ public ParquetFileWriter(
throws IOException {
this(
file,
- schema,
+ applyInt96TimestampOrder(schema, props),
mode,
rowGroupSize,
maxPaddingSize,
@@ -447,6 +451,35 @@ public ParquetFileWriter(
props.getAllocator());
}
+ /**
+ * Returns the schema with INT96 columns tagged with the INT96_TIMESTAMP_ORDER column order if
+ * INT96 timestamp statistics are enabled, so that statistics are accumulated with the
+ * chronological comparator and the proper column order is written to the footer.
+ */
+ static MessageType applyInt96TimestampOrder(MessageType schema, ParquetProperties props) {
+ if (!props.getInt96TimestampStatisticsEnabled()) {
+ return schema;
+ }
+ return new MessageType(schema.getName(), applyInt96TimestampOrder(schema.getFields()));
+ }
+
+ private static List applyInt96TimestampOrder(List fields) {
+ List result = new ArrayList<>(fields.size());
+ for (Type field : fields) {
+ if (field.isPrimitive()) {
+ PrimitiveType primitive = field.asPrimitiveType();
+ if (primitive.getPrimitiveTypeName() == PrimitiveTypeName.INT96) {
+ field = primitive.withColumnOrder(ColumnOrder.int96TimestampOrder());
+ }
+ result.add(field);
+ } else {
+ GroupType group = field.asGroupType();
+ result.add(group.withNewFields(applyInt96TimestampOrder(group.getFields())));
+ }
+ }
+ return result;
+ }
+
@Deprecated
public ParquetFileWriter(
OutputFile file,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index 8e05d49bd3..95f9796a4e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -122,6 +122,12 @@ public class ParquetInputFormat extends FileInputFormat {
*/
public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled";
+ /**
+ * key to configure whether INT96 min/max statistics written with the INT96_TIMESTAMP_ORDER
+ * column order are read (enabled by default)
+ */
+ public static final String INT96_TIMESTAMP_STATISTICS_READING_ENABLED = "parquet.int96.timestamp.statistics.read.enabled";
+
/**
* key to configure whether row group dictionary filtering is enabled
*/
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 868ae634c1..ba0a6a5924 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -163,6 +163,7 @@ public static enum JobSummaryLevel {
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled";
public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled";
+ public static final String INT96_TIMESTAMP_STATISTICS_ENABLED = "parquet.int96.timestamp.statistics.enabled";
public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -432,6 +433,14 @@ public static boolean getStatisticsEnabled(Configuration conf, String columnPath
return conf.getBoolean(STATISTICS_ENABLED, ParquetProperties.DEFAULT_STATISTICS_ENABLED);
}
+ public static void setInt96TimestampStatisticsEnabled(JobContext jobContext, boolean enabled) {
+ getConfiguration(jobContext).setBoolean(INT96_TIMESTAMP_STATISTICS_ENABLED, enabled);
+ }
+
+ public static boolean getInt96TimestampStatisticsEnabled(Configuration conf) {
+ return conf.getBoolean(INT96_TIMESTAMP_STATISTICS_ENABLED, ParquetProperties.DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED);
+ }
+
public static void setSizeStatisticsEnabled(Configuration conf, boolean enabled) {
conf.setBoolean(SIZE_STATISTICS_ENABLED, enabled);
}
@@ -526,7 +535,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp
.withRowGroupRowCountLimit(getBlockRowCountLimit(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
- .withStatisticsEnabled(getStatisticsEnabled(conf));
+ .withStatisticsEnabled(getStatisticsEnabled(conf))
+ .withInt96TimestampStatisticsEnabled(getInt96TimestampStatisticsEnabled(conf));
new ColumnConfigParser()
.withColumnConfig(
ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 8eb5f7f17b..afa06065c4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -952,6 +952,18 @@ public SELF withStatisticsEnabled(boolean enabled) {
return self();
}
+ /**
+ * Sets whether min/max statistics are collected and written for INT96 columns using the
+ * chronological INT96_TIMESTAMP_ORDER column order (disabled by default).
+ *
+ * @param enabled whether to collect and write INT96 timestamp statistics
+ * @return this builder for method chaining
+ */
+ public SELF withInt96TimestampStatisticsEnabled(boolean enabled) {
+ encodingPropsBuilder.withInt96TimestampStatisticsEnabled(enabled);
+ return self();
+ }
+
/**
* Sets the size statistics enabled/disabled for the specified column. All column size statistics are enabled by default.
*
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatistics.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatistics.java
new file mode 100644
index 0000000000..8345304e80
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestInt96TimestampStatistics.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.statistics;
+
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.format.ColumnChunk;
+import org.apache.parquet.format.FileMetaData;
+import org.apache.parquet.format.RowGroup;
+import org.apache.parquet.format.Statistics;
+import org.apache.parquet.format.Type;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.ColumnOrder;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for INT96 timestamp statistics support (INT96_TIMESTAMP_ORDER).
+ */
+public class TestInt96TimestampStatistics {
+
+ private static final MessageType SCHEMA =
+ parseMessageType("message test { required int96 ts; required int64 id; } ");
+
+ // Chronologically: EARLY < SAME_DAY_EARLY < LATE_IN_DAY < NEXT_DAY.
+ // Byte-wise lexicographic comparison would order these incorrectly (nanos bytes come first),
+ // so these values detect a reader/writer using the wrong order.
+ private static final Binary EARLY = int96(2440000, 123L); // 1968-05-23 00:00:00.000000123
+ private static final Binary SAME_DAY_EARLY = int96(2440588, 1_000L); // 1970-01-01 00:00:00.000001
+ private static final Binary LATE_IN_DAY = int96(2440588, 86_399_999_999_999L); // 1970-01-01 23:59:59.999...
+ private static final Binary NEXT_DAY = int96(2440589, 0L); // 1970-01-02 00:00:00
+
+ private static final List VALUES = List.of(LATE_IN_DAY, NEXT_DAY, EARLY, SAME_DAY_EARLY);
+ private static final Binary EXPECTED_MIN = EARLY;
+ private static final Binary EXPECTED_MAX = NEXT_DAY;
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ private static Binary int96(int julianDay, long nanosOfDay) {
+ return Binary.fromConstantByteArray(ByteBuffer.allocate(12)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putLong(nanosOfDay)
+ .putInt(julianDay)
+ .array());
+ }
+
+ private File writeFile(boolean int96StatsEnabled) throws IOException {
+ File file = new File(tmp.getRoot(), "int96_" + int96StatsEnabled + ".parquet");
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(SCHEMA, conf);
+ SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA);
+ ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.getAbsolutePath()))
+ .withConf(conf)
+ .withInt96TimestampStatisticsEnabled(int96StatsEnabled)
+ .build();
+ try {
+ for (int i = 0; i < VALUES.size(); i++) {
+ writer.write(factory.newGroup().append("ts", VALUES.get(i)).append("id", (long) i));
+ }
+ } finally {
+ writer.close();
+ }
+ return file;
+ }
+
+ private static ParquetMetadata readFooter(File file, Configuration conf) throws IOException {
+ Path path = new Path(file.getAbsolutePath());
+ try (ParquetFileReader reader = ParquetFileReader.open(
+ HadoopInputFile.fromPath(path, conf),
+ HadoopReadOptions.builder(conf, path).build())) {
+ return reader.getFooter();
+ }
+ }
+
+ private static FileMetaData readRawFooter(File file) throws IOException {
+ byte[] bytes = Files.readAllBytes(file.toPath());
+ int footerLen = ByteBuffer.wrap(bytes, bytes.length - 8, 4)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .getInt();
+ int footerStart = bytes.length - 8 - footerLen;
+ return Util.readFileMetaData(new ByteArrayInputStream(bytes, footerStart, footerLen));
+ }
+
+ /** Rewrites the footer of src into a new file, keeping all data pages byte-identical. */
+ private File rewriteFooter(File src, FileMetaData footer, String name)
+ throws IOException {
+ byte[] bytes = Files.readAllBytes(src.toPath());
+ int footerLen = ByteBuffer.wrap(bytes, bytes.length - 8, 4)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .getInt();
+ int footerStart = bytes.length - 8 - footerLen;
+ File dst = new File(tmp.getRoot(), name);
+ try (FileOutputStream out = new FileOutputStream(dst)) {
+ out.write(bytes, 0, footerStart);
+ ByteArrayOutputStream serialized = new ByteArrayOutputStream();
+ Util.writeFileMetaData(footer, serialized);
+ out.write(serialized.toByteArray());
+ out.write(ByteBuffer.allocate(4)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putInt(serialized.size())
+ .array());
+ out.write(ParquetFileWriter.MAGIC);
+ }
+ return dst;
+ }
+
+ /**
+ * Injects min/max statistics for the INT96 column into the raw footer while keeping TYPE_ORDER
+ * as the column order. This simulates a legacy writer that emitted INT96 stats before
+ * INT96_TIMESTAMP_ORDER existed.
+ */
+ private static void injectInt96Stats(FileMetaData footer) {
+ // No need to touch column_orders: the writer already stamps TYPE_ORDER for every column when
+ // INT96 timestamp statistics are disabled
+ for (RowGroup rowGroup : footer.getRow_groups()) {
+ rowGroup.getColumns().stream()
+ .map(c -> c.getMeta_data())
+ .filter(md -> md.getType() == Type.INT96)
+ .forEach(md -> {
+ Statistics stats = new Statistics();
+ stats.setMin_value(EXPECTED_MIN.getBytes());
+ stats.setMax_value(EXPECTED_MAX.getBytes());
+ stats.setNull_count(0);
+ md.setStatistics(stats);
+ });
+ }
+ }
+
+ private static ColumnChunkMetaData getColumn(ParquetMetadata footer, String name) {
+ BlockMetaData block = footer.getBlocks().get(0);
+ return block.getColumns().stream()
+ .filter(c -> c.getPath().toDotString().equals(name))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("no column " + name));
+ }
+
+ private static void assertStatsIgnored(ColumnChunkMetaData column) {
+ assertTrue(column.getStatistics() == null || !column.getStatistics().hasNonNullValue());
+ }
+
+ private static void assertStatsUsable(ColumnChunkMetaData column) {
+ assertTrue(column.getStatistics() != null && column.getStatistics().hasNonNullValue());
+ assertArrayEquals(EXPECTED_MIN.getBytes(), column.getStatistics().getMinBytes());
+ assertArrayEquals(EXPECTED_MAX.getBytes(), column.getStatistics().getMaxBytes());
+ }
+
+ @Test
+ public void testWriterOmitsInt96StatsByDefault() throws IOException {
+ File file = writeFile(false);
+ FileMetaData rawFooter = readRawFooter(file);
+ for (RowGroup rowGroup : rawFooter.getRow_groups()) {
+ for (ColumnChunk chunk : rowGroup.getColumns()) {
+ if (chunk.getMeta_data().getType() == Type.INT96) {
+ Statistics stats = chunk.getMeta_data().getStatistics();
+ assertTrue(stats == null || (!stats.isSetMin_value() && !stats.isSetMax_value()));
+ }
+ }
+ }
+
+ // Without the new order, the column order written for INT96 stays TYPE_ORDER
+ assertTrue(rawFooter.getColumn_orders().get(0).isSetTYPE_ORDER());
+ assertStatsIgnored(getColumn(readFooter(file, new Configuration()), "ts"));
+
+ // Without the new order, column index must not be written for the INT96 column.
+ Configuration conf = new Configuration();
+ Path path = new Path(file.getAbsolutePath());
+ try (
+ ParquetFileReader reader = ParquetFileReader.open(
+ HadoopInputFile.fromPath(path, conf), HadoopReadOptions.builder(conf, path).build()
+ )
+ ) {
+ assertNotNull(reader.readColumnIndex(getColumn(reader.getFooter(), "id")));
+ assertNull(reader.readColumnIndex(getColumn(reader.getFooter(), "ts")));
+ }
+ }
+
+ @Test
+ public void testWriterEmitsInt96StatsAndColumnOrderWhenEnabled() throws IOException {
+ File file = writeFile(true);
+ FileMetaData rawFooter = readRawFooter(file);
+ // schema[0] is the message root; column_orders are indexed by leaf order: ts=0, id=1
+ assertTrue(rawFooter.getColumn_orders().get(0).isSetINT96_TIMESTAMP_ORDER());
+ assertTrue(rawFooter.getColumn_orders().get(1).isSetTYPE_ORDER());
+
+ for (RowGroup rowGroup : rawFooter.getRow_groups()) {
+ for (ColumnChunk chunk : rowGroup.getColumns()) {
+ if (chunk.getMeta_data().getType() == Type.INT96) {
+ Statistics stats = chunk.getMeta_data().getStatistics();
+ assertTrue(stats != null && stats.isSetMin_value());
+ assertArrayEquals(EXPECTED_MIN.getBytes(), stats.getMin_value());
+ assertArrayEquals(EXPECTED_MAX.getBytes(), stats.getMax_value());
+ }
+ }
+ }
+
+ // Column index should be present for both columns.
+ Configuration conf = new Configuration();
+ Path path = new Path(file.getAbsolutePath());
+ try (
+ ParquetFileReader reader = ParquetFileReader.open(
+ HadoopInputFile.fromPath(path, conf), HadoopReadOptions.builder(conf, path).build()
+ )
+ ) {
+ assertNotNull(reader.readColumnIndex(getColumn(reader.getFooter(), "id")));
+
+ ColumnIndex columnIndex = reader.readColumnIndex(getColumn(reader.getFooter(), "ts"));
+ assertNotNull(columnIndex);
+ assertArrayEquals(EXPECTED_MIN.getBytes(), toArray(columnIndex.getMinValues().get(0)));
+ assertArrayEquals(EXPECTED_MAX.getBytes(),
+ toArray(columnIndex.getMaxValues().get(columnIndex.getMaxValues().size() - 1)));
+ }
+ }
+
+ @Test
+ public void testStatsAccumulationUsesChronologicalOrder() throws IOException {
+ // Values are written in non-chronological order; the writer must still produce the
+ // chronological min/max, not first/last or byte-wise extremes.
+ File file = writeFile(true);
+ ParquetMetadata footer = readFooter(file, new Configuration());
+ byte[] minBytes = getColumn(footer, "ts").getStatistics().getMinBytes();
+ assertFalse(Binary.fromConstantByteArray(minBytes).equals(NEXT_DAY));
+ assertArrayEquals(EXPECTED_MIN.getBytes(), minBytes);
+ assertArrayEquals(EXPECTED_MAX.getBytes(),
+ getColumn(footer, "ts").getStatistics().getMaxBytes());
+ }
+
+ @Test
+ public void testReaderReadsStatsWrittenWithInt96TimestampOrder() throws IOException {
+ File file = writeFile(true);
+ ParquetMetadata footer = readFooter(file, new Configuration());
+
+ PrimitiveType ts = footer.getFileMetaData().getSchema().getType("ts").asPrimitiveType();
+ assertEquals(ColumnOrder.int96TimestampOrder(), ts.columnOrder());
+ assertEquals("BINARY_AS_INT96_TIMESTAMP_COMPARATOR", ts.comparator().toString());
+
+ assertStatsUsable(getColumn(footer, "ts"));
+ }
+
+ @Test
+ public void testReaderReadsStatsWrittenWithInt96TimestampOrderWhenDisabled() throws IOException {
+ File file = writeFile(true);
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED, false);
+ ParquetMetadata footer = readFooter(file, conf);
+
+ assertEquals(ColumnOrder.undefined(),
+ footer.getFileMetaData().getSchema().getType("ts").asPrimitiveType().columnOrder());
+ assertStatsIgnored(getColumn(footer, "ts"));
+ }
+
+ @Test
+ public void testReaderIgnoresInt96StatsWithTypeDefinedOrder() throws IOException {
+ // Legacy layout: stats present, but column order is TYPE_ORDER so they are ignored.
+ File file = writeFile(false);
+ FileMetaData rawFooter = readRawFooter(file);
+ injectInt96Stats(rawFooter);
+ File legacy = rewriteFooter(file, rawFooter, "legacy.parquet");
+
+ // Validate the data is still intact after rewriting the footer.
+ try (
+ ParquetReader reader = ParquetReader.builder(
+ new GroupReadSupport(), new Path(legacy.getAbsolutePath())
+ ).build()
+ ) {
+ for (int i = 0; i < VALUES.size(); i++) {
+ Group group = reader.read();
+ assertEquals(VALUES.get(i), group.getInt96("ts", 0));
+ assertEquals(i, group.getLong("id", 0));
+ }
+ }
+
+ ParquetMetadata footer = readFooter(legacy, new Configuration());
+ assertEquals(ColumnOrder.undefined(),
+ footer.getFileMetaData().getSchema().getType("ts").asPrimitiveType().columnOrder());
+ assertStatsIgnored(getColumn(footer, "ts"));
+ // The non-INT96 sibling column is unaffected.
+ assertTrue(getColumn(footer, "id").getStatistics().hasNonNullValue());
+ }
+
+ private static byte[] toArray(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.duplicate().get(bytes);
+ return bytes;
+ }
+}