Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 31 additions & 26 deletions crates/symbolicator-native/src/symbolication/attachments.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::fs::File;
use std::pin::pin;

use futures::TryStreamExt;
use symbolicator_service::download::DownloadService;
use symbolicator_service::caching::CacheError;
use symbolicator_service::download::{self, DownloadService};
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
use tokio_util::io::StreamReader;

use crate::interface::AttachmentFile;

pub async fn download_attachment(
download_svc: &DownloadService,
file: AttachmentFile,
) -> anyhow::Result<File> {
) -> Result<File, CacheError> {
let (storage_url, storage_token) = match file {
AttachmentFile::Local(file) => return Ok(file),
AttachmentFile::Remote {
Expand All @@ -25,26 +24,32 @@ pub async fn download_attachment(
// download files in multiple chunks concurrently, but I don’t think our `objecstore` server currently
// supports range requests, and those would also mess with streaming decompression.
// Not to mention that using the `DownloadService` is not that straight forward.
let mut request = download_svc.trusted_client.get(storage_url);
if let Some(token) = storage_token {
request = request.bearer_auth(token);
}
let stream = request
.send()
.await?
.error_for_status()?
.bytes_stream()
.map_err(std::io::Error::other);
let mut reader = pin!(StreamReader::new(stream));

let file = tempfile::tempfile()?;
let mut writer = BufWriter::new(tokio::fs::File::from_std(file));
tokio::io::copy(&mut reader, &mut writer).await?;
writer.flush().await?;
let mut file = writer.into_inner();
file.sync_data().await?;

file.rewind().await?;

Ok(file.into_std().await)
download::retry(|| async {
let mut request = download_svc.trusted_client.get(&storage_url);
if let Some(token) = storage_token.as_ref() {
request = request.bearer_auth(token);
}
let response = request.send().await?;
if !response.status().is_success() {
return Err(
download::GenericErrorHandler::handle_response(&storage_url, response).await,
);
}

let mut stream = response.bytes_stream();

let file = tempfile::tempfile()?;
let mut writer = BufWriter::new(tokio::fs::File::from_std(file));
while let Some(chunk) = stream.try_next().await? {
writer.write_all(&chunk).await?;
}
writer.flush().await?;
let mut file = writer.into_inner();
file.sync_data().await?;

file.rewind().await?;

Ok(file.into_std().await)
})
.await
}
15 changes: 11 additions & 4 deletions crates/symbolicator-service/src/download/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,15 @@ async fn do_download_reqwest(
///
/// This error handler uses the HTTP status code to infer the [`CacheError`],
/// this works for any HTTP request, but does not consider API specific responses.
struct GenericErrorHandler;
pub struct GenericErrorHandler;

impl ErrorHandler for GenericErrorHandler {
async fn handle(&self, source: &str, response: SymResponse<'_>) -> CacheError {
impl GenericErrorHandler {
/// Converts an unsuccessful HTTP response to a [`CacheError`].
pub async fn handle_response(source: &str, response: reqwest::Response) -> CacheError {
let status = response.status();
debug_assert!(!status.is_success());

if let Ok(details) = response.response.text().await {
if let Ok(details) = response.text().await {
::sentry::configure_scope(|scope| {
scope.set_extra(
"reqwest_response_body",
Expand Down Expand Up @@ -677,6 +678,12 @@ impl ErrorHandler for GenericErrorHandler {
}
}

impl ErrorHandler for GenericErrorHandler {
async fn handle(&self, source: &str, response: SymResponse<'_>) -> CacheError {
Self::handle_response(source, response.response).await
}
}

/// A HTTP request Symbolicator wants to make.
struct SymRequest<'a> {
source_name: &'a str,
Expand Down
Loading