Skip to content

Commit 678a124

Browse files
committed
refactor(cubestore): Replace invalidate_tables_cache flag with generic post-commit callback API
Introduce a generic `set_post_commit_callback` on `BatchPipe<S>` that fires after a successful RocksDB commit on the RW-loop thread. This replaces the hard-coded `invalidate_tables_cache` boolean flag, which was a hack that also ran before the actual commit. - `BatchPipe` is now generic over a store type `S` (defaults to `()`), allowing callbacks to receive a typed store reference - `RocksMetaStore::write_operation` passes `&RocksMetaStore` to callbacks, giving them direct access to `cached_tables` and other metastore state - Moved `cached_tables` from `RocksStore` to `RocksMetaStore` where it belongs - Cache invalidation now only happens when the commit actually succeeds - `RocksTable` methods are generic over `S` so they work with any `BatchPipe<S>`
1 parent 34b1329 commit 678a124

File tree

4 files changed

+128
-77
lines changed

4 files changed

+128
-77
lines changed

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,14 +498,14 @@ impl RocksCacheStore {
498498
f: F,
499499
) -> Result<R, CubeError>
500500
where
501-
F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result<R, CubeError>
501+
F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a>) -> Result<R, CubeError>
502502
+ Send
503503
+ Sync
504504
+ 'static,
505505
R: Send + Sync + 'static,
506506
{
507507
self.store
508-
.write_operation_impl::<F, R>(&self.rw_loop_queue_cf, op_name, f)
508+
.write_operation_impl::<F, R, ()>(&self.rw_loop_queue_cf, op_name, f, None)
509509
.await
510510
}
511511

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use cuberockstore::rocksdb::{BlockBasedOptions, Cache, Env, MergeOperands, Optio
2626
use log::info;
2727
use serde::{Deserialize, Serialize};
2828
use std::hash::Hash;
29+
use std::sync::Mutex;
2930
use std::{env, io::Cursor, sync::Arc};
3031

3132
use crate::config::injection::DIService;
@@ -1355,8 +1356,10 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
13551356
}
13561357
}
13571358

