Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class ParquetProperties {
public static final int DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER = 5;
public static final boolean DEFAULT_STATISTICS_ENABLED = true;
public static final boolean DEFAULT_SIZE_STATISTICS_ENABLED = true;
public static final boolean DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED = false;

public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;

Expand Down Expand Up @@ -120,6 +121,7 @@ public static WriterVersion fromString(String name) {
private final int statisticsTruncateLength;
private final boolean statisticsEnabled;
private final boolean sizeStatisticsEnabled;
private final boolean int96TimestampStatisticsEnabled;

// The expected NDV (number of distinct values) for each columns
private final ColumnProperty<Long> bloomFilterNDVs;
Expand Down Expand Up @@ -154,6 +156,7 @@ private ParquetProperties(Builder builder) {
this.statisticsTruncateLength = builder.statisticsTruncateLength;
this.statisticsEnabled = builder.statisticsEnabled;
this.sizeStatisticsEnabled = builder.sizeStatisticsEnabled;
this.int96TimestampStatisticsEnabled = builder.int96TimestampStatisticsEnabled;
this.bloomFilterNDVs = builder.bloomFilterNDVs.build();
this.bloomFilterFPPs = builder.bloomFilterFPPs.build();
this.bloomFilterEnabled = builder.bloomFilterEnabled.build();
Expand Down Expand Up @@ -370,6 +373,10 @@ public boolean getSizeStatisticsEnabled(ColumnDescriptor column) {
return sizeStatisticsEnabled;
}

public boolean getInt96TimestampStatisticsEnabled() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this an option? Since we don't use int96 stats otherwise, I think it would be perfectly fine to keep it simple and just produce the new stats all the time.

return int96TimestampStatisticsEnabled;
}

@Override
public String toString() {
return "Parquet page size to " + getPageSizeThreshold() + '\n'
Expand Down Expand Up @@ -406,6 +413,7 @@ public static class Builder {
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
private boolean statisticsEnabled = DEFAULT_STATISTICS_ENABLED;
private boolean sizeStatisticsEnabled = DEFAULT_SIZE_STATISTICS_ENABLED;
private boolean int96TimestampStatisticsEnabled = DEFAULT_INT96_TIMESTAMP_STATISTICS_ENABLED;
private final ColumnProperty.Builder<Long> bloomFilterNDVs;
private final ColumnProperty.Builder<Double> bloomFilterFPPs;
private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
Expand Down Expand Up @@ -756,6 +764,19 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) {
return this;
}

/**
* Sets whether min/max statistics are collected and written for INT96 columns using the
* chronological INT96_TIMESTAMP_ORDER column order (disabled by default). When enabled, INT96
* columns are tagged with INT96_TIMESTAMP_ORDER in the file footer.
*
* @param enabled whether to collect and write INT96 timestamp statistics
* @return this builder for method chaining
*/
public Builder withInt96TimestampStatisticsEnabled(boolean enabled) {
this.int96TimestampStatisticsEnabled = enabled;
return this;
}

public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@ public enum ColumnOrderName {
/**
* Type defined order meaning that the comparison order of the elements are based on its type.
*/
TYPE_DEFINED_ORDER
TYPE_DEFINED_ORDER,
/**
* Chronological order for INT96 timestamps: values are compared by the Julian day (the last 4
* bytes, as a little-endian signed int32), then by the nanoseconds within the day (the first 8
* bytes, as a little-endian signed int64). Only supported for the INT96 physical type.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this Javadoc is the right place for documenting the format and how to compare. This is part of the spec so the spec needs to be clear and this needs to state what the order means.

*/
INT96_TIMESTAMP_ORDER
}

private static final ColumnOrder UNDEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.UNDEFINED);
private static final ColumnOrder TYPE_DEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.TYPE_DEFINED_ORDER);
private static final ColumnOrder INT96_TIMESTAMP_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.INT96_TIMESTAMP_ORDER);

