diff --git a/bson/src/main/org/bson/BsonBinaryWriter.java b/bson/src/main/org/bson/BsonBinaryWriter.java index 20e73d97d44..e57d061ddc4 100644 --- a/bson/src/main/org/bson/BsonBinaryWriter.java +++ b/bson/src/main/org/bson/BsonBinaryWriter.java @@ -334,6 +334,26 @@ public void pipe(final BsonReader reader) { pipeDocument(reader, null); } + /** + * Pipes a raw BSON document from the given byte array to this writer, writing the bytes directly to the + * output without intermediate object allocation. + * + * @param bytes the byte array containing the BSON document + * @param offset the offset into the byte array + * @param length the length of the BSON document + * @since 5.8 + */ + public void pipe(final byte[] bytes, final int offset, final int length) { + checkMinDocumentSize(length); + if (getState() == State.VALUE) { + bsonOutput.writeByte(BsonType.DOCUMENT.getValue()); + writeCurrentName(); + } + int pipedDocumentStartPosition = bsonOutput.getPosition(); + bsonOutput.writeBytes(bytes, offset, length); + completePipeDocument(pipedDocumentStartPosition); + } + @Override public void pipe(final BsonReader reader, final List extraElements) { notNull("reader", reader); @@ -350,14 +370,10 @@ private void pipeDocument(final BsonReader reader, final List extra } BsonInput bsonInput = binaryReader.getBsonInput(); int size = bsonInput.readInt32(); - if (size < 5) { - throw new BsonSerializationException("Document size must be at least 5"); - } + checkMinDocumentSize(size); int pipedDocumentStartPosition = bsonOutput.getPosition(); bsonOutput.writeInt32(size); - byte[] bytes = new byte[size - 4]; - bsonInput.readBytes(bytes); - bsonOutput.writeBytes(bytes); + bsonInput.pipe(bsonOutput, size - 4); binaryReader.setState(AbstractBsonReader.State.TYPE); @@ -371,17 +387,7 @@ private void pipeDocument(final BsonReader reader, final List extra setContext(getContext().getParentContext()); } - if (getContext() == null) { - setState(State.DONE); - } else { - if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) { - backpatchSize(); // size of the JavaScript with scope value - setContext(getContext().getParentContext()); - } - setState(getNextState()); - } - - validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition); + completePipeDocument(pipedDocumentStartPosition); } else if (extraElements != null) { super.pipe(reader, extraElements); } else { @@ -389,6 +395,19 @@ private void pipeDocument(final BsonReader reader, final List extra } } + private void completePipeDocument(final int pipedDocumentStartPosition) { + if (getContext() == null) { + setState(State.DONE); + } else { + if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) { + backpatchSize(); // size of the JavaScript with scope value + setContext(getContext().getParentContext()); + } + setState(getNextState()); + } + validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition); + } + /** * Sets a maximum size for documents from this point. * @@ -426,6 +445,12 @@ public void reset() { mark = null; } + private static void checkMinDocumentSize(final int size) { + if (size < 5) { + throw new BsonSerializationException("Document size must be at least 5"); + } + } + private void writeCurrentName() { if (getContext().getContextType() == BsonContextType.ARRAY) { int index = getContext().index++; diff --git a/bson/src/main/org/bson/BsonWriter.java b/bson/src/main/org/bson/BsonWriter.java index c3da5dc6059..1d2e5055a3c 100644 --- a/bson/src/main/org/bson/BsonWriter.java +++ b/bson/src/main/org/bson/BsonWriter.java @@ -356,5 +356,4 @@ public interface BsonWriter { * @param reader The source. */ void pipe(BsonReader reader); - } diff --git a/bson/src/main/org/bson/RawBsonDocument.java b/bson/src/main/org/bson/RawBsonDocument.java index eb672bcef8d..6d4c5328810 100644 --- a/bson/src/main/org/bson/RawBsonDocument.java +++ b/bson/src/main/org/bson/RawBsonDocument.java @@ -144,6 +144,36 @@ public ByteBuf getByteBuffer() { return new ByteBufNIO(buffer); } + /** + * Returns the byte array backing this document. Changes to the returned array will be reflected in this document. + * + * @return the backing byte array + * @since 5.8 + */ + public byte[] getBackingArray() { + return bytes; + } + + /** + * Returns the offset into the {@linkplain #getBackingArray() backing byte array} where this document starts. + * + * @return the offset + * @since 5.8 + */ + public int getByteOffset() { + return offset; + } + + /** + * Returns the length of this document within the {@linkplain #getBackingArray() backing byte array}. + * + * @return the length + * @since 5.8 + */ + public int getByteLength() { + return length; + } + /** * Decode this into a document. * diff --git a/bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java b/bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java index 4d81b7f97aa..a0d5947429f 100644 --- a/bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java +++ b/bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java @@ -40,8 +40,17 @@ public RawBsonDocumentCodec() { @Override public void encode(final BsonWriter writer, final RawBsonDocument value, final EncoderContext encoderContext) { - try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) { - writer.pipe(reader); + if (writer instanceof BsonBinaryWriter) { + // Fast path. The pipe method should ideally exist on BsonWriter, but adding it as + // abstract would be a breaking change, and adding it as a default method would force + // BsonWriter to depend on BsonBinaryReader/ByteBufferBsonInput, violating the + // interface's abstraction. + // TODO JAVA-6211 move pipe(byte[], int, int) to BsonWriter to remove this instanceof. + ((BsonBinaryWriter) writer).pipe(value.getBackingArray(), value.getByteOffset(), value.getByteLength()); + } else { + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) { + writer.pipe(reader); + } } } diff --git a/bson/src/main/org/bson/io/BsonInput.java b/bson/src/main/org/bson/io/BsonInput.java index 823355fe3ee..250cddab0e4 100644 --- a/bson/src/main/org/bson/io/BsonInput.java +++ b/bson/src/main/org/bson/io/BsonInput.java @@ -127,6 +127,19 @@ public interface BsonInput extends Closeable { */ boolean hasRemaining(); + /** + * Pipes the specified number of bytes from {@linkplain BsonInput this} input to the given {@linkplain BsonOutput output}. + * + * @param output the output to pipe to + * @param numBytes the number of bytes to pipe + * @since 5.8 + */ + default void pipe(BsonOutput output, int numBytes) { + byte[] bytes = new byte[numBytes]; + readBytes(bytes); + output.writeBytes(bytes); + } + @Override void close(); } diff --git a/bson/src/main/org/bson/io/ByteBufferBsonInput.java b/bson/src/main/org/bson/io/ByteBufferBsonInput.java index 2819bdcb091..1ab5ac9f5b3 100644 --- a/bson/src/main/org/bson/io/ByteBufferBsonInput.java +++ b/bson/src/main/org/bson/io/ByteBufferBsonInput.java @@ -275,6 +275,24 @@ public boolean hasRemaining() { return buffer.hasRemaining(); } + @Override + public void pipe(final BsonOutput output, final int numBytes) { + ensureOpen(); + ensureAvailable(numBytes); + + if (buffer.isBackedByArray()) { + int position = buffer.position(); + int arrayOffset = buffer.arrayOffset(); + output.writeBytes(buffer.array(), arrayOffset + position, numBytes); + buffer.position(position + numBytes); + } else { + // Fallback: use temporary buffer for non-array-backed buffers + byte[] temp = new byte[numBytes]; + buffer.get(temp); + output.writeBytes(temp); + } + } + @Override public void close() { buffer.release(); diff --git a/bson/src/test/unit/org/bson/BsonBinaryWriterTest.java b/bson/src/test/unit/org/bson/BsonBinaryWriterTest.java index 0b067fc816f..4f589a42263 100644 --- a/bson/src/test/unit/org/bson/BsonBinaryWriterTest.java +++ b/bson/src/test/unit/org/bson/BsonBinaryWriterTest.java @@ -802,7 +802,34 @@ public void testPipeOfDocumentWithInvalidSize() { // expected } } + } + + @Test + public void testPipeOfRawBytes() { + BasicOutputBuffer sourceBuffer = new BasicOutputBuffer(); + try (BsonBinaryWriter sourceWriter = new BsonBinaryWriter(sourceBuffer)) { + sourceWriter.writeStartDocument(); + sourceWriter.writeBoolean("a", true); + sourceWriter.writeEndDocument(); + } + byte[] documentBytes = sourceBuffer.toByteArray(); + BasicOutputBuffer destBuffer = new BasicOutputBuffer(); + try (BsonBinaryWriter destWriter = new BsonBinaryWriter(destBuffer)) { + destWriter.pipe(documentBytes, 0, documentBytes.length); + } + + assertArrayEquals(documentBytes, destBuffer.toByteArray()); + } + + @Test + public void testPipeOfRawBytesWithInvalidSize() { + byte[] bytes = {4, 0, 0, 0}; // minimum document size is 5 + + BasicOutputBuffer newBuffer = new BasicOutputBuffer(); + try (BsonBinaryWriter newWriter = new BsonBinaryWriter(newBuffer)) { + assertThrows(BsonSerializationException.class, () -> newWriter.pipe(bytes, 0, bytes.length)); + } } // CHECKSTYLE:OFF diff --git a/bson/src/test/unit/org/bson/RawBsonDocumentSpecification.groovy b/bson/src/test/unit/org/bson/RawBsonDocumentSpecification.groovy index a23ec06dedb..4a3748f2971 100644 --- a/bson/src/test/unit/org/bson/RawBsonDocumentSpecification.groovy +++ b/bson/src/test/unit/org/bson/RawBsonDocumentSpecification.groovy @@ -113,6 +113,23 @@ class RawBsonDocumentSpecification extends Specification { rawDocument << createRawDocumentVariants() } + + def 'getBackingArray, getByteOffset and getByteLength should expose the document range'() { + expect: + rawDocument.getByteOffset() == expectedOffset + rawDocument.getByteLength() == expectedLength + Arrays.copyOfRange( + rawDocument.getBackingArray(), + rawDocument.getByteOffset(), + rawDocument.getByteOffset() + rawDocument.getByteLength()) == getBytesFromDocument() + + where: + rawDocument | expectedOffset | expectedLength + createRawDocumenFromDocument() | 0 | 66 + createRawDocumentFromByteArray() | 0 | 66 + createRawDocumentFromByteArrayOffsetLength()| 1 | 66 + } + def 'parse should through if parameter is invalid'() { when: RawBsonDocument.parse(null) diff --git a/bson/src/test/unit/org/bson/io/BsonInputTest.java b/bson/src/test/unit/org/bson/io/BsonInputTest.java new file mode 100644 index 00000000000..dd73a62a060 --- /dev/null +++ b/bson/src/test/unit/org/bson/io/BsonInputTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.bson.io; + +import org.bson.ByteBufNIO; +import org.bson.types.ObjectId; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class BsonInputTest { + + @Test + void defaultPipeShouldCopyBytesFromInputToOutput() { + // given + byte[] inputBytes = {0x4a, 0x61, 0x76, 0x61, 0x21}; + + try (BsonInput bsonInput = new ForwardingBsonInput( + new ByteBufferBsonInput(new ByteBufNIO(ByteBuffer.wrap(inputBytes)))); + BasicOutputBuffer output = new BasicOutputBuffer()) { + // when + bsonInput.pipe(output, inputBytes.length); + + // then + assertEquals(inputBytes.length, bsonInput.getPosition()); + assertEquals(inputBytes.length, output.getPosition()); + assertArrayEquals(inputBytes, output.toByteArray()); + } + } + + @Test + void defaultPipeShouldCopyPartialBytesFromInputToOutput() { + // given + byte[] inputBytes = {0x4a, 0x61, 0x76, 0x61, 0x21}; + + try (BsonInput bsonInput = new ForwardingBsonInput( + new ByteBufferBsonInput(new ByteBufNIO(ByteBuffer.wrap(inputBytes)))); + BasicOutputBuffer output = new BasicOutputBuffer()) { + // when + bsonInput.pipe(output, 3); + + // then + assertEquals(3, bsonInput.getPosition()); + assertEquals(3, output.getPosition()); + assertArrayEquals(new byte[]{0x4a, 0x61, 0x76}, output.toByteArray()); + } + } + + /** + * Delegates all abstract methods but does NOT override pipe, + * so the default implementation is exercised. + */ + private static class ForwardingBsonInput implements BsonInput { + private final ByteBufferBsonInput delegate; + + ForwardingBsonInput(final ByteBufferBsonInput delegate) { + this.delegate = delegate; + } + + @Override + public int getPosition() { return delegate.getPosition(); } + + @Override + public byte readByte() { return delegate.readByte(); } + + @Override + public void readBytes(final byte[] bytes) { delegate.readBytes(bytes); } + + @Override + public void readBytes(final byte[] bytes, final int offset, final int length) { delegate.readBytes(bytes, offset, length); } + + @Override + public long readInt64() { return delegate.readInt64(); } + + @Override + public double readDouble() { return delegate.readDouble(); } + + @Override + public int readInt32() { return delegate.readInt32(); } + + @Override + public String readString() { return delegate.readString(); } + + @Override + public ObjectId readObjectId() { return delegate.readObjectId(); } + + @Override + public String readCString() { return delegate.readCString(); } + + @Override + public void skipCString() { delegate.skipCString(); } + + @Override + public void skip(final int numBytes) { delegate.skip(numBytes); } + + @Override + public BsonInputMark getMark(final int readLimit) { return delegate.getMark(readLimit); } + + @Override + public boolean hasRemaining() { return delegate.hasRemaining(); } + + @Override + public void close() { delegate.close(); } + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonInputTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonInputTest.java index b988f1cde1a..d1918f5f7bf 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonInputTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonInputTest.java @@ -22,6 +22,7 @@ import org.bson.BsonSerializationException; import org.bson.ByteBuf; import org.bson.ByteBufNIO; +import org.bson.io.BasicOutputBuffer; import org.bson.io.ByteBufferBsonInput; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; @@ -45,6 +46,7 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static java.util.stream.IntStream.rangeClosed; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -710,6 +712,58 @@ void shouldReadSkipCStringWhenMultipleNullTerminatorPresentWithinBuffer(final Bu } + @ParameterizedTest(name = "should pipe bytes to output. BufferProvider={0}") + @MethodSource("bufferProviders") + void shouldPipeBytesToOutput(final BufferProvider bufferProvider) { + // given + byte[] input = {0x4a, 0x61, 0x76, 0x61, 0x21}; + ByteBuf buffer = allocateAndWriteToBuffer(bufferProvider, input); + + try (ByteBufferBsonInput bufferInput = new ByteBufferBsonInput(buffer); + BasicOutputBuffer bufferOutput = new BasicOutputBuffer()) { + // when + bufferInput.pipe(bufferOutput, input.length); + + // then + assertEquals(input.length, bufferInput.getPosition()); + assertEquals(input.length, bufferOutput.getPosition()); + assertArrayEquals(input, bufferOutput.toByteArray()); + } + } + + @ParameterizedTest(name = "should pipe partial bytes to output. BufferProvider={0}") + @MethodSource("bufferProviders") + void shouldPipePartialBytesToOutput(final BufferProvider bufferProvider) { + // given + byte[] input = {0x4a, 0x61, 0x76, 0x61, 0x21}; + ByteBuf buffer = allocateAndWriteToBuffer(bufferProvider, input); + + try (ByteBufferBsonInput bufferInput = new ByteBufferBsonInput(buffer); + BasicOutputBuffer output = new BasicOutputBuffer()) { + // when + bufferInput.pipe(output, 3); + + // then + assertEquals(3, bufferInput.getPosition()); + assertEquals(3, output.getPosition()); + assertArrayEquals(new byte[]{0x4a, 0x61, 0x76}, output.toByteArray()); + } + } + + @ParameterizedTest(name = "should throw when piping more bytes than available. BufferProvider={0}") + @MethodSource("bufferProviders") + void shouldThrowWhenPipingMoreBytesThanAvailable(final BufferProvider bufferProvider) { + // given + byte[] input = {0x4a, 0x61, 0x76}; + ByteBuf buffer = allocateAndWriteToBuffer(bufferProvider, input); + + try (ByteBufferBsonInput bufferInput = new ByteBufferBsonInput(buffer); + BasicOutputBuffer output = new BasicOutputBuffer()) { + // when & then + assertThrows(BsonSerializationException.class, () -> bufferInput.pipe(output, 10)); + } + } + private static ByteBuf allocateAndWriteToBuffer(final BufferProvider bufferProvider, final byte[] input) { ByteBuf buffer = bufferProvider.getBuffer(input.length); buffer.put(input, 0, input.length);