Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions bson/src/main/org/bson/BsonBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,26 @@ public void pipe(final BsonReader reader) {
pipeDocument(reader, null);
}

/**
* Pipes a raw BSON document from the given byte array to this writer, writing the bytes directly to the
* output without intermediate object allocation.
*
* @param bytes the byte array containing the BSON document
* @param offset the offset into the byte array
* @param length the length of the BSON document
* @since 5.8
*/
public void pipe(final byte[] bytes, final int offset, final int length) {
checkMinDocumentSize(length);
if (getState() == State.VALUE) {
bsonOutput.writeByte(BsonType.DOCUMENT.getValue());
writeCurrentName();
}
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeBytes(bytes, offset, length);
completePipeDocument(pipedDocumentStartPosition);
Comment thread
vbabanin marked this conversation as resolved.
Comment thread
vbabanin marked this conversation as resolved.
}

@Override
public void pipe(final BsonReader reader, final List<BsonElement> extraElements) {
notNull("reader", reader);
Expand All @@ -350,14 +370,10 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
}
BsonInput bsonInput = binaryReader.getBsonInput();
int size = bsonInput.readInt32();
if (size < 5) {
throw new BsonSerializationException("Document size must be at least 5");
}
checkMinDocumentSize(size);
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeInt32(size);
byte[] bytes = new byte[size - 4];
bsonInput.readBytes(bytes);
bsonOutput.writeBytes(bytes);
Comment on lines -358 to -360
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids an extra temporary byte[] allocation by reading directly into the target buffer, reducing both allocation pressure and byte-copy/processing overhead.

bsonInput.pipe(bsonOutput, size - 4);

binaryReader.setState(AbstractBsonReader.State.TYPE);

Expand All @@ -371,24 +387,27 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
setContext(getContext().getParentContext());
}

if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}

validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
completePipeDocument(pipedDocumentStartPosition);
} else if (extraElements != null) {
super.pipe(reader, extraElements);
} else {
super.pipe(reader);
}
}

private void completePipeDocument(final int pipedDocumentStartPosition) {
if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}
validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
}

/**
* Sets a maximum size for documents from this point.
*
Expand Down Expand Up @@ -426,6 +445,12 @@ public void reset() {
mark = null;
}

private static void checkMinDocumentSize(final int size) {
if (size < 5) {
throw new BsonSerializationException("Document size must be at least 5");
}
}

private void writeCurrentName() {
if (getContext().getContextType() == BsonContextType.ARRAY) {
int index = getContext().index++;
Expand Down
1 change: 0 additions & 1 deletion bson/src/main/org/bson/BsonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,5 +356,4 @@ public interface BsonWriter {
* @param reader The source.
*/
void pipe(BsonReader reader);

}
30 changes: 30 additions & 0 deletions bson/src/main/org/bson/RawBsonDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,36 @@ public ByteBuf getByteBuffer() {
return new ByteBufNIO(buffer);
}

/**
* Returns the byte array backing this document. Changes to the returned array will be reflected in this document.
*
* @return the backing byte array
* @since 5.8
*/
public byte[] getBackingArray() {
return bytes;
}
Comment thread
vbabanin marked this conversation as resolved.

/**
* Returns the offset into the {@linkplain #getBackingArray() backing byte array} where this document starts.
*
* @return the offset
* @since 5.8
*/
public int getByteOffset() {
return offset;
}

/**
* Returns the length of this document within the {@linkplain #getBackingArray() backing byte array}.
*
* @return the length
* @since 5.8
*/
public int getByteLength() {
return length;
}

/**
* Decode this into a document.
*
Expand Down
13 changes: 11 additions & 2 deletions bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,17 @@ public RawBsonDocumentCodec() {

@Override
public void encode(final BsonWriter writer, final RawBsonDocument value, final EncoderContext encoderContext) {
try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) {
writer.pipe(reader);
if (writer instanceof BsonBinaryWriter) {
// Fast path. The pipe method should ideally exist on BsonWriter, but adding it as
// abstract would be a breaking change, and adding it as a default method would force
// BsonWriter to depend on BsonBinaryReader/ByteBufferBsonInput, violating the
// interface's abstraction.
// TODO JAVA-6211 move pipe(byte[], int, int) to BsonWriter to remove this instanceof.
((BsonBinaryWriter) writer).pipe(value.getBackingArray(), value.getByteOffset(), value.getByteLength());
} else {
Comment on lines 42 to +50
try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) {
writer.pipe(reader);
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions bson/src/main/org/bson/io/BsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ public interface BsonInput extends Closeable {
*/
boolean hasRemaining();

