Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ public static LargeListVector empty(String name, BufferAllocator allocator) {
/** The maximum index that is actually set. */
private int lastSet;

/**
* Temporary offset buffer used only for serialization of a never-allocated vector (see {@link
* #getFieldBuffers()}). Owned by this vector so it is released in {@link #clear()} rather than
* leaked.
*/
private ArrowBuf serializationOffsetBuffer;

/**
* Constructs a new instance.
*
Expand Down Expand Up @@ -134,6 +141,7 @@ public LargeListVector(Field field, BufferAllocator allocator, CallBack callBack
BitVectorHelper.getValidityBufferSizeFromCount(INITIAL_VALUE_ALLOCATION);
this.lastSet = -1;
this.offsetBuffer = allocator.getEmpty();
this.serializationOffsetBuffer = allocator.getEmpty();
this.vector = vector == null ? DEFAULT_DATA_VECTOR : vector;
this.valueCount = 0;
}
Expand Down Expand Up @@ -277,8 +285,26 @@ public List<ArrowBuf> getFieldBuffers() {
List<ArrowBuf> result = new ArrayList<>(2);
setReaderAndWriterIndex();
result.add(validityBuffer);
result.add(offsetBuffer);

// A never-allocated vector has an empty (capacity 0) offset buffer, yet setReaderAndWriterIndex
// marks OFFSET_WIDTH bytes as written so that serializers still emit offset[0] = 0 (an empty
// offset buffer would crash IPC readers in other libraries). Serializers read `writerIndex`
// bytes, so we must hand them a properly sized buffer. Mirror exportCDataBuffers() by
// substituting a temporary buffer instead of mutating this.offsetBuffer, which validation and
// subsequent writes still rely on being empty. The temporary is owned by this vector and
// released in clear()/close(), so it is not leaked.
if (offsetBuffer.capacity() == 0 && offsetBuffer.writerIndex() > 0) {
// Allocate directly rather than via allocateOffsetBuffer(), which would overwrite
// offsetAllocationSizeInBytes and shrink a later allocateNew()'s offset buffer.
final long size = offsetBuffer.writerIndex();
serializationOffsetBuffer = releaseBuffer(serializationOffsetBuffer);
serializationOffsetBuffer = allocator.buffer(size);
serializationOffsetBuffer.readerIndex(0);
serializationOffsetBuffer.setZero(0, serializationOffsetBuffer.capacity());
serializationOffsetBuffer.writerIndex(size);
result.add(serializationOffsetBuffer);
} else {
result.add(offsetBuffer);
}
return result;
}

Expand Down Expand Up @@ -805,6 +831,7 @@ public void clear() {
valueCount = 0;
super.clear();
validityBuffer = releaseBuffer(validityBuffer);
serializationOffsetBuffer = releaseBuffer(serializationOffsetBuffer);
lastSet = -1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ public static ListVector empty(String name, BufferAllocator allocator) {
/** The maximum index that is actually set. */
protected int lastSet;

/**
* Temporary offset buffer used only for serialization of a never-allocated vector (see {@link
* #getFieldBuffers()}). Owned by this vector so it is released in {@link #clear()} rather than
* leaked.
*/
private ArrowBuf serializationOffsetBuffer;

/**
* Constructs a new instance.
*
Expand Down Expand Up @@ -110,6 +117,7 @@ public ListVector(Field field, BufferAllocator allocator, CallBack callBack) {
this.validityAllocationSizeInBytes =
BitVectorHelper.getValidityBufferSizeFromCount(INITIAL_VALUE_ALLOCATION);
this.lastSet = -1;
this.serializationOffsetBuffer = allocator.getEmpty();
}

@Override
Expand Down Expand Up @@ -235,8 +243,26 @@ public List<ArrowBuf> getFieldBuffers() {
List<ArrowBuf> result = new ArrayList<>(2);
setReaderAndWriterIndex();
result.add(validityBuffer);
result.add(offsetBuffer);

// A never-allocated vector has an empty (capacity 0) offset buffer, yet setReaderAndWriterIndex
// marks OFFSET_WIDTH bytes as written so that serializers still emit offset[0] = 0 (an empty
// offset buffer would crash IPC readers in other libraries). Serializers read `writerIndex`
// bytes, so we must hand them a properly sized buffer. Mirror exportCDataBuffers() by
// substituting a temporary buffer instead of mutating this.offsetBuffer, which validation and
// subsequent writes still rely on being empty. The temporary is owned by this vector and
// released in clear()/close(), so it is not leaked.
if (offsetBuffer.capacity() == 0 && offsetBuffer.writerIndex() > 0) {
// Allocate directly rather than via allocateOffsetBuffer(), which would overwrite
// offsetAllocationSizeInBytes and shrink a later allocateNew()'s offset buffer.
final long size = offsetBuffer.writerIndex();
serializationOffsetBuffer = releaseBuffer(serializationOffsetBuffer);
serializationOffsetBuffer = allocator.buffer(size);
serializationOffsetBuffer.readerIndex(0);
serializationOffsetBuffer.setZero(0, serializationOffsetBuffer.capacity());
serializationOffsetBuffer.writerIndex(size);
result.add(serializationOffsetBuffer);
} else {
result.add(offsetBuffer);
}
return result;
}

Expand Down Expand Up @@ -652,6 +678,7 @@ public MinorType getMinorType() {
public void clear() {
super.clear();
validityBuffer = releaseBuffer(validityBuffer);
serializationOffsetBuffer = releaseBuffer(serializationOffsetBuffer);
lastSet = -1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,51 @@ public void testEmptyLargeListOffsetBuffer() {
}
}

@Test
public void testEmptyLargeListOffsetBufferWithoutAllocate() {
// Regression test for the Arrow 19 IOOBE: a never-allocated LargeListVector must still produce
// a valid offset buffer for serialization after setValueCount(0). getFieldBuffers() substitutes
// a properly sized temporary offset buffer (holding offset[0] = 0) without mutating the
// vector's own capacity-0 offset buffer.
try (LargeListVector list = LargeListVector.empty("list", allocator)) {
list.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
list.setValueCount(0); // no allocateNew() — offset buffer starts at capacity 0

List<ArrowBuf> buffers = list.getFieldBuffers();
ArrowBuf offsetBuffer = buffers.get(1);
assertTrue(
offsetBuffer.readableBytes() >= LargeListVector.OFFSET_WIDTH,
"Offset buffer should have at least "
+ LargeListVector.OFFSET_WIDTH
+ " bytes for offset[0]");
assertEquals(0L, offsetBuffer.getLong(0));
// The vector's own offset buffer is left untouched so subsequent writes still work.
assertEquals(0, list.getOffsetBuffer().capacity());
}
}

@Test
public void testEmptyLargeListGetBuffersWithoutAllocate() {
// Exercises the IPC serialization entry points — getBuffers(false) and getFieldBuffers(), the
// latter being the path that produced the original Netty IOOBE.
try (LargeListVector list = LargeListVector.empty("list", allocator)) {
list.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
list.setValueCount(0);

// getBufferSize() returns 0 for valueCount==0, so getBuffers returns an empty array and must
// not crash on the never-allocated offset buffer.
ArrowBuf[] bufs = list.getBuffers(false);
assertEquals(0, bufs.length);

// getFieldBuffers() must hand serializers a readable offset buffer holding offset[0] = 0.
List<ArrowBuf> fieldBuffers = list.getFieldBuffers();
assertTrue(
fieldBuffers.get(1).readableBytes() >= LargeListVector.OFFSET_WIDTH,
"Offset buffer should be readable for >= " + LargeListVector.OFFSET_WIDTH + " bytes");
assertEquals(0L, fieldBuffers.get(1).getLong(0));
}
}

private void writeIntValues(UnionLargeListWriter writer, int[] values) {
writer.startList();
for (int v : values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,54 @@ public void testEmptyListOffsetBuffer() {
}
}

@Test
public void testEmptyListOffsetBufferWithoutAllocate() {
// Regression test for the Arrow 19 IOOBE: a never-allocated ListVector must still produce
// a valid offset buffer for serialization after setValueCount(0). getFieldBuffers() substitutes
// a properly sized temporary offset buffer (holding offset[0] = 0) without mutating the
// vector's own capacity-0 offset buffer.
try (ListVector list = ListVector.empty("list", allocator)) {
list.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
list.setValueCount(0); // no allocateNew() — offset buffer starts at capacity 0

List<ArrowBuf> buffers = list.getFieldBuffers();
ArrowBuf offsetBuffer = buffers.get(1);
assertTrue(
offsetBuffer.readableBytes() >= BaseRepeatedValueVector.OFFSET_WIDTH,
"Offset buffer should have at least "
+ BaseRepeatedValueVector.OFFSET_WIDTH
+ " bytes for offset[0]");
assertEquals(0, offsetBuffer.getInt(0));
// The vector's own offset buffer is left untouched so subsequent writes still work.
assertEquals(0, list.getOffsetBuffer().capacity());
}
}

@Test
public void testEmptyListGetBuffersWithoutAllocate() {
// Exercises the IPC serialization entry points — getBuffers(false) and getFieldBuffers(), the
// latter being the path that produced the original Netty IOOBE via
// VectorUnloader -> NettyArrowBuf.unwrapBuffer().
try (ListVector list = ListVector.empty("list", allocator)) {
list.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
list.setValueCount(0);

// getBufferSize() returns 0 for valueCount==0, so getBuffers returns an empty array and must
// not crash on the never-allocated offset buffer.
ArrowBuf[] bufs = list.getBuffers(false);
assertEquals(0, bufs.length);

// getFieldBuffers() must hand serializers a readable offset buffer holding offset[0] = 0.
List<ArrowBuf> fieldBuffers = list.getFieldBuffers();
assertTrue(
fieldBuffers.get(1).readableBytes() >= BaseRepeatedValueVector.OFFSET_WIDTH,
"Offset buffer should be readable for >= "
+ BaseRepeatedValueVector.OFFSET_WIDTH
+ " bytes");
assertEquals(0, fieldBuffers.get(1).getInt(0));
}
}

private void writeIntValues(UnionListWriter writer, int[] values) {
writer.startList();
for (int v : values) {
Expand Down
Loading