Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 92 additions & 73 deletions s3/src/request/request_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ use http::header::{
};
use std::fmt::Write as _;

#[cfg(feature = "with-async-std")]
use async_std::stream::Stream;

#[cfg(feature = "with-tokio")]
use tokio_stream::Stream;
#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
use futures_util::Stream;
#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
use futures_util::stream::{self, StreamExt};

#[derive(Debug)]

Expand Down Expand Up @@ -126,28 +125,39 @@ impl tokio::io::AsyncRead for ResponseDataStream {
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
// Poll the stream for the next chunk of bytes
match Stream::poll_next(self.bytes.as_mut(), cx) {
std::task::Poll::Ready(Some(Ok(chunk))) => {
// Write as much of the chunk as fits in the buffer
let amt = std::cmp::min(chunk.len(), buf.remaining());
buf.put_slice(&chunk[..amt]);

// AIDEV-NOTE: Bytes that don't fit in the buffer are discarded from this chunk.
// This is expected AsyncRead behavior - consumers should use appropriately sized
// buffers or wrap in BufReader for efficiency with small reads.

std::task::Poll::Ready(Ok(()))
}
std::task::Poll::Ready(Some(Err(error))) => {
// Convert S3Error to io::Error
std::task::Poll::Ready(Err(std::io::Error::other(error)))
}
std::task::Poll::Ready(None) => {
// Stream is exhausted, signal EOF by returning Ok(()) with no bytes written
std::task::Poll::Ready(Ok(()))
if buf.remaining() == 0 {
return std::task::Poll::Ready(Ok(()));
}

loop {
match Stream::poll_next(self.bytes.as_mut(), cx) {
std::task::Poll::Ready(Some(Ok(chunk))) => {
if chunk.is_empty() {
continue;
}

let amt = std::cmp::min(chunk.len(), buf.remaining());
buf.put_slice(&chunk[..amt]);

if amt < chunk.len() {
let remainder = chunk.slice(amt..);
let previous_stream =
std::mem::replace(&mut self.bytes, Box::pin(stream::empty()));
self.bytes = Box::pin(
stream::once(async move { Ok(remainder) }).chain(previous_stream),
);
}

return std::task::Poll::Ready(Ok(()));
}
std::task::Poll::Ready(Some(Err(error))) => {
return std::task::Poll::Ready(Err(std::io::Error::other(error)));
}
std::task::Poll::Ready(None) => {
return std::task::Poll::Ready(Ok(()));
}
std::task::Poll::Pending => return std::task::Poll::Pending,
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
Expand All @@ -159,28 +169,39 @@ impl async_std::io::Read for ResponseDataStream {
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
// Poll the stream for the next chunk of bytes
match Stream::poll_next(self.bytes.as_mut(), cx) {
std::task::Poll::Ready(Some(Ok(chunk))) => {
// Write as much of the chunk as fits in the buffer
let amt = std::cmp::min(chunk.len(), buf.len());
buf[..amt].copy_from_slice(&chunk[..amt]);

// AIDEV-NOTE: Bytes that don't fit in the buffer are discarded from this chunk.
// This is expected AsyncRead behavior - consumers should use appropriately sized
// buffers or wrap in BufReader for efficiency with small reads.

std::task::Poll::Ready(Ok(amt))
}
std::task::Poll::Ready(Some(Err(error))) => {
// Convert S3Error to io::Error
std::task::Poll::Ready(Err(std::io::Error::other(error)))
}
std::task::Poll::Ready(None) => {
// Stream is exhausted, signal EOF by returning 0 bytes read
std::task::Poll::Ready(Ok(0))
if buf.is_empty() {
return std::task::Poll::Ready(Ok(0));
}

loop {
match Stream::poll_next(self.bytes.as_mut(), cx) {
std::task::Poll::Ready(Some(Ok(chunk))) => {
if chunk.is_empty() {
continue;
}

let amt = std::cmp::min(chunk.len(), buf.len());
buf[..amt].copy_from_slice(&chunk[..amt]);

if amt < chunk.len() {
let remainder = chunk.slice(amt..);
let previous_stream =
std::mem::replace(&mut self.bytes, Box::pin(stream::empty()));
self.bytes = Box::pin(
stream::once(async move { Ok(remainder) }).chain(previous_stream),
);
}

return std::task::Poll::Ready(Ok(amt));
}
std::task::Poll::Ready(Some(Err(error))) => {
return std::task::Poll::Ready(Err(std::io::Error::other(error)));
}
std::task::Poll::Ready(None) => {
return std::task::Poll::Ready(Ok(0));
}
std::task::Poll::Pending => return std::task::Poll::Pending,
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
Expand Down Expand Up @@ -875,11 +896,9 @@ mod tests {
}

#[tokio::test]
async fn test_async_read_with_small_buffer() {
// Create a stream with a large chunk
let chunks = vec![Ok(Bytes::from(
"This is a much longer string that won't fit in a small buffer",
))];
async fn test_async_read_with_small_buffer_preserves_large_chunk() {
let expected = b"This is a much longer string that won't fit in a small buffer";
let chunks = vec![Ok(Bytes::copy_from_slice(expected))];

let stream = stream::iter(chunks);
let data_stream: DataStream = Box::pin(stream);
Expand All @@ -889,17 +908,18 @@ mod tests {
status_code: 200,
};

// Read with a small buffer - demonstrates that excess bytes are discarded per chunk
let mut output = Vec::new();
let mut buffer = [0u8; 10];
let n = response_stream.read(&mut buffer).await.unwrap();

// We should only get the first 10 bytes
assert_eq!(n, 10);
assert_eq!(&buffer[..n], b"This is a ");
loop {
let n = response_stream.read(&mut buffer).await.unwrap();
if n == 0 {
break;
}
output.extend_from_slice(&buffer[..n]);
}

// Next read should get 0 bytes (EOF) because the chunk was consumed
let n = response_stream.read(&mut buffer).await.unwrap();
assert_eq!(n, 0);
assert_eq!(output, expected);
}

#[tokio::test]
Expand Down Expand Up @@ -992,11 +1012,9 @@ mod async_std_tests {
}

#[async_std::test]
async fn test_async_read_with_small_buffer() {
// Create a stream with a large chunk
let chunks = vec![Ok(Bytes::from(
"This is a much longer string that won't fit in a small buffer",
))];
async fn test_async_read_with_small_buffer_preserves_large_chunk() {
let expected = b"This is a much longer string that won't fit in a small buffer";
let chunks = vec![Ok(Bytes::copy_from_slice(expected))];

let stream = stream::iter(chunks);
let data_stream: DataStream = Box::pin(stream);
Expand All @@ -1006,17 +1024,18 @@ mod async_std_tests {
status_code: 200,
};

// Read with a small buffer - demonstrates that excess bytes are discarded per chunk
let mut output = Vec::new();
let mut buffer = [0u8; 10];
let n = response_stream.read(&mut buffer).await.unwrap();

// We should only get the first 10 bytes
assert_eq!(n, 10);
assert_eq!(&buffer[..n], b"This is a ");
loop {
let n = response_stream.read(&mut buffer).await.unwrap();
if n == 0 {
break;
}
output.extend_from_slice(&buffer[..n]);
}

// Next read should get 0 bytes (EOF) because the chunk was consumed
let n = response_stream.read(&mut buffer).await.unwrap();
assert_eq!(n, 0);
assert_eq!(output, expected);
}

#[async_std::test]
Expand Down
Loading