Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> {
"queue_result_by_id_external_id_mismatch",
queue_result_by_id_external_id_mismatch,
),
t(
"queue_result_ack_multiple_with_external_id",
queue_result_ack_multiple_with_external_id,
),
t("limit_pushdown_group", limit_pushdown_group),
t("limit_pushdown_group_order", limit_pushdown_group_order),
t(
Expand Down Expand Up @@ -363,6 +367,7 @@ lazy_static::lazy_static! {
"queue_custom_orphaned",
"queue_result_by_external_id",
"queue_result_by_id_external_id_mismatch",
"queue_result_ack_multiple_with_external_id",
"queue_full_workflow_v1",
"queue_full_workflow_v2",
"queue_full_workflow_v2_with_external_id",
Expand Down Expand Up @@ -11314,6 +11319,109 @@ async fn queue_result_by_id_external_id_mismatch(
Ok(())
}

async fn queue_result_ack_multiple_with_external_id(
service: Box<dyn SqlClient>,
) -> Result<(), CubeError> {
// 2 items with external_id
let add1 = service
.exec_query(
r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-multi-1' "STANDALONE#queue:multi1" "payload1";"#,
)
.await?;
let id1 = assert_queue_add_and_get_id(&add1)?;

let add2 = service
.exec_query(
r#"QUEUE ADD PRIORITY 1 EXTERNAL_ID 'ext-multi-2' "STANDALONE#queue:multi2" "payload2";"#,
)
.await?;
let id2 = assert_queue_add_and_get_id(&add2)?;

// 2 items without external_id
let add3 = service
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:multi3" "payload3";"#)
.await?;
let id3 = assert_queue_add_and_get_id(&add3)?;

let add4 = service
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:multi4" "payload4";"#)
.await?;
let id4 = assert_queue_add_and_get_id(&add4)?;

// ACK all 4 items
let ack1 = service
.exec_query(r#"QUEUE ACK "STANDALONE#queue:multi1" "result:multi1""#)
.await?;
assert_eq!(
ack1.get_rows(),
&vec![Row::new(vec![TableValue::Boolean(true)])]
);

let ack2 = service
.exec_query(r#"QUEUE ACK "STANDALONE#queue:multi2" "result:multi2""#)
.await?;
assert_eq!(
ack2.get_rows(),
&vec![Row::new(vec![TableValue::Boolean(true)])]
);

let ack3 = service
.exec_query(r#"QUEUE ACK "STANDALONE#queue:multi3" "result:multi3""#)
.await?;
assert_eq!(
ack3.get_rows(),
&vec![Row::new(vec![TableValue::Boolean(true)])]
);

let ack4 = service
.exec_query(r#"QUEUE ACK "STANDALONE#queue:multi4" "result:multi4""#)
.await?;
assert_eq!(
ack4.get_rows(),
&vec![Row::new(vec![TableValue::Boolean(true)])]
);

// Verify results for items with external_id
let result1 = service
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:multi1""#)
.await?;
assert_queue_result_columns(&result1);
assert_eq!(
result1.get_rows(),
&vec![queue_result_row("result:multi1", &id1, Some("ext-multi-1"))]
);

let result2 = service
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:multi2""#)
.await?;
assert_queue_result_columns(&result2);
assert_eq!(
result2.get_rows(),
&vec![queue_result_row("result:multi2", &id2, Some("ext-multi-2"))]
);

// Verify results for items without external_id
let result3 = service
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:multi3""#)
.await?;
assert_queue_result_columns(&result3);
assert_eq!(
result3.get_rows(),
&vec![queue_result_row("result:multi3", &id3, None)]
);

let result4 = service
.exec_query(r#"QUEUE RESULT "STANDALONE#queue:multi4""#)
.await?;
assert_queue_result_columns(&result4);
assert_eq!(
result4.get_rows(),
&vec![queue_result_row("result:multi4", &id4, None)]
);

Ok(())
}

async fn sys_cachestore_info(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
service.migration_run_next_query();
service.exec_query("SYS CACHESTORE INFO").await?;
Expand Down
103 changes: 103 additions & 0 deletions rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2495,4 +2495,107 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_queue_add_none_external_id_after_rebuild() -> Result<(), CubeError> {
init_test_logger().await;

let (_, cachestore) = RocksCacheStore::prepare_test_cachestore(
"test_queue_add_none_ext_rebuild",
Config::test("test_queue_add_none_ext_rebuild"),
);

// Add two queue items without external_id
cachestore
.queue_add(QueueAddPayload {
path: "prefix:path1".to_string(),
value: "v1".to_string(),
priority: 0,
orphaned: None,
process_id: None,
exclusive: false,
external_id: None,
})
.await?;

cachestore
.queue_add(QueueAddPayload {
path: "prefix:path2".to_string(),
value: "v2".to_string(),
priority: 0,
orphaned: None,
process_id: None,
exclusive: false,
external_id: None,
})
.await?;

// Add one item with a real external_id
cachestore
.queue_add(QueueAddPayload {
path: "prefix:path_ext".to_string(),
value: "v_ext".to_string(),
priority: 0,
orphaned: None,
process_id: None,
exclusive: false,
external_id: Some("ext-real".to_string()),
})
.await?;

// Simulate migration: rebuild the ByExternalId index.
cachestore
.read_operation_queue("test_rebuild_index", move |db_ref| {
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
let indexes = QueueItemRocksTable::indexes();

queue_schema.migrate()?;

Ok(())
})
.await?;

// After rebuild, adding another item without external_id should still succeed
let res = cachestore
.queue_add(QueueAddPayload {
path: "prefix:path3".to_string(),
value: "v3".to_string(),
priority: 0,
orphaned: None,
process_id: None,
exclusive: false,
external_id: None,
})
.await;
assert!(
res.is_ok(),
"Insert with None external_id after index rebuild should succeed, got: {:?}",
res.err()
);

// Uniqueness for real external_id values should still be enforced after rebuild
let res = cachestore
.queue_add(QueueAddPayload {
path: "prefix:path_ext_dup".to_string(),
value: "v_ext_dup".to_string(),
priority: 0,
orphaned: None,
process_id: None,
exclusive: false,
external_id: Some("ext-real".to_string()),
})
.await;
assert!(
res.is_err(),
"Duplicate external_id should still fail after rebuild"
);
assert!(res
.unwrap_err()
.to_string()
.contains("Unique constraint violation"));

RocksCacheStore::cleanup_test_cachestore("test_queue_add_none_ext_rebuild");

Ok(())
}
}
2 changes: 1 addition & 1 deletion rust/cubestore/cubestore/src/cachestore/queue_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct QueueItem {

impl RocksEntity for QueueItem {
fn version() -> u32 {
4
5
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct QueueItemPayload {

impl RocksEntity for QueueItemPayload {
fn version() -> u32 {
2
3
}
}

Expand Down
11 changes: 8 additions & 3 deletions rust/cubestore/cubestore/src/metastore/rocks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
let serialized_row = ser.take_buffer();

for index in Self::indexes().iter() {
if index.is_unique() {
if index.is_unique() && index.should_index_row(&row) {
let hash = index.key_hash(&row);
let index_val = index.index_key_by(&row);
let existing_keys =
Expand Down Expand Up @@ -733,9 +733,12 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
log_shown = true;
}
let row = row?;
let index_row = self.index_key_val(row.get_row(), row.get_id(), index);
batch.put(index_row.key, index_row.val);
if index.should_index_row(row.get_row()) {
let index_row = self.index_key_val(row.get_row(), row.get_id(), index);
batch.put(index_row.key, index_row.val);
}
}

batch.put(
&RowKey::SecondaryIndexInfo {
index_id: Self::index_id(index.get_id()),
Expand All @@ -748,6 +751,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
.as_slice(),
);
self.db().write(batch)?;

if log_shown {
log::info!(
"Rebuilding metastore index {:?} for table {:?} complete ({:?})",
Expand All @@ -756,6 +760,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
time.elapsed()?
);
}

Ok(())
}

Expand Down
Loading