Skip to content

Commit 43b3fd5

Browse files
authored
refactor(cubestore): Replace invalidate_tables_cache flag with generi… (#10683)
Introduce a generic `set_post_commit_callback` on `BatchPipe<S>` that fires after a successful RocksDB commit on the RW-loop thread. This replaces the hard-coded `invalidate_tables_cache` boolean flag, which was a hack that also ran before the actual commit. - `BatchPipe` is now generic over a store type `S` (defaults to `()`), allowing callbacks to receive a typed store reference - `RocksMetaStore::write_operation` passes `&RocksMetaStore` to callbacks, giving them direct access to `cached_tables` and other metastore state - Moved `cached_tables` from `RocksStore` to `RocksMetaStore` where it belongs - Cache invalidation now only happens when the commit actually succeeds - `RocksTable` methods are generic over `S` so they work with any `BatchPipe<S>`
1 parent 1fab8f1 commit 43b3fd5

File tree

5 files changed

+486
-616
lines changed

5 files changed

+486
-616
lines changed

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,14 +498,14 @@ impl RocksCacheStore {
498498
f: F,
499499
) -> Result<R, CubeError>
500500
where
501-
F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result<R, CubeError>
501+
F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a>) -> Result<R, CubeError>
502502
+ Send
503503
+ Sync
504504
+ 'static,
505505
R: Send + Sync + 'static,
506506
{
507507
self.store
508-
.write_operation_impl::<F, R>(&self.rw_loop_queue_cf, op_name, f)
508+
.write_operation_impl::<F, R, ()>(&self.rw_loop_queue_cf, op_name, f, ())
509509
.await
510510
}
511511

rust/cubestore/cubestore/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ impl From<ParserError> for CubeError {
214214
}
215215
}
216216

217+
impl From<regex::Error> for CubeError {
218+
fn from(v: regex::Error) -> Self {
219+
CubeError::from_error(v)
220+
}
221+
}
222+
217223
impl From<ParquetError> for CubeError {
218224
fn from(v: ParquetError) -> Self {
219225
CubeError::from_error(v.to_string())

0 commit comments

Comments
 (0)