Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions rust/lance/src/dataset/mem_wal/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,4 +606,188 @@ mod tests {
let result = manifest_store.write(&manifest2).await;
assert!(result.is_err());
}

// ========================================================================
// Fencing tests
// ========================================================================

#[tokio::test]
async fn test_claim_epoch_new_shard() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

let (epoch, manifest) = manifest_store.claim_epoch(0).await.unwrap();
assert_eq!(epoch, 1);
assert_eq!(manifest.version, 1);
assert_eq!(manifest.writer_epoch, 1);
}

#[tokio::test]
async fn test_claim_epoch_sequential_fences_previous() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

// Writer A claims
let (epoch_a, _) = manifest_store.claim_epoch(0).await.unwrap();
assert_eq!(epoch_a, 1);

// Writer B claims the same shard — should get epoch 2
let (epoch_b, manifest_b) = manifest_store.claim_epoch(0).await.unwrap();
assert_eq!(epoch_b, 2);
assert_eq!(manifest_b.version, 2);
assert_eq!(manifest_b.writer_epoch, 2);

// Writer A's epoch (1) is now fenced by epoch 2
let result = manifest_store.check_fenced(epoch_a).await;
assert!(result.is_err(), "Writer A should be fenced");
assert!(
result.unwrap_err().to_string().contains("Writer fenced"),
"Error should mention fencing"
);

// Writer B's epoch (2) is still valid
manifest_store.check_fenced(epoch_b).await.unwrap();
}

#[tokio::test]
async fn test_claim_epoch_concurrent_same_version_one_wins() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();

// Two manifest stores pointing at the same shard (simulates two processes)
let store_a = ShardManifestStore::new(store.clone(), &base_path, shard_id, 2);
let store_b = ShardManifestStore::new(store, &base_path, shard_id, 2);

// Writer A claims first
let (epoch_a, _) = store_a.claim_epoch(0).await.unwrap();
assert_eq!(epoch_a, 1);

// Writer B also sees empty shard and tries version 1 — but A already wrote it.
// However, claim_epoch reads the latest first, so B will see version 1 and
// try version 2. This tests the sequential (non-concurrent) path.
let (epoch_b, _) = store_b.claim_epoch(0).await.unwrap();
assert_eq!(epoch_b, 2);
}

#[tokio::test]
async fn test_check_fenced_no_manifest_passes() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

// No manifest exists — check_fenced should pass (no one to fence against)
manifest_store.check_fenced(1).await.unwrap();
}

#[tokio::test]
async fn test_check_fenced_equal_epoch_passes() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

let manifest = create_test_manifest(shard_id, 1, 5);
manifest_store.write(&manifest).await.unwrap();

// Same epoch should pass
manifest_store.check_fenced(5).await.unwrap();
}

#[tokio::test]
async fn test_check_fenced_higher_local_epoch_passes() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

let manifest = create_test_manifest(shard_id, 1, 3);
manifest_store.write(&manifest).await.unwrap();

// Local epoch higher than stored — should pass
manifest_store.check_fenced(5).await.unwrap();
}

#[tokio::test]
async fn test_check_fenced_lower_local_epoch_fails() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

let manifest = create_test_manifest(shard_id, 1, 5);
manifest_store.write(&manifest).await.unwrap();

// Local epoch lower than stored — should fail
let result = manifest_store.check_fenced(3).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Writer fenced"));
assert!(err_msg.contains("local epoch 3"));
assert!(err_msg.contains("stored epoch 5"));
}

#[tokio::test]
async fn test_commit_update_rejects_fenced_epoch() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

// Write initial manifest with epoch 5
let manifest = create_test_manifest(shard_id, 1, 5);
manifest_store.write(&manifest).await.unwrap();

// Try to commit_update with a fenced epoch (3 < 5)
let result = manifest_store
.commit_update(3, |m| ShardManifest {
version: m.version + 1,
writer_epoch: 3,
..m.clone()
})
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Writer fenced"));
}

#[tokio::test]
async fn test_commit_update_succeeds_with_current_epoch() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

// Claim epoch
let (epoch, _) = manifest_store.claim_epoch(0).await.unwrap();

// commit_update with the same epoch should succeed
let updated = manifest_store
.commit_update(epoch, |m| ShardManifest {
version: m.version + 1,
writer_epoch: epoch,
current_generation: m.current_generation + 1,
..m.clone()
})
.await
.unwrap();
assert_eq!(updated.version, 2);
assert_eq!(updated.current_generation, 2);
}

#[tokio::test]
async fn test_three_writers_only_last_survives() {
let (store, base_path, _temp_dir) = create_local_store().await;
let shard_id = Uuid::new_v4();
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);

let (epoch_a, _) = manifest_store.claim_epoch(0).await.unwrap();
let (epoch_b, _) = manifest_store.claim_epoch(0).await.unwrap();
let (epoch_c, _) = manifest_store.claim_epoch(0).await.unwrap();

assert_eq!(epoch_a, 1);
assert_eq!(epoch_b, 2);
assert_eq!(epoch_c, 3);

// A and B are fenced
assert!(manifest_store.check_fenced(epoch_a).await.is_err());
assert!(manifest_store.check_fenced(epoch_b).await.is_err());
// C is still valid
manifest_store.check_fenced(epoch_c).await.unwrap();
}
}
Loading
Loading