Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion crates/consensus-db/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,16 +475,28 @@ impl Db {
blocks.insert(height, block_bytes)?;
}

// `insert_certificate` records its own write bytes and write time, so
// stop this function's block-write timer before calling it. Otherwise
// the certificate insert would be double-counted in `write_time`: once
// by the outer span here and once by `insert_certificate` internally.
let block_write_time = start.elapsed();

self.insert_certificate(
&tx,
decided_block.certificate,
CommitCertificateType::Minimal,
Some(proposer),
)?;

// The single commit flushes both the block and the certificate. Attribute
// its cost to the block write (counted once) rather than dropping it.
let commit_start = Instant::now();
tx.commit()?;

self.update_write_metrics(write_bytes, start.elapsed());
self.update_write_metrics(
write_bytes,
block_write_time.saturating_add(commit_start.elapsed()),
);

Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion crates/malachite-app/src/handlers/get_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,15 @@ pub async fn handle(
if round.as_i64() == 0 {
if let Some(monitor) = &mut state.proposal_monitor {
debug_assert_eq!(monitor.height, height, "proposal monitor height mismatch");
monitor.record_proposal(proposed_value.value.id());
let new_value = proposed_value.value.id();
if !monitor.record_proposal(new_value) {
warn!(
%height,
first_value = ?monitor.value_id,
%new_value,
"Equivocating proposal at round 0",
);
}
} else {
warn!(%height, %round, "No proposal monitor present");
}
Expand Down
10 changes: 9 additions & 1 deletion crates/malachite-app/src/handlers/received_proposal_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,15 @@ fn record_proposal_in_monitor(state: &mut State, proposed_value: &ProposedValue<
return;
}

monitor.record_proposal(proposed_value.value.id());
let new_value = proposed_value.value.id();
if !monitor.record_proposal(new_value) {
warn!(
height = %proposed_value.height,
first_value = ?monitor.value_id,
%new_value,
"Equivocating proposal at round 0",
);
}
}

struct HandlerContext<'a, 'b> {
Expand Down
106 changes: 104 additions & 2 deletions crates/malachite-app/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ enum StreamInsertResult {
Incomplete(Option<Height>),
ExceededMaxMessages,
ExceededMaxChunkSize(usize),
/// A `Fin` declared a sequence outside `[0, MAX_MESSAGES_PER_STREAM - 1]`,
/// so the stream could never complete within the per-stream message cap.
FinSequenceOutOfRange(Sequence),
Complete(Vec<ProposalPart>),
}

Expand Down Expand Up @@ -275,13 +278,31 @@ impl StreamState {

// This is the `Fin` message.
if msg.is_fin() {
// `expected_messages` is taken directly from the `Fin` sequence,
// which is an unauthenticated wire field the sending peer fully
// controls. A stream can hold at most `MAX_MESSAGES_PER_STREAM`
// parts (sequences `0..MAX_MESSAGES_PER_STREAM`), so the only
// sequences a stream can ever satisfy are `[0, MAX_MESSAGES_PER_STREAM - 1]`.
// Reject anything larger up front:
// * `msg.sequence as usize + 1` would otherwise overflow for
// sequences near `u64::MAX` — a panic in debug / overflow-checked
// builds (a single gossip message could crash the node) and a
// silent wrap in release, and
// * even a non-overflowing but out-of-range sequence makes
// `expected_messages` exceed the message cap, so the stream
// could never complete and would squat a per-peer slot until
// the age sweep clears it.
if msg.sequence >= MAX_MESSAGES_PER_STREAM as Sequence {
return StreamInsertResult::FinSequenceOutOfRange(msg.sequence);
}

self.fin_received = true;

// If we have received the fin message, we can determine when we will be done.
// We are done if we have already received all messages from 0 to fin.sequence,
// included. That is to say, if we have received `fin.sequence + 1` messages.
// Sequence is a u64 protocol field; on 64-bit targets usize == u64.
// The +1 cannot overflow because MAX_MESSAGES_PER_STREAM << u64::MAX.
// Bounded by the check above: `msg.sequence < MAX_MESSAGES_PER_STREAM`,
// so `msg.sequence + 1 <= MAX_MESSAGES_PER_STREAM` and cannot overflow.
#[allow(clippy::cast_possible_truncation, clippy::arithmetic_side_effects)]
{
self.expected_messages = msg.sequence as usize + 1;
Expand Down Expand Up @@ -516,6 +537,19 @@ impl PartStreamsMap {
return InsertResult::Pending;
}

StreamInsertResult::FinSequenceOutOfRange(sequence) => {
warn!(
%peer_id,
%stream_id,
sequence,
max = MAX_MESSAGES_PER_STREAM,
"Stream Fin declared an out-of-range sequence, evicting"
);

self.evict(&key);
return InsertResult::Pending;
}

StreamInsertResult::Complete(parts) => {
self.streams.remove(&key);
parts
Expand Down Expand Up @@ -1993,6 +2027,74 @@ mod tests {
);
}

#[test]
fn test_fin_with_out_of_range_sequence_is_rejected_without_overflow() {
// Regression for the unvalidated `Fin` sequence: a lone `Fin` whose
// sequence is `u64::MAX` must not overflow `expected_messages =
// sequence + 1` (a panic in overflow-checked builds, a silent wrap in
// release). It must be evicted immediately rather than wedging a
// per-peer slot until the age sweep.
let peer = PeerId::random();
let stream = new_stream_id(Height::new(1), Round::new(0), 0);
let mut map = PartStreamsMap::new(Height::new(1), NUM_VALIDATORS);

let msg = make_fin_message(&stream, u64::MAX);
assert!(matches!(map.insert(peer, msg), InsertResult::Pending));
assert_eq!(
map.streams.len(),
0,
"out-of-range Fin must not leave a lingering stream"
);
assert!(
map.evicted.peek(&(peer, stream)).is_some(),
"out-of-range Fin stream should be marked evicted so retries are dropped"
);

// A `Fin` exactly at the cap is also out of range: a stream holds at
// most MAX_MESSAGES_PER_STREAM parts, so the largest satisfiable
// sequence is MAX_MESSAGES_PER_STREAM - 1.
let stream2 = new_stream_id(Height::new(1), Round::new(0), 1);
let msg = make_fin_message(&stream2, MAX_MESSAGES_PER_STREAM as u64);
assert!(matches!(map.insert(peer, msg), InsertResult::Pending));
assert_eq!(map.streams.len(), 0);
}

#[test]
fn test_fin_at_max_in_range_sequence_completes() {
// Boundary guard for the fix above: the largest *valid* Fin sequence is
// MAX_MESSAGES_PER_STREAM - 1, a full stream of MAX_MESSAGES_PER_STREAM
// parts. The new bound must not over-reject it — it must still complete.
let peer = PeerId::random();
let stream = new_stream_id(Height::new(1), Round::new(0), 0);
let mut map = PartStreamsMap::new(Height::new(1), NUM_VALIDATORS);

let last = (MAX_MESSAGES_PER_STREAM - 1) as u64;

// Fill every sequence 0..=last (MAX_MESSAGES_PER_STREAM messages):
// seq 0 : Init
// seq 1..last-1: data parts
// seq last-1 : the proposal's Fin signature part
// seq last : the stream-terminator Fin (completes the stream)
assert!(map
.must_insert(peer, make_message(&stream, 0, make_init_part()))
.is_none());
for seq in 1..(last - 1) {
assert!(map
.must_insert(peer, make_message(&stream, seq, make_data_part(seq as u8)))
.is_none());
}
assert!(map
.must_insert(peer, make_message(&stream, last - 1, make_fin_part()))
.is_none());

let result = map.must_insert(peer, make_fin_message(&stream, last));
assert!(
result.is_some(),
"a full-length stream (Fin sequence = MAX_MESSAGES_PER_STREAM - 1) must complete"
);
assert!(map.streams.is_empty());
}

// --- Property-Based Tests ---

proptest! {
Expand Down
33 changes: 32 additions & 1 deletion crates/spammer/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ impl PartitionMode {
(n + d / 2) / d
}

if round_div(num_accounts, 1 << (num_generators - 1)) == 0 {
// The smallest (first) bucket is `num_accounts` halved `num_generators - 1`
// times, i.e. divided by `1 << (num_generators - 1)`. Once that shift
// amount reaches the platform word size the shift itself overflows
// (panic in debug, silent wrap in release) before the guard can fire, so
// check the shift width first: a divisor of `2^(num_generators - 1)` that
// does not fit in a `usize` already exceeds `num_accounts`, so the
// smallest bucket necessarily rounds to 0. `||` short-circuits, so the
// `1 << shift` below is only evaluated when `shift < usize::BITS`.
let shift = num_generators - 1;
if shift >= usize::BITS as usize || round_div(num_accounts, 1 << shift) == 0 {
eyre::bail!("too many generators: it would result in a bucket with size 0");
}

Expand Down Expand Up @@ -187,4 +196,26 @@ mod tests {
}
Ok(())
}

#[tokio::test]
async fn partition_accounts_exponential_rejects_too_many_generators() -> Result<()> {
// `num_generators - 1 >= usize::BITS` makes the `1 << (num_generators - 1)`
// bucket-count guard overflow its shift (panic in debug, garbage
// partition in release). It must instead return the graceful
// "too many generators" error, like the in-range `(100, 9)` case does.
let shift_overflow_cases = [
(130, 65), // shift == usize::BITS exactly (the overflowing boundary)
(1000, 100), // shift well past the word size
(200, 64), // shift == usize::BITS - 1: valid shift, bucket still rounds to 0
];
for (num_accounts, num_generators) in shift_overflow_cases {
let result =
PartitionMode::Exponential.partition_accounts(num_accounts, num_generators);
assert!(
result.is_err(),
"expected graceful error for ({num_accounts}, {num_generators}), got {result:?}"
);
}
Ok(())
}
}
12 changes: 6 additions & 6 deletions crates/spammer/src/latency/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ impl BlockStream {
&mut ws_client,
*next_expected_height,
height - 1,
notification.received_at,
block_sender,
)
.await?;
Expand Down Expand Up @@ -351,7 +350,6 @@ impl BlockStream {
ws_client: &mut WsClient,
start_height: u64,
end_height: u64,
received_at: u64,
block_sender: &Sender<BlockEvent>,
) -> Result<()> {
if start_height > end_height {
Expand All @@ -365,10 +363,12 @@ impl BlockStream {
let hex_height = format!("0x{height:x}");
match Self::fetch_block(ws_client, ETH_GET_BLOCK_BY_NUMBER, &hex_height).await {
FetchResult::Ok(block) => {
// Send the block to the tracker.
// The timestamp for catch-up blocks is the
// arrival time of the first notification.
// TODO: should we use the blocks' timestamps?
// Timestamp each catch-up block at the moment it is actually
// fetched, not with a single shared reconnect timestamp.
// Sharing one `received_at` across the whole gap collapses
// every missed block onto the same instant, flattening the
// observed latency distribution during gap scans.
let received_at = timestamp_now();
if block_sender
.send(BlockEvent { block, received_at })
.await
Expand Down
18 changes: 18 additions & 0 deletions crates/types/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ macro_rules! impl_versioned_codec {
return Err($crate::codec::error::CodecError::EmptyBytes);
}

// Fast path: a leading version byte unambiguously marks a
// versioned message, so decode it directly and skip the
// always-failing legacy protobuf attempt below. This is sound
// because the version prefix (0x01) can never be the first byte
// of a valid protobuf message: a protobuf field tag is
// `(field_number << 3) | wire_type` with `field_number >= 1`, so
// the smallest possible leading byte is 0x08. All current
// NetCodec/WalCodec messages carry this prefix, so this is the
// common case.
if bytes.first() == Some(&($version_val as u8)) {
bytes.advance(1);
return malachitebft_codec::Codec::decode(
&$crate::codec::proto::ProtobufCodec,
bytes,
)
.map_err($crate::codec::error::CodecError::Protobuf);
}

// TODO: Phase 3: Remove after all nodes are upgraded to use versioning
if let Ok(msg) = malachitebft_codec::Codec::decode(
&$crate::codec::proto::ProtobufCodec,
Expand Down
Loading