Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 80 additions & 22 deletions crates/lib/docs_rs_storage/src/archive_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ impl Cache {
// https://docs.rs/moka/0.12.14/moka/future/struct.Cache.html#concurrent-calls-on-the-same-key
// So we don't need any locking here to prevent multiple downloads for the same
// missing archive index.
self.manager
if let Err(arc_err) = self
.manager
.try_get_with_by_ref(&local_index_path, async {
// NOTE: benign race with the eviction listener.
//
Expand Down Expand Up @@ -512,27 +513,27 @@ impl Cache {
Ok::<_, anyhow::Error>(Arc::new(entry))
})
.await
.map_err(|arc_err: Arc<anyhow::Error>| {
// We can't convert this Arc<Error> into the inner error type.
// See https://github.com/moka-rs/moka/issues/497
// But since some callers are specifically checking
// ::is<PathNotFoundError> to differentiate other errors from
// the "not found" case, we want to preserve that information
// if it was the cause of the error.
//
// This mean all error types that we later want to use with ::is<> or
// ::downcast<> have to be mentioned here.
//
// While we could also migrate to a custom enum error type, this would
// only be really nice when the whole storage lib uses is. Otherwise
// we'll end up with some hardcoded conversions again.
// So I can leave it as-is for now.
if arc_err.is::<PathNotFoundError>() {
anyhow!(PathNotFoundError)
} else {
anyhow!(arc_err)
}
})?;
{
// We can't convert this Arc<Error> into the inner error type.
// See https://github.com/moka-rs/moka/issues/497
// But since some callers are specifically checking
// ::is<PathNotFoundError> to differentiate other errors from
// the "not found" case, we want to preserve that information
// if it was the cause of the error.
//
// This mean all error types that we later want to use with ::is<> or
// ::downcast<> have to be mentioned here.
//
// While we could also migrate to a custom enum error type, this would
// only be really nice when the whole storage lib uses is. Otherwise
// we'll end up with some hardcoded conversions again.
// So I can leave it as-is for now.
if arc_err.is::<PathNotFoundError>() {
return Ok(None);
} else {
return Err(anyhow!(arc_err));
}
}

// Final attempt: if this still fails, bubble the error.
find_in_file(local_index_path, path_in_archive).await
Expand Down Expand Up @@ -977,6 +978,44 @@ mod tests {
}
}

struct NotFoundDownloader {
remote_index_path: String,
fetch_count: std::sync::Mutex<usize>,
}

impl NotFoundDownloader {
fn new(remote_index_path: String) -> Self {
Self {
remote_index_path,
fetch_count: std::sync::Mutex::new(0),
}
}

fn fetch_count(&self) -> usize {
*self.fetch_count.lock().unwrap()
}
}

impl Downloader for NotFoundDownloader {
fn fetch_archive_index<'a>(
&'a self,
remote_index_path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<StreamingBlob>> + Send + 'a>> {
Box::pin(async move {
if remote_index_path != self.remote_index_path {
bail!(
"unexpected remote index path: expected {}, got {remote_index_path}",
self.remote_index_path
);
}

let mut fetch_count = self.fetch_count.lock().unwrap();
*fetch_count += 1;
Err(PathNotFoundError.into())
})
}
}

async fn create_index_bytes(file_count: u32) -> Result<Vec<u8>> {
let tf = create_test_archive(file_count).await?;
let tempfile = tempfile::NamedTempFile::new()?.into_temp_path();
Expand Down Expand Up @@ -1169,6 +1208,25 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn find_remote_index_not_found_returns_none_without_retries() -> Result<()> {
let cache = test_cache().await?;
const LATEST_BUILD_ID: Option<BuildId> = Some(BuildId(7));
const ARCHIVE_NAME: &str = "missing-index.zip";
const FILE_IN_ARCHIVE: &str = "testfile0";

let remote_index_path = format!("{ARCHIVE_NAME}.{ARCHIVE_INDEX_FILE_EXTENSION}");
let downloader = NotFoundDownloader::new(remote_index_path);

let result = cache
.find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader)
.await?;
assert!(result.is_none());
assert_eq!(downloader.fetch_count(), 1);

Ok(())
}

#[tokio::test]
async fn find_retries_once_then_errors() -> Result<()> {
let cache = test_cache().await?;
Expand Down
10 changes: 3 additions & 7 deletions crates/lib/docs_rs_storage/src/storage/non_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,11 @@ impl AsyncStorage {
latest_build_id: Option<BuildId>,
path: &str,
) -> Result<bool> {
match self
Ok(self
.archive_index_cache
.find(archive_path, latest_build_id, path, self)
.await
{
Ok(file_info) => Ok(file_info.is_some()),
Err(err) if err.downcast_ref::<PathNotFoundError>().is_some() => Ok(false),
Err(err) => Err(err),
}
.await?
.is_some())
}

/// get, decompress and materialize an object from store
Expand Down
Loading