Skip to content
58 changes: 41 additions & 17 deletions bson/src/main/org/bson/BsonBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,25 @@ public void pipe(final BsonReader reader) {
pipeDocument(reader, null);
}

/**
* Pipes an encoded BSON document from the given byte array to this writer.
*
* @param bytes the byte array containing the encoded BSON document
* @param offset the offset into the byte array
* @param length the length of the encoded BSON document
* @since 5.8
*/
public void pipe(final byte[] bytes, final int offset, final int length) {
checkMinDocumentSize(length);
if (getState() == State.VALUE) {
bsonOutput.writeByte(BsonType.DOCUMENT.getValue());
writeCurrentName();
}
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeBytes(bytes, offset, length);
completePipeDocument(pipedDocumentStartPosition);
Comment thread
vbabanin marked this conversation as resolved.
Comment thread
vbabanin marked this conversation as resolved.
}

@Override
public void pipe(final BsonReader reader, final List<BsonElement> extraElements) {
notNull("reader", reader);
Expand All @@ -350,14 +369,10 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
}
BsonInput bsonInput = binaryReader.getBsonInput();
int size = bsonInput.readInt32();
if (size < 5) {
throw new BsonSerializationException("Document size must be at least 5");
}
checkMinDocumentSize(size);
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeInt32(size);
byte[] bytes = new byte[size - 4];
bsonInput.readBytes(bytes);
bsonOutput.writeBytes(bytes);
Comment thread
rozza marked this conversation as resolved.
bsonInput.pipe(bsonOutput, size - 4);

binaryReader.setState(AbstractBsonReader.State.TYPE);

Expand All @@ -371,24 +386,27 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
setContext(getContext().getParentContext());
}

if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}

validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
completePipeDocument(pipedDocumentStartPosition);
} else if (extraElements != null) {
super.pipe(reader, extraElements);
} else {
super.pipe(reader);
}
}

private void completePipeDocument(final int pipedDocumentStartPosition) {
if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}
validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
}

/**
* Sets a maximum size for documents from this point.
*
Expand Down Expand Up @@ -426,6 +444,12 @@ public void reset() {
mark = null;
}

private static void checkMinDocumentSize(final int size) {
if (size < 5) {
throw new BsonSerializationException("Document size must be at least 5");
}
}

private void writeCurrentName() {
if (getContext().getContextType() == BsonContextType.ARRAY) {
int index = getContext().index++;
Expand Down
1 change: 0 additions & 1 deletion bson/src/main/org/bson/BsonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,5 +356,4 @@ public interface BsonWriter {
* @param reader The source.
*/
void pipe(BsonReader reader);

}
36 changes: 35 additions & 1 deletion bson/src/main/org/bson/RawBsonDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import static org.bson.assertions.Assertions.notNull;

/**
* An immutable BSON document that is represented using only the raw bytes.
* A BSON document that is represented using only the raw bytes.
*
* @since 3.0
*/
Expand Down Expand Up @@ -144,6 +144,40 @@ public ByteBuf getByteBuffer() {
return new ByteBufNIO(buffer);
}

/**
* Returns the byte array backing this document. The returned array may be larger than the BSON document itself;
* only the range from {@link #getByteOffset()} to {@code getByteOffset() + }{@link #getByteLength()} contains
* valid document bytes. Changes to the returned array will be reflected in this document.
Comment thread
rozza marked this conversation as resolved.
*
* @return the backing byte array
* @since 5.8
* @see #getByteOffset()
* @see #getByteLength()
*/
public byte[] getBackingArray() {
return bytes;
}
Comment thread
vbabanin marked this conversation as resolved.
Comment thread
vbabanin marked this conversation as resolved.

/**
* Returns the offset into the {@linkplain #getBackingArray() backing byte array} where this document starts.
*
* @return the offset
* @since 5.8
*/
public int getByteOffset() {
return offset;
}

