Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ pub struct Dataset {
/// Object store parameters used when opening this dataset.
/// These are used when creating object stores for additional base paths.
pub(crate) store_params: Option<Box<ObjectStoreParams>>,
/// Optional runtime-only object store parameters keyed by base path URI.
pub(crate) base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
}

impl std::fmt::Debug for Dataset {
Expand All @@ -186,6 +188,7 @@ impl std::fmt::Debug for Dataset {
.field("base", &self.base)
.field("version", &self.manifest.version)
.field("cache_num_items", &self.session.approx_num_items())
.field("base_store_params", &self.base_store_params.is_some())
.finish()
}
}
Expand Down Expand Up @@ -577,6 +580,7 @@ impl Dataset {
self.commit_handler.clone(),
self.file_reader_options.clone(),
self.store_params.as_deref().cloned(),
self.base_store_params.clone(),
)
}

Expand Down Expand Up @@ -695,6 +699,7 @@ impl Dataset {
commit_handler: Arc<dyn CommitHandler>,
file_reader_options: Option<FileReaderOptions>,
store_params: Option<ObjectStoreParams>,
base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
) -> Result<Self> {
let refs = Refs::new(
object_store.clone(),
Expand Down Expand Up @@ -722,6 +727,7 @@ impl Dataset {
index_cache,
file_reader_options,
store_params: store_params.map(Box::new),
base_store_params,
})
}

Expand Down Expand Up @@ -1631,6 +1637,23 @@ impl Dataset {
cloned
}

fn store_params_for_base(
&self,
base_path: Option<&lance_table::format::BasePath>,
) -> ObjectStoreParams {
// Most datasets only use the dataset-level store params. When a base path
// has a runtime override, builder precomputes the merged ObjectStoreParams
// for that exact `BasePath.path` so reads can do a single lookup here.
base_path
.and_then(|base_path| {
self.base_store_params
.as_ref()
.and_then(|params| params.get(&base_path.path))
})
.cloned()
.unwrap_or_else(|| self.store_params.as_deref().cloned().unwrap_or_default())
}

/// Returns the initial storage options used when opening this dataset, if any.
///
/// This returns the static initial options without triggering any refresh.
Expand Down Expand Up @@ -1739,11 +1762,12 @@ impl Dataset {
let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
})?;
let store_params = self.store_params_for_base(Some(base_path));

let (store, _) = ObjectStore::from_uri_and_params(
self.session.store_registry(),
&base_path.path,
&self.store_params.as_deref().cloned().unwrap_or_default(),
&store_params,
)
.await?;

Expand Down Expand Up @@ -2564,6 +2588,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
dataset.commit_handler.clone(),
dataset.file_reader_options.clone(),
dataset.store_params.as_deref().cloned(),
dataset.base_store_params.clone(),
)?;
let loaded =
Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
Expand Down Expand Up @@ -2595,6 +2620,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
dataset.commit_handler.clone(),
dataset.file_reader_options.clone(),
dataset.store_params.as_deref().cloned(),
dataset.base_store_params.clone(),
)
} else {
// If we didn't get the latest manifest, we can still return the dataset
Expand Down
93 changes: 72 additions & 21 deletions rust/lance/src/dataset/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct DatasetBuilder {
file_reader_options: Option<FileReaderOptions>,
/// Storage options that override user-provided options (e.g., from namespace client)
storage_options_override: Option<HashMap<String, String>>,
/// Runtime-only per-base storage options keyed by base path URI.
base_storage_options_overrides: HashMap<String, HashMap<String, String>>,
}

impl std::fmt::Debug for DatasetBuilder {
Expand All @@ -68,6 +70,10 @@ impl std::fmt::Debug for DatasetBuilder {
"storage_options_override",
&self.storage_options_override.is_some(),
)
.field(
"base_storage_options_overrides",
&!self.base_storage_options_overrides.is_empty(),
)
.finish()
}
}
Expand All @@ -86,6 +92,7 @@ impl DatasetBuilder {
manifest: None,
file_reader_options: None,
storage_options_override: None,
base_storage_options_overrides: HashMap::new(),
}
}

Expand Down Expand Up @@ -448,6 +455,20 @@ impl DatasetBuilder {
self
}

/// Set runtime-only storage options for a specific registered base path.
///
/// These options are not persisted in the manifest. They are applied whenever
/// the dataset resolves an object store for the given `BasePath.path`.
pub fn with_base_storage_options(
mut self,
base_path: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> Self {
self.base_storage_options_overrides
.insert(base_path.as_ref().to_string(), storage_options);
self
}

/// Set options based on [ReadParams].
pub fn with_read_params(mut self, read_params: ReadParams) -> Self {
self = self
Expand Down Expand Up @@ -566,30 +587,41 @@ impl DatasetBuilder {
}
}

// Runtime per-base overrides are supplied as storage options, but the dataset
// ultimately resolves object stores from ObjectStoreParams. Normalize once in
// the builder so reads only need to look up the prepared params by base path.
fn merge_store_params_with_storage_options(
params: &ObjectStoreParams,
override_options: &HashMap<String, String>,
) -> ObjectStoreParams {
if override_options.is_empty() {
return params.clone();
}

let mut merged_params = params.clone();
let mut merged_options = merged_params.storage_options().cloned().unwrap_or_default();
merged_options.extend(override_options.clone());

let storage_options_accessor = match merged_params
.storage_options_accessor
.as_ref()
.and_then(|accessor| accessor.provider().cloned())
{
Some(provider) => Arc::new(StorageOptionsAccessor::with_initial_and_provider(
merged_options,
provider,
)),
None => Arc::new(StorageOptionsAccessor::with_static_options(merged_options)),
};
merged_params.storage_options_accessor = Some(storage_options_accessor);
merged_params
}

async fn load_impl(mut self) -> Result<Dataset> {
// Apply storage_options_override to merge namespace client options with any existing accessor
if let Some(override_opts) = self.storage_options_override.take() {
// Get existing options and merge
let mut merged_opts = self.options.storage_options().cloned().unwrap_or_default();
// Override with namespace client storage options - they take precedence
merged_opts.extend(override_opts);

// Update accessor with merged options
if let Some(accessor) = &self.options.storage_options_accessor {
if let Some(provider) = accessor.provider().cloned() {
self.options.storage_options_accessor = Some(Arc::new(
StorageOptionsAccessor::with_initial_and_provider(merged_opts, provider),
));
} else {
self.options.storage_options_accessor = Some(Arc::new(
StorageOptionsAccessor::with_static_options(merged_opts),
));
}
} else {
self.options.storage_options_accessor = Some(Arc::new(
StorageOptionsAccessor::with_static_options(merged_opts),
));
}
self.options =
Self::merge_store_params_with_storage_options(&self.options, &override_opts);
}

let index_cache_backend = self.index_cache_backend.take();
Expand Down Expand Up @@ -617,6 +649,22 @@ impl DatasetBuilder {

let file_reader_options = self.file_reader_options.clone();
let store_params = self.options.clone();
let base_store_params = (!self.base_storage_options_overrides.is_empty()).then(|| {
Arc::new(
self.base_storage_options_overrides
.iter()
.map(|(base_path, storage_options)| {
(
base_path.clone(),
Self::merge_store_params_with_storage_options(
&store_params,
storage_options,
),
)
})
.collect::<HashMap<_, _>>(),
)
});
let (object_store, base_path, commit_handler) = self.build_object_store().await?;

// Two cases that need to check out after loading the manifest:
Expand Down Expand Up @@ -669,6 +717,7 @@ impl DatasetBuilder {
base_path,
commit_handler,
Some(store_params),
base_store_params,
)
.await?;

Expand Down Expand Up @@ -706,6 +755,7 @@ impl DatasetBuilder {
base_path: Path,
commit_handler: Arc<dyn CommitHandler>,
store_params: Option<ObjectStoreParams>,
base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
) -> Result<Dataset> {
let (manifest, location) = if let Some(mut manifest) = manifest {
let location = commit_handler
Expand Down Expand Up @@ -768,6 +818,7 @@ impl DatasetBuilder {
commit_handler,
file_reader_options,
store_params,
base_store_params,
)
}
}
56 changes: 56 additions & 0 deletions rust/lance/src/dataset/tests/dataset_io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::sync::Arc;
use std::vec;

Expand Down Expand Up @@ -35,6 +36,7 @@ use lance_datagen::{BatchCount, RowCount, array, gen_batch};
use lance_file::version::LanceFileVersion;
use lance_io::assert_io_eq;
use lance_table::feature_flags;
use lance_table::format::BasePath;

use crate::index::DatasetIndexExt;
use futures::TryStreamExt;
Expand Down Expand Up @@ -167,6 +169,60 @@ async fn test_with_object_store_enables_isolated_per_request_io_tracking() {
assert_eq!(tracker_b.incremental_stats().read_iops, 0);
}

#[cfg(feature = "azure")]
#[tokio::test]
async fn test_object_store_for_base_uses_runtime_base_storage_options() {
let test_dir = TempStdDir::default();
create_file(&test_dir, WriteMode::Create, LanceFileVersion::Stable).await;
let uri = test_dir.to_str().unwrap();
let dataset = Arc::new(Dataset::open(uri).await.unwrap());

let base_a = BasePath::new(
1,
"az://container/path-a".to_string(),
Some("base-a".to_string()),
true,
);
let base_b = BasePath::new(
2,
"az://container/path-b".to_string(),
Some("base-b".to_string()),
true,
);
dataset
.add_bases(vec![base_a.clone(), base_b.clone()], None)
.await
.unwrap();

let dataset = DatasetBuilder::from_uri(uri)
.with_base_storage_options(
&base_a.path,
HashMap::from([
("account_name".to_string(), "account-a".to_string()),
("account_key".to_string(), "dGVzdA==".to_string()),
]),
)
.with_base_storage_options(
&base_b.path,
HashMap::from([
("account_name".to_string(), "account-b".to_string()),
("account_key".to_string(), "dGVzdA==".to_string()),
]),
)
.load()
.await
.unwrap();

let store_a = dataset.object_store_for_base(1).await.unwrap();
let store_a_again = dataset.object_store_for_base(1).await.unwrap();
let store_b = dataset.object_store_for_base(2).await.unwrap();

assert!(Arc::ptr_eq(&store_a, &store_a_again));
assert!(!Arc::ptr_eq(&store_a, &store_b));
assert_eq!(store_a.store_prefix, "az$container@account-a");
assert_eq!(store_b.store_prefix, "az$container@account-b");
}

#[rstest]
#[lance_test_macros::test(tokio::test)]
async fn test_create_dataset(
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/write/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ impl<'a> CommitBuilder<'a> {
metadata_cache,
file_reader_options: None,
store_params: self.store_params.clone().map(Box::new),
base_store_params: None,
})
}
}
Expand Down
Loading