diff --git a/crates/symbolicator-native/src/symbolication/attachments.rs b/crates/symbolicator-native/src/symbolication/attachments.rs index e206e4106..8a29e79b7 100644 --- a/crates/symbolicator-native/src/symbolication/attachments.rs +++ b/crates/symbolicator-native/src/symbolication/attachments.rs @@ -1,17 +1,16 @@ use std::fs::File; -use std::pin::pin; use futures::TryStreamExt; -use symbolicator_service::download::DownloadService; +use symbolicator_service::caching::CacheError; +use symbolicator_service::download::{self, DownloadService}; use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter}; -use tokio_util::io::StreamReader; use crate::interface::AttachmentFile; pub async fn download_attachment( download_svc: &DownloadService, file: AttachmentFile, -) -> anyhow::Result { +) -> Result { let (storage_url, storage_token) = match file { AttachmentFile::Local(file) => return Ok(file), AttachmentFile::Remote { @@ -25,26 +24,32 @@ pub async fn download_attachment( // download files in multiple chunks concurrently, but I don’t think our `objecstore` server currently // supports range requests, and those would also mess with streaming decompression. // Not to mention that using the `DownloadService` is not that straight forward. - let mut request = download_svc.trusted_client.get(storage_url); - if let Some(token) = storage_token { - request = request.bearer_auth(token); - } - let stream = request - .send() - .await? - .error_for_status()? - .bytes_stream() - .map_err(std::io::Error::other); - let mut reader = pin!(StreamReader::new(stream)); - - let file = tempfile::tempfile()?; - let mut writer = BufWriter::new(tokio::fs::File::from_std(file)); - tokio::io::copy(&mut reader, &mut writer).await?; - writer.flush().await?; - let mut file = writer.into_inner(); - file.sync_data().await?; - - file.rewind().await?; - - Ok(file.into_std().await) + download::retry(|| async { + let mut request = download_svc.trusted_client.get(&storage_url); + if let Some(token) = storage_token.as_ref() { + request = request.bearer_auth(token); + } + let response = request.send().await?; + if !response.status().is_success() { + return Err( + download::GenericErrorHandler::handle_response(&storage_url, response).await, + ); + } + + let mut stream = response.bytes_stream(); + + let file = tempfile::tempfile()?; + let mut writer = BufWriter::new(tokio::fs::File::from_std(file)); + while let Some(chunk) = stream.try_next().await? { + writer.write_all(&chunk).await?; + } + writer.flush().await?; + let mut file = writer.into_inner(); + file.sync_data().await?; + + file.rewind().await?; + + Ok(file.into_std().await) + }) + .await } diff --git a/crates/symbolicator-service/src/download/mod.rs b/crates/symbolicator-service/src/download/mod.rs index 8d33f7620..e73838c18 100644 --- a/crates/symbolicator-service/src/download/mod.rs +++ b/crates/symbolicator-service/src/download/mod.rs @@ -633,14 +633,15 @@ async fn do_download_reqwest( /// /// This error handler uses the HTTP status code to infer the [`CacheError`], /// this works for any HTTP request, but does not consider API specific responses. -struct GenericErrorHandler; +pub struct GenericErrorHandler; -impl ErrorHandler for GenericErrorHandler { - async fn handle(&self, source: &str, response: SymResponse<'_>) -> CacheError { +impl GenericErrorHandler { + /// Converts an unsuccessful HTTP response to a [`CacheError`]. + pub async fn handle_response(source: &str, response: reqwest::Response) -> CacheError { let status = response.status(); debug_assert!(!status.is_success()); - if let Ok(details) = response.response.text().await { + if let Ok(details) = response.text().await { ::sentry::configure_scope(|scope| { scope.set_extra( "reqwest_response_body", @@ -677,6 +678,12 @@ impl ErrorHandler for GenericErrorHandler { } } +impl ErrorHandler for GenericErrorHandler { + async fn handle(&self, source: &str, response: SymResponse<'_>) -> CacheError { + Self::handle_response(source, response.response).await + } +} + /// A HTTP request Symbolicator wants to make. struct SymRequest<'a> { source_name: &'a str,