/**
* Returns the length of this document within the {@linkplain #getBackingArray() backing byte array}.
*
* @return the length
* @since 5.8
*/
public int getByteLength() {
return length;
}

/**
* Decode this into a document.
*
Expand Down
13 changes: 11 additions & 2 deletions bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,17 @@ public RawBsonDocumentCodec() {

@Override
public void encode(final BsonWriter writer, final RawBsonDocument value, final EncoderContext encoderContext) {
try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) {
writer.pipe(reader);
if (writer instanceof BsonBinaryWriter) {
// Fast path. The pipe method should ideally exist on BsonWriter, but adding it as
// abstract would be a breaking change, and adding it as a default method would force
// BsonWriter to depend on BsonBinaryReader/ByteBufferBsonInput, violating the
// interface's abstraction.
// TODO JAVA-6211 move pipe(byte[], int, int) to BsonWriter to remove this instanceof.
((BsonBinaryWriter) writer).pipe(value.getBackingArray(), value.getByteOffset(), value.getByteLength());
} else {
Comment thread
vbabanin marked this conversation as resolved.
try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) {
writer.pipe(reader);
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions bson/src/main/org/bson/io/BsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ public interface BsonInput extends Closeable {
*/
boolean hasRemaining();

/**
* Pipes the specified number of bytes from {@linkplain BsonInput this} input to the given {@linkplain BsonOutput output}.
*
* @param output the output to pipe to
* @param numBytes the number of bytes to pipe
* @since 5.8
*/
default void pipe(BsonOutput output, int numBytes) {
byte[] bytes = new byte[numBytes];
readBytes(bytes);
output.writeBytes(bytes);
}

@Override
void close();
}
18 changes: 18 additions & 0 deletions bson/src/main/org/bson/io/ByteBufferBsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ public boolean hasRemaining() {
return buffer.hasRemaining();
}

@Override
public void pipe(final BsonOutput output, final int numBytes) {
ensureOpen();
Comment thread
vbabanin marked this conversation as resolved.
ensureAvailable(numBytes);

if (buffer.isBackedByArray()) {
int position = buffer.position();
int arrayOffset = buffer.arrayOffset();
output.writeBytes(buffer.array(), arrayOffset + position, numBytes);
buffer.position(position + numBytes);
} else {
// Fallback: use temporary buffer for non-array-backed buffers
byte[] temp = new byte[numBytes];
buffer.get(temp);
output.writeBytes(temp);
Comment thread
rozza marked this conversation as resolved.
}
Comment thread
vbabanin marked this conversation as resolved.
}

@Override
public void close() {
buffer.release();
Expand Down
27 changes: 27 additions & 0 deletions bson/src/test/unit/org/bson/BsonBinaryWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,34 @@ public void testPipeOfDocumentWithInvalidSize() {
// expected
}
}
}

@Test
public void testPipeOfRawBytes() {
BasicOutputBuffer sourceBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter sourceWriter = new BsonBinaryWriter(sourceBuffer)) {
sourceWriter.writeStartDocument();
sourceWriter.writeBoolean("a", true);
sourceWriter.writeEndDocument();
}
byte[] documentBytes = sourceBuffer.toByteArray();

BasicOutputBuffer destBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter destWriter = new BsonBinaryWriter(destBuffer)) {
destWriter.pipe(documentBytes, 0, documentBytes.length);
}

assertArrayEquals(documentBytes, destBuffer.toByteArray());
}

@Test
public void testPipeOfRawBytesWithInvalidSize() {
byte[] bytes = {4, 0, 0, 0}; // minimum document size is 5
Comment thread
rozza marked this conversation as resolved.

BasicOutputBuffer newBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter newWriter = new BsonBinaryWriter(newBuffer)) {
assertThrows(BsonSerializationException.class, () -> newWriter.pipe(bytes, 0, bytes.length));
}
}

// CHECKSTYLE:OFF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class RawBsonDocumentSpecification extends Specification {
rawDocument << createRawDocumentVariants()
}