/**
* @return a {@link ColumnOrder} instance representing an undefined order
Expand All @@ -58,6 +65,14 @@ public static ColumnOrder typeDefined() {
return TYPE_DEFINED_COLUMN_ORDER;
}

/**
* @return a {@link ColumnOrder} instance representing the chronological order of INT96 timestamps
* @see ColumnOrderName#INT96_TIMESTAMP_ORDER
*/
public static ColumnOrder int96TimestampOrder() {
return INT96_TIMESTAMP_COLUMN_ORDER;
}

private final ColumnOrderName columnOrderName;

private ColumnOrder(ColumnOrderName columnOrderName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Comparator;
import org.apache.parquet.io.api.Binary;

Expand Down Expand Up @@ -293,4 +294,36 @@ public String toString() {
return "BINARY_AS_FLOAT16_COMPARATOR";
}
};

/*

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend for this to be Javadoc?

* Comparator for two timestamps encoded as INT96 (12-byte little-endian) binary.
* Layout: first 8 bytes = nanoseconds within the day, last 4 bytes = Julian day.
*
* Two-level comparison, matching the INT96 timestamp sort order:
* 1. Compare the last 4 bytes (Julian day) as a signed little-endian int32.
* 2. If equal, compare the first 8 bytes (nanos) as a signed little-endian int64.
*/
static final PrimitiveComparator<Binary> BINARY_AS_INT96_TIMESTAMP_COMPARATOR = new BinaryComparator() {
@Override
int compareBinary(Binary b1, Binary b2) {
if (b1.length() != 12 || b2.length() != 12) {
throw new IllegalArgumentException(
"INT96 binary length must be 12, got " + b1.length() + " and " + b2.length());
}

ByteBuffer bb1 = b1.toByteBuffer().slice();
ByteBuffer bb2 = b2.toByteBuffer().slice();
bb1.order(ByteOrder.LITTLE_ENDIAN);
bb2.order(ByteOrder.LITTLE_ENDIAN);

int result = Integer.compare(bb1.getInt(8), bb2.getInt(8));
if (result != 0) return result;
return Long.compare(bb1.getLong(0), bb2.getLong(0));
}

@Override
public String toString() {
return "BINARY_AS_INT96_TIMESTAMP_COMPARATOR";
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -622,9 +622,15 @@ public PrimitiveType(
private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) {
if (primitive == PrimitiveTypeName.INT96) {
Preconditions.checkArgument(
columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED,
columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED
|| columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER,
"The column order %s is not supported by INT96",
columnOrder);
} else {
Preconditions.checkArgument(
columnOrder.getColumnOrderName() != ColumnOrderName.INT96_TIMESTAMP_ORDER,
"The column order %s is only supported by INT96",
columnOrder);
}
if (getLogicalTypeAnnotation() != null) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -655,6 +661,15 @@ public PrimitiveType withLogicalTypeAnnotation(LogicalTypeAnnotation logicalType
return new PrimitiveType(getRepetition(), primitive, length, getName(), logicalType, getId());
}

/**
* @param columnOrder the column order
* @return a new PrimitiveType with the same fields and the given column order
*/
public PrimitiveType withColumnOrder(ColumnOrder columnOrder) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we always produce INT96 stats using the timestamp order, would we need this addition to the API? I think we could always produce a PrimitiveType with the right order when constructing types. We would just need to make sure that deserialization correctly distinguishes between unordered and timestamp order.

return new PrimitiveType(
getRepetition(), primitive, length, getName(), getLogicalTypeAnnotation(), getId(), columnOrder);
}

/**
* @return the primitive type
*/
Expand Down Expand Up @@ -869,6 +884,9 @@ protected Type union(Type toMerge, boolean strict) {
*/
@SuppressWarnings("unchecked")
public <T> PrimitiveComparator<T> comparator() {
if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER) {
return (PrimitiveComparator<T>) PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR;
}
return (PrimitiveComparator<T>) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.schema;

import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR;
import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_INT96_TIMESTAMP_COMPARATOR;
import static org.apache.parquet.schema.PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
import static org.apache.parquet.schema.PrimitiveComparator.BOOLEAN_COMPARATOR;
import static org.apache.parquet.schema.PrimitiveComparator.DOUBLE_COMPARATOR;
Expand All @@ -33,8 +34,12 @@

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.io.api.Binary;
import org.junit.Test;

Expand Down Expand Up @@ -297,6 +302,60 @@ public void testBinaryAsSignedIntegerComparatorWithEquals() {
}
}

private static Binary int96(int julianDay, long nanosOfDay) {
return new NanoTime(julianDay, nanosOfDay).toBinary();
}

private static Binary timestampToInt96(String timestamp) {
LocalDateTime dt = LocalDateTime.parse(timestamp);
int julianDay = (int) (dt.toLocalDate().toEpochDay() + 2440588);
return new NanoTime(julianDay, dt.toLocalTime().toNanoOfDay()).toBinary();
}

@Test
public void testInt96TimestampComparator() {
Binary[] valuesInAscendingOrder = {
int96(Integer.MIN_VALUE, 0), // most negative julian day
int96(-1, 86_399_999_999_999L), // negative julian days sort before day 0
int96(0, 0), // start of the julian period
int96(0, 86_399_999_999_999L), // same day, later time of day
timestampToInt96("1968-05-23T00:00:00.000000123"), // pre-epoch but positive julian day
timestampToInt96("2020-01-01T12:00:00"),
timestampToInt96("2020-02-01T11:00:00"), // later day even though earlier time of day
timestampToInt96("2020-02-01T11:00:00.000000001"), // nanos tie-break
int96(Integer.MAX_VALUE, 86_399_999_999_999L)
};

// The same value in different Binary representations must compare identically; the offset
// variant guards against absolute reads not being relative to the value's start
List<Function<Binary, Binary>> representations = List.of(
b -> b,
b -> Binary.fromReusedByteArray(b.getBytes()),
b -> Binary.fromConstantByteArray(b.getBytes()),
b -> {
byte[] bytes = b.getBytes();
byte[] padded = new byte[bytes.length + 20];
Arrays.fill(padded, (byte) 0xAA);
System.arraycopy(bytes, 0, padded, 10, bytes.length);
return Binary.fromReusedByteArray(padded, 10, bytes.length);
});

for (int i = 0; i < valuesInAscendingOrder.length; ++i) {
for (int j = 0; j < valuesInAscendingOrder.length; ++j) {
for (Function<Binary, Binary> fi : representations) {
for (Function<Binary, Binary> fj : representations) {
Binary bi = fi.apply(valuesInAscendingOrder[i]);
Binary bj = fj.apply(valuesInAscendingOrder[j]);
assertEquals(
"comparing value " + i + " to value " + j,
Integer.signum(Integer.compare(i, j)),
Integer.signum(BINARY_AS_INT96_TIMESTAMP_COMPARATOR.compare(bi, bj)));
}
}
}
}
}

@Test
public void testFloat16Comparator() {
Binary[] valuesInAscendingOrder = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT;
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
Expand Down Expand Up @@ -291,6 +292,10 @@ public Builder(ParquetConfiguration conf) {
if (badRecordThresh != null) {
set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
}
String readInt96TimestampStats = conf.get(INT96_TIMESTAMP_STATISTICS_READING_ENABLED);
if (readInt96TimestampStats != null) {
set(INT96_TIMESTAMP_STATISTICS_READING_ENABLED, readInt96TimestampStats);
}
}

public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.parquet.format.GeographyType;
import org.apache.parquet.format.GeometryType;
import org.apache.parquet.format.GeospatialStatistics;
import org.apache.parquet.format.Int96TimestampOrder;
import org.apache.parquet.format.IntType;
import org.apache.parquet.format.KeyValue;
import org.apache.parquet.format.LogicalType;
Expand All @@ -111,6 +112,7 @@
import org.apache.parquet.format.Uncompressed;
import org.apache.parquet.format.VariantType;
import org.apache.parquet.format.XxHash;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
Expand Down Expand Up @@ -143,6 +145,7 @@
public class ParquetMetadataConverter {

private static final TypeDefinedOrder TYPE_DEFINED_ORDER = new TypeDefinedOrder();
private static final Int96TimestampOrder INT96_TIMESTAMP_ORDER = new Int96TimestampOrder();
public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
Expand Down Expand Up @@ -278,11 +281,16 @@ public FileMetaData toParquetMetadata(

private List<ColumnOrder> getColumnOrders(MessageType schema) {
List<ColumnOrder> columnOrders = new ArrayList<>();
// Currently, only TypeDefinedOrder is supported, so we create a column order for each columns with
// TypeDefinedOrder even if some types (e.g. INT96) have undefined column orders.
for (int i = 0, n = schema.getPaths().size(); i < n; ++i) {
// Columns with the INT96_TIMESTAMP_ORDER column order are tagged as such; all other columns are
// tagged with TypeDefinedOrder even if some types have undefined column orders.
for (String[] path : schema.getPaths()) {
ColumnOrder columnOrder = new ColumnOrder();
columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER);
if (schema.getType(path).asPrimitiveType().columnOrder().getColumnOrderName() ==
ColumnOrderName.INT96_TIMESTAMP_ORDER) {
columnOrder.setINT96_TIMESTAMP_ORDER(INT96_TIMESTAMP_ORDER);
} else {
columnOrder.setTYPE_ORDER(TYPE_DEFINED_ORDER);
}
columnOrders.add(columnOrder);
}
return columnOrders;
Expand Down Expand Up @@ -891,7 +899,9 @@ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength,
}

private static boolean isMinMaxStatsSupported(PrimitiveType type) {
return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER;
ColumnOrderName name = type.columnOrder().getColumnOrderName();
return name == ColumnOrderName.TYPE_DEFINED_ORDER
|| name == ColumnOrderName.INT96_TIMESTAMP_ORDER;
}

/**
Expand Down Expand Up @@ -2034,6 +2044,11 @@ private void buildChildren(
|| schemaElement.converted_type == ConvertedType.INTERVAL)) {
columnOrder = org.apache.parquet.schema.ColumnOrder.undefined();
}
// INT96_TIMESTAMP_ORDER is only valid for INT96 columns; ignore it anywhere else
if (columnOrder.getColumnOrderName() == ColumnOrderName.INT96_TIMESTAMP_ORDER
&& schemaElement.type != Type.INT96) {
columnOrder = org.apache.parquet.schema.ColumnOrder.undefined();
}
primitiveBuilder.columnOrder(columnOrder);
}
childBuilder = primitiveBuilder;
Expand Down Expand Up @@ -2086,14 +2101,22 @@ Repetition fromParquetRepetition(FieldRepetitionType repetition) {
return Repetition.valueOf(repetition.name());
}

private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) {
private org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(ColumnOrder columnOrder) {
if (columnOrder.isSetTYPE_ORDER()) {
return org.apache.parquet.schema.ColumnOrder.typeDefined();
}
if (columnOrder.isSetINT96_TIMESTAMP_ORDER() && readInt96TimestampStatisticsEnabled()) {
return org.apache.parquet.schema.ColumnOrder.int96TimestampOrder();
}
// The column order is not yet supported by this API
return org.apache.parquet.schema.ColumnOrder.undefined();
}

private boolean readInt96TimestampStatisticsEnabled() {
return options == null
|| options.isEnabled(ParquetInputFormat.INT96_TIMESTAMP_STATISTICS_READING_ENABLED, true);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we have an option for reading INT96 stats? Wouldn't this always be true if we know the int96 stats are using the timestamp order?

}

@Deprecated
public void writeDataPageHeader(
int uncompressedSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public InternalParquetRecordWriter(
ParquetProperties props) {
this.parquetFileWriter = parquetFileWriter;
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null");
this.schema = schema;
this.schema = ParquetFileWriter.applyInt96TimestampOrder(schema, props);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right place to fixup order. It would be better to always use the new order for INT96 when constructing schemas. When converting from file schemas, we would need to detect timestamp vs unordered, but anything going through the write path should automatically use timestamp order because the write path should always produce it.

this.extraMetaData = extraMetaData;
this.rowGroupSizeThreshold = rowGroupSize;
this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
Expand Down
Loading