Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions crates/symbolicator-native/src/symbolication/attachments.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::fs::File;
use std::pin::pin;

use futures::TryStreamExt;
use symbolicator_service::download::DownloadService;
use symbolicator_service::caching::CacheError;
use symbolicator_service::download::{self, DownloadService};
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
use tokio_util::io::StreamReader;

use crate::interface::AttachmentFile;

pub async fn download_attachment(
download_svc: &DownloadService,
file: AttachmentFile,
) -> anyhow::Result<File> {
) -> Result<File, CacheError> {
let (storage_url, storage_token) = match file {
AttachmentFile::Local(file) => return Ok(file),
AttachmentFile::Remote {
Expand All @@ -20,31 +19,37 @@ pub async fn download_attachment(
} => (storage_url, storage_token),
};

// TODO: maybe its worth using the actual `DownloadService` instead of straight going to the `trusted_client`.
// Doing so would in theory allow us to have retries and error report, as well as being able to
// download files in multiple chunks concurrently, but I don’t think our `objecstore` server currently
// supports range requests, and those would also mess with streaming decompression.
// Not to mention that using the `DownloadService` is not that straight forward.
let mut request = download_svc.trusted_client.get(storage_url);
if let Some(token) = storage_token {
request = request.bearer_auth(token);
}
let stream = request
.send()
.await?
.error_for_status()?
.bytes_stream()
.map_err(std::io::Error::other);
let mut reader = pin!(StreamReader::new(stream));

let file = tempfile::tempfile()?;
let mut writer = BufWriter::new(tokio::fs::File::from_std(file));
tokio::io::copy(&mut reader, &mut writer).await?;
writer.flush().await?;
let mut file = writer.into_inner();
file.sync_data().await?;

file.rewind().await?;

Ok(file.into_std().await)
download::retry(|| async {
// TODO: maybe its worth using the actual `DownloadService` instead of straight going to the `trusted_client`.
// Doing so would in theory allow us to have retries and error report, as well as being able to
// download files in multiple chunks concurrently, but I don’t think our `objecstore` server currently
// supports range requests, and those would also mess with streaming decompression.
// Not to mention that using the `DownloadService` is not that straight forward.
Comment thread
Dav1dde marked this conversation as resolved.
Outdated
let mut request = download_svc.trusted_client.get(&storage_url);
if let Some(token) = storage_token.as_ref() {
request = request.bearer_auth(token);
}
let response = request.send().await?;
if !response.status().is_success() {
return Err(
download::GenericErrorHandler::handle_response(&storage_url, response).await,
);
}

let mut stream = response.bytes_stream();

let file = tempfile::tempfile()?;
let mut writer = BufWriter::new(tokio::fs::File::from_std(file));
while let Some(chunk) = stream.try_next().await? {
writer.write_all(&chunk).await?;
}
writer.flush().await?;
let mut file = writer.into_inner();
file.sync_data().await?;

file.rewind().await?;

Ok(file.into_std().await)
})
.await
}
15 changes: 11 additions & 4 deletions crates/symbolicator-service/src/download/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,15 @@ async fn do_download_reqwest(
///
/// This error handler uses the HTTP status code to infer the [`CacheError`],
/// this works for any HTTP request, but does not consider API specific responses.
struct GenericErrorHandler;
pub struct GenericErrorHandler;

impl ErrorHandler for GenericErrorHandler {
async fn handle(&self, source: &str, response: SymResponse<'_>) -> CacheError {
impl GenericErrorHandler {
/// Converts an unsuccessful HTTP response to a [`CacheError`].
pub async fn handle_response(source: &str, response: reqwest::Response) -> CacheError {
let status = response.status();
debug_assert!(!status.is_success());

if let Ok(details) = response.response.text().await {
if let Ok(details) = response.text().await {
::sentry::configure_scope(|scope| {
scope.set_extra(
"reqwest_response_body",
Expand Down Expand Up @@ -677,6 +678,12 @@ impl ErrorHandler for GenericErrorHandler {
}
}

impl ErrorHandler for GenericErrorHandler {
async fn handle(&self, source: &str, response: SymResponse<'_>) -> CacheError {
Self::handle_response(source, response.response).await
}
}

/// A HTTP request Symbolicator wants to make.
struct SymRequest<'a> {
source_name: &'a str,
Expand Down
Loading