Skip to content

Commit b237591

Browse files
authored
Merge branch 'main' into feat/dataframe-sort-repartition
2 parents 209b076 + 7f49541 commit b237591

37 files changed

Lines changed: 4956 additions & 65 deletions

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ under the License.
5757
<groupId>com.google.protobuf</groupId>
5858
<artifactId>protobuf-java</artifactId>
5959
</dependency>
60+
<dependency>
61+
<groupId>org.apache.avro</groupId>
62+
<artifactId>avro</artifactId>
63+
<scope>test</scope>
64+
</dependency>
6065
</dependencies>
6166

6267
<build>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.datafusion;
21+
22+
import org.apache.arrow.vector.types.pojo.Schema;
23+
import org.apache.datafusion.protobuf.AvroReadOptionsProto;
24+
25+
/**
26+
* Configuration knobs for Avro sources passed to {@link SessionContext#registerAvro(String, String,
27+
* AvroReadOptions)} and {@link SessionContext#readAvro(String, AvroReadOptions)}.
28+
*
29+
* <p>Mirrors the subset of DataFusion's {@code AvroReadOptions} that maps onto the Java surface
30+
* today: {@code fileExtension} (default {@code ".avro"}) and an explicit Arrow {@code schema} that
31+
* bypasses on-read schema inference. {@code tablePartitionCols} is intentionally deferred -- no
32+
* other Java reader exposes Hive-style partitioning yet.
33+
*
34+
* <p>Avro carries its own per-block compression (snappy, deflate, bzip2, xz, zstandard) inside the
35+
* object container itself, negotiated when the file is written, so unlike CSV / NDJSON there is no
36+
* {@code FileCompressionType} setter.
37+
*/
38+
public final class AvroReadOptions {
39+
40+
private String fileExtension = ".avro";
41+
private Schema schema;
42+
43+
public AvroReadOptions fileExtension(String ext) {
44+
this.fileExtension = ext;
45+
return this;
46+
}
47+
48+
public AvroReadOptions schema(Schema schema) {
49+
this.schema = schema;
50+
return this;
51+
}
52+
53+
byte[] toBytes() {
54+
return AvroReadOptionsProto.newBuilder().setFileExtension(fileExtension).build().toByteArray();
55+
}
56+
57+
Schema schema() {
58+
return schema;
59+
}
60+
}

core/src/main/java/org/apache/datafusion/DataFrame.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@
1919

2020
package org.apache.datafusion;
2121

22+
import java.io.ByteArrayInputStream;
23+
import java.io.IOException;
24+
import java.nio.channels.Channels;
25+
2226
import org.apache.arrow.c.ArrowArrayStream;
2327
import org.apache.arrow.c.Data;
2428
import org.apache.arrow.memory.BufferAllocator;
2529
import org.apache.arrow.vector.ipc.ArrowReader;
30+
import org.apache.arrow.vector.ipc.ReadChannel;
31+
import org.apache.arrow.vector.ipc.message.MessageSerializer;
32+
import org.apache.arrow.vector.types.pojo.Schema;
2633