/**
* Pipes the specified number of bytes from {@linkplain BsonInput this} input to the given {@linkplain BsonOutput output}.
*
* @param output the output to pipe to
* @param numBytes the number of bytes to pipe
* @since 5.8
*/
void pipe(BsonOutput output, int numBytes);
Comment thread
vbabanin marked this conversation as resolved.
Outdated
Comment thread
rozza marked this conversation as resolved.
Outdated

@Override
void close();
}
18 changes: 18 additions & 0 deletions bson/src/main/org/bson/io/ByteBufferBsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ public boolean hasRemaining() {
return buffer.hasRemaining();
}

@Override
public void pipe(final BsonOutput output, final int numBytes) {
ensureOpen();
Comment thread
vbabanin marked this conversation as resolved.
ensureAvailable(numBytes);

if (buffer.isBackedByArray()) {
int position = buffer.position();
int arrayOffset = buffer.arrayOffset();
output.writeBytes(buffer.array(), arrayOffset + position, numBytes);
buffer.position(position + numBytes);
} else {
// Fallback: use temporary buffer for non-array-backed buffers
byte[] temp = new byte[numBytes];
buffer.get(temp);
output.writeBytes(temp);
}
Comment thread
vbabanin marked this conversation as resolved.
}

@Override
public void close() {
buffer.release();
Expand Down
27 changes: 27 additions & 0 deletions bson/src/test/unit/org/bson/BsonBinaryWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,34 @@ public void testPipeOfDocumentWithInvalidSize() {
// expected
}
}
}

@Test
public void testPipeOfRawBytes() {
BasicOutputBuffer sourceBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter sourceWriter = new BsonBinaryWriter(sourceBuffer)) {
sourceWriter.writeStartDocument();
sourceWriter.writeBoolean("a", true);
sourceWriter.writeEndDocument();
}
byte[] documentBytes = sourceBuffer.toByteArray();

BasicOutputBuffer destBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter destWriter = new BsonBinaryWriter(destBuffer)) {
destWriter.pipe(documentBytes, 0, documentBytes.length);
}

assertArrayEquals(documentBytes, destBuffer.toByteArray());
}

@Test
public void testPipeOfRawBytesWithInvalidSize() {
byte[] bytes = {4, 0, 0, 0}; // minimum document size is 5
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the validation for size of 5, just curious why is it 5 ?


BasicOutputBuffer newBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter newWriter = new BsonBinaryWriter(newBuffer)) {
assertThrows(BsonSerializationException.class, () -> newWriter.pipe(bytes, 0, bytes.length));
}
}

// CHECKSTYLE:OFF
Expand Down
17 changes: 17 additions & 0 deletions bson/src/test/unit/org/bson/RawBsonDocumentSpecification.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ class RawBsonDocumentSpecification extends Specification {
rawDocument << createRawDocumentVariants()
}


def 'getBackingArray, getByteOffset and getByteLength should expose the document range'() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be mistaken but I saw a lot of PRs that also remove groovy spec class, can we move this test case to java test instead ?

expect:
rawDocument.getByteOffset() == expectedOffset
rawDocument.getByteLength() == expectedLength
Arrays.copyOfRange(
rawDocument.getBackingArray(),
rawDocument.getByteOffset(),
rawDocument.getByteOffset() + rawDocument.getByteLength()) == getBytesFromDocument()

where:
rawDocument | expectedOffset | expectedLength
createRawDocumenFromDocument() | 0 | 66
createRawDocumentFromByteArray() | 0 | 66
createRawDocumentFromByteArrayOffsetLength()| 1 | 66
}

def 'parse should through if parameter is invalid'() {
when:
RawBsonDocument.parse(null)
Expand Down