From f65ce500f9138502d7b3eecfd1b55860b13f0582 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Mon, 25 May 2026 23:48:44 +0400 Subject: [PATCH 1/8] fix(malachite): tighten injected validation --- ethexe/common/src/malachite.rs | 2 +- ethexe/malachite/core/src/streaming.rs | 40 +++- ethexe/malachite/service/src/externalities.rs | 176 +++++++++++++++--- 3 files changed, 180 insertions(+), 38 deletions(-) diff --git a/ethexe/common/src/malachite.rs b/ethexe/common/src/malachite.rs index 0e7e50a112f..ea21122522b 100644 --- a/ethexe/common/src/malachite.rs +++ b/ethexe/common/src/malachite.rs @@ -42,7 +42,7 @@ pub enum Transaction { Injected(SignedInjectedTransaction), } -/// Placeholder; shape firms up once executor plumbing lands. +/// Reserved per-MB limits for scheduled task progress. #[derive(Clone, Debug, Default, PartialEq, Eq, Encode, Decode, TypeInfo)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub struct ProgressTasksLimits {} diff --git a/ethexe/malachite/core/src/streaming.rs b/ethexe/malachite/core/src/streaming.rs index 7f9d3c37778..3ffed0fe753 100644 --- a/ethexe/malachite/core/src/streaming.rs +++ b/ethexe/malachite/core/src/streaming.rs @@ -196,15 +196,17 @@ impl PartStreamsMap { msg: StreamMessage, ) -> Option { let stream_id = msg.stream_id.clone(); - let state = self - .streams - .entry((peer_id, stream_id.clone())) - .or_default(); - if !state.seen_sequences.insert(msg.sequence) { - return None; - } - let result = state.insert(msg); - if state.is_done() { + let result = { + let state = self + .streams + .entry((peer_id, stream_id.clone())) + .or_default(); + if !state.seen_sequences.insert(msg.sequence) { + return None; + } + state.insert(msg) + }; + if result.is_some() { self.streams.remove(&(peer_id, stream_id)); } result @@ -273,6 +275,26 @@ mod tests { assert_eq!(done.data_block_bytes(), Some(&b"hello"[..])); } + #[test] + fn completed_stream_releases_slot() { + let mut map = PartStreamsMap::new(); + let p = peer_id(1); + let s = sid(11); + + assert!(map.insert(p, msg(s.clone(), 0, init_part(11))).is_none()); + assert!( + map.insert(p, msg(s.clone(), 1, data_part(b"done"))) + .is_none() + ); + assert!(map.insert(p, fin_msg(s.clone(), 2)).is_some()); + + assert!( + !map.streams.contains_key(&(p, s)), + "completed stream must be removed from PartStreamsMap" + ); + assert_eq!(map.streams.len(), 0); + } + #[test] fn complete_out_of_order_assembles() { let mut map = PartStreamsMap::new(); diff --git a/ethexe/malachite/service/src/externalities.rs b/ethexe/malachite/service/src/externalities.rs index 9a02cd66fb3..39ecf7147cb 100644 --- a/ethexe/malachite/service/src/externalities.rs +++ b/ethexe/malachite/service/src/externalities.rs @@ -55,7 +55,7 @@ use ethexe_malachite_core::{Block, Externalities}; use gprimitives::H256; use parity_scale_codec::Encode; use std::{ - collections::VecDeque, + collections::{HashSet, VecDeque}, sync::{Arc, Mutex, RwLock}, }; use tokio::sync::{Notify, mpsc}; @@ -113,6 +113,12 @@ pub(crate) struct PendingEvent { pub prerequisite: H256, } +struct ValidatedMbShape<'a> { + advance: Option, + injected: Vec<&'a SignedInjectedTransaction>, + process_queues_limits: &'a ProcessQueuesLimits, +} + #[async_trait] impl Externalities for EthexeExternalities { async fn process_mb_proposal(&self, mb_hash: H256, mb: Block) -> Result<()> { @@ -294,7 +300,7 @@ impl Externalities for EthexeExternalities { // MB → the touched-set seed is empty. let mut touched = match advance { Some(advanced_eb) => eb_touched_programs(&self.db, parent_advanced, advanced_eb)?, - None => std::collections::HashSet::new(), + None => HashSet::new(), }; let initial_touched_count = touched.len(); if initial_touched_count > MAX_TOUCHED_PROGRAMS_PER_MB as usize { @@ -384,7 +390,9 @@ impl Externalities for EthexeExternalities { None }; - while let Some(Transaction::Injected(_)) = next { + let mut injected = Vec::new(); + while let Some(Transaction::Injected(signed)) = next { + injected.push(signed); next = iter.next(); } @@ -398,14 +406,23 @@ impl Externalities for EthexeExternalities { // `ProgressTasksLimits` is empty today; when fields are added, // bound them here. - let Some(Transaction::ProcessQueues { limits: pq_limits }) = iter.next() else { + let Some(Transaction::ProcessQueues { + limits: process_queues_limits, + }) = iter.next() + else { warn!("validate: MB shape violation — expected `ProcessQueues` bookend"); return Ok(false); }; - if pq_limits.gas_allowance > crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE { + let mb = ValidatedMbShape { + advance, + injected, + process_queues_limits, + }; + + if mb.process_queues_limits.gas_allowance > crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE { warn!( - allowance = pq_limits.gas_allowance, + allowance = mb.process_queues_limits.gas_allowance, cap = crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE, "validate: ProcessQueues.gas_allowance exceeds protocol cap" ); @@ -417,6 +434,37 @@ impl Externalities for EthexeExternalities { return Ok(false); } + let mut encoded_size = 0usize; + let mut seen_injected_hashes = HashSet::with_capacity(mb.injected.len()); + for signed in &mb.injected { + let tx_size = signed.encoded_size(); + let Some(next_size) = encoded_size.checked_add(tx_size) else { + warn!( + current_size = encoded_size, + tx_size, "validate: injected tx encoded size overflows usize — rejecting MB", + ); + return Ok(false); + }; + if next_size > MAX_INJECTED_TRANSACTIONS_SIZE_PER_MB { + warn!( + encoded_size = next_size, + cap = MAX_INJECTED_TRANSACTIONS_SIZE_PER_MB, + "validate: injected txs exceed per-MB encoded size cap — rejecting MB", + ); + return Ok(false); + } + encoded_size = next_size; + + let tx_hash = signed.data().to_hash(); + if !seen_injected_hashes.insert(tx_hash) { + warn!( + %tx_hash, + "validate: duplicate injected tx within MB — rejecting MB", + ); + return Ok(false); + } + } + // (2) Quarantine + parent-link — single synchronous check. // // Validators never wait for local sync here. The proposer's @@ -434,7 +482,7 @@ impl Externalities for EthexeExternalities { // TODO: #5479 emit `malachite_validate_abstain_total{reason=...}` at // each early-return below so operators can tune // `post_quarantine_delay` from observability rather than logs. - if let Some(advance) = advance { + if let Some(advance) = mb.advance { let parent_advanced = if parent_hash.is_zero() { H256::zero() } else { @@ -503,10 +551,7 @@ impl Externalities for EthexeExternalities { // No local chain head yet. If the MB carries no injected // txs we can still accept it; otherwise we must abstain // since the checker has no anchor to walk from. - let has_injected = payload - .iter() - .any(|tx| matches!(tx, Transaction::Injected(_))); - if has_injected { + if !mb.injected.is_empty() { warn!("validate: MB carries injected txs but no local chain head — abstaining"); return Ok(false); } @@ -521,10 +566,7 @@ impl Externalities for EthexeExternalities { // Propagating the error upward is the right call: it indicates // local DB corruption, not a peer-side issue. let checker = TxValidityChecker::new_for_mb(self.db.clone(), chain_head, parent_hash)?; - for tx in payload.iter() { - let Transaction::Injected(signed) = tx else { - continue; - }; + for signed in &mb.injected { // `?` inside `check_tx_validity` only fires on local DB // inconsistency (a `latest_states` entry whose `state_hash` // is absent from CAS). Every malicious-tx-data path returns @@ -554,10 +596,11 @@ impl Externalities for EthexeExternalities { // `Transaction::Injected` destination on top of the EB-touched // seed and reject if the union exceeds `limit`. // - // NOTE: there is no per-MB size cap on the validator side - // (master parity). We rely on the Malachite engine's 1 MiB - // hard cap on the encoded `Block` payload — anything larger - // never reaches `validate_block_above` in the first place. + // The encoded-size and within-MB duplicate guards above match + // producer-side selection. This participant-side cap keeps a + // malicious proposer from forcing oversized injected batches + // through the executor even when the outer proposal fits + // Malachite's larger block payload limit. let parent_advanced = if parent_hash.is_zero() { H256::zero() } else { @@ -571,15 +614,13 @@ impl Externalities for EthexeExternalities { // `advance` and its ancestors, so any failure here is a local // DB / sync race — not a proposer-controlled condition. Same // reasoning as the other two `?`s in this function. - let mut touched = match advance { + let mut touched = match mb.advance { Some(advanced_eb) => eb_touched_programs(&self.db, parent_advanced, advanced_eb)?, - None => std::collections::HashSet::new(), + None => HashSet::new(), }; let limit = touched.len().max(MAX_TOUCHED_PROGRAMS_PER_MB as usize); - for tx in payload.iter() { - if let Transaction::Injected(signed) = tx { - touched.insert(signed.data().destination); - } + for signed in &mb.injected { + touched.insert(signed.data().destination); } if touched.len() > limit { warn!( @@ -1604,6 +1645,87 @@ mod tests { ); } + #[tokio::test] + async fn validate_rejects_mb_exceeding_injected_size_cap() { + use ethexe_common::{ + injected::{InjectedTransaction, MAX_INJECTED_TX_PAYLOAD_SIZE}, + mock::{BlockChain, Mock}, + }; + use gprimitives::ActorId; + + let db = Database::memory(); + let chain = BlockChain::mock(2u32).setup(&db); + let head = chain.blocks[2].to_simple(); + let dests: Vec = (0..2u64).map(ActorId::from).collect(); + let parent_mb = setup_mb_with_destinations(&db, chain.mb_hash_at(1), &dests); + + let (ext, _rx) = make_externalities(db); + *ext.chain_head.write().unwrap() = Some(head); + + let pk = ethexe_common::PrivateKey::random(); + let mut transactions = Vec::new(); + for (i, dest) in dests.iter().enumerate() { + let tx = ethexe_common::SignedMessage::create( + pk.clone(), + InjectedTransaction { + destination: *dest, + payload: vec![0u8; MAX_INJECTED_TX_PAYLOAD_SIZE].try_into().unwrap(), + value: 0, + reference_block: chain.blocks[1].hash, + salt: vec![i as u8; 32].try_into().unwrap(), + }, + ) + .unwrap(); + transactions.push(Transaction::Injected(tx)); + } + transactions.push(Transaction::ProgressTasks { + limits: ProgressTasksLimits::default(), + }); + transactions.push(Transaction::ProcessQueues { + limits: ProcessQueuesLimits::default(), + }); + + assert!( + !ext.validate_block_above(parent_mb, Transactions::new(transactions)) + .await + .unwrap(), + "MB whose cumulative injected encoded size exceeds the cap must be rejected" + ); + } + + #[tokio::test] + async fn validate_rejects_within_mb_duplicate_injected_tx() { + use ethexe_common::mock::{BlockChain, Mock}; + use gprimitives::ActorId; + + let db = Database::memory(); + let chain = BlockChain::mock(2u32).setup(&db); + let head = chain.blocks[2].to_simple(); + let dest = ActorId::from([1; 32]); + let parent_mb = setup_mb_with_destinations(&db, chain.mb_hash_at(1), &[dest]); + + let (ext, _rx) = make_externalities(db); + *ext.chain_head.write().unwrap() = Some(head); + + let pk = ethexe_common::PrivateKey::random(); + let tx = signed_injected_tx(&pk, dest, chain.blocks[1].hash, 7); + let payload = Transactions::new(vec![ + Transaction::Injected(tx.clone()), + Transaction::Injected(tx), + Transaction::ProgressTasks { + limits: ProgressTasksLimits::default(), + }, + Transaction::ProcessQueues { + limits: ProcessQueuesLimits::default(), + }, + ]); + + assert!( + !ext.validate_block_above(parent_mb, payload).await.unwrap(), + "MB carrying the same injected tx twice must be rejected" + ); + } + // ------------------------------------------------------------------ // Shape & ordering checks on `validate_block_above`. // @@ -1878,9 +2000,7 @@ mod tests { assert!( !ext.validate_block_above(parent_mb, payload).await.unwrap(), - "MB whose AdvanceTillEthereumBlock regresses parent.last_advanced_eb \ - must be rejected — currently passes because validate_block_above \ - skips the strict-descendant check the producer enforces", + "MB whose AdvanceTillEthereumBlock regresses parent.last_advanced_eb must be rejected", ); } From 8bf33d4b56a1a4c10f68b11b3188a12acb9bd3b1 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Tue, 26 May 2026 00:04:18 +0400 Subject: [PATCH 2/8] fix(ethexe): update contracts hash helpers --- ethexe/contracts/lib/forge-std | 2 +- ethexe/contracts/lib/frost-secp256k1-evm | 2 +- ethexe/contracts/src/Mirror.sol | 8 ++++---- ethexe/contracts/src/Router.sol | 10 +++++----- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ethexe/contracts/lib/forge-std b/ethexe/contracts/lib/forge-std index 8987040ede9..ff47d4052a6 160000 --- a/ethexe/contracts/lib/forge-std +++ b/ethexe/contracts/lib/forge-std @@ -1 +1 @@ -Subproject commit 8987040ede9553cea20c95ad40d0455930f9c8e0 +Subproject commit ff47d4052a6018d9e5419e5cf013b16ff8006aae diff --git a/ethexe/contracts/lib/frost-secp256k1-evm b/ethexe/contracts/lib/frost-secp256k1-evm index d9473aa5e35..c7badcc70df 160000 --- a/ethexe/contracts/lib/frost-secp256k1-evm +++ b/ethexe/contracts/lib/frost-secp256k1-evm @@ -1 +1 @@ -Subproject commit d9473aa5e35a59c522fab81b12c388bbb23b16d1 +Subproject commit c7badcc70df1eda47a051f6fc46ea3f226ac86b1 diff --git a/ethexe/contracts/src/Mirror.sol b/ethexe/contracts/src/Mirror.sol index f303407a4fc..50a15439a75 100644 --- a/ethexe/contracts/src/Mirror.sol +++ b/ethexe/contracts/src/Mirror.sol @@ -529,7 +529,7 @@ contract Mirror is IMirror { /** * @dev Store the message hash in memory at messagesHashes[offset : offset+32]. */ - Memory.writeWordAsBytes32(messagesHashesMemPtr, offset, messageHash); + Memory.writeWord(messagesHashesMemPtr, offset, uint256(messageHash)); unchecked { offset += 32; } @@ -544,7 +544,7 @@ contract Mirror is IMirror { } } - return Hashes.efficientKeccak256AsBytes32(messagesHashesMemPtr, 0, messagesHashesSize); + return bytes32(Hashes.efficientKeccak256(messagesHashesMemPtr, 0, messagesHashesSize)); } /** @@ -939,7 +939,7 @@ contract Mirror is IMirror { for (uint256 i = 0; i < claimsLen; i++) { Gear.ValueClaim calldata claim = _claims[i]; bytes32 claimHash = Gear.valueClaimHash(claim.messageId, claim.destination, claim.value); - Memory.writeWordAsBytes32(claimsHashesMemPtr, offset, claimHash); + Memory.writeWord(claimsHashesMemPtr, offset, uint256(claimHash)); unchecked { offset += 32; } @@ -952,7 +952,7 @@ contract Mirror is IMirror { } } - return Hashes.efficientKeccak256AsBytes32(claimsHashesMemPtr, 0, claimsHashesSize); + return bytes32(Hashes.efficientKeccak256(claimsHashesMemPtr, 0, claimsHashesSize)); } // TODO (breathx): allow zero inheritor in `Router`. diff --git a/ethexe/contracts/src/Router.sol b/ethexe/contracts/src/Router.sol index ffca0795918..11c53db5f69 100644 --- a/ethexe/contracts/src/Router.sol +++ b/ethexe/contracts/src/Router.sol @@ -874,7 +874,7 @@ contract Router is // Check for duplicate isn't necessary, because `Clones.cloneDeterministic` // reverts execution in case of address is already taken. - bytes32 salt = Hashes.efficientKeccak256AsBytes32(_codeId, _salt); + bytes32 salt = bytes32(Hashes.efficientKeccak256(uint256(_codeId), uint256(_salt))); address actorId = _isSmall ? ClonesSmall.cloneDeterministic(address(this), salt) : Clones.cloneDeterministic(address(this), salt); @@ -933,13 +933,13 @@ contract Router is emit CodeGotValidated(_commitment.id, _commitment.valid); bytes32 codeCommitmentHash = Gear.codeCommitmentHash(_commitment.id, _commitment.valid); - Memory.writeWordAsBytes32(codeCommitmentsPtr, offset, codeCommitmentHash); + Memory.writeWord(codeCommitmentsPtr, offset, uint256(codeCommitmentHash)); unchecked { offset += 32; } } - return Hashes.efficientKeccak256AsBytes32(codeCommitmentsPtr, 0, codeCommitmentsHashSize); + return bytes32(Hashes.efficientKeccak256(codeCommitmentsPtr, 0, codeCommitmentsHashSize)); } // TODO #4609 @@ -1045,13 +1045,13 @@ contract Router is } bytes32 transitionHash = IMirror(transition.actorId).performStateTransition{value: value}(transition); - Memory.writeWordAsBytes32(transitionsHashesMemPtr, offset, transitionHash); + Memory.writeWord(transitionsHashesMemPtr, offset, uint256(transitionHash)); unchecked { offset += 32; } } - return Hashes.efficientKeccak256AsBytes32(transitionsHashesMemPtr, 0, transitionsHashSize); + return bytes32(Hashes.efficientKeccak256(transitionsHashesMemPtr, 0, transitionsHashSize)); } function _resetValidators( From fb2cd18a82c7276e6004d7e512885832799db43d Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Tue, 26 May 2026 00:11:21 +0400 Subject: [PATCH 3/8] fix(malachite): bound proposal streams --- ethexe/malachite/core/src/streaming.rs | 116 ++++++++++++++++++------- 1 file changed, 86 insertions(+), 30 deletions(-) diff --git a/ethexe/malachite/core/src/streaming.rs b/ethexe/malachite/core/src/streaming.rs index 3ffed0fe753..c2abac06f5b 100644 --- a/ethexe/malachite/core/src/streaming.rs +++ b/ethexe/malachite/core/src/streaming.rs @@ -26,6 +26,10 @@ use crate::{ types::Address, }; +const MAX_STREAM_MESSAGES: u64 = 16; +const MAX_STREAMS_PER_PEER: usize = 64; +const MAX_STREAMS_TOTAL: usize = 1024; + /// Min-heap wrapper that orders `StreamMessage`s by ascending sequence. struct MinSeq(StreamMessage); @@ -172,13 +176,10 @@ impl ProposalParts { } } -// TODO: #5473 `PartStreamsMap` has no per-peer cap, no total cap, and no -// eviction for streams that never receive a valid `Fin`. Pinned by the -// (ignored) regression test -// `streaming::tests::part_streams_map_grows_unbounded_under_fin_sequence_attack`. #[derive(Default)] pub struct PartStreamsMap { streams: BTreeMap<(PeerId, StreamId), StreamState>, + peer_streams: BTreeMap, } impl PartStreamsMap { @@ -196,21 +197,48 @@ impl PartStreamsMap { msg: StreamMessage, ) -> Option { let stream_id = msg.stream_id.clone(); + let key = (peer_id, stream_id.clone()); + if msg.sequence >= MAX_STREAM_MESSAGES { + self.remove_stream(&key); + return None; + } + if !self.streams.contains_key(&key) && !self.can_open_stream(peer_id) { + return None; + } + let result = { - let state = self - .streams - .entry((peer_id, stream_id.clone())) - .or_default(); + let state = self.streams.entry(key.clone()).or_insert_with(|| { + *self.peer_streams.entry(peer_id).or_default() += 1; + StreamState::default() + }); if !state.seen_sequences.insert(msg.sequence) { return None; } state.insert(msg) }; if result.is_some() { - self.streams.remove(&(peer_id, stream_id)); + self.remove_stream(&key); } result } + + fn can_open_stream(&self, peer_id: PeerId) -> bool { + self.streams.len() < MAX_STREAMS_TOTAL + && self.peer_streams.get(&peer_id).copied().unwrap_or_default() < MAX_STREAMS_PER_PEER + } + + fn remove_stream(&mut self, key: &(PeerId, StreamId)) { + if self.streams.remove(key).is_none() { + return; + } + + if let Some(count) = self.peer_streams.get_mut(&key.0) { + *count -= 1; + if *count == 0 { + self.peer_streams.remove(&key.0); + } + } + } } #[cfg(test)] @@ -335,43 +363,71 @@ mod tests { assert!(map.insert(p, fin_msg(s2.clone(), 2)).is_none()); } - /// REPRODUCES: a single peer can grow `PartStreamsMap` without - /// bound by either (a) opening fresh `stream_id`s and never sending - /// `Fin`, or (b) sending a `Fin` with a `sequence` far above any - /// part it actually delivers so the `total_messages == buffer.len()` - /// gate is unreachable. #[test] - #[ignore = "tracks issue #5473 in streaming.rs: unbounded PartStreamsMap"] - fn part_streams_map_grows_unbounded_under_fin_sequence_attack() { + fn part_streams_map_bounds_single_peer_flood() { let mut map = PartStreamsMap::new(); let p = peer_id(1); - // Attack A: a peer opens many streams and never finalises. - // 100 distinct stream_ids, each with Init + Data but no Fin. for stream_idx in 0..100u64 { let s = sid(0xA000_0000 + stream_idx); assert!(map.insert(p, msg(s.clone(), 0, init_part(1))).is_none()); assert!(map.insert(p, msg(s, 1, data_part(b"x"))).is_none()); } - // Attack B: cheaper still — one message per stream, Fin with a - // far-future sequence. `total_messages` becomes - // `u64::MAX as usize + 1` (wraps to 0 in release, panics in - // debug), but the `is_done` gate `buffer.len() == total_messages` - // is unreachable for any sane traffic. 100 more streams. for stream_idx in 0..100u64 { let s = sid(0xB000_0000 + stream_idx); assert!(map.insert(p, fin_msg(s, u64::MAX / 2)).is_none()); } - // Desired behaviour: a single peer cannot hold > a bounded - // number of in-flight stream slots. The exact cap is up to the - // fix, but it must be much smaller than the 200 we just pushed. + assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER); + assert_eq!(map.peer_streams.get(&p), Some(&MAX_STREAMS_PER_PEER)); + } + + #[test] + fn malformed_far_future_fin_evicts_stream() { + let mut map = PartStreamsMap::new(); + let p = peer_id(1); + let s = sid(30); + + assert!(map.insert(p, msg(s.clone(), 0, init_part(30))).is_none()); + assert!(map.streams.contains_key(&(p, s.clone()))); + assert!( + map.insert(p, fin_msg(s.clone(), MAX_STREAM_MESSAGES)) + .is_none() + ); + + assert!(!map.streams.contains_key(&(p, s))); + assert!(!map.peer_streams.contains_key(&p)); + } + + #[test] + fn cap_does_not_block_existing_stream_completion() { + let mut map = PartStreamsMap::new(); + let p = peer_id(1); + let first = sid(40); + + assert!( + map.insert(p, msg(first.clone(), 0, init_part(40))) + .is_none() + ); + for stream_idx in 1..MAX_STREAMS_PER_PEER as u64 { + let s = sid(40 + stream_idx); + assert!(map.insert(p, msg(s, 0, init_part(40))).is_none()); + } + assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER); + + let over_cap = sid(10_000); assert!( - map.streams.len() < 200, - "PartStreamsMap grew to {} entries under a single-peer flood — \ - needs per-peer cap + GC for never-finalised / bogus-Fin streams", - map.streams.len(), + map.insert(p, msg(over_cap.clone(), 0, init_part(40))) + .is_none() + ); + assert!(!map.streams.contains_key(&(p, over_cap))); + + assert!( + map.insert(p, msg(first.clone(), 1, data_part(b"ok"))) + .is_none() ); + assert!(map.insert(p, fin_msg(first.clone(), 2)).is_some()); + assert!(!map.streams.contains_key(&(p, first))); } } From 2e568b21f06abfdae2f27323a5c728536e924b81 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Tue, 26 May 2026 00:19:46 +0400 Subject: [PATCH 4/8] fix(ethexe): keep contract libs pinned to master --- ethexe/contracts/lib/forge-std | 2 +- ethexe/contracts/lib/frost-secp256k1-evm | 2 +- ethexe/contracts/src/Mirror.sol | 8 ++++---- ethexe/contracts/src/Router.sol | 10 +++++----- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ethexe/contracts/lib/forge-std b/ethexe/contracts/lib/forge-std index ff47d4052a6..8987040ede9 160000 --- a/ethexe/contracts/lib/forge-std +++ b/ethexe/contracts/lib/forge-std @@ -1 +1 @@ -Subproject commit ff47d4052a6018d9e5419e5cf013b16ff8006aae +Subproject commit 8987040ede9553cea20c95ad40d0455930f9c8e0 diff --git a/ethexe/contracts/lib/frost-secp256k1-evm b/ethexe/contracts/lib/frost-secp256k1-evm index c7badcc70df..d9473aa5e35 160000 --- a/ethexe/contracts/lib/frost-secp256k1-evm +++ b/ethexe/contracts/lib/frost-secp256k1-evm @@ -1 +1 @@ -Subproject commit c7badcc70df1eda47a051f6fc46ea3f226ac86b1 +Subproject commit d9473aa5e35a59c522fab81b12c388bbb23b16d1 diff --git a/ethexe/contracts/src/Mirror.sol b/ethexe/contracts/src/Mirror.sol index 50a15439a75..f303407a4fc 100644 --- a/ethexe/contracts/src/Mirror.sol +++ b/ethexe/contracts/src/Mirror.sol @@ -529,7 +529,7 @@ contract Mirror is IMirror { /** * @dev Store the message hash in memory at messagesHashes[offset : offset+32]. */ - Memory.writeWord(messagesHashesMemPtr, offset, uint256(messageHash)); + Memory.writeWordAsBytes32(messagesHashesMemPtr, offset, messageHash); unchecked { offset += 32; } @@ -544,7 +544,7 @@ contract Mirror is IMirror { } } - return bytes32(Hashes.efficientKeccak256(messagesHashesMemPtr, 0, messagesHashesSize)); + return Hashes.efficientKeccak256AsBytes32(messagesHashesMemPtr, 0, messagesHashesSize); } /** @@ -939,7 +939,7 @@ contract Mirror is IMirror { for (uint256 i = 0; i < claimsLen; i++) { Gear.ValueClaim calldata claim = _claims[i]; bytes32 claimHash = Gear.valueClaimHash(claim.messageId, claim.destination, claim.value); - Memory.writeWord(claimsHashesMemPtr, offset, uint256(claimHash)); + Memory.writeWordAsBytes32(claimsHashesMemPtr, offset, claimHash); unchecked { offset += 32; } @@ -952,7 +952,7 @@ contract Mirror is IMirror { } } - return bytes32(Hashes.efficientKeccak256(claimsHashesMemPtr, 0, claimsHashesSize)); + return Hashes.efficientKeccak256AsBytes32(claimsHashesMemPtr, 0, claimsHashesSize); } // TODO (breathx): allow zero inheritor in `Router`. diff --git a/ethexe/contracts/src/Router.sol b/ethexe/contracts/src/Router.sol index 11c53db5f69..ffca0795918 100644 --- a/ethexe/contracts/src/Router.sol +++ b/ethexe/contracts/src/Router.sol @@ -874,7 +874,7 @@ contract Router is // Check for duplicate isn't necessary, because `Clones.cloneDeterministic` // reverts execution in case of address is already taken. - bytes32 salt = bytes32(Hashes.efficientKeccak256(uint256(_codeId), uint256(_salt))); + bytes32 salt = Hashes.efficientKeccak256AsBytes32(_codeId, _salt); address actorId = _isSmall ? ClonesSmall.cloneDeterministic(address(this), salt) : Clones.cloneDeterministic(address(this), salt); @@ -933,13 +933,13 @@ contract Router is emit CodeGotValidated(_commitment.id, _commitment.valid); bytes32 codeCommitmentHash = Gear.codeCommitmentHash(_commitment.id, _commitment.valid); - Memory.writeWord(codeCommitmentsPtr, offset, uint256(codeCommitmentHash)); + Memory.writeWordAsBytes32(codeCommitmentsPtr, offset, codeCommitmentHash); unchecked { offset += 32; } } - return bytes32(Hashes.efficientKeccak256(codeCommitmentsPtr, 0, codeCommitmentsHashSize)); + return Hashes.efficientKeccak256AsBytes32(codeCommitmentsPtr, 0, codeCommitmentsHashSize); } // TODO #4609 @@ -1045,13 +1045,13 @@ contract Router is } bytes32 transitionHash = IMirror(transition.actorId).performStateTransition{value: value}(transition); - Memory.writeWord(transitionsHashesMemPtr, offset, uint256(transitionHash)); + Memory.writeWordAsBytes32(transitionsHashesMemPtr, offset, transitionHash); unchecked { offset += 32; } } - return bytes32(Hashes.efficientKeccak256(transitionsHashesMemPtr, 0, transitionsHashSize)); + return Hashes.efficientKeccak256AsBytes32(transitionsHashesMemPtr, 0, transitionsHashSize); } function _resetValidators( From 5a33539b5cdbc1ef136a544236349db3f5017d07 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Tue, 26 May 2026 11:15:41 +0400 Subject: [PATCH 5/8] fix(malachite): avoid type-info doc churn --- ethexe/common/src/malachite.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethexe/common/src/malachite.rs b/ethexe/common/src/malachite.rs index ea21122522b..0e7e50a112f 100644 --- a/ethexe/common/src/malachite.rs +++ b/ethexe/common/src/malachite.rs @@ -42,7 +42,7 @@ pub enum Transaction { Injected(SignedInjectedTransaction), } -/// Reserved per-MB limits for scheduled task progress. +/// Placeholder; shape firms up once executor plumbing lands. #[derive(Clone, Debug, Default, PartialEq, Eq, Encode, Decode, TypeInfo)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub struct ProgressTasksLimits {} From 108d006c2e282afbc1e7e164540679829c05803b Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Tue, 26 May 2026 11:54:43 +0400 Subject: [PATCH 6/8] fix(malachite): refresh db type metadata hash --- ethexe/common/src/db.rs | 2 +- ethexe/common/src/malachite.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index cb50920e126..b6cff813b7f 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -265,7 +265,7 @@ mod tests { #[test] fn ensure_types_unchanged() { const EXPECTED_TYPE_INFO_HASH: &str = - "d43d8ab319fb6d934231dba55950c9825e28c6ecf603e8076a90e0cab3855671"; + "bc1ef49ee0cb886020f6cb0d88f5259df238917ee8d49f8df2404db0da7417fa"; let types = [ meta_type::(), diff --git a/ethexe/common/src/malachite.rs b/ethexe/common/src/malachite.rs index 0e7e50a112f..ea21122522b 100644 --- a/ethexe/common/src/malachite.rs +++ b/ethexe/common/src/malachite.rs @@ -42,7 +42,7 @@ pub enum Transaction { Injected(SignedInjectedTransaction), } -/// Placeholder; shape firms up once executor plumbing lands. +/// Reserved per-MB limits for scheduled task progress. #[derive(Clone, Debug, Default, PartialEq, Eq, Encode, Decode, TypeInfo)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub struct ProgressTasksLimits {} From a645b4529e41bab365859ac82927bc77bda7be12 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Tue, 26 May 2026 17:03:39 +0400 Subject: [PATCH 7/8] refactor(malachite): simplify bounded fix state --- ethexe/malachite/core/src/streaming.rs | 14 ++-- ethexe/malachite/service/src/externalities.rs | 80 +++++++------------ 2 files changed, 34 insertions(+), 60 deletions(-) diff --git a/ethexe/malachite/core/src/streaming.rs b/ethexe/malachite/core/src/streaming.rs index c2abac06f5b..883d9843881 100644 --- a/ethexe/malachite/core/src/streaming.rs +++ b/ethexe/malachite/core/src/streaming.rs @@ -87,13 +87,15 @@ struct StreamState { buffer: MinHeap, init_info: Option, seen_sequences: HashSet, - total_messages: usize, - fin_received: bool, + total_messages: Option, } impl StreamState { fn is_done(&self) -> bool { - self.init_info.is_some() && self.fin_received && self.buffer.len() == self.total_messages + self.init_info.is_some() + && self + .total_messages + .is_some_and(|total| self.buffer.len() == total) } fn insert(&mut self, msg: StreamMessage) -> Option { @@ -101,8 +103,7 @@ impl StreamState { self.init_info = msg.content.as_data().and_then(|p| p.as_init()).cloned(); } if msg.is_fin() { - self.fin_received = true; - self.total_messages = msg.sequence as usize + 1; + self.total_messages = Some(msg.sequence as usize + 1); } self.buffer.push(msg); if self.is_done() { @@ -196,8 +197,7 @@ impl PartStreamsMap { peer_id: PeerId, msg: StreamMessage, ) -> Option { - let stream_id = msg.stream_id.clone(); - let key = (peer_id, stream_id.clone()); + let key = (peer_id, msg.stream_id.clone()); if msg.sequence >= MAX_STREAM_MESSAGES { self.remove_stream(&key); return None; diff --git a/ethexe/malachite/service/src/externalities.rs b/ethexe/malachite/service/src/externalities.rs index 39ecf7147cb..dc793b71ace 100644 --- a/ethexe/malachite/service/src/externalities.rs +++ b/ethexe/malachite/service/src/externalities.rs @@ -113,12 +113,6 @@ pub(crate) struct PendingEvent { pub prerequisite: H256, } -struct ValidatedMbShape<'a> { - advance: Option, - injected: Vec<&'a SignedInjectedTransaction>, - process_queues_limits: &'a ProcessQueuesLimits, -} - #[async_trait] impl Externalities for EthexeExternalities { async fn process_mb_proposal(&self, mb_hash: H256, mb: Block) -> Result<()> { @@ -128,11 +122,7 @@ impl Externalities for EthexeExternalities { // Propagate `last_advanced_eb` forward — the latest // `AdvanceTillEthereumBlock` in this MB wins; otherwise we // inherit the parent's value (zero if pre-genesis). - let parent_advanced = if parent.is_zero() { - H256::zero() - } else { - self.db.mb_meta(parent).last_advanced_eb - }; + let parent_advanced = self.parent_last_advanced_eb(parent); let last_advanced = payload .iter() .rev() @@ -145,7 +135,7 @@ impl Externalities for EthexeExternalities { // CAS-store transactions first so the contract — "if // CompactMb exists, transactions are reachable" — holds // unconditionally. - let transactions_hash = self.db.set_transactions(payload.clone()); + let transactions_hash = self.db.set_transactions(payload); self.db.set_mb_compact_block( mb_hash, CompactMb { @@ -233,11 +223,7 @@ impl Externalities for EthexeExternalities { // `parent_hash` is the consensus envelope hash of the parent // (zero for genesis). Use it directly to seed the producer's // `last_advanced_eb` lookup. - let parent_advanced = if parent_mb_hash.is_zero() { - H256::zero() - } else { - self.db.mb_meta(parent_mb_hash).last_advanced_eb - }; + let parent_advanced = self.parent_last_advanced_eb(parent_mb_hash); let (advance, injected) = self.wait_for_proposable_content(parent_advanced).await; @@ -414,15 +400,9 @@ impl Externalities for EthexeExternalities { return Ok(false); }; - let mb = ValidatedMbShape { - advance, - injected, - process_queues_limits, - }; - - if mb.process_queues_limits.gas_allowance > crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE { + if process_queues_limits.gas_allowance > crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE { warn!( - allowance = mb.process_queues_limits.gas_allowance, + allowance = process_queues_limits.gas_allowance, cap = crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE, "validate: ProcessQueues.gas_allowance exceeds protocol cap" ); @@ -435,8 +415,8 @@ impl Externalities for EthexeExternalities { } let mut encoded_size = 0usize; - let mut seen_injected_hashes = HashSet::with_capacity(mb.injected.len()); - for signed in &mb.injected { + let mut seen_injected_hashes = HashSet::with_capacity(injected.len()); + for signed in &injected { let tx_size = signed.encoded_size(); let Some(next_size) = encoded_size.checked_add(tx_size) else { warn!( @@ -482,12 +462,8 @@ impl Externalities for EthexeExternalities { // TODO: #5479 emit `malachite_validate_abstain_total{reason=...}` at // each early-return below so operators can tune // `post_quarantine_delay` from observability rather than logs. - if let Some(advance) = mb.advance { - let parent_advanced = if parent_hash.is_zero() { - H256::zero() - } else { - self.db.mb_meta(parent_hash).last_advanced_eb - }; + if let Some(advance) = advance { + let parent_advanced = self.parent_last_advanced_eb(parent_hash); let start_block_hash = self.db.globals().start_block_hash; let Some(chain_head) = *self.chain_head.read().expect("chain_head poisoned") else { @@ -551,7 +527,7 @@ impl Externalities for EthexeExternalities { // No local chain head yet. If the MB carries no injected // txs we can still accept it; otherwise we must abstain // since the checker has no anchor to walk from. - if !mb.injected.is_empty() { + if !injected.is_empty() { warn!("validate: MB carries injected txs but no local chain head — abstaining"); return Ok(false); } @@ -566,7 +542,7 @@ impl Externalities for EthexeExternalities { // Propagating the error upward is the right call: it indicates // local DB corruption, not a peer-side issue. let checker = TxValidityChecker::new_for_mb(self.db.clone(), chain_head, parent_hash)?; - for signed in &mb.injected { + for signed in &injected { // `?` inside `check_tx_validity` only fires on local DB // inconsistency (a `latest_states` entry whose `state_hash` // is absent from CAS). Every malicious-tx-data path returns @@ -585,27 +561,17 @@ impl Externalities for EthexeExternalities { } } - // (4) Touched-programs cap (master's #6). Only enforced on - // the validator side — the proposer in `build_block_above` - // already shapes the MB to stay within the cap; this check - // is the participant's guard against a malicious proposer. - // - // Per master: `limit = max(initial_touched.len(), MAX_*)` — - // the proposer can't *avoid* programs already touched by EB - // events, so those set the floor for the cap. We add every - // `Transaction::Injected` destination on top of the EB-touched - // seed and reject if the union exceeds `limit`. + // (4) Touched-programs cap. The EB-touched set is the durable + // floor: the proposer can't avoid programs already touched by + // EB events, and injected destinations must not grow the union + // beyond max(floor, MAX_TOUCHED_PROGRAMS_PER_MB). // // The encoded-size and within-MB duplicate guards above match // producer-side selection. This participant-side cap keeps a // malicious proposer from forcing oversized injected batches // through the executor even when the outer proposal fits // Malachite's larger block payload limit. - let parent_advanced = if parent_hash.is_zero() { - H256::zero() - } else { - self.db.mb_meta(parent_hash).last_advanced_eb - }; + let parent_advanced = self.parent_last_advanced_eb(parent_hash); // `?` here only fires on local DB issues: missing // `mb_program_states` for `latest_computed_mb_hash`, missing // `block_header` on a canonical ancestor of `advance`, or @@ -614,12 +580,12 @@ impl Externalities for EthexeExternalities { // `advance` and its ancestors, so any failure here is a local // DB / sync race — not a proposer-controlled condition. Same // reasoning as the other two `?`s in this function. - let mut touched = match mb.advance { + let mut touched = match advance { Some(advanced_eb) => eb_touched_programs(&self.db, parent_advanced, advanced_eb)?, None => HashSet::new(), }; let limit = touched.len().max(MAX_TOUCHED_PROGRAMS_PER_MB as usize); - for signed in &mb.injected { + for signed in &injected { touched.insert(signed.data().destination); } if touched.len() > limit { @@ -635,6 +601,14 @@ impl Externalities for EthexeExternalities { } impl EthexeExternalities { + fn parent_last_advanced_eb(&self, parent_hash: H256) -> H256 { + if parent_hash.is_zero() { + H256::zero() + } else { + self.db.mb_meta(parent_hash).last_advanced_eb + } + } + /// True iff `prerequisite.is_zero()` (no prerequisite — genesis /// or pre-advance) or the prerequisite Eth block has been fully /// **prepared** locally. @@ -1600,7 +1574,7 @@ mod tests { let mempool = Arc::new(crate::InjectedTxMempool::new(db.clone())); let _ = mempool.set_chain_head(head); let pk = ethexe_common::PrivateKey::random(); - // Each tx carries the maximum-size payload; the pool is loaded + // Each tx carries a half-max payload; the pool is loaded // with enough of them that two fit but three don't. for (i, dest) in dests.iter().enumerate().take(3) { let tx = ethexe_common::SignedMessage::create( From d55ea36e14bcd3d3bc8b083524398522580a2f43 Mon Sep 17 00:00:00 2001 From: Vadim Smirnov Date: Fri, 29 May 2026 15:21:13 +0400 Subject: [PATCH 8/8] fix(malachite): evict stale proposal streams --- ethexe/malachite/core/src/streaming.rs | 194 ++++++++++++++++++++++--- 1 file changed, 171 insertions(+), 23 deletions(-) diff --git a/ethexe/malachite/core/src/streaming.rs b/ethexe/malachite/core/src/streaming.rs index 883d9843881..b310fd492c0 100644 --- a/ethexe/malachite/core/src/streaming.rs +++ b/ethexe/malachite/core/src/streaming.rs @@ -11,7 +11,7 @@ use std::{ cmp::Ordering, - collections::{BTreeMap, BinaryHeap, HashSet}, + collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet}, }; use parity_scale_codec::{Decode, Encode, Error as CodecError, Input, Output}; @@ -30,6 +30,8 @@ const MAX_STREAM_MESSAGES: u64 = 16; const MAX_STREAMS_PER_PEER: usize = 64; const MAX_STREAMS_TOTAL: usize = 1024; +type StreamKey = (PeerId, StreamId); + /// Min-heap wrapper that orders `StreamMessage`s by ascending sequence. struct MinSeq(StreamMessage); @@ -179,8 +181,11 @@ impl ProposalParts { #[derive(Default)] pub struct PartStreamsMap { - streams: BTreeMap<(PeerId, StreamId), StreamState>, + streams: BTreeMap, peer_streams: BTreeMap, + recencies: BTreeMap, + recency_order: BTreeSet<(u64, PeerId, StreamId)>, + next_recency: u64, } impl PartStreamsMap { @@ -188,6 +193,11 @@ impl PartStreamsMap { Self::default() } + #[cfg(test)] + pub fn len(&self) -> usize { + self.streams.len() + } + /// Insert a part. Returns `Some(parts)` once the stream is /// complete (all parts seen + Fin received). Subsequent calls for /// the same `(peer, stream)` after completion return `None` — the @@ -202,8 +212,8 @@ impl PartStreamsMap { self.remove_stream(&key); return None; } - if !self.streams.contains_key(&key) && !self.can_open_stream(peer_id) { - return None; + if !self.streams.contains_key(&key) { + self.make_room_for_new_stream(peer_id); } let result = { @@ -218,20 +228,59 @@ impl PartStreamsMap { }; if result.is_some() { self.remove_stream(&key); + } else { + self.touch_stream(&key); } result } - fn can_open_stream(&self, peer_id: PeerId) -> bool { - self.streams.len() < MAX_STREAMS_TOTAL - && self.peer_streams.get(&peer_id).copied().unwrap_or_default() < MAX_STREAMS_PER_PEER + fn make_room_for_new_stream(&mut self, peer_id: PeerId) { + if self + .peer_streams + .get(&peer_id) + .is_some_and(|count| *count >= MAX_STREAMS_PER_PEER) + && let Some(key) = self.oldest_stream_for_peer(peer_id) + { + self.remove_stream(&key); + } + + if self.streams.len() >= MAX_STREAMS_TOTAL + && let Some((_, peer_id, stream_id)) = self.recency_order.iter().next().cloned() + { + self.remove_stream(&(peer_id, stream_id)); + } } - fn remove_stream(&mut self, key: &(PeerId, StreamId)) { + fn oldest_stream_for_peer(&self, peer_id: PeerId) -> Option { + self.recency_order + .iter() + .find(|(_, candidate_peer, _)| *candidate_peer == peer_id) + .map(|(_, peer_id, stream_id)| (*peer_id, stream_id.clone())) + } + + fn touch_stream(&mut self, key: &StreamKey) { + let recency = self.next_recency; + self.next_recency = self + .next_recency + .checked_add(1) + .expect("stream recency counter overflowed"); + + if let Some(old_recency) = self.recencies.insert(key.clone(), recency) { + self.recency_order + .remove(&(old_recency, key.0, key.1.clone())); + } + self.recency_order.insert((recency, key.0, key.1.clone())); + } + + fn remove_stream(&mut self, key: &StreamKey) { if self.streams.remove(key).is_none() { return; } + if let Some(recency) = self.recencies.remove(key) { + self.recency_order.remove(&(recency, key.0, key.1.clone())); + } + if let Some(count) = self.peer_streams.get_mut(&key.0) { *count -= 1; if *count == 0 { @@ -286,6 +335,22 @@ mod tests { StreamMessage::new(stream_id, seq, StreamContent::Fin) } + fn fill_global_cap(map: &mut PartStreamsMap, start_stream: u64) { + let mut stream = start_stream; + for peer_byte in 2..=250 { + for _ in 0..MAX_STREAMS_PER_PEER { + if map.len() == MAX_STREAMS_TOTAL { + return; + } + let p = peer_id(peer_byte); + let s = sid(stream); + assert!(map.insert(p, msg(s, 0, init_part(stream))).is_none()); + stream += 1; + } + } + assert_eq!(map.len(), MAX_STREAMS_TOTAL); + } + #[test] fn complete_in_order_assembles() { let mut map = PartStreamsMap::new(); @@ -321,6 +386,8 @@ mod tests { "completed stream must be removed from PartStreamsMap" ); assert_eq!(map.streams.len(), 0); + assert!(map.recencies.is_empty()); + assert!(map.recency_order.is_empty()); } #[test] @@ -364,32 +431,109 @@ mod tests { } #[test] - fn part_streams_map_bounds_single_peer_flood() { + fn per_peer_cap_evicts_oldest_and_accepts_new_stream() { let mut map = PartStreamsMap::new(); let p = peer_id(1); - for stream_idx in 0..100u64 { + let first = sid(0xA000_0000); + for stream_idx in 0..MAX_STREAMS_PER_PEER as u64 { let s = sid(0xA000_0000 + stream_idx); assert!(map.insert(p, msg(s.clone(), 0, init_part(1))).is_none()); - assert!(map.insert(p, msg(s, 1, data_part(b"x"))).is_none()); - } - - for stream_idx in 0..100u64 { - let s = sid(0xB000_0000 + stream_idx); - assert!(map.insert(p, fin_msg(s, u64::MAX / 2)).is_none()); } assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER); assert_eq!(map.peer_streams.get(&p), Some(&MAX_STREAMS_PER_PEER)); + + let newest = sid(0xB000_0000); + assert!( + map.insert(p, msg(newest.clone(), 0, init_part(1))) + .is_none() + ); + + assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER); + assert!(!map.streams.contains_key(&(p, first))); + assert!(map.streams.contains_key(&(p, newest.clone()))); + + assert!( + map.insert(p, msg(newest.clone(), 1, data_part(b"new"))) + .is_none() + ); + assert!(map.insert(p, fin_msg(newest.clone(), 2)).is_some()); + assert!(!map.streams.contains_key(&(p, newest))); + assert_eq!(map.peer_streams.get(&p), Some(&(MAX_STREAMS_PER_PEER - 1))); } #[test] - fn malformed_far_future_fin_evicts_stream() { + fn global_cap_evicts_oldest_and_stays_bounded() { + let mut map = PartStreamsMap::new(); + let first_peer = peer_id(1); + let first_stream = sid(10); + + assert!( + map.insert(first_peer, msg(first_stream.clone(), 0, init_part(10))) + .is_none() + ); + fill_global_cap(&mut map, 1_000); + assert_eq!(map.len(), MAX_STREAMS_TOTAL); + + let new_peer = peer_id(251); + let new_stream = sid(20_000); + assert!( + map.insert(new_peer, msg(new_stream.clone(), 0, init_part(20_000))) + .is_none() + ); + + assert_eq!(map.len(), MAX_STREAMS_TOTAL); + assert!(!map.streams.contains_key(&(first_peer, first_stream))); + assert!(map.streams.contains_key(&(new_peer, new_stream))); + } + + #[test] + fn valid_parts_refresh_existing_stream_recency() { + let mut map = PartStreamsMap::new(); + let p = peer_id(1); + let refreshed = sid(50); + let stale = sid(51); + + assert!( + map.insert(p, msg(refreshed.clone(), 0, init_part(50))) + .is_none() + ); + assert!( + map.insert(p, msg(stale.clone(), 0, init_part(51))) + .is_none() + ); + assert!( + map.insert(p, msg(refreshed.clone(), 1, data_part(b"fresh"))) + .is_none() + ); + + fill_global_cap(&mut map, 2_000); + + let new_peer = peer_id(251); + let new_stream = sid(30_000); + assert!( + map.insert(new_peer, msg(new_stream.clone(), 0, init_part(30_000))) + .is_none() + ); + + assert!(map.streams.contains_key(&(p, refreshed))); + assert!(!map.streams.contains_key(&(p, stale))); + assert!(map.streams.contains_key(&(new_peer, new_stream))); + } + + #[test] + fn malformed_far_future_fin_evicts_only_its_stream() { let mut map = PartStreamsMap::new(); let p = peer_id(1); let s = sid(30); + let other = sid(31); assert!(map.insert(p, msg(s.clone(), 0, init_part(30))).is_none()); + assert!( + map.insert(p, msg(other.clone(), 0, init_part(31))) + .is_none() + ); assert!(map.streams.contains_key(&(p, s.clone()))); assert!( map.insert(p, fin_msg(s.clone(), MAX_STREAM_MESSAGES)) @@ -397,11 +541,13 @@ mod tests { ); assert!(!map.streams.contains_key(&(p, s))); - assert!(!map.peer_streams.contains_key(&p)); + assert!(map.streams.contains_key(&(p, other.clone()))); + assert_eq!(map.peer_streams.get(&p), Some(&1)); + assert!(map.recencies.contains_key(&(p, other))); } #[test] - fn cap_does_not_block_existing_stream_completion() { + fn completed_stream_releases_slot_after_cap_eviction() { let mut map = PartStreamsMap::new(); let p = peer_id(1); let first = sid(40); @@ -421,13 +567,15 @@ mod tests { map.insert(p, msg(over_cap.clone(), 0, init_part(40))) .is_none() ); - assert!(!map.streams.contains_key(&(p, over_cap))); + assert!(!map.streams.contains_key(&(p, first.clone()))); + assert!(map.streams.contains_key(&(p, over_cap.clone()))); assert!( - map.insert(p, msg(first.clone(), 1, data_part(b"ok"))) + map.insert(p, msg(over_cap.clone(), 1, data_part(b"ok"))) .is_none() ); - assert!(map.insert(p, fin_msg(first.clone(), 2)).is_some()); - assert!(!map.streams.contains_key(&(p, first))); + assert!(map.insert(p, fin_msg(over_cap.clone(), 2)).is_some()); + assert!(!map.streams.contains_key(&(p, over_cap))); + assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER - 1); } }