Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions crutest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2517,7 +2517,8 @@ async fn replace_before_active(
// generation numbers.
fill_workload(volume, di, true).await?;
let ds_total = targets.len() - 1;
let mut old_ds = 0;
let mut old_ds_a = 0;
let mut old_ds_b = 1;
let mut new_ds = targets.len() - 1;
for c in 1.. {
info!(log, "[{c}] Touch every extent");
Expand All @@ -2533,41 +2534,44 @@ async fn replace_before_active(
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}

// Stop a downstairs, wait for dsc to confirm it is stopped.
dsc_client.dsc_stop(old_ds).await.unwrap();
loop {
let res = dsc_client.dsc_get_ds_state(old_ds).await.unwrap();
let state = res.into_inner();
if state == DownstairsState::Exit {
break;
// Stop two downstairs, wait for dsc to confirm they are stopped.
for old_ds in [old_ds_a, old_ds_b] {
dsc_client.dsc_stop(old_ds).await.unwrap();
loop {
let res = dsc_client.dsc_get_ds_state(old_ds).await.unwrap();
let state = res.into_inner();
if state == DownstairsState::Exit {
break;
}
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}

info!(log, "[{c}] Request the upstairs activate");
// Spawn a task to re-activate, this will not finish till all three
// downstairs respond.
// Spawn a task to re-activate, this will not finish until 2-3
// downstairs respond (and we have disabled all but 1)
gen += 1;
let gc = volume.clone();
let handle =
tokio::spawn(async move { gc.activate_with_gen(gen).await });

// Give the activation request time to percolate in the upstairs.
// Give the activation request time to percolate in the upstairs; it
// shouldn't get anywhere because we don't have enough downstairs
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
let is_active = volume.query_is_active().await.unwrap();
info!(log, "[{c}] activate should now be waiting {:?}", is_active);
assert!(!is_active);

info!(
log,
"[{c}] Replacing DS {old_ds}:{} with {new_ds}:{}",
targets[old_ds as usize],
"[{c}] Replacing DS {old_ds_a}:{} with {new_ds}:{}",
targets[old_ds_a as usize],
targets[new_ds],
);
match volume
.replace_downstairs(
Uuid::new_v4(),
targets[old_ds as usize],
targets[old_ds_a as usize],
targets[new_ds],
)
.await
Expand All @@ -2578,6 +2582,9 @@ async fn replace_before_active(
}
}

// At this point, we've got two Downstairs (one of which was provided
// initially, and one of which has just been replaced), so activation
// should happen!
info!(log, "[{c}] Wait for activation after replacement");
loop {
let is_active = volume.query_is_active().await.unwrap();
Expand All @@ -2602,8 +2609,10 @@ async fn replace_before_active(
}

// Start up the old downstairs so it is ready for the next loop.
let res = dsc_client.dsc_start(old_ds).await;
info!(log, "[{c}] Replay: started {old_ds}, returned:{:?}", res);
for old_ds in [old_ds_a, old_ds_b] {
Comment thread
mkeeter marked this conversation as resolved.
Outdated
let res = dsc_client.dsc_start(old_ds).await;
info!(log, "[{c}] Replay: started {old_ds}, returned:{:?}", res);
}

// Wait for all IO to finish before we continue
loop {
Expand All @@ -2622,12 +2631,13 @@ async fn replace_before_active(
tokio::time::sleep(tokio::time::Duration::from_secs(4)).await;
}

old_ds = (old_ds + 1) % (ds_total as u32 + 1);
old_ds_a = (old_ds_a + 1) % (ds_total as u32 + 1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... Will this test pass if you do 2 region sets?
I think this logic can get you 1 downstairs in one region set, and a 2nd in a different region set, which is not the behavior we want here. Also, If we don't have multiple region set testing turned on by default, I'm going to go do that everywhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I notice a CI failure in test-up-2region-encrypted, so I suspect that you're correct 😄

Copy link
Copy Markdown
Contributor Author

@mkeeter mkeeter Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fixed in d2d168e bdcfc08.

It's somewhat subtle: the unused downstairs is always right before the downstairs we're about to replace, but the second downstairs in that set could either be before or after (and shifts over time). That commit tracks which downstairs is in which region set, so that we can always disable 2 in the same region set (and prevent activation, which is the whole point).

old_ds_b = (old_ds_b + 1) % (ds_total as u32 + 1);
new_ds = (new_ds + 1) % (ds_total + 1);

match wtq {
WhenToQuit::Count { count } => {
if c > count {
if c >= count {
break;
}
}
Expand Down
171 changes: 171 additions & 0 deletions integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,24 @@ mod test {
Ok(())
}

/// Stops the downstairs task, return a `(port, rport)` tuple
pub async fn stop(&mut self) -> Result<(u16, u16)> {
let ds = self.downstairs.take().unwrap();
let port = ds.address().port();
let rport = ds.repair_address().port();
ds.stop().await?;
Ok((port, rport))
}

pub async fn reboot_read_write(&mut self) -> Result<()> {
self.reboot_read_write_with_ports(0, 0).await
}

pub async fn reboot_read_write_with_ports(
&mut self,
port: u16,
rport: u16,
) -> Result<()> {
let downstairs =
Downstairs::new_builder(self.tempdir.path(), false)
.set_logger(csl())
Expand All @@ -134,6 +151,8 @@ mod test {
downstairs,
DownstairsClientSettings {
address: self.address,
port,
rport,
..DownstairsClientSettings::default()
},
)
Expand Down Expand Up @@ -5839,4 +5858,156 @@ mod test {
// Make sure everything worked
volume.activate().await.unwrap();
}

#[tokio::test]
async fn connect_two_ds_then_deactivate() {
const BLOCK_SIZE: usize = 512;

// Spin off three downstairs, build our Crucible struct.
let mut tds = TestDownstairsSet::small(false).await.unwrap();
let opts = tds.opts();
tds.downstairs1.stop().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let (guest, io) = Guest::new(None);
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
guest.activate().await.unwrap();

let res = guest
.write(
BlockIndex(0),
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
)
.await;
assert!(res.is_ok());

guest.deactivate().await.unwrap();
}

#[tokio::test]
async fn connect_two_ds_then_another() {
const BLOCK_SIZE: usize = 512;

// Spin off three downstairs, build our Crucible struct.
let mut tds = TestDownstairsSet::small(false).await.unwrap();
let opts = tds.opts();
let (ds1_port, ds1_rport) = tds.downstairs1.stop().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let (guest, io) = Guest::new(None);
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
guest.activate().await.unwrap();

let res = guest
.write(
BlockIndex(0),
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
)
.await;
assert!(res.is_ok());

// Restart downstairs1, which should use live-repair to join the quorum
//
// We have to wait a while here, because there's a 10-second reconnect
// delay.
tds.downstairs1
.reboot_read_write_with_ports(ds1_port, ds1_rport)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
guest.deactivate().await.unwrap();

// Reconnect with only ds1 running, then confirm that it received the
// writes. We'll come up in read-only mode so that we can connect with
// just a single Downstairs, to make sure the reads go to DS1.
tds.downstairs1.reboot_read_only().await.unwrap();
tds.downstairs2.stop().await.unwrap();
tds.downstairs3.stop().await.unwrap();
tds.crucible_opts.read_only = true;
tds.crucible_opts.target[0] = tds.downstairs1.address();
let opts = tds.opts();
let (guest, io) = Guest::new(None);
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
guest.activate().await.unwrap();
let mut buf = Buffer::new(2, BLOCK_SIZE);
guest.read(BlockIndex(0), &mut buf).await.unwrap();

assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]);
}

#[tokio::test]
async fn min_quorum_live_repair() {
const BLOCK_SIZE: usize = 512;

// Spin off three downstairs, build our Crucible struct.
let mut tds = TestDownstairsSet::small(false).await.unwrap();

// Stop downstairs 1 before constructing the guest, so it won't be
// included and we'll do min-quorum reconciliation.
let (port, rport) = tds.downstairs1.stop().await.unwrap();

// Start the guest and do a write to ds 2 and 3.
let (guest, io) = Guest::new(None);
let opts = tds.opts();
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
guest.activate().await.unwrap();
let res = guest
.write(
BlockIndex(0),
BytesMut::from(vec![0x55; BLOCK_SIZE * 2].as_slice()),
)
.await;
assert!(res.is_ok());

// Deactivate the guest, all without downstairs 1 participating
guest.deactivate().await.unwrap();

// At this point, the data has been written to DS 2 and 3. We'll start
// up again with DS 1 and 2, so min-quorum should do reconciliation.

tds.downstairs1
.reboot_read_write_with_ports(port, rport)
.await
.unwrap();
tds.downstairs2.stop().await.unwrap();
guest.activate_with_gen(2).await.unwrap();

let mut buf = Buffer::new(2, BLOCK_SIZE);
guest.read(BlockIndex(0), &mut buf).await.unwrap();

assert_eq!(buf.to_vec(), vec![0x55; BLOCK_SIZE * 2]);
}

#[tokio::test]
async fn min_quorum_cancel() {
// Spin off three downstairs, build our Crucible struct.
let mut tds = TestDownstairsSet::small(false).await.unwrap();

// Stop downstairs 1 before constructing the guest, so it won't be
// included and we'll do min-quorum reconciliation.
let (port, rport) = tds.downstairs1.stop().await.unwrap();

// Start the guest and do a write to ds 2 and 3.
let (guest, io) = Guest::new(None);
let opts = tds.opts();
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
let s = tokio::spawn(async move { guest.activate().await });

// Get into our min-quorum wait, which is 500 ms
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Stop DS2
tds.downstairs2.stop().await.unwrap();

// Wait for the min-quorum timer to go off; it shouldn't panic!
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// Restart DS1, we're now eligible for min-quorum negotiation again
tds.downstairs1
.reboot_read_write_with_ports(port, rport)
.await
.unwrap();

s.await.unwrap().unwrap()
}
}
Loading