1359+
#[derive(Clone)]
13581360
pub struct RocksMetaStore {
13591361
store: Arc<RocksStore>,
1362+
cached_tables: Arc<Mutex<Option<Arc<Vec<TablePath>>>>>,
13601363
disk_space_cache: Arc<RwLock<Option<(HashMap<String, u64>, SystemTime)>>>,
13611364
upload_loop: Arc<WorkerLoop>,
13621365
}
@@ -1379,13 +1382,14 @@ impl RocksMetaStore {
13791382
fn new_from_store(store: Arc<RocksStore>) -> Arc<Self> {
13801383
Arc::new(Self {
13811384
store,
1385+
cached_tables: Arc::new(Mutex::new(None)),
13821386
disk_space_cache: Arc::new(RwLock::new(None)),
13831387
upload_loop: Arc::new(WorkerLoop::new("Metastore upload")),
13841388
})
13851389
}
13861390

13871391
pub fn reset_cached_tables(&self) {
1388-
*self.store.cached_tables.lock().unwrap() = None;
1392+
*self.cached_tables.lock().unwrap() = None;
13891393
}
13901394

13911395
pub async fn load_from_dump(
@@ -1512,19 +1516,29 @@ impl RocksMetaStore {
15121516
#[inline(always)]
15131517
pub async fn write_operation<F, R>(&self, op_name: &'static str, f: F) -> Result<R, CubeError>
15141518
where
1515-
F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result<R, CubeError>
1519+
F: for<'a> FnOnce(
1520+
DbTableRef<'a>,
1521+
&mut BatchPipe<'a, RocksMetaStore>,
1522+
) -> Result<R, CubeError>
15161523
+ Send
15171524
+ Sync
15181525
+ 'static,
15191526
R: Send + Sync + 'static,
15201527
{
1521-
self.store.write_operation(op_name, f).await
1528+
self.store
1529+
.write_operation_impl(
1530+
&self.store.rw_loop_default_cf,
1531+
op_name,
1532+
f,
1533+
Some(self.clone()),
1534+
)
1535+
.await
15221536
}
15231537

1524-
fn drop_table_impl(
1538+
fn drop_table_impl<S>(
15251539
table_id: u64,
15261540
db_ref: DbTableRef,
1527-
batch_pipe: &mut BatchPipe,
1541+
batch_pipe: &mut BatchPipe<'_, S>,
15281542
) -> Result<IdRow<Table>, CubeError> {
15291543
let tables_table = TableRocksTable::new(db_ref.clone());
15301544
let indexes_table = IndexRocksTable::new(db_ref.clone());
@@ -1554,8 +1568,8 @@ impl RocksMetaStore {
15541568
}
15551569

15561570
impl RocksMetaStore {
1557-
fn add_index(
1558-
batch_pipe: &mut BatchPipe,
1571+
fn add_index<S>(
1572+
batch_pipe: &mut BatchPipe<'_, S>,
15591573
rocks_index: &IndexRocksTable,
15601574
rocks_partition: &PartitionRocksTable,
15611575
table_cols: &Vec<Column>,
@@ -1585,8 +1599,8 @@ impl RocksMetaStore {
15851599
),
15861600
}
15871601
}
1588-
fn add_regular_index(
1589-
batch_pipe: &mut BatchPipe,
1602+
fn add_regular_index<S>(
1603+
batch_pipe: &mut BatchPipe<'_, S>,
15901604
rocks_index: &IndexRocksTable,
15911605
rocks_partition: &PartitionRocksTable,
15921606
table_cols: &Vec<Column>,
@@ -1733,8 +1747,8 @@ impl RocksMetaStore {
17331747
Ok(index_id)
17341748
}
17351749

1736-
fn add_aggregate_index(
1737-
batch_pipe: &mut BatchPipe,
1750+
fn add_aggregate_index<S>(
1751+
batch_pipe: &mut BatchPipe<'_, S>,
17381752
rocks_index: &IndexRocksTable,
17391753
rocks_partition: &PartitionRocksTable,
17401754
table_cols: &Vec<Column>,
@@ -1848,9 +1862,9 @@ impl RocksMetaStore {
18481862
}
18491863

18501864
// Must be run under write_operation(). Returns activated row count.
1851-
fn activate_chunks_impl(
1865+
fn activate_chunks_impl<S>(
18521866
db_ref: DbTableRef,
1853-
batch_pipe: &mut BatchPipe,
1867+
batch_pipe: &mut BatchPipe<'_, S>,
18541868
uploaded_chunk_ids: &[(u64, Option<u64>)],
18551869
replay_handle_id: Option<u64>,
18561870
) -> Result<(u64, HashMap</*partition_id*/ u64, /*rows*/ u64>), CubeError> {
@@ -1907,7 +1921,9 @@ impl MetaStore for RocksMetaStore {
19071921
if_not_exists: bool,
19081922
) -> Result<IdRow<Schema>, CubeError> {
19091923
self.write_operation("create_schema", move |db_ref, batch_pipe| {
1910-
batch_pipe.invalidate_tables_cache();
1924+
batch_pipe.set_post_commit_callback(|metastore| {
1925+
*metastore.cached_tables.lock().unwrap() = None;
1926+
});
19111927
let table = SchemaRocksTable::new(db_ref.clone());
19121928
if if_not_exists {
19131929
let rows = table.get_rows_by_index(&schema_name, &SchemaRocksIndex::Name)?;
@@ -1968,7 +1984,9 @@ impl MetaStore for RocksMetaStore {
19681984
new_schema_name: String,
19691985
) -> Result<IdRow<Schema>, CubeError> {
19701986
self.write_operation("rename_schema", move |db_ref, batch_pipe| {
1971-
batch_pipe.invalidate_tables_cache();
1987+
batch_pipe.set_post_commit_callback(|metastore| {
1988+
*metastore.cached_tables.lock().unwrap() = None;
1989+
});
19721990
let table = SchemaRocksTable::new(db_ref.clone());
19731991
let existing_keys =
19741992
table.get_row_ids_by_index(&old_schema_name, &SchemaRocksIndex::Name)?;
@@ -1992,7 +2010,9 @@ impl MetaStore for RocksMetaStore {
19922010
new_schema_name: String,
19932011
) -> Result<IdRow<Schema>, CubeError> {
19942012
self.write_operation("rename_schema_by_id", move |db_ref, batch_pipe| {
1995-
batch_pipe.invalidate_tables_cache();
2013+
batch_pipe.set_post_commit_callback(|metastore| {
2014+
*metastore.cached_tables.lock().unwrap() = None;
2015+
});
19962016
let table = SchemaRocksTable::new(db_ref.clone());
19972017

19982018
let old_schema = table.get_row(schema_id)?.unwrap();
@@ -2008,7 +2028,9 @@ impl MetaStore for RocksMetaStore {
20082028
#[tracing::instrument(level = "trace", skip(self))]
20092029
async fn delete_schema(&self, schema_name: String) -> Result<(), CubeError> {
20102030
self.write_operation("delete_schema", move |db_ref, batch_pipe| {
2011-
batch_pipe.invalidate_tables_cache();
2031+
batch_pipe.set_post_commit_callback(|metastore| {
2032+
*metastore.cached_tables.lock().unwrap() = None;
2033+
});
20122034
let table = SchemaRocksTable::new(db_ref.clone());
20132035
let existing_keys =
20142036
table.get_row_ids_by_index(&schema_name, &SchemaRocksIndex::Name)?;
@@ -2035,7 +2057,9 @@ impl MetaStore for RocksMetaStore {
20352057
#[tracing::instrument(level = "trace", skip(self))]
20362058
async fn delete_schema_by_id(&self, schema_id: u64) -> Result<(), CubeError> {
20372059
self.write_operation("delete_schema_by_id", move |db_ref, batch_pipe| {
2038-
batch_pipe.invalidate_tables_cache();
2060+
batch_pipe.set_post_commit_callback(|metastore| {
2061+
*metastore.cached_tables.lock().unwrap() = None;
2062+
});
20392063
let tables = TableRocksTable::new(db_ref.clone()).all_rows()?;
20402064
if tables
20412065
.into_iter()
@@ -2100,7 +2124,9 @@ impl MetaStore for RocksMetaStore {
21002124
extension: Option<String>,
21012125
) -> Result<IdRow<Table>, CubeError> {
21022126
self.write_operation("create_table", move |db_ref, batch_pipe| {
2103-
batch_pipe.invalidate_tables_cache();
2127+
batch_pipe.set_post_commit_callback(|metastore| {
2128+
*metastore.cached_tables.lock().unwrap() = None;
2129+
});
21042130
if drop_if_exists {
21052131
if let Ok(exists_table) = get_table_impl(db_ref.clone(), schema_name.clone(), table_name.clone()) {
21062132
RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?;
@@ -2295,7 +2321,9 @@ impl MetaStore for RocksMetaStore {
22952321
#[tracing::instrument(level = "trace", skip(self))]
22962322
async fn table_ready(&self, id: u64, is_ready: bool) -> Result<IdRow<Table>, CubeError> {
22972323
self.write_operation("table_ready", move |db_ref, batch_pipe| {
2298-
batch_pipe.invalidate_tables_cache();
2324+
batch_pipe.set_post_commit_callback(|metastore| {
2325+
*metastore.cached_tables.lock().unwrap() = None;
2326+
});
22992327
let rocks_table = TableRocksTable::new(db_ref.clone());
23002328
Ok(rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?)
23012329
})
@@ -2305,7 +2333,9 @@ impl MetaStore for RocksMetaStore {
23052333
#[tracing::instrument(level = "trace", skip(self))]
23062334
async fn seal_table(&self, id: u64) -> Result<IdRow<Table>, CubeError> {
23072335
self.write_operation("seal_table", move |db_ref, batch_pipe| {
2308-
batch_pipe.invalidate_tables_cache();
2336+
batch_pipe.set_post_commit_callback(|metastore| {
2337+
*metastore.cached_tables.lock().unwrap() = None;
2338+
});
23092339
let rocks_table = TableRocksTable::new(db_ref.clone());
23102340
Ok(rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?)
23112341
})
@@ -2334,13 +2364,16 @@ impl MetaStore for RocksMetaStore {
23342364
self.write_operation(
23352365
"update_location_download_size",
23362366
move |db_ref, batch_pipe| {
2337-
batch_pipe.invalidate_tables_cache();
2367+
batch_pipe.set_post_commit_callback(|metastore| {
2368+
*metastore.cached_tables.lock().unwrap() = None;
2369+
});
2370+
23382371
let rocks_table = TableRocksTable::new(db_ref.clone());
2339-
Ok(rocks_table.update_with_res_fn(
2372+
rocks_table.update_with_res_fn(
23402373
id,
23412374
|r| r.update_location_download_size(&location, download_size),
23422375
batch_pipe,
2343-
)?)
2376+
)
23442377
},
23452378
)
23462379
.await
@@ -2393,14 +2426,14 @@ impl MetaStore for RocksMetaStore {
23932426
})
23942427
.await
23952428
} else {
2396-
let cache = self.store.cached_tables.clone();
2429+
let cache = self.cached_tables.clone();
23972430

23982431
if let Some(t) = cube_ext::spawn_blocking(move || cache.lock().unwrap().clone()).await?
23992432
{
24002433
return Ok(t);
24012434
}
24022435

2403-
let cache = self.store.cached_tables.clone();
2436+
let cache = self.cached_tables.clone();
24042437
// Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped
24052438
self.read_operation("get_tables_with_path", move |db_ref| {
24062439
let cached_tables = { cache.lock().unwrap().clone() };
@@ -2465,7 +2498,9 @@ impl MetaStore for RocksMetaStore {
24652498
#[tracing::instrument(level = "trace", skip(self))]
24662499
async fn drop_table(&self, table_id: u64) -> Result<IdRow<Table>, CubeError> {
24672500
self.write_operation("drop_table", move |db_ref, batch_pipe| {
2468-
batch_pipe.invalidate_tables_cache();
2501+
batch_pipe.set_post_commit_callback(|metastore| {
2502+
*metastore.cached_tables.lock().unwrap() = None;
2503+
});
24692504
RocksMetaStore::drop_table_impl(table_id, db_ref, batch_pipe)
24702505
})
24712506
.await
@@ -4817,9 +4852,9 @@ fn get_default_index_impl(db_ref: DbTableRef, table_id: u64) -> Result<IdRow<Ind
48174852
/// Note that [current_active] and [new_active] are snapshots at some older point in time. The
48184853
/// relevant partitions might be dropped or changed by the time this function runs. Implementation
48194854
/// must take great care to avoid inconsistencies caused by this.
4820-
fn swap_active_partitions_impl(
4855+
fn swap_active_partitions_impl<S>(
48214856
db_ref: DbTableRef,
4822-
batch_pipe: &mut BatchPipe,
4857+
batch_pipe: &mut BatchPipe<'_, S>,
48234858
current_active: &[(IdRow<Partition>, Vec<IdRow<Chunk>>)],
48244859
new_active: &[(IdRow<Partition>, u64)],
48254860
mut update_new_partition_stats: impl FnMut(/*index*/ usize, &Partition) -> Partition,
@@ -7132,11 +7167,11 @@ mod tests {
71327167
}
71337168

71347169
impl RocksMetaStore {
7135-
fn swap_chunks_impl(
7170+
fn swap_chunks_impl<S>(
71367171
deactivate_ids: Vec<u64>,
71377172
uploaded_ids_and_sizes: Vec<(u64, Option<u64>)>,
71387173
db_ref: DbTableRef,
7139-
batch_pipe: &mut BatchPipe,
7174+
batch_pipe: &mut BatchPipe<'_, S>,
71407175
check_rows: bool,
71417176
new_replay_handle_id: Option<u64>,
71427177
) -> Result<(), CubeError> {
@@ -7234,9 +7269,9 @@ impl RocksMetaStore {
72347269
}
72357270

72367271
impl RocksMetaStore {
7237-
fn drop_index(
7272+
fn drop_index<S>(
72387273
db: DbTableRef,
7239-
pipe: &mut BatchPipe,
7274+
pipe: &mut BatchPipe<'_, S>,
72407275
index_id: u64,
72417276
update_multi_partitions: bool,
72427277
) -> Result<(), CubeError> {

0 commit comments

Comments
 (0)