diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index 08d0576f6823f..e5c3389bf83f2 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -298,6 +298,10 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> { "queue_result_by_id_external_id_mismatch", queue_result_by_id_external_id_mismatch, ), + t( + "queue_result_ack_multiple_with_external_id", + queue_result_ack_multiple_with_external_id, + ), t("limit_pushdown_group", limit_pushdown_group), t("limit_pushdown_group_order", limit_pushdown_group_order), t( @@ -363,6 +367,7 @@ lazy_static::lazy_static! { "queue_custom_orphaned", "queue_result_by_external_id", "queue_result_by_id_external_id_mismatch", + "queue_result_ack_multiple_with_external_id", "queue_full_workflow_v1", "queue_full_workflow_v2", "queue_full_workflow_v2_with_external_id", @@ -11314,6 +11319,109 @@ async fn queue_result_by_id_external_id_mismatch( Ok(()) } +async fn queue_result_ack_multiple_with_external_id( + service: Box, +) -> Result<(), CubeError> { + // 2 items with external_id + let add1 = service + .exec_query( + r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-multi-1' "STANDALONE#queue:multi1" "payload1";"#, + ) + .await?; + let id1 = assert_queue_add_and_get_id(&add1)?; + + let add2 = service + .exec_query( + r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-multi-2' "STANDALONE#queue:multi2" "payload2";"#, + ) + .await?; + let id2 = assert_queue_add_and_get_id(&add2)?; + + // 2 items without external_id + let add3 = service + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:multi3" "payload3";"#) + .await?; + let id3 = assert_queue_add_and_get_id(&add3)?; + + let add4 = service + .exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:multi4" "payload4";"#) + .await?; + let id4 = assert_queue_add_and_get_id(&add4)?; + + // ACK all 4 items + let ack1 = service + .exec_query(r#"QUEUE ACK "STANDALONE#queue:multi1" "result:multi1""#) + .await?; + assert_eq!( + ack1.get_rows(), + &vec![Row::new(vec![TableValue::Boolean(true)])] + ); + + let ack2 = service + .exec_query(r#"QUEUE ACK "STANDALONE#queue:multi2" "result:multi2""#) + .await?; + assert_eq!( + ack2.get_rows(), + &vec![Row::new(vec![TableValue::Boolean(true)])] + ); + + let ack3 = service + .exec_query(r#"QUEUE ACK "STANDALONE#queue:multi3" "result:multi3""#) + .await?; + assert_eq!( + ack3.get_rows(), + &vec![Row::new(vec![TableValue::Boolean(true)])] + ); + + let ack4 = service + .exec_query(r#"QUEUE ACK "STANDALONE#queue:multi4" "result:multi4""#) + .await?; + assert_eq!( + ack4.get_rows(), + &vec![Row::new(vec![TableValue::Boolean(true)])] + ); + + // Verify results for items with external_id + let result1 = service + .exec_query(r#"QUEUE RESULT "STANDALONE#queue:multi1""#) + .await?; + assert_queue_result_columns(&result1); + assert_eq!( + result1.get_rows(), + &vec![queue_result_row("result:multi1", &id1, Some("ext-multi-1"))] + ); + + let result2 = service + .exec_query(r#"QUEUE RESULT "STANDALONE#queue:multi2""#) + .await?; + assert_queue_result_columns(&result2); + assert_eq!( + result2.get_rows(), + &vec![queue_result_row("result:multi2", &id2, Some("ext-multi-2"))] + ); + + // Verify results for items without external_id + let result3 = service + .exec_query(r#"QUEUE RESULT "STANDALONE#queue:multi3""#) + .await?; + assert_queue_result_columns(&result3); + assert_eq!( + result3.get_rows(), + &vec![queue_result_row("result:multi3", &id3, None)] + ); + + let result4 = service + .exec_query(r#"QUEUE RESULT "STANDALONE#queue:multi4""#) + .await?; + assert_queue_result_columns(&result4); + assert_eq!( + result4.get_rows(), + &vec![queue_result_row("result:multi4", &id4, None)] + ); + + Ok(()) +} + async fn sys_cachestore_info(service: Box) -> Result<(), CubeError> { service.migration_run_next_query(); service.exec_query("SYS CACHESTORE INFO").await?; diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 322cd3a57930b..b2f3b3d10fc49 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -2495,4 +2495,110 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_queue_add_none_external_id_after_rebuild() -> Result<(), CubeError> { + init_test_logger().await; + + let (_, cachestore) = RocksCacheStore::prepare_test_cachestore( + "test_queue_add_none_ext_rebuild", + Config::test("test_queue_add_none_ext_rebuild"), + ); + + // Add two queue items without external_id + cachestore + .queue_add(QueueAddPayload { + path: "prefix:path1".to_string(), + value: "v1".to_string(), + priority: 0, + orphaned: None, + process_id: None, + exclusive: false, + external_id: None, + }) + .await?; + + cachestore + .queue_add(QueueAddPayload { + path: "prefix:path2".to_string(), + value: "v2".to_string(), + priority: 0, + orphaned: None, + process_id: None, + exclusive: false, + external_id: None, + }) + .await?; + + // Add one item with a real external_id + cachestore + .queue_add(QueueAddPayload { + path: "prefix:path_ext".to_string(), + value: "v_ext".to_string(), + priority: 0, + orphaned: None, + process_id: None, + exclusive: false, + external_id: Some("ext-real".to_string()), + }) + .await?; + + // Simulate migration: rebuild the ByExternalId index. + cachestore + .read_operation_queue("test_rebuild_index", move |db_ref| { + let queue_schema = QueueItemRocksTable::new(db_ref.clone()); + let indexes = QueueItemRocksTable::indexes(); + + // Force rebuild index manually, instead of relying on the automatic rebuild (it will ignore). + for index in indexes.iter() { + queue_schema.rebuild_index(index)?; + } + + Ok(()) + }) + .await?; + + // After rebuild, adding another item without external_id should still succeed + let res = cachestore + .queue_add(QueueAddPayload { + path: "prefix:path3".to_string(), + value: "v3".to_string(), + priority: 0, + orphaned: None, + process_id: None, + exclusive: false, + external_id: None, + }) + .await; + assert!( + res.is_ok(), + "Insert with None external_id after index rebuild should succeed, got: {:?}", + res.err() + ); + + // Uniqueness for real external_id values should still be enforced after rebuild + let res = cachestore + .queue_add(QueueAddPayload { + path: "prefix:path_ext_dup".to_string(), + value: "v_ext_dup".to_string(), + priority: 0, + orphaned: None, + process_id: None, + exclusive: false, + external_id: Some("ext-real".to_string()), + }) + .await; + assert!( + res.is_err(), + "Duplicate external_id should still fail after rebuild" + ); + assert!(res + .unwrap_err() + .to_string() + .contains("Unique constraint violation")); + + RocksCacheStore::cleanup_test_cachestore("test_queue_add_none_ext_rebuild"); + + Ok(()) + } } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item.rs b/rust/cubestore/cubestore/src/cachestore/queue_item.rs index e9d919c1e800e..bc4b6400bd31d 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item.rs @@ -101,7 +101,7 @@ pub struct QueueItem { impl RocksEntity for QueueItem { fn version() -> u32 { - 4 + 5 } } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_item_payload.rs b/rust/cubestore/cubestore/src/cachestore/queue_item_payload.rs index a4c6acc627840..760da862e8781 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_item_payload.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_item_payload.rs @@ -19,7 +19,7 @@ pub struct QueueItemPayload { impl RocksEntity for QueueItemPayload { fn version() -> u32 { - 2 + 3 } } diff --git a/rust/cubestore/cubestore/src/cachestore/queue_result.rs b/rust/cubestore/cubestore/src/cachestore/queue_result.rs index c759b7e54730c..d2eaa34aa065d 100644 --- a/rust/cubestore/cubestore/src/cachestore/queue_result.rs +++ b/rust/cubestore/cubestore/src/cachestore/queue_result.rs @@ -22,7 +22,7 @@ pub struct QueueResult { impl RocksEntity for QueueResult { fn version() -> u32 { - 3 + 4 } } diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 280df9394705f..8b8bacacc55c4 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -509,7 +509,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { let serialized_row = ser.take_buffer(); for index in Self::indexes().iter() { - if index.is_unique() { + if index.is_unique() && index.should_index_row(&row) { let hash = index.key_hash(&row); let index_val = index.index_key_by(&row); let existing_keys = @@ -733,9 +733,12 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { log_shown = true; } let row = row?; - let index_row = self.index_key_val(row.get_row(), row.get_id(), index); - batch.put(index_row.key, index_row.val); + if index.should_index_row(row.get_row()) { + let index_row = self.index_key_val(row.get_row(), row.get_id(), index); + batch.put(index_row.key, index_row.val); + } } + batch.put( &RowKey::SecondaryIndexInfo { index_id: Self::index_id(index.get_id()), @@ -748,6 +751,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { .as_slice(), ); self.db().write(batch)?; + if log_shown { log::info!( "Rebuilding metastore index {:?} for table {:?} complete ({:?})", @@ -756,6 +760,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { time.elapsed()? ); } + Ok(()) }