From d106d33c30443889658a05bddf6e7467857362e5 Mon Sep 17 00:00:00 2001 From: Drazen Urch Date: Mon, 4 May 2026 22:24:48 +0200 Subject: [PATCH] fix: preserve response stream chunk remainders --- s3/src/request/request_trait.rs | 165 ++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 73 deletions(-) diff --git a/s3/src/request/request_trait.rs b/s3/src/request/request_trait.rs index 5686cc8dda..217e9b6cc8 100644 --- a/s3/src/request/request_trait.rs +++ b/s3/src/request/request_trait.rs @@ -21,11 +21,10 @@ use http::header::{ }; use std::fmt::Write as _; -#[cfg(feature = "with-async-std")] -use async_std::stream::Stream; - -#[cfg(feature = "with-tokio")] -use tokio_stream::Stream; +#[cfg(any(feature = "with-tokio", feature = "with-async-std"))] +use futures_util::Stream; +#[cfg(any(feature = "with-tokio", feature = "with-async-std"))] +use futures_util::stream::{self, StreamExt}; #[derive(Debug)] @@ -126,28 +125,39 @@ impl tokio::io::AsyncRead for ResponseDataStream { cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - // Poll the stream for the next chunk of bytes - match Stream::poll_next(self.bytes.as_mut(), cx) { - std::task::Poll::Ready(Some(Ok(chunk))) => { - // Write as much of the chunk as fits in the buffer - let amt = std::cmp::min(chunk.len(), buf.remaining()); - buf.put_slice(&chunk[..amt]); - - // AIDEV-NOTE: Bytes that don't fit in the buffer are discarded from this chunk. - // This is expected AsyncRead behavior - consumers should use appropriately sized - // buffers or wrap in BufReader for efficiency with small reads. - - std::task::Poll::Ready(Ok(())) - } - std::task::Poll::Ready(Some(Err(error))) => { - // Convert S3Error to io::Error - std::task::Poll::Ready(Err(std::io::Error::other(error))) - } - std::task::Poll::Ready(None) => { - // Stream is exhausted, signal EOF by returning Ok(()) with no bytes written - std::task::Poll::Ready(Ok(())) + if buf.remaining() == 0 { + return std::task::Poll::Ready(Ok(())); + } + + loop { + match Stream::poll_next(self.bytes.as_mut(), cx) { + std::task::Poll::Ready(Some(Ok(chunk))) => { + if chunk.is_empty() { + continue; + } + + let amt = std::cmp::min(chunk.len(), buf.remaining()); + buf.put_slice(&chunk[..amt]); + + if amt < chunk.len() { + let remainder = chunk.slice(amt..); + let previous_stream = + std::mem::replace(&mut self.bytes, Box::pin(stream::empty())); + self.bytes = Box::pin( + stream::once(async move { Ok(remainder) }).chain(previous_stream), + ); + } + + return std::task::Poll::Ready(Ok(())); + } + std::task::Poll::Ready(Some(Err(error))) => { + return std::task::Poll::Ready(Err(std::io::Error::other(error))); + } + std::task::Poll::Ready(None) => { + return std::task::Poll::Ready(Ok(())); + } + std::task::Poll::Pending => return std::task::Poll::Pending, } - std::task::Poll::Pending => std::task::Poll::Pending, } } } @@ -159,28 +169,39 @@ impl async_std::io::Read for ResponseDataStream { cx: &mut std::task::Context<'_>, buf: &mut [u8], ) -> std::task::Poll> { - // Poll the stream for the next chunk of bytes - match Stream::poll_next(self.bytes.as_mut(), cx) { - std::task::Poll::Ready(Some(Ok(chunk))) => { - // Write as much of the chunk as fits in the buffer - let amt = std::cmp::min(chunk.len(), buf.len()); - buf[..amt].copy_from_slice(&chunk[..amt]); - - // AIDEV-NOTE: Bytes that don't fit in the buffer are discarded from this chunk. - // This is expected AsyncRead behavior - consumers should use appropriately sized - // buffers or wrap in BufReader for efficiency with small reads. - - std::task::Poll::Ready(Ok(amt)) - } - std::task::Poll::Ready(Some(Err(error))) => { - // Convert S3Error to io::Error - std::task::Poll::Ready(Err(std::io::Error::other(error))) - } - std::task::Poll::Ready(None) => { - // Stream is exhausted, signal EOF by returning 0 bytes read - std::task::Poll::Ready(Ok(0)) + if buf.is_empty() { + return std::task::Poll::Ready(Ok(0)); + } + + loop { + match Stream::poll_next(self.bytes.as_mut(), cx) { + std::task::Poll::Ready(Some(Ok(chunk))) => { + if chunk.is_empty() { + continue; + } + + let amt = std::cmp::min(chunk.len(), buf.len()); + buf[..amt].copy_from_slice(&chunk[..amt]); + + if amt < chunk.len() { + let remainder = chunk.slice(amt..); + let previous_stream = + std::mem::replace(&mut self.bytes, Box::pin(stream::empty())); + self.bytes = Box::pin( + stream::once(async move { Ok(remainder) }).chain(previous_stream), + ); + } + + return std::task::Poll::Ready(Ok(amt)); + } + std::task::Poll::Ready(Some(Err(error))) => { + return std::task::Poll::Ready(Err(std::io::Error::other(error))); + } + std::task::Poll::Ready(None) => { + return std::task::Poll::Ready(Ok(0)); + } + std::task::Poll::Pending => return std::task::Poll::Pending, } - std::task::Poll::Pending => std::task::Poll::Pending, } } } @@ -875,11 +896,9 @@ mod tests { } #[tokio::test] - async fn test_async_read_with_small_buffer() { - // Create a stream with a large chunk - let chunks = vec![Ok(Bytes::from( - "This is a much longer string that won't fit in a small buffer", - ))]; + async fn test_async_read_with_small_buffer_preserves_large_chunk() { + let expected = b"This is a much longer string that won't fit in a small buffer"; + let chunks = vec![Ok(Bytes::copy_from_slice(expected))]; let stream = stream::iter(chunks); let data_stream: DataStream = Box::pin(stream); @@ -889,17 +908,18 @@ mod tests { status_code: 200, }; - // Read with a small buffer - demonstrates that excess bytes are discarded per chunk + let mut output = Vec::new(); let mut buffer = [0u8; 10]; - let n = response_stream.read(&mut buffer).await.unwrap(); - // We should only get the first 10 bytes - assert_eq!(n, 10); - assert_eq!(&buffer[..n], b"This is a "); + loop { + let n = response_stream.read(&mut buffer).await.unwrap(); + if n == 0 { + break; + } + output.extend_from_slice(&buffer[..n]); + } - // Next read should get 0 bytes (EOF) because the chunk was consumed - let n = response_stream.read(&mut buffer).await.unwrap(); - assert_eq!(n, 0); + assert_eq!(output, expected); } #[tokio::test] @@ -992,11 +1012,9 @@ mod async_std_tests { } #[async_std::test] - async fn test_async_read_with_small_buffer() { - // Create a stream with a large chunk - let chunks = vec![Ok(Bytes::from( - "This is a much longer string that won't fit in a small buffer", - ))]; + async fn test_async_read_with_small_buffer_preserves_large_chunk() { + let expected = b"This is a much longer string that won't fit in a small buffer"; + let chunks = vec![Ok(Bytes::copy_from_slice(expected))]; let stream = stream::iter(chunks); let data_stream: DataStream = Box::pin(stream); @@ -1006,17 +1024,18 @@ mod async_std_tests { status_code: 200, }; - // Read with a small buffer - demonstrates that excess bytes are discarded per chunk + let mut output = Vec::new(); let mut buffer = [0u8; 10]; - let n = response_stream.read(&mut buffer).await.unwrap(); - // We should only get the first 10 bytes - assert_eq!(n, 10); - assert_eq!(&buffer[..n], b"This is a "); + loop { + let n = response_stream.read(&mut buffer).await.unwrap(); + if n == 0 { + break; + } + output.extend_from_slice(&buffer[..n]); + } - // Next read should get 0 bytes (EOF) because the chunk was consumed - let n = response_stream.read(&mut buffer).await.unwrap(); - assert_eq!(n, 0); + assert_eq!(output, expected); } #[async_std::test]