def 'parse should through if parameter is invalid'() {
when:
RawBsonDocument.parse(null)
Expand Down
106 changes: 106 additions & 0 deletions bson/src/test/unit/org/bson/RawBsonDocumentTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bson;

import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.EncoderContext;
import org.bson.io.BasicOutputBuffer;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Arrays;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

class RawBsonDocumentTest {

private static final BsonDocument DOCUMENT = new BsonDocument()
.append("a", new BsonInt32(1))
.append("b", new BsonInt32(2))
.append("c", new BsonDocument("x", BsonBoolean.TRUE))
.append("d", new BsonArray(Arrays.asList(
new BsonDocument("y", BsonBoolean.FALSE),
new BsonArray(Arrays.asList(new BsonInt32(1))))));

private static final byte[] DOCUMENT_BYTES = encodeDocument();

static Stream<Arguments> backingArrayAccessors() {
int documentLength = DOCUMENT_BYTES.length;

Stream.Builder<Arguments> builder = Stream.builder();
builder.add(Arguments.of(createFromDocument(), 0, documentLength));
builder.add(Arguments.of(createFromByteArray(), 0, documentLength));

for (int padding = 1; padding <= 2; padding++) {
builder.add(Arguments.of(createPaddedBefore(padding), padding, documentLength));
builder.add(Arguments.of(createPaddedAfter(padding), 0, documentLength));
builder.add(Arguments.of(createPaddedBoth(padding), padding, documentLength));
}

return builder.build();
}

@ParameterizedTest(name = "{0}, expectedOffset={1}, expectedLength={2}")
@MethodSource("backingArrayAccessors")
void shouldExposeBackingArrayOffsetAndLength(final RawBsonDocument rawDocument,
final int expectedOffset,
final int expectedLength) {
assertEquals(expectedOffset, rawDocument.getByteOffset());
assertEquals(expectedLength, rawDocument.getByteLength());
assertArrayEquals(DOCUMENT_BYTES,
Arrays.copyOfRange(
rawDocument.getBackingArray(),
rawDocument.getByteOffset(),
rawDocument.getByteOffset() + rawDocument.getByteLength()));
}

private static Named<RawBsonDocument> createFromDocument() {
return Named.of("from document", new RawBsonDocument(DOCUMENT, new BsonDocumentCodec()));
}

private static Named<RawBsonDocument> createFromByteArray() {
return Named.of("from byte array", new RawBsonDocument(DOCUMENT_BYTES));
}

private static Named<RawBsonDocument> createPaddedBefore(final int padding) {
byte[] padded = new byte[DOCUMENT_BYTES.length + padding];
System.arraycopy(DOCUMENT_BYTES, 0, padded, padding, DOCUMENT_BYTES.length);
return Named.of("padded before " + padding, new RawBsonDocument(padded, padding, DOCUMENT_BYTES.length));
}

private static Named<RawBsonDocument> createPaddedAfter(final int padding) {
byte[] padded = new byte[DOCUMENT_BYTES.length + padding];
System.arraycopy(DOCUMENT_BYTES, 0, padded, 0, DOCUMENT_BYTES.length);
return Named.of("padded after " + padding, new RawBsonDocument(padded, 0, DOCUMENT_BYTES.length));
}

private static Named<RawBsonDocument> createPaddedBoth(final int padding) {
byte[] padded = new byte[DOCUMENT_BYTES.length + padding * 2];
System.arraycopy(DOCUMENT_BYTES, 0, padded, padding, DOCUMENT_BYTES.length);
return Named.of("padded both " + padding, new RawBsonDocument(padded, padding, DOCUMENT_BYTES.length));
}

private static byte[] encodeDocument() {
BasicOutputBuffer buffer = new BasicOutputBuffer();
new BsonDocumentCodec().encode(new BsonBinaryWriter(buffer), DOCUMENT, EncoderContext.builder().build());
return Arrays.copyOf(buffer.getInternalBuffer(), buffer.getPosition());
}
}
Loading