2734
/**
2835
* A lazy representation of a query plan, mirroring the Rust DataFusion {@code DataFrame}. Created
@@ -106,6 +113,77 @@ public ArrowReader executeStream(BufferAllocator allocator) {
106113
}
107114
}
108115

116+
/**
117+
* Return the Arrow {@link Schema} of this DataFrame's output. Non-consuming: the receiver remains
118+
* usable and must still be closed independently. Schema inspection does not execute the plan.
119+
*
120+
* <p>The schema is transferred via Arrow IPC; no {@link BufferAllocator} is required because a
121+
* schema carries no buffer data.
122+
*/
123+
public Schema schema() {
124+
if (nativeHandle == 0) {
125+
throw new IllegalStateException("DataFrame is closed or already collected");
126+
}
127+
byte[] ipcBytes = schemaIpc(nativeHandle);
128+
try {
129+
return MessageSerializer.deserializeSchema(
130+
new ReadChannel(Channels.newChannel(new ByteArrayInputStream(ipcBytes))));
131+
} catch (IOException e) {
132+
throw new RuntimeException("Failed to deserialize IPC schema", e);
133+
}
134+
}
135+
136+
/**
137+
* Return a new DataFrame whose rows describe the plan that would execute this DataFrame.
138+
* Non-consuming: the receiver remains usable and must still be closed independently.
139+
*
140+
* <p>With {@code verbose=false} and {@code analyze=false} (the cheap, lazy variant), the result
141+
* contains the logical plan only. {@code verbose=true} adds optimised-plan and physical-plan
142+
* rows; {@code analyze=true} runs the plan and attaches per-operator metrics. Render via {@link
143+
* #show()} or {@link #collect(BufferAllocator)}.
144+
*/
145+
public DataFrame explain(boolean verbose, boolean analyze) {
146+
if (nativeHandle == 0) {
147+
throw new IllegalStateException("DataFrame is closed or already collected");
148+
}
149+
return new DataFrame(explainPlan(nativeHandle, verbose, analyze));
150+
}
151+
152+
/**
153+
* Materialise this DataFrame into an in-memory table and return a new DataFrame that scans it.
154+
* Non-consuming: the receiver remains usable and must still be closed independently.
155+
*
156+
* <p>Executes the plan eagerly: the entire result set is held in native memory until the returned
157+
* DataFrame is closed. Suitable for intermediate results that will be reused across multiple
158+
* downstream queries.
159+
*
160+
* @throws RuntimeException if execution fails.
161+
*/
162+
public DataFrame cache() {
163+
if (nativeHandle == 0) {
164+
throw new IllegalStateException("DataFrame is closed or already collected");
165+
}
166+
return new DataFrame(cachePlan(nativeHandle));
167+
}
168+
169+
/**
170+
* Compute summary statistics (count, null_count, mean, std, min, max, median) over this
171+
* DataFrame's columns and return them as a new DataFrame. Non-consuming: the receiver remains
172+
* usable and must still be closed independently.
173+
*
174+
* <p>Executes the plan: DataFusion runs seven aggregate sub-plans against this DataFrame to build
175+
* the summary table. Numeric columns receive every statistic; non-numeric columns receive {@code
176+
* count} / {@code null_count} / {@code min} / {@code max} where applicable.
177+
*
178+
* @throws RuntimeException if execution fails.
179+
*/
180+
public DataFrame describe() {
181+
if (nativeHandle == 0) {
182+
throw new IllegalStateException("DataFrame is closed or already collected");
183+
}
184+
return new DataFrame(describePlan(nativeHandle));
185+
}
186+
109187
/** Execute the plan and return the number of rows. */
110188
public long count() {
111189
if (nativeHandle == 0) {
@@ -399,6 +477,38 @@ public void writeCsv(String path, CsvWriteOptions options) {
399477
writeCsvWithOptions(nativeHandle, path, options.toBytes());
400478
}
401479

480+
/**
481+
* Materialize this DataFrame as newline-delimited JSON at {@code path}. The path is treated as a
482+
* directory unless overridden via {@link JsonWriteOptions#singleFileOutput(boolean)}. The
483+
* receiver remains usable and must still be closed independently.
484+
*
485+
* @throws RuntimeException if the write fails.
486+
*/
487+
public void writeJson(String path) {
488+
writeJson(path, new JsonWriteOptions());
489+
}
490+
491+
/**
492+
* Materialize this DataFrame as newline-delimited JSON at {@code path} with the supplied {@link
493+
* JsonWriteOptions}. The receiver remains usable and must still be closed independently.
494+
*
495+
* @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}.
496+
* @throws RuntimeException if the write fails (path inaccessible, invalid compression spec,
497+
* etc.).
498+
*/
499+
public void writeJson(String path, JsonWriteOptions options) {
500+
if (nativeHandle == 0) {
501+
throw new IllegalStateException("DataFrame is closed or already collected");
502+
}
503+
if (path == null) {
504+
throw new IllegalArgumentException("writeJson path must be non-null");
505+
}
506+
if (options == null) {
507+
throw new IllegalArgumentException("writeJson options must be non-null");
508+
}
509+
writeJsonWithOptions(nativeHandle, path, options.toBytes());
510+
}
511+
402512
@Override
403513
public void close() {
404514
if (nativeHandle != 0) {
@@ -415,6 +525,14 @@ public void close() {
415525

416526
private static native long countRows(long handle);
417527

528+
private static native byte[] schemaIpc(long handle);
529+
530+
private static native long explainPlan(long handle, boolean verbose, boolean analyze);
531+
532+
private static native long cachePlan(long handle);
533+
534+
private static native long describePlan(long handle);
535+
418536
private static native void showDataFrame(long handle);
419537

420538
private static native void showDataFrameWithLimit(long handle, int limit);
@@ -450,4 +568,6 @@ private static native void writeParquetWithOptions(
450568
boolean singleFileOutputValue);
451569

452570
private static native void writeCsvWithOptions(long handle, String path, byte[] optionsBytes);
571+
572+
private static native void writeJsonWithOptions(long handle, String path, byte[] optionsBytes);
453573
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.datafusion;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
/**
26+
* Configuration knobs for writing JSON, passed to {@link DataFrame#writeJson(String,
27+
* JsonWriteOptions)}.
28+
*
29+
* <p>Mirrors a subset of DataFusion's {@code DataFrameWriteOptions} and the writer-side {@code
30+
* JsonOptions}. All setters return {@code this} for fluent chaining. Defaults: every field {@code
31+
* null} or empty (meaning the DataFusion default is used).
32+
*
33+
* <p>Path semantics: when {@link #singleFileOutput(boolean)} is {@code true}, the path passed to
34+
* {@code writeJson} is the literal output filename. When left unset (the default) and there are no
35+
* partition columns, the path is treated as a directory that DataFusion populates with one or more
36+
* part-files.
37+
*
38+
* <p>The output is always newline-delimited JSON (NDJSON). DataFusion's JSON writer does not emit
39+
* the bracketed array form, so there is no toggle for it here.
40+
*
41+
* <p>Compression reuses {@link FileCompressionType} -- the same codec set ({@code UNCOMPRESSED},
42+
* {@code GZIP}, {@code BZIP2}, {@code XZ}, {@code ZSTD}) the read side and the CSV writer accept.
43+
*/
44+
public final class JsonWriteOptions {
45+
46+
private Boolean singleFileOutput;
47+
private final List<String> partitionCols = new ArrayList<>();
48+
private FileCompressionType fileCompressionType;
49+
50+
/**
51+
* When {@code true}, write to a single file at the supplied path. When left unset (the default)
52+
* and no partition columns are configured, the path is treated as a directory and DataFusion
53+
* writes one or more part-files.
54+
*/
55+
public JsonWriteOptions singleFileOutput(boolean v) {
56+
this.singleFileOutput = v;
57+
return this;
58+
}
59+
60+
/**
61+
* Hive-style partition columns. Each column listed here is removed from the data rows and encoded
62+
* into the directory layout (one subdirectory per distinct value). Mutually exclusive with {@link
63+
* #singleFileOutput(boolean)} -- DataFusion rejects the combination at write time.
64+
*/
65+
public JsonWriteOptions partitionCols(String... cols) {
66+
this.partitionCols.clear();
67+
for (String c : cols) {
68+
this.partitionCols.add(c);
69+
}
70+
return this;
71+
}
72+
73+
/** Output compression codec. Defaults to uncompressed. */
74+
public JsonWriteOptions fileCompressionType(FileCompressionType t) {
75+
this.fileCompressionType = t;
76+
return this;
77+
}
78+
79+
byte[] toBytes() {
80+
org.apache.datafusion.protobuf.JsonWriteOptionsProto.Builder b =
81+
org.apache.datafusion.protobuf.JsonWriteOptionsProto.newBuilder();
82+
if (singleFileOutput != null) {
83+
b.setSingleFileOutput(singleFileOutput);
84+
}
85+
b.addAllPartitionCols(partitionCols);
86+
if (fileCompressionType != null) {
87+
b.setFileCompressionType(FileCompressionTypes.toProto(fileCompressionType));
88+
}
89+
return b.build().toByteArray();
90+
}
91+
}

0 commit comments

Comments
 (0)