Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,14 @@ impl RocksCacheStore {
f: F,
) -> Result<R, CubeError>
where
F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result<R, CubeError>
F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a>) -> Result<R, CubeError>
+ Send
+ Sync
+ 'static,
R: Send + Sync + 'static,
{
self.store
.write_operation_impl::<F, R>(&self.rw_loop_queue_cf, op_name, f)
.write_operation_impl::<F, R, ()>(&self.rw_loop_queue_cf, op_name, f, ())
.await
}

Expand Down
144 changes: 119 additions & 25 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use cuberockstore::rocksdb::{BlockBasedOptions, Cache, Env, MergeOperands, Optio
use log::info;
use serde::{Deserialize, Serialize};
use std::hash::Hash;
use std::sync::Mutex;
use std::{env, io::Cursor, sync::Arc};

use crate::config::injection::DIService;
Expand Down Expand Up @@ -1355,8 +1356,10 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
}
}

#[derive(Clone)]
pub struct RocksMetaStore {
store: Arc<RocksStore>,
cached_tables: Arc<Mutex<Option<Arc<Vec<TablePath>>>>>,
disk_space_cache: Arc<RwLock<Option<(HashMap<String, u64>, SystemTime)>>>,
upload_loop: Arc<WorkerLoop>,
}
Expand All @@ -1379,13 +1382,14 @@ impl RocksMetaStore {
fn new_from_store(store: Arc<RocksStore>) -> Arc<Self> {
Arc::new(Self {
store,
cached_tables: Arc::new(Mutex::new(None)),
disk_space_cache: Arc::new(RwLock::new(None)),
upload_loop: Arc::new(WorkerLoop::new("Metastore upload")),
})
}

pub fn reset_cached_tables(&self) {
*self.store.cached_tables.lock().unwrap() = None;
*self.cached_tables.lock().unwrap() = None;
}

pub async fn load_from_dump(
Expand Down Expand Up @@ -1512,19 +1516,24 @@ impl RocksMetaStore {
#[inline(always)]
pub async fn write_operation<F, R>(&self, op_name: &'static str, f: F) -> Result<R, CubeError>
where
F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result<R, CubeError>
F: for<'a> FnOnce(
DbTableRef<'a>,
&mut BatchPipe<'a, RocksMetaStore>,
) -> Result<R, CubeError>
+ Send
+ Sync
+ 'static,
R: Send + Sync + 'static,
{
self.store.write_operation(op_name, f).await
self.store
.write_operation_impl(&self.store.rw_loop_default_cf, op_name, f, self.clone())
.await
Comment on lines +1528 to +1530
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor observation: self.clone() here clones the entire RocksMetaStore on every write operation. Since all fields are Arc-wrapped this is just reference count bumps (cheap), so it's fine. But it's worth noting that adding #[derive(Clone)] to RocksMetaStore widens the API surface — if any non-Arc field is added in the future, this clone could become expensive. A comment or a targeted clone_for_callback() method might make the intent clearer.

Also, accessing self.store.rw_loop_default_cf directly (instead of going through a method) requires the field to be pub(crate) — that visibility change is visible in rocks_store.rs. This is fine but creates a tighter coupling between the two modules.

Copy link
Copy Markdown
Member Author

@ovr ovr Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claude how will it affect performance? As I can see, this struct containts 4 fields and all of them are under Arc(s), it should clone 4 Arc(s)?

Copy link
Copy Markdown
Contributor

@claude claude bot Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude finished @ovr's task in 34s —— View job


Performance impact of self.clone() on RocksMetaStore

  • Read struct definition
  • Analyze clone cost
  • Post answer

Yes, you're right. RocksMetaStore has exactly 4 fields, all Arc-wrapped:

pub struct RocksMetaStore {
    store: Arc<RocksStore>,
    cached_tables: Arc<Mutex<Option<Arc<Vec<TablePath>>>>>,
    disk_space_cache: Arc<RwLock<Option<(HashMap<String, u64>, SystemTime)>>>,
    upload_loop: Arc<WorkerLoop>,
}

So self.clone() is 4 atomic reference count increments — no heap allocations, no deep copies. Each Arc::clone is a single fetch_add (or equivalent) on the refcount's AtomicUsize, typically ~1–3 ns on modern hardware.

This happens once per write_operation call, well before the actual RocksDB commit (batch_write_rows). RocksDB write + fsync will dominate by 3–6 orders of magnitude (~milliseconds vs. nanoseconds), so the clone cost is immeasurable in practice. No performance concern here.

The only theoretical concern with frequent Arc::clone across threads is cache-line contention on the refcount, but since this is on the single RW-loop thread, that doesn't apply either.


}

fn drop_table_impl(
table_id: u64,
db_ref: DbTableRef,
batch_pipe: &mut BatchPipe,
batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
) -> Result<IdRow<Table>, CubeError> {
let tables_table = TableRocksTable::new(db_ref.clone());
let indexes_table = IndexRocksTable::new(db_ref.clone());
Expand Down Expand Up @@ -1555,7 +1564,7 @@ impl RocksMetaStore {

impl RocksMetaStore {
fn add_index(
batch_pipe: &mut BatchPipe,
batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
rocks_index: &IndexRocksTable,
rocks_partition: &PartitionRocksTable,
table_cols: &Vec<Column>,
Expand Down Expand Up @@ -1586,7 +1595,7 @@ impl RocksMetaStore {
}
}
fn add_regular_index(
batch_pipe: &mut BatchPipe,
batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
rocks_index: &IndexRocksTable,
rocks_partition: &PartitionRocksTable,
table_cols: &Vec<Column>,
Expand Down Expand Up @@ -1734,7 +1743,7 @@ impl RocksMetaStore {
}

fn add_aggregate_index(
batch_pipe: &mut BatchPipe,
batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
rocks_index: &IndexRocksTable,
rocks_partition: &PartitionRocksTable,
table_cols: &Vec<Column>,
Expand Down Expand Up @@ -1850,7 +1859,7 @@ impl RocksMetaStore {
// Must be run under write_operation(). Returns activated row count.
fn activate_chunks_impl(
db_ref: DbTableRef,
batch_pipe: &mut BatchPipe,
batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
uploaded_chunk_ids: &[(u64, Option<u64>)],
replay_handle_id: Option<u64>,
) -> Result<(u64, HashMap</*partition_id*/ u64, /*rows*/ u64>), CubeError> {
Expand Down Expand Up @@ -1907,7 +1916,9 @@ impl MetaStore for RocksMetaStore {
if_not_exists: bool,
) -> Result<IdRow<Schema>, CubeError> {
self.write_operation("create_schema", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
Comment thread
ovr marked this conversation as resolved.
let table = SchemaRocksTable::new(db_ref.clone());
if if_not_exists {
let rows = table.get_rows_by_index(&schema_name, &SchemaRocksIndex::Name)?;
Expand Down Expand Up @@ -1968,7 +1979,9 @@ impl MetaStore for RocksMetaStore {
new_schema_name: String,
) -> Result<IdRow<Schema>, CubeError> {
self.write_operation("rename_schema", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
let table = SchemaRocksTable::new(db_ref.clone());
let existing_keys =
table.get_row_ids_by_index(&old_schema_name, &SchemaRocksIndex::Name)?;
Expand All @@ -1992,7 +2005,9 @@ impl MetaStore for RocksMetaStore {
new_schema_name: String,
) -> Result<IdRow<Schema>, CubeError> {
self.write_operation("rename_schema_by_id", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
let table = SchemaRocksTable::new(db_ref.clone());

let old_schema = table.get_row(schema_id)?.unwrap();
Expand All @@ -2008,7 +2023,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn delete_schema(&self, schema_name: String) -> Result<(), CubeError> {
self.write_operation("delete_schema", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
let table = SchemaRocksTable::new(db_ref.clone());
let existing_keys =
table.get_row_ids_by_index(&schema_name, &SchemaRocksIndex::Name)?;
Expand All @@ -2035,7 +2052,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn delete_schema_by_id(&self, schema_id: u64) -> Result<(), CubeError> {
self.write_operation("delete_schema_by_id", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
let tables = TableRocksTable::new(db_ref.clone()).all_rows()?;
if tables
.into_iter()
Expand Down Expand Up @@ -2100,7 +2119,9 @@ impl MetaStore for RocksMetaStore {
extension: Option<String>,
) -> Result<IdRow<Table>, CubeError> {
self.write_operation("create_table", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
if drop_if_exists {
if let Ok(exists_table) = get_table_impl(db_ref.clone(), schema_name.clone(), table_name.clone()) {
RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?;
Expand Down Expand Up @@ -2295,7 +2316,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn table_ready(&self, id: u64, is_ready: bool) -> Result<IdRow<Table>, CubeError> {
self.write_operation("table_ready", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
let rocks_table = TableRocksTable::new(db_ref.clone());
Ok(rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?)
})
Expand All @@ -2305,7 +2328,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn seal_table(&self, id: u64) -> Result<IdRow<Table>, CubeError> {
self.write_operation("seal_table", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
let rocks_table = TableRocksTable::new(db_ref.clone());
Ok(rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?)
})
Expand Down Expand Up @@ -2334,13 +2359,16 @@ impl MetaStore for RocksMetaStore {
self.write_operation(
"update_location_download_size",
move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});

let rocks_table = TableRocksTable::new(db_ref.clone());
Ok(rocks_table.update_with_res_fn(
rocks_table.update_with_res_fn(
id,
|r| r.update_location_download_size(&location, download_size),
batch_pipe,
)?)
)
},
)
.await
Expand Down Expand Up @@ -2393,14 +2421,14 @@ impl MetaStore for RocksMetaStore {
})
.await
} else {
let cache = self.store.cached_tables.clone();
let cache = self.cached_tables.clone();

if let Some(t) = cube_ext::spawn_blocking(move || cache.lock().unwrap().clone()).await?
{
return Ok(t);
}

let cache = self.store.cached_tables.clone();
let cache = self.cached_tables.clone();
// Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped
self.read_operation("get_tables_with_path", move |db_ref| {
let cached_tables = { cache.lock().unwrap().clone() };
Expand Down Expand Up @@ -2465,7 +2493,9 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn drop_table(&self, table_id: u64) -> Result<IdRow<Table>, CubeError> {
self.write_operation("drop_table", move |db_ref, batch_pipe| {
batch_pipe.invalidate_tables_cache();
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
RocksMetaStore::drop_table_impl(table_id, db_ref, batch_pipe)
})
.await
Expand Down Expand Up @@ -4819,7 +4849,7 @@ fn get_default_index_impl(db_ref: DbTableRef, table_id: u64) -> Result<IdRow<Ind
/// must take great care to avoid inconsistencies caused by this.
fn swap_active_partitions_impl(
db_ref: DbTableRef,
batch_pipe: &mut BatchPipe,
batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
current_active: &[(IdRow<Partition>, Vec<IdRow<Chunk>>)],
new_active: &[(IdRow<Partition>, u64)],
mut update_new_partition_stats: impl FnMut(/*index*/ usize, &Partition) -> Partition,
Expand Down Expand Up @@ -4982,6 +5012,70 @@ mod tests {
use std::time::Duration;
use std::{env, fs};

#[tokio::test]
async fn test_post_commit_callback_on_success() {
let config = Config::test("test_post_commit_callback_on_success");
let store_path = env::current_dir()
.unwrap()
.join("test_post_commit_callback_on_success-local");
let remote_store_path = env::current_dir()
.unwrap()
.join("test_post_commit_callback_on_success-remote");
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone());

let meta_store = RocksMetaStore::new(
store_path.join("metastore").as_path(),
BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()),
config.config_obj(),
)
.unwrap();

// Test 1: callback fires on successful writing
{
let called = Arc::new(std::sync::atomic::AtomicBool::new(false));
let called_clone = called.clone();
meta_store
.write_operation("test_success", move |_db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(move |_metastore| {
called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
});
Ok(())
})
.await
.unwrap();

assert!(
called.load(std::sync::atomic::Ordering::SeqCst),
"post-commit callback should fire on successful write"
);
}

// Test 2: callback does NOT fire when the closure returns Err
{
let called = Arc::new(std::sync::atomic::AtomicBool::new(false));
let called_clone = called.clone();
let result: Result<(), _> = meta_store
.write_operation("test_failure", move |_db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(move |_metastore| {
called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
});
Err(CubeError::user("intentional error".to_string()))
})
.await;

assert!(result.is_err());
assert!(
!called.load(std::sync::atomic::Ordering::SeqCst),
"post-commit callback should NOT fire when write fails"
);
}

let _ = fs::remove_dir_all(store_path);
let _ = fs::remove_dir_all(remote_store_path);
}

#[test]
fn macro_test() {
let s = Schema {
Expand Down Expand Up @@ -7136,7 +7230,7 @@ impl RocksMetaStore {
deactivate_ids: Vec<u64>,
uploaded_ids_and_sizes: Vec<(u64, Option<u64>)>,
db_ref: DbTableRef,
batch_pipe: &mut BatchPipe,
batch_pipe: &mut BatchPipe<'_, RocksMetaStore>,
check_rows: bool,
new_replay_handle_id: Option<u64>,
) -> Result<(), CubeError> {
Expand Down Expand Up @@ -7236,7 +7330,7 @@ impl RocksMetaStore {
impl RocksMetaStore {
fn drop_index(
db: DbTableRef,
pipe: &mut BatchPipe,
pipe: &mut BatchPipe<'_, RocksMetaStore>,
index_id: u64,
update_multi_partitions: bool,
) -> Result<(), CubeError> {
Expand Down
Loading
Loading