diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 4f76b6eb5cd..eb2e4a61a2f 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -542,7 +542,8 @@ pub async fn init_l1( let local_node_record = get_local_node_record(&datadir, &local_p2p_node, &signer); - let peer_table = PeerTableServer::spawn(opts.target_peers, store.clone()); + let peer_table = + PeerTableServer::spawn(local_p2p_node.node_id(), opts.target_peers, store.clone()); // TODO: Check every module starts properly. let tracker = TaskTracker::new(); diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index 668c1d9d55d..44b3ac8eaec 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -250,7 +250,11 @@ pub async fn init_l2( if !opts.sequencer_opts.based { blockchain.set_synced(); } - let peer_table = PeerTableServer::spawn(opts.node_opts.target_peers, store.clone()); + let peer_table = PeerTableServer::spawn( + local_p2p_node.node_id(), + opts.node_opts.target_peers, + store.clone(), + ); let p2p_context = P2PContext::new( local_p2p_node.clone(), network_config, diff --git a/crates/blockchain/metrics/p2p.rs b/crates/blockchain/metrics/p2p.rs index 14244ad7d3a..4ab303038b3 100644 --- a/crates/blockchain/metrics/p2p.rs +++ b/crates/blockchain/metrics/p2p.rs @@ -98,7 +98,7 @@ impl MetricsP2P { kademlia_insert_contact_duration: Histogram::with_opts( HistogramOpts::new( "ethrex_kademlia_insert_contact_duration_seconds", - "Duration of peer table contact insertion operations", + "Duration of Kademlia insert_contact operations", ) .buckets(vec![ 0.000_001, 0.000_005, 0.000_01, 0.000_05, 0.000_1, 0.000_5, 0.001, 0.01, @@ -108,7 +108,7 @@ impl MetricsP2P { kademlia_iter_contacts_duration: Histogram::with_opts( HistogramOpts::new( "ethrex_kademlia_iter_contacts_duration_seconds", - "Duration of peer table full-scan operations", + "Duration of Kademlia iter_contacts full-scan operations", ) .buckets(vec![ 0.000_01, 0.000_05, 0.000_1, 0.000_5, 0.001, 0.005, 0.01, 0.05, 0.1, diff --git a/crates/networking/p2p/discovery/discv4_handlers.rs b/crates/networking/p2p/discovery/discv4_handlers.rs index 7999ce5a070..57fb78f6d99 100644 --- a/crates/networking/p2p/discovery/discv4_handlers.rs +++ b/crates/networking/p2p/discovery/discv4_handlers.rs @@ -1,19 +1,24 @@ use crate::{ backend, + discovery::lookup::{IterativeLookup, LOOKUP_ALPHA, LOOKUP_BUCKET_SIZE}, discv4::{ messages::{ - ENRRequestMessage, ENRResponseMessage, Message, NeighborsMessage, PingMessage, - PongMessage, + ENRRequestMessage, ENRResponseMessage, FindNodeMessage, Message, NeighborsMessage, + PingMessage, PongMessage, }, server::{Discv4Message, EXPIRATION_SECONDS}, }, metrics::METRICS, peer_table::{Contact, ContactValidation, DiscoveryProtocol, PeerTableServerProtocol as _}, types::{Endpoint, Node, NodeRecord}, - utils::{get_msg_expiration_from_seconds, is_msg_expired, node_id}, + utils::{ + get_msg_expiration_from_seconds, is_msg_expired, node_id, public_key_from_signing_key, + }, }; use bytes::{Bytes, BytesMut}; use ethrex_common::{H256, H512, types::ForkId}; +use rand::rngs::OsRng; +use secp256k1::SecretKey; use std::time::Duration; use tracing::{debug, error, trace}; @@ -21,6 +26,8 @@ use super::server::{DiscoveryServer, DiscoveryServerError}; /// Discv4 revalidation interval. const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12 hours +/// Peer count threshold below which we use bootstrap-mode settings. +const BOOTSTRAP_THRESHOLD: usize = 30; impl DiscoveryServer { pub(crate) async fn discv4_process_message( @@ -120,35 +127,145 @@ impl DiscoveryServer { } pub(crate) async fn discv4_lookup(&mut self) -> Result<(), DiscoveryServerError> { + if self.discv4.is_none() { + return Ok(()); + } + + // Remove finished lookups + self.discv4 + .as_mut() + .expect("discv4 state must exist") + .active_lookups + .retain(|(l, _)| !l.is_finished()); + + let peer_count = self.peer_table.peer_count().await.unwrap_or(0); + + // Above bootstrap threshold, don't chain back-to-back — let the timer + // fall back to the slow interval between lookups to avoid excessive + // FindNode traffic at steady-state. + if peer_count >= BOOTSTRAP_THRESHOLD + && self + .discv4 + .as_ref() + .expect("discv4 state must exist") + .active_lookups + .is_empty() + { + return Ok(()); + } + + // Start a new lookup if none active + if self + .discv4 + .as_ref() + .expect("discv4 state must exist") + .active_lookups + .is_empty() + { + // Generate random target + let random_priv_key = SecretKey::new(&mut OsRng); + let random_pub_key = public_key_from_signing_key(&random_priv_key); + let target_id = node_id(&random_pub_key); + + // Seed with closest known nodes from the connection pool + let seed = self + .peer_table + .get_closest_from_pool(target_id, LOOKUP_BUCKET_SIZE) + .await?; + if !seed.is_empty() { + trace!( + protocol = "discv4", + seeds = seed.len(), + "Starting new iterative lookup" + ); + let lookup = IterativeLookup::new(target_id, seed); + + // Sign one FindNode message for this target + let expiration = get_msg_expiration_from_seconds(EXPIRATION_SECONDS); + let msg = Message::FindNode(FindNodeMessage::new(random_pub_key, expiration)); + let mut buf = BytesMut::new(); + msg.encode_with_header(&mut buf, &self.signer); + + let discv4 = self.discv4.as_mut().expect("discv4 state must exist"); + discv4.active_lookups.push((lookup, buf)); + } else { + trace!( + protocol = "discv4", + "No seeds for lookup, connection pool empty" + ); + } + } + + self.advance_v4_lookup().await + } + + async fn advance_v4_lookup(&mut self) -> Result<(), DiscoveryServerError> { let discv4 = match &mut self.discv4 { Some(s) => s, None => return Ok(()), }; - if let Some(contact) = self - .peer_table - .get_contact_for_lookup(DiscoveryProtocol::Discv4) - .await? - { - if let Err(e) = self - .udp_socket - .send_to(&discv4.find_node_message, &contact.node.udp_addr()) - .await - { - error!(protocol = "discv4", sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message"); - self.peer_table.set_disposable(contact.node.node_id())?; + + if discv4.active_lookups.is_empty() { + return Ok(()); + } + + let peer_count = self.peer_table.peer_count().await.unwrap_or(0); + + // Collect queries from all active lookups + let mut queries: Vec<(usize, H256, Node, BytesMut)> = Vec::new(); + for (idx, (lookup, message)) in discv4.active_lookups.iter_mut().enumerate() { + for (node_id, node) in lookup.next_to_query(LOOKUP_ALPHA) { + queries.push((idx, node_id, node, message.clone())); + } + } + + if !queries.is_empty() { + trace!( + protocol = "discv4", + count = queries.len(), + "Advancing lookups, querying nodes" + ); + } + + for (idx, node_id, node, message) in queries { + // Pre-bond: ping the node before querying it so it accepts our + // FindNode. Only during bootstrap, and only nodes we haven't + // already pinged. + if peer_count < BOOTSTRAP_THRESHOLD { + let already_pinged = self + .discv4 + .as_ref() + .map(|s| s.pinged_nodes.contains(&node_id)) + .unwrap_or(true); + if !already_pinged { + if let Some(discv4) = &mut self.discv4 { + discv4.pinged_nodes.insert(node_id); + } + let _ = self.discv4_send_ping(&node).await; + } + } + + if let Err(e) = self.udp_socket.send_to(&message, &node.udp_addr()).await { + error!(protocol = "discv4", sending = "FindNode", addr = ?node.udp_addr(), err=?e, "Error sending message"); + self.peer_table.set_disposable(node_id)?; METRICS.record_new_discarded_node(); + if let Some(discv4) = &mut self.discv4 + && let Some((lookup, _)) = discv4.active_lookups.get_mut(idx) + { + lookup.record_timeout(); + } } else { #[cfg(feature = "metrics")] { use ethrex_metrics::p2p::METRICS_P2P; METRICS_P2P.inc_discv4_outgoing("FindNode"); } - discv4 + self.discv4 + .as_mut() + .expect("discv4 state must exist") .pending_find_node - .insert(contact.node.node_id(), std::time::Instant::now()); + .insert(node_id, std::time::Instant::now()); } - self.peer_table - .increment_find_node_sent(contact.node.node_id())?; } Ok(()) } @@ -251,6 +368,9 @@ impl DiscoveryServer { .unwrap_or(false) { self.discv4_send_ping(&node).await?; + if let Some(discv4) = &mut self.discv4 { + discv4.pinged_nodes.insert(node.node_id()); + } } else { let node_id = node_id(&sender_public_key); let stored_enr_seq = self @@ -343,11 +463,23 @@ impl DiscoveryServer { } let nodes = neighbors_message.nodes; - self.peer_table.new_contacts( - nodes, - self.local_node.node_id(), - DiscoveryProtocol::Discv4, - )?; + self.peer_table + .new_contacts(nodes.clone(), DiscoveryProtocol::Discv4)?; + + // Feed results into ALL active lookups and advance them + if let Some(discv4) = &mut self.discv4 { + let entries: Vec<(H256, Node)> = + nodes.iter().map(|n| (n.node_id(), n.clone())).collect(); + for (lookup, _) in &mut discv4.active_lookups { + lookup.feed_results(entries.clone()); + } + // Record response on first active lookup (we don't track which triggered it) + if let Some((lookup, _)) = discv4.active_lookups.first_mut() { + lookup.record_response(); + } + } + self.advance_v4_lookup().await?; + Ok(()) } diff --git a/crates/networking/p2p/discovery/discv5_handlers.rs b/crates/networking/p2p/discovery/discv5_handlers.rs index 72f81246a9f..48e98f797dc 100644 --- a/crates/networking/p2p/discovery/discv5_handlers.rs +++ b/crates/networking/p2p/discovery/discv5_handlers.rs @@ -1,4 +1,5 @@ use crate::{ + discovery::lookup::{IterativeLookup, LOOKUP_ALPHA, LOOKUP_BUCKET_SIZE}, discv5::{ messages::{ DISTANCES_PER_FIND_NODE_MSG, FindNodeMessage, Handshake, HandshakeAuthdata, Message, @@ -30,6 +31,8 @@ use super::server::{DiscoveryServer, DiscoveryServerError}; /// Maximum number of ENRs per NODES message (limited by UDP packet size). const MAX_ENRS_PER_MESSAGE: usize = 3; +/// Peer count threshold below which we use bootstrap-mode settings. +const BOOTSTRAP_THRESHOLD: usize = 30; /// Nodes not validated within this interval are candidates for revalidation. const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12 hours /// Minimum interval between WHOAREYOU packets to the same IP address. @@ -238,8 +241,7 @@ impl DiscoveryServer { } if let Some(record) = &authdata.record { - self.peer_table - .new_contact_records(vec![record.clone()], self.local_node.node_id())?; + self.peer_table.new_contact_records(vec![record.clone()])?; } let session = derive_session_keys( @@ -278,38 +280,113 @@ impl DiscoveryServer { } pub(crate) async fn discv5_lookup(&mut self) -> Result<(), DiscoveryServerError> { - if let Some(contact) = self - .peer_table - .get_contact_for_lookup(DiscoveryProtocol::Discv5) - .await? + if self.discv5.is_none() { + return Ok(()); + } + + // Remove finished lookups + self.discv5 + .as_mut() + .expect("discv5 state must exist") + .active_lookups + .retain(|l| !l.is_finished()); + + let peer_count = self.peer_table.peer_count().await.unwrap_or(0); + + // Above bootstrap threshold, don't chain back-to-back — let the timer + // fall back to the slow interval between lookups to avoid excessive + // FindNode traffic at steady-state. + if peer_count >= BOOTSTRAP_THRESHOLD + && self + .discv5 + .as_ref() + .expect("discv5 state must exist") + .active_lookups + .is_empty() { - let find_node_msg = self.discv5_get_random_find_node_message(&contact.node); - if let Err(e) = self - .discv5_send_ordinary(find_node_msg, &contact.node) - .await - { - error!(protocol = "discv5", sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message"); - self.peer_table.set_disposable(contact.node.node_id())?; - METRICS.record_new_discarded_node(); + return Ok(()); + } + + // Start a new lookup if none active + if self + .discv5 + .as_ref() + .expect("discv5 state must exist") + .active_lookups + .is_empty() + { + let mut rng = OsRng; + let target_id: H256 = rng.r#gen(); + + // Seed with closest known nodes from the connection pool + let seed = self + .peer_table + .get_closest_from_pool(target_id, LOOKUP_BUCKET_SIZE) + .await?; + if !seed.is_empty() { + trace!( + protocol = "discv5", + seeds = seed.len(), + "Starting new iterative lookup" + ); + let lookup = IterativeLookup::new(target_id, seed); + let discv5 = self.discv5.as_mut().expect("discv5 state must exist"); + discv5.active_lookups.push(lookup); + } else { + trace!( + protocol = "discv5", + "No seeds for lookup, connection pool empty" + ); } + } + + self.advance_v5_lookup().await + } + + async fn advance_v5_lookup(&mut self) -> Result<(), DiscoveryServerError> { + let discv5 = match &mut self.discv5 { + Some(s) => s, + None => return Ok(()), + }; - self.peer_table - .increment_find_node_sent(contact.node.node_id())?; + if discv5.active_lookups.is_empty() { + return Ok(()); + } + + // Collect queries from all active lookups + let mut queries: Vec<(usize, H256, H256, Node)> = Vec::new(); + for (idx, lookup) in discv5.active_lookups.iter_mut().enumerate() { + let target = lookup.target; + for (node_id, node) in lookup.next_to_query(LOOKUP_ALPHA) { + queries.push((idx, target, node_id, node)); + } + } + + for (idx, target, node_id, node) in queries { + let find_node_msg = self.discv5_build_find_node_for_target(target, &node); + if let Err(e) = self.discv5_send_ordinary(find_node_msg, &node).await { + error!(protocol = "discv5", sending = "FindNode", addr = ?node.udp_addr(), err=?e, "Error sending message"); + self.peer_table.set_disposable(node_id)?; + METRICS.record_new_discarded_node(); + if let Some(discv5) = &mut self.discv5 + && let Some(lookup) = discv5.active_lookups.get_mut(idx) + { + lookup.record_timeout(); + } + } } Ok(()) } - fn discv5_get_random_find_node_message(&self, node: &Node) -> Message { - let mut rng = OsRng; - let target = rng.r#gen(); - let distance = distance(&target, &node.node_id()) as u8; + fn discv5_build_find_node_for_target(&self, target: H256, node: &Node) -> Message { + let center_distance = distance(&target, &node.node_id()) as u8; let mut distances = Vec::new(); - distances.push(distance as u32); + distances.push(center_distance as u32); for i in 0..DISTANCES_PER_FIND_NODE_MSG / 2 { - if let Some(d) = distance.checked_add(i + 1) { + if let Some(d) = center_distance.checked_add(i + 1) { distances.push(d as u32) } - if let Some(d) = distance.checked_sub(i + 1) { + if let Some(d) = center_distance.checked_sub(i + 1) { distances.push(d as u32) } } @@ -417,10 +494,7 @@ impl DiscoveryServer { let mut nodes = self .peer_table - .get_nodes_at_distances( - self.local_node.node_id(), - find_node_message.distances.clone(), - ) + .get_nodes_at_distances(find_node_message.distances.clone()) .await?; if find_node_message.distances.contains(&0) { nodes.push(self.local_node_record.clone()); @@ -469,7 +543,25 @@ impl DiscoveryServer { nodes_message: NodesMessage, ) -> Result<(), DiscoveryServerError> { self.peer_table - .new_contact_records(nodes_message.nodes, self.local_node.node_id())?; + .new_contact_records(nodes_message.nodes.clone())?; + + // Feed results into ALL active lookups and advance them + if let Some(discv5) = &mut self.discv5 { + let entries: Vec<(H256, Node)> = nodes_message + .nodes + .iter() + .filter_map(|r| Node::from_enr(r).ok().map(|n| (n.node_id(), n))) + .collect(); + for lookup in &mut discv5.active_lookups { + lookup.feed_results(entries.clone()); + } + // Record response on first active lookup (we don't track which triggered it) + if let Some(lookup) = discv5.active_lookups.first_mut() { + lookup.record_response(); + } + } + self.advance_v5_lookup().await?; + Ok(()) } diff --git a/crates/networking/p2p/discovery/lookup.rs b/crates/networking/p2p/discovery/lookup.rs new file mode 100644 index 00000000000..a23324f866d --- /dev/null +++ b/crates/networking/p2p/discovery/lookup.rs @@ -0,0 +1,204 @@ +use crate::peer_table::xor_distance; +use crate::types::Node; +use ethrex_common::H256; +use rustc_hash::FxHashSet; +use std::time::{Duration, Instant}; + +/// Number of concurrent queries per iteration round. +pub const LOOKUP_ALPHA: usize = 3; +/// Maximum entries in the result set (Kademlia k parameter). +pub const LOOKUP_BUCKET_SIZE: usize = 16; +/// Maximum duration before a lookup is considered timed out. +pub const LOOKUP_TIMEOUT: Duration = Duration::from_secs(20); + +/// A single entry in the iterative lookup result set. +#[derive(Debug, Clone)] +pub struct LookupEntry { + pub node_id: H256, + pub node: Node, + pub distance: H256, + pub queried: bool, +} + +/// Iterative convergence lookup (geth-style). +/// +/// Generates a random target, seeds with closest known nodes, queries alpha=3 +/// closest not-yet-asked nodes, feeds responses back, and iterates until +/// convergence (no more unqueried entries closer than what we have, or timeout). +#[derive(Debug)] +pub struct IterativeLookup { + pub target: H256, + result: Vec, + seen: FxHashSet, + queries_in_flight: usize, + started_at: Instant, +} + +impl IterativeLookup { + /// Create a new iterative lookup seeded with the given nodes. + pub fn new(target: H256, seed_nodes: Vec<(H256, Node)>) -> Self { + let mut seen = FxHashSet::default(); + let mut result: Vec = Vec::with_capacity(LOOKUP_BUCKET_SIZE); + + for (node_id, node) in seed_nodes { + if seen.insert(node_id) { + let distance = xor_distance(&target, &node_id); + result.push(LookupEntry { + node_id, + node, + distance, + queried: false, + }); + } + } + + // Sort by distance (ascending) and truncate to bucket size + result.sort_by(|a, b| a.distance.cmp(&b.distance)); + result.truncate(LOOKUP_BUCKET_SIZE); + + Self { + target, + result, + seen, + queries_in_flight: 0, + started_at: Instant::now(), + } + } + + /// Returns up to `count` closest unqueried entries, marks them as queried, + /// and increments the in-flight counter. + pub fn next_to_query(&mut self, count: usize) -> Vec<(H256, Node)> { + let mut out = Vec::with_capacity(count); + for entry in &mut self.result { + if out.len() >= count { + break; + } + if !entry.queried { + entry.queried = true; + self.queries_in_flight += 1; + out.push((entry.node_id, entry.node.clone())); + } + } + out + } + + /// Feed response nodes into the lookup. Inserts new nodes if they are + /// closer than the farthest entry (or the result set is not full yet). + /// Deduplicates via the `seen` set. + pub fn feed_results(&mut self, nodes: Vec<(H256, Node)>) { + for (node_id, node) in nodes { + if !self.seen.insert(node_id) { + continue; + } + let distance = xor_distance(&self.target, &node_id); + + if self.result.len() < LOOKUP_BUCKET_SIZE { + self.result.push(LookupEntry { + node_id, + node, + distance, + queried: false, + }); + } else if let Some(farthest) = self.result.last() + && distance < farthest.distance + { + // Replace the farthest entry + let last_idx = self.result.len() - 1; + self.result[last_idx] = LookupEntry { + node_id, + node, + distance, + queried: false, + }; + } else { + continue; + } + + // Re-sort after insertion + self.result.sort_by(|a, b| a.distance.cmp(&b.distance)); + } + } + + /// Record that a response was received (decrements in-flight counter). + pub fn record_response(&mut self) { + self.queries_in_flight = self.queries_in_flight.saturating_sub(1); + } + + /// Record that a query timed out (same as record_response). + pub fn record_timeout(&mut self) { + self.queries_in_flight = self.queries_in_flight.saturating_sub(1); + } + + /// Returns true if the lookup has converged: + /// - All entries in the result set have been queried (don't wait for + /// stragglers — late responses still get processed via handle_neighbors + /// and feed into the connection pool / next lookup), OR + /// - The lookup has timed out. + pub fn is_finished(&self) -> bool { + if self.started_at.elapsed() >= LOOKUP_TIMEOUT { + return true; + } + !self.result.iter().any(|e| !e.queried) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ethrex_common::H512; + use std::net::{IpAddr, Ipv4Addr}; + + fn make_node(seed: u8) -> (H256, Node) { + let pk = H512::from_low_u64_be(seed as u64 + 1); + let node = Node::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, seed)), 30303, 30303, pk); + (node.node_id(), node) + } + + #[test] + fn new_sorts_by_distance_and_truncates() { + let target = H256::zero(); + let seeds: Vec<_> = (1..=20).map(|i| make_node(i)).collect(); + let lookup = IterativeLookup::new(target, seeds); + + assert!(lookup.result.len() <= LOOKUP_BUCKET_SIZE); + for w in lookup.result.windows(2) { + assert!(w[0].distance <= w[1].distance); + } + } + + #[test] + fn next_to_query_returns_alpha_entries() { + let target = H256::zero(); + let seeds: Vec<_> = (1..=10).map(|i| make_node(i)).collect(); + let mut lookup = IterativeLookup::new(target, seeds); + + let batch = lookup.next_to_query(LOOKUP_ALPHA); + assert_eq!(batch.len(), LOOKUP_ALPHA); + assert_eq!(lookup.queries_in_flight, LOOKUP_ALPHA); + } + + #[test] + fn feed_results_deduplicates() { + let target = H256::zero(); + let seeds: Vec<_> = (1..=3).map(|i| make_node(i)).collect(); + let mut lookup = IterativeLookup::new(target, seeds.clone()); + + let initial_len = lookup.result.len(); + // Feed the same nodes again + lookup.feed_results(seeds); + assert_eq!(lookup.result.len(), initial_len); + } + + #[test] + fn is_finished_when_all_queried() { + let target = H256::zero(); + let seeds: Vec<_> = (1..=2).map(|i| make_node(i)).collect(); + let mut lookup = IterativeLookup::new(target, seeds); + + assert!(!lookup.is_finished()); + + let _ = lookup.next_to_query(10); + // Finished once all entries are queried (don't wait for in-flight) + assert!(lookup.is_finished()); + } +} diff --git a/crates/networking/p2p/discovery/mod.rs b/crates/networking/p2p/discovery/mod.rs index 345d66f0d89..a95c374ee9c 100644 --- a/crates/networking/p2p/discovery/mod.rs +++ b/crates/networking/p2p/discovery/mod.rs @@ -13,6 +13,7 @@ pub mod codec; mod discv4_handlers; mod discv5_handlers; +pub mod lookup; pub mod server; pub use server::{DiscoveryServer, DiscoveryServerError, is_discv4_packet}; diff --git a/crates/networking/p2p/discovery/server.rs b/crates/networking/p2p/discovery/server.rs index 9348bcf6c32..d5ff2ca7f08 100644 --- a/crates/networking/p2p/discovery/server.rs +++ b/crates/networking/p2p/discovery/server.rs @@ -39,7 +39,11 @@ const DISCV4_MIN_PACKET_SIZE: usize = 98; // Shared constants const REVALIDATION_CHECK_INTERVAL: Duration = Duration::from_secs(1); const PRUNE_INTERVAL: Duration = Duration::from_secs(5); -const CHANGE_FIND_NODE_MESSAGE_INTERVAL: Duration = Duration::from_secs(5); +/// Faster tick rate used while an iterative lookup is active to drive convergence. +const ACTIVE_LOOKUP_TICK: Duration = Duration::from_millis(50); +/// Slower tick during bootstrap to give ping/pong bonds time to establish +/// between lookup ticks, reducing wasted FindNode queries. +const BOOTSTRAP_LOOKUP_TICK: Duration = Duration::from_millis(200); #[derive(Debug, Error)] pub enum DiscoveryServerError { @@ -73,7 +77,6 @@ pub trait DiscoveryServerProtocol: Send + Sync { fn lookup_v4(&self) -> Result<(), ActorError>; fn lookup_v5(&self) -> Result<(), ActorError>; fn enr_lookup(&self) -> Result<(), ActorError>; - fn change_find_node_message(&self) -> Result<(), ActorError>; fn prune(&self) -> Result<(), ActorError>; fn shutdown(&self) -> Result<(), ActorError>; } @@ -127,12 +130,8 @@ impl DiscoveryServer { count = bootnodes.len(), "Adding bootnodes" ); - peer_table.new_contacts( - bootnodes.clone(), - local_node.node_id(), - DiscoveryProtocol::Discv4, - )?; - Some(Discv4State::new(&signer)) + peer_table.new_contacts(bootnodes.clone(), DiscoveryProtocol::Discv4)?; + Some(Discv4State::default()) } else { None }; @@ -143,11 +142,7 @@ impl DiscoveryServer { count = bootnodes.len(), "Adding bootnodes" ); - peer_table.new_contacts( - bootnodes.clone(), - local_node.node_id(), - DiscoveryProtocol::Discv5, - )?; + peer_table.new_contacts(bootnodes.clone(), DiscoveryProtocol::Discv5)?; Some(Discv5State::default()) } else { None @@ -170,6 +165,12 @@ impl DiscoveryServer { for bootnode in &bootnodes { server.discv4_send_ping(bootnode).await?; } + // Record bootnodes as already pinged + if let Some(discv4) = &mut server.discv4 { + for bootnode in &bootnodes { + discv4.pinged_nodes.insert(bootnode.node_id()); + } + } } server.start(); @@ -209,11 +210,6 @@ impl DiscoveryServer { ctx.clone(), discovery_server_protocol::RevalidateV4, ); - send_interval( - CHANGE_FIND_NODE_MESSAGE_INTERVAL, - ctx.clone(), - discovery_server_protocol::ChangeFindNodeMessage, - ); let _ = ctx.send(discovery_server_protocol::LookupV4); let _ = ctx.send(discovery_server_protocol::EnrLookup); } @@ -286,7 +282,20 @@ impl DiscoveryServer { let _ = self.discv4_lookup().await.inspect_err( |e| error!(protocol = "discv4", err=?e, "Error performing Discovery lookup"), ); - let interval = self.get_lookup_interval().await; + let interval = if self + .discv4 + .as_ref() + .is_some_and(|s| !s.active_lookups.is_empty()) + { + let peer_count = self.peer_table.peer_count().await.unwrap_or(0); + if peer_count < 30 { + BOOTSTRAP_LOOKUP_TICK + } else { + ACTIVE_LOOKUP_TICK + } + } else { + self.get_lookup_interval().await + }; send_after(interval, ctx.clone(), discovery_server_protocol::LookupV4); } @@ -300,7 +309,20 @@ impl DiscoveryServer { let _ = self.discv5_lookup().await.inspect_err( |e| error!(protocol = "discv5", err=?e, "Error performing Discovery lookup"), ); - let interval = self.get_lookup_interval().await; + let interval = if self + .discv5 + .as_ref() + .is_some_and(|s| !s.active_lookups.is_empty()) + { + let peer_count = self.peer_table.peer_count().await.unwrap_or(0); + if peer_count < 30 { + BOOTSTRAP_LOOKUP_TICK + } else { + ACTIVE_LOOKUP_TICK + } + } else { + self.get_lookup_interval().await + }; send_after(interval, ctx.clone(), discovery_server_protocol::LookupV5); } @@ -318,17 +340,6 @@ impl DiscoveryServer { send_after(interval, ctx.clone(), discovery_server_protocol::EnrLookup); } - #[send_handler] - async fn handle_change_find_node_message( - &mut self, - _msg: discovery_server_protocol::ChangeFindNodeMessage, - _ctx: &Context, - ) { - if let Some(discv4) = &mut self.discv4 { - discv4.find_node_message = Discv4State::random_message(&self.signer); - } - } - #[send_handler] async fn handle_prune(&mut self, _msg: discovery_server_protocol::Prune, _ctx: &Context) { trace!(received = "Prune"); @@ -406,6 +417,9 @@ impl DiscoveryServer { discv4 .pending_find_node .retain(|_, sent_at| sent_at.elapsed() < expiration); + if discv4.pinged_nodes.len() > 10_000 { + discv4.pinged_nodes.clear(); + } } let winning_ip = self .discv5 diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index b5a0502536e..bfce4b6a365 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -1,46 +1,26 @@ use crate::{ - discv4::messages::{FindNodeMessage, Message, Packet}, - utils::{get_msg_expiration_from_seconds, node_id, public_key_from_signing_key}, + discovery::lookup::IterativeLookup, + discv4::messages::{Message, Packet}, + utils::node_id, }; use bytes::BytesMut; use ethrex_common::{H256, H512}; -use rand::rngs::OsRng; -use secp256k1::SecretKey; +use rustc_hash::FxHashSet; use std::{collections::HashMap, net::SocketAddr, time::Instant}; pub const EXPIRATION_SECONDS: u64 = 20; /// Discv4-specific state held within the unified DiscoveryServer. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Discv4State { - /// The last `FindNode` message sent, cached due to message - /// signatures being expensive. - pub find_node_message: BytesMut, /// Tracks pending FindNode requests by node_id -> sent_at. /// Used to reject unsolicited Neighbors responses. pub pending_find_node: HashMap, -} - -impl Discv4State { - pub fn new(signer: &SecretKey) -> Self { - Self { - find_node_message: Self::random_message(signer), - pending_find_node: HashMap::new(), - } - } - - /// Generate a FindNodeMessage with a random key. - /// We send the same message on discovery lookup. - /// Changed every CHANGE_FIND_NODE_MESSAGE_INTERVAL. - pub fn random_message(signer: &SecretKey) -> BytesMut { - let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS); - let random_priv_key = SecretKey::new(&mut OsRng); - let random_pub_key = public_key_from_signing_key(&random_priv_key); - let msg = Message::FindNode(FindNodeMessage::new(random_pub_key, expiration)); - let mut buf = BytesMut::new(); - msg.encode_with_header(&mut buf, signer); - buf - } + /// Currently active iterative lookups, each with its cached signed FindNode message. + pub active_lookups: Vec<(IterativeLookup, BytesMut)>, + /// Tracks nodes we have already pinged so we don't re-ping (which would + /// invalidate an already-bonded contact via `record_ping_sent`). + pub pinged_nodes: FxHashSet, } #[derive(Debug, Clone)] diff --git a/crates/networking/p2p/discv5/server.rs b/crates/networking/p2p/discv5/server.rs index 41dfc3c44ae..a184d7f8c79 100644 --- a/crates/networking/p2p/discv5/server.rs +++ b/crates/networking/p2p/discv5/server.rs @@ -1,3 +1,4 @@ +use crate::discovery::lookup::IterativeLookup; use crate::discv5::messages::Message; use crate::{ discv5::messages::Packet, @@ -46,6 +47,8 @@ pub struct Discv5State { pub ip_vote_period_start: Option, /// Whether the first (fast) voting round has completed. pub first_ip_vote_round_completed: bool, + /// Currently active iterative lookups. + pub active_lookups: Vec, } impl Default for Discv5State { @@ -64,6 +67,7 @@ impl Default for Discv5State { ip_votes: Default::default(), ip_vote_period_start: None, first_ip_vote_round_completed: false, + active_lookups: Vec::new(), } } } diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index c3d57af4603..f7fda6fd955 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -425,6 +425,7 @@ impl PeerHandler { debug!( "[SYNCING] Received empty headers from peer {peer_id}, trying another" ); + let _ = self.peer_table.set_disposable(peer_id); return Ok(None); } if are_block_headers_chained(&block_headers, &order) { @@ -438,7 +439,7 @@ impl PeerHandler { self.peer_table.record_failure(peer_id)?; return Ok(None); } - // Timeouted + // Timeout or invalid response - mark peer as disposable warn!( "[SYNCING] Didn't receive block headers from peer, penalizing peer {peer_id}..." ); @@ -523,6 +524,7 @@ impl PeerHandler { "[SYNCING] Didn't receive block bodies from peer, penalizing peer {peer_id}..." ); self.peer_table.record_failure(peer_id)?; + let _ = self.peer_table.set_disposable(peer_id); Ok(None) } } diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index be38d656c9e..fc273b5519e 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -17,9 +17,9 @@ use crate::{ utils::distance, }; use bytes::Bytes; -use ethrex_common::H256; +use ethrex_common::{H256, U256}; use ethrex_storage::Store; -use indexmap::{IndexMap, map::Entry}; +use indexmap::IndexMap; use rand::distributions::WeightedIndex; use rand::prelude::Distribution; use rand::seq::{IteratorRandom, SliceRandom}; @@ -39,8 +39,6 @@ const MAX_SCORE: i64 = 50; const MIN_SCORE: i64 = -50; /// Score assigned to peers who are acting maliciously (e.g., returning a node with wrong hash) const MIN_SCORE_CRITICAL: i64 = MIN_SCORE * 3; -/// Maximum amount of FindNode messages sent to a single node. -const MAX_FIND_NODE_PER_PEER: u64 = 20; /// Score weight for the load balancing function. const SCORE_WEIGHT: i64 = 1; /// Weight for amount of requests being handled by the peer for the load balancing function. @@ -49,13 +47,124 @@ const REQUESTS_WEIGHT: i64 = 1; const MAX_CONCURRENT_REQUESTS_PER_PEER: i64 = 100; /// The target number of RLPx connections to reach. pub const TARGET_PEERS: usize = 100; -/// The target number of contacts to maintain in peer_table. -const TARGET_CONTACTS: usize = 100_000; /// Maximum number of ENRs to return in a FindNode response (discv4 compatible). pub(crate) const MAX_NODES_IN_NEIGHBORS_PACKET: usize = 16; /// Maximum number of ENRs to return in a discv5 FindNode response. const MAX_ENRS_PER_FINDNODE_RESPONSE: usize = 16; +/// Number of k-buckets in the Kademlia routing table (one per bit of the 256-bit node ID). +const NUMBER_OF_BUCKETS: usize = 256; +/// Maximum number of contacts per k-bucket (Kademlia k parameter). +pub const MAX_NODES_PER_BUCKET: usize = 16; +/// Maximum number of replacement entries per k-bucket. +const MAX_REPLACEMENTS_PER_BUCKET: usize = 10; +/// Maximum number of entries in the flat connection candidate pool. +/// This pool is separate from the k-bucket routing table and retains +/// more contacts for RLPx connection initiation than the k-bucket +/// structure allows (256 × 16 = 4,096 vs this larger capacity). +/// 10K matches what Reth and Nethermind use for their candidate pools. +const MAX_CONNECTION_POOL_SIZE: usize = 10_000; + +/// A single k-bucket in the Kademlia routing table. +/// Each bucket stores contacts at a specific XOR distance range from the local node. +#[derive(Debug, Clone, Default)] +pub struct KBucket { + pub(crate) contacts: Vec<(H256, Contact)>, + pub(crate) replacements: Vec<(H256, Contact)>, +} + +impl KBucket { + /// Find a contact by node ID in the main list. + fn get(&self, node_id: &H256) -> Option<&Contact> { + self.contacts + .iter() + .find(|(id, _)| id == node_id) + .map(|(_, c)| c) + } + + /// Find a contact by node ID in either the main or replacement list. + fn get_any(&self, node_id: &H256) -> Option<&Contact> { + self.get(node_id).or_else(|| { + self.replacements + .iter() + .find(|(id, _)| id == node_id) + .map(|(_, c)| c) + }) + } + + /// Find a mutable reference to a contact by node ID (main or replacement list). + fn get_mut(&mut self, node_id: &H256) -> Option<&mut Contact> { + if let Some((_, c)) = self.contacts.iter_mut().find(|(id, _)| id == node_id) { + return Some(c); + } + self.replacements + .iter_mut() + .find(|(id, _)| id == node_id) + .map(|(_, c)| c) + } + + /// Check if a contact exists in this bucket (main or replacement list). + fn contains(&self, node_id: &H256) -> bool { + self.contacts.iter().any(|(id, _)| id == node_id) + || self.replacements.iter().any(|(id, _)| id == node_id) + } + + /// Insert a contact into the bucket. Returns true if inserted into main list. + /// If the bucket is full, the contact is added to the replacement list instead. + fn insert(&mut self, node_id: H256, contact: Contact) -> bool { + if self.contacts.len() < MAX_NODES_PER_BUCKET { + self.contacts.push((node_id, contact)); + true + } else { + self.insert_replacement(node_id, contact); + false + } + } + + /// Add a contact to the replacement list, evicting the oldest if full. + fn insert_replacement(&mut self, node_id: H256, contact: Contact) { + if self.replacements.len() >= MAX_REPLACEMENTS_PER_BUCKET { + self.replacements.remove(0); + } + self.replacements.push((node_id, contact)); + } + + /// Remove a contact from the main list and promote a replacement if available. + /// Returns the promoted replacement's node ID, if any. + fn remove_and_promote(&mut self, node_id: &H256) -> Option { + let idx = self.contacts.iter().position(|(id, _)| id == node_id)?; + self.contacts.remove(idx); + if !self.replacements.is_empty() { + let (replacement_id, replacement) = self.replacements.remove(0); + self.contacts.push((replacement_id, replacement)); + Some(replacement_id) + } else { + None + } + } +} + +/// Computes the bucket index for a node relative to the local node. +/// Uses XOR distance: bucket = floor(log2(XOR(local, remote))), i.e. the +/// position of the highest set bit minus 1. +/// Returns None for the local node itself (XOR = 0). +fn bucket_index(local_node_id: &H256, node_id: &H256) -> Option { + let xor = *local_node_id ^ *node_id; + let dist = U256::from_big_endian(xor.as_bytes()); + if dist.is_zero() { + None + } else { + Some(dist.bits() - 1) + } +} + +/// Computes the raw XOR distance between two node IDs. +/// Used for comparing relative closeness: a is closer to target than b +/// iff xor_distance(target, a) < xor_distance(target, b). +pub(crate) fn xor_distance(a: &H256, b: &H256) -> H256 { + *a ^ *b +} + /// Identifies which discovery protocol was used to find a contact. /// This allows protocol-specific lookups to only query compatible contacts. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -90,7 +199,6 @@ pub struct Contact { /// None if no request was sent yet or it was already acknowledged. pub enr_request_hash: Option, - pub n_find_node_sent: u64, /// ENR associated with this contact, if it was provided by the peer. pub record: Option, /// This contact failed to respond our Ping. @@ -148,7 +256,6 @@ impl Contact { validation_timestamp: None, ping_id: None, enr_request_hash: None, - n_find_node_sent: 0, record: None, disposable: false, knows_us: true, @@ -281,17 +388,9 @@ impl Drop for RequestPermit { #[protocol] pub trait PeerTableServerProtocol: Send + Sync { // Send (cast) methods - fn new_contacts( - &self, - nodes: Vec, - local_node_id: H256, - protocol: DiscoveryProtocol, - ) -> Result<(), ActorError>; - fn new_contact_records( - &self, - node_records: Vec, - local_node_id: H256, - ) -> Result<(), ActorError>; + fn new_contacts(&self, nodes: Vec, protocol: DiscoveryProtocol) + -> Result<(), ActorError>; + fn new_contact_records(&self, node_records: Vec) -> Result<(), ActorError>; fn new_connected_peer( &self, node: Node, @@ -316,7 +415,6 @@ pub trait PeerTableServerProtocol: Send + Sync { record: NodeRecord, ) -> Result<(), ActorError>; fn set_disposable(&self, node_id: H256) -> Result<(), ActorError>; - fn increment_find_node_sent(&self, node_id: H256) -> Result<(), ActorError>; fn mark_knows_us(&self, node_id: H256) -> Result<(), ActorError>; fn prune_table(&self) -> Result<(), ActorError>; fn shutdown(&self) -> Result<(), ActorError>; @@ -328,9 +426,8 @@ pub trait PeerTableServerProtocol: Send + Sync { fn target_peers_reached(&self) -> Response; fn target_peers_completion(&self) -> Response; fn get_contact_to_initiate(&self) -> Response>>; - fn get_contact_for_lookup(&self, protocol: DiscoveryProtocol) - -> Response>>; fn get_contact_for_enr_lookup(&self) -> Response>>; + fn get_closest_from_pool(&self, target: H256, count: usize) -> Response>; fn get_contact(&self, node_id: H256) -> Response>>; fn get_contact_to_revalidate( &self, @@ -361,11 +458,7 @@ pub trait PeerTableServerProtocol: Send + Sync { fn insert_if_new(&self, node: Node, protocol: DiscoveryProtocol) -> Response; fn validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> Response; fn get_closest_nodes(&self, node_id: H256) -> Response>; - fn get_nodes_at_distances( - &self, - local_node_id: H256, - distances: Vec, - ) -> Response>; + fn get_nodes_at_distances(&self, distances: Vec) -> Response>; fn get_peers_data(&self) -> Response>; fn get_random_peer( &self, @@ -377,32 +470,39 @@ pub trait PeerTableServerProtocol: Send + Sync { #[derive(Debug)] pub struct PeerTableServer { - contacts: IndexMap, + local_node_id: H256, + buckets: Vec, peers: IndexMap, already_tried_peers: FxHashSet, - discarded_contacts: FxHashSet, target_peers: usize, store: Store, /// Standalone session store, independent of contacts. /// Allows sessions to be stored even before the contact's ENR is known/parseable. sessions: FxHashMap, + /// Flat pool of discovered contacts for RLPx connection initiation. + /// Decoupled from the k-bucket routing table so that connection initiation + /// has access to a much larger candidate pool than the k-bucket structure + /// allows (k-buckets: 256 × 16 = 4,096 max; this pool: up to 50,000). + /// K-buckets are still used for all Kademlia protocol operations. + connection_pool: IndexMap, } #[actor(protocol = PeerTableServerProtocol)] impl PeerTableServer { - pub fn spawn(target_peers: usize, store: Store) -> PeerTable { - PeerTableServer::new(target_peers, store).start() + pub fn spawn(local_node_id: H256, target_peers: usize, store: Store) -> PeerTable { + PeerTableServer::new(local_node_id, target_peers, store).start() } - pub(crate) fn new(target_peers: usize, store: Store) -> Self { + pub(crate) fn new(local_node_id: H256, target_peers: usize, store: Store) -> Self { Self { - contacts: Default::default(), + local_node_id, + buckets: vec![KBucket::default(); NUMBER_OF_BUCKETS], peers: Default::default(), already_tried_peers: Default::default(), - discarded_contacts: Default::default(), target_peers, store, sessions: Default::default(), + connection_pool: IndexMap::with_capacity(MAX_CONNECTION_POOL_SIZE), } } @@ -423,8 +523,7 @@ impl PeerTableServer { msg: peer_table_server_protocol::NewContacts, _ctx: &Context, ) { - self.do_new_contacts(msg.nodes, msg.local_node_id, msg.protocol) - .await; + self.do_new_contacts(msg.nodes, msg.protocol).await; } #[send_handler] @@ -433,8 +532,7 @@ impl PeerTableServer { msg: peer_table_server_protocol::NewContactRecords, _ctx: &Context, ) { - self.do_new_contact_records(msg.node_records, msg.local_node_id) - .await; + self.do_new_contact_records(msg.node_records).await; } #[send_handler] @@ -457,7 +555,7 @@ impl PeerTableServer { // Store in the standalone sessions map (always succeeds, no contact required). self.sessions.insert(msg.node_id, msg.session.clone()); // Also update the contact's cached session if the contact exists. - if let Some(contact) = self.contacts.get_mut(&msg.node_id) { + if let Some(contact) = self.get_contact_mut(&msg.node_id) { contact.session = Some(msg.session); } } @@ -498,9 +596,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::SetUnwanted, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.unwanted = true); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.unwanted = true; + } } #[send_handler] @@ -509,9 +607,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::SetIsForkIdValid, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.is_fork_id_valid = Some(msg.valid)); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.is_fork_id_valid = Some(msg.valid); + } } #[send_handler] @@ -558,9 +656,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordPingSent, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.record_ping_sent(msg.ping_id)); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.record_ping_sent(msg.ping_id); + } } #[send_handler] @@ -569,16 +667,15 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordPongReceived, _ctx: &Context, ) { - self.contacts.entry(msg.node_id).and_modify(|contact| { - if contact + if let Some(contact) = self.get_contact_mut(&msg.node_id) + && contact .ping_id .as_ref() .map(|value| *value == msg.ping_id) .unwrap_or(false) - { - contact.ping_id = None - } - }); + { + contact.ping_id = None; + } } #[send_handler] @@ -587,9 +684,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordEnrRequestSent, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.record_enr_request_sent(msg.request_hash)); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.record_enr_request_sent(msg.request_hash); + } } #[send_handler] @@ -598,9 +695,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordEnrResponseReceived, _ctx: &Context, ) { - self.contacts.entry(msg.node_id).and_modify(|contact| { + if let Some(contact) = self.get_contact_mut(&msg.node_id) { contact.record_enr_response_received(msg.request_hash, msg.record); - }); + } } #[send_handler] @@ -609,20 +706,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::SetDisposable, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.disposable = true); - } - - #[send_handler] - async fn handle_increment_find_node_sent( - &mut self, - msg: peer_table_server_protocol::IncrementFindNodeSent, - _ctx: &Context, - ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.n_find_node_sent += 1); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.disposable = true; + } } #[send_handler] @@ -631,9 +717,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::MarkKnowsUs, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|c| c.knows_us = true); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.knows_us = true; + } } #[send_handler] @@ -680,7 +766,7 @@ impl PeerTableServer { _msg: peer_table_server_protocol::TargetReached, _ctx: &Context, ) -> bool { - self.contacts.len() >= TARGET_CONTACTS && self.peers.len() >= self.target_peers + self.peers.len() >= self.target_peers } #[request_handler] @@ -711,12 +797,12 @@ impl PeerTableServer { } #[request_handler] - async fn handle_get_contact_for_lookup( + async fn handle_get_closest_from_pool( &mut self, - msg: peer_table_server_protocol::GetContactForLookup, + msg: peer_table_server_protocol::GetClosestFromPool, _ctx: &Context, - ) -> Option> { - self.do_get_contact_for_lookup(msg.protocol).map(Box::new) + ) -> Vec<(H256, Node)> { + self.do_get_closest_from_pool(msg.target, msg.count) } #[request_handler] @@ -734,7 +820,7 @@ impl PeerTableServer { msg: peer_table_server_protocol::GetContact, _ctx: &Context, ) -> Option> { - self.contacts.get(&msg.node_id).cloned().map(Box::new) + self.get_contact(&msg.node_id).cloned().map(Box::new) } #[request_handler] @@ -858,14 +944,19 @@ impl PeerTableServer { msg: peer_table_server_protocol::InsertIfNew, _ctx: &Context, ) -> bool { - match self.contacts.entry(msg.node.node_id()) { - Entry::Occupied(_) => false, - Entry::Vacant(entry) => { - METRICS.record_new_discovery().await; - entry.insert(Contact::new(msg.node, msg.protocol)); - true - } + let node_id = msg.node.node_id(); + // Always add to the connection pool + self.insert_to_connection_pool(node_id, msg.node.clone()); + if self.contact_exists(&node_id) { + return false; } + let contact = Contact::new(msg.node, msg.protocol); + // Return true for any genuinely new node, even if it overflows to the + // replacement list. This ensures the caller sends a reciprocal ping + // which establishes the bond needed for FindNode validation. + self.insert_contact(node_id, contact); + METRICS.record_new_discovery().await; + true } #[request_handler] @@ -892,7 +983,7 @@ impl PeerTableServer { msg: peer_table_server_protocol::GetNodesAtDistances, _ctx: &Context, ) -> Vec { - self.do_get_nodes_at_distances(msg.local_node_id, &msg.distances) + self.do_get_nodes_at_distances(&msg.distances) } #[request_handler] @@ -928,7 +1019,7 @@ impl PeerTableServer { self.sessions .get(&msg.node_id) .cloned() - .or_else(|| self.contacts.get(&msg.node_id)?.session.clone()) + .or_else(|| self.get_contact(&msg.node_id)?.session.clone()) } #[request_handler] @@ -963,13 +1054,104 @@ impl PeerTableServer { // === Private helper methods === - // Weighting function used to select best peer + // --- K-bucket accessors --- + + /// Get the bucket index for a node ID, or None if it's the local node. + fn bucket_for(&self, node_id: &H256) -> Option { + bucket_index(&self.local_node_id, node_id) + } + + /// Look up a contact by node ID in main or replacement list (O(K) within the bucket). + fn get_contact(&self, node_id: &H256) -> Option<&Contact> { + let idx = self.bucket_for(node_id)?; + self.buckets[idx].get_any(node_id) + } + + /// Look up a mutable reference to a contact by node ID. + fn get_contact_mut(&mut self, node_id: &H256) -> Option<&mut Contact> { + let idx = self.bucket_for(node_id)?; + self.buckets[idx].get_mut(node_id) + } + + /// Check if a contact exists in any bucket (main or replacement list). + fn contact_exists(&self, node_id: &H256) -> bool { + let Some(idx) = self.bucket_for(node_id) else { + return false; + }; + self.buckets[idx].contains(node_id) + } + + /// Insert a contact into the appropriate k-bucket. Returns true if inserted + /// into the main list, false if the node went to the replacement list or is + /// the local node. + fn insert_contact(&mut self, node_id: H256, contact: Contact) -> bool { + #[cfg(feature = "metrics")] + let start = std::time::Instant::now(); + + let Some(idx) = self.bucket_for(&node_id) else { + return false; + }; + let result = self.buckets[idx].insert(node_id, contact); + + #[cfg(feature = "metrics")] + { + use ethrex_metrics::p2p::METRICS_P2P; + METRICS_P2P.observe_insert_contact_duration(start.elapsed().as_secs_f64()); + } + + result + } + + /// Insert a node into the flat connection pool for RLPx initiation. + /// Evicts the oldest entry when the pool is at capacity. + fn insert_to_connection_pool(&mut self, node_id: H256, node: Node) { + if self.connection_pool.contains_key(&node_id) { + return; + } + if self.connection_pool.len() >= MAX_CONNECTION_POOL_SIZE { + self.connection_pool.shift_remove_index(0); + } + self.connection_pool.insert(node_id, node); + } + + /// Look up a contact by node ID in either the main or replacement list. + fn get_contact_or_replacement(&self, node_id: &H256) -> Option<&Contact> { + let idx = self.bucket_for(node_id)?; + self.buckets[idx].get_any(node_id) + } + + /// Look up a mutable reference in either the main or replacement list. + fn get_contact_or_replacement_mut(&mut self, node_id: &H256) -> Option<&mut Contact> { + let idx = self.bucket_for(node_id)?; + let bucket = &mut self.buckets[idx]; + // Search main list first, then replacement list. + // Done inline to avoid borrow-checker issues with or_else closures. + if let Some(pos) = bucket.contacts.iter().position(|(id, _)| id == node_id) { + return Some(&mut bucket.contacts[pos].1); + } + if let Some(pos) = bucket.replacements.iter().position(|(id, _)| id == node_id) { + return Some(&mut bucket.replacements[pos].1); + } + None + } + + /// Iterate over all contacts across all buckets (main and replacement lists). + fn iter_contacts(&self) -> impl Iterator { + self.buckets.iter().flat_map(|bucket| { + bucket + .contacts + .iter() + .chain(bucket.replacements.iter()) + .map(|(id, c)| (id, c)) + }) + } + + // --- Peer selection --- + fn weight_peer(&self, score: &i64, requests: &i64) -> i64 { score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT } - // Returns if the peer has room for more connections given the current score - // and amount of inflight requests fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool { let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64; let max_requests = (MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio).max(1.0); @@ -1041,62 +1223,106 @@ impl PeerTableServer { .collect() } + // --- Contact operations --- + + /// Prune disposable contacts from both main and replacement lists. + /// When a main contact is removed, a replacement is automatically promoted. + /// Pruned contacts remain in the connection pool so they can be retried + /// later — the RLPx handshake will reject them if they're truly bad. fn prune(&mut self) { - let disposable_contacts = self - .contacts - .iter() - .filter_map(|(c_id, c)| c.disposable.then_some(*c_id)) - .collect::>(); + for bucket in &mut self.buckets { + // Collect disposable contacts from main list + let main_disposable: Vec = bucket + .contacts + .iter() + .filter(|(_, c)| c.disposable) + .map(|(id, _)| *id) + .collect(); + + // Remove from main list and promote replacements + for node_id in main_disposable { + bucket.remove_and_promote(&node_id); + } - for contact_to_discard_id in disposable_contacts { - self.contacts.swap_remove(&contact_to_discard_id); - self.discarded_contacts.insert(contact_to_discard_id); + // Remove disposable contacts from replacement list + // (these don't get promoted, just removed) + bucket.replacements.retain(|(_, c)| !c.disposable); } } fn do_get_contact_to_initiate(&mut self) -> Option { - for contact in self.contacts.values() { - let node_id = contact.node.node_id(); - if !self.peers.contains_key(&node_id) - && !self.already_tried_peers.contains(&node_id) - && contact.knows_us - && !contact.unwanted - && contact.is_fork_id_valid != Some(false) + // Draw from the flat connection pool using O(1) random index probing. + // Pick a random start index and scan forward (wrapping) until we find + // an eligible candidate or complete a full loop. + let pool_len = self.connection_pool.len(); + if pool_len == 0 { + return None; + } + + let start = rand::random::() % pool_len; + for offset in 0..pool_len { + let idx = (start + offset) % pool_len; + let Some((node_id, node)) = self.connection_pool.get_index(idx) else { + continue; + }; + let node_id = *node_id; + + if self.peers.contains_key(&node_id) + || self.already_tried_peers.contains(&node_id) + || self + .get_contact_or_replacement(&node_id) + .map(|c| !c.knows_us || c.unwanted || c.is_fork_id_valid == Some(false)) + .unwrap_or(false) { - self.already_tried_peers.insert(node_id); - return Some(contact.clone()); + continue; } + + let node = node.clone(); + self.already_tried_peers.insert(node_id); + let contact = self + .get_contact_or_replacement(&node_id) + .cloned() + .unwrap_or_else(|| Contact::new(node, DiscoveryProtocol::Discv4)); + return Some(contact); } + + // Exhausted all candidates — reset tried set for next cycle. tracing::trace!("Resetting list of tried peers."); self.already_tried_peers.clear(); None } - fn do_get_contact_for_lookup(&self, protocol: DiscoveryProtocol) -> Option { - self.contacts - .values() - .filter(|c| { - c.supports_protocol(protocol) - && c.n_find_node_sent < MAX_FIND_NODE_PER_PEER - && !c.disposable - }) - .collect::>() - .choose(&mut rand::rngs::OsRng) - .cloned() - .cloned() + /// Get the `count` closest nodes from the connection pool, sorted by XOR distance to `target`. + fn do_get_closest_from_pool(&self, target: H256, count: usize) -> Vec<(H256, Node)> { + let mut nodes: Vec<(H256, Node, H256)> = Vec::with_capacity(count); + + for (node_id, node) in &self.connection_pool { + let dist = xor_distance(&target, node_id); + if nodes.len() < count { + nodes.push((*node_id, node.clone(), dist)); + } else if let Some((farthest_idx, _)) = + nodes.iter().enumerate().max_by_key(|(_, (_, _, d))| *d) + && dist < nodes[farthest_idx].2 + { + nodes[farthest_idx] = (*node_id, node.clone(), dist); + } + } + + nodes.sort_by(|a, b| a.2.cmp(&b.2)); + nodes.into_iter().map(|(id, node, _)| (id, node)).collect() } /// Get contact for ENR lookup (discv4 only) fn do_get_contact_for_enr_lookup(&mut self) -> Option { - self.contacts - .values() - .filter(|c| { + self.iter_contacts() + .filter(|(_, c)| { c.is_discv4 && c.was_validated() && !c.has_pending_enr_request() && c.record.is_none() && !c.disposable }) + .map(|(_, c)| c) .collect::>() .choose(&mut rand::rngs::OsRng) .cloned() @@ -1108,19 +1334,19 @@ impl PeerTableServer { revalidation_interval: Duration, protocol: DiscoveryProtocol, ) -> Option> { - self.contacts - .values() - .filter(|c| { + self.iter_contacts() + .filter(|(_, c)| { c.supports_protocol(protocol) && Self::is_validation_needed(c, revalidation_interval) }) + .map(|(_, c)| c) .choose(&mut rand::rngs::OsRng) .cloned() .map(Box::new) } fn do_validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> ContactValidation { - let Some(contact) = self.contacts.get(&node_id) else { + let Some(contact) = self.get_contact(&node_id) else { return ContactValidation::UnknownContact; }; if !contact.was_validated() { @@ -1135,24 +1361,22 @@ impl PeerTableServer { ContactValidation::Valid(Box::new(contact.clone())) } - /// Get closest nodes for discv4 (returns Vec) + /// Get closest nodes using raw XOR distance for accurate ordering. fn do_get_closest_nodes(&self, node_id: H256) -> Vec { #[cfg(feature = "metrics")] let scan_start = std::time::Instant::now(); - let mut nodes: Vec<(Node, usize)> = vec![]; + let mut nodes: Vec<(Node, H256)> = vec![]; - for (contact_id, contact) in &self.contacts { - let dist = Self::distance(&node_id, contact_id); + for (contact_id, contact) in self.iter_contacts() { + let dist = xor_distance(&node_id, contact_id); if nodes.len() < MAX_NODES_IN_NEIGHBORS_PACKET { nodes.push((contact.node.clone(), dist)); - } else { - for (i, (_, d)) in &mut nodes.iter().enumerate() { - if dist < *d { - nodes[i] = (contact.node.clone(), dist); - break; - } - } + } else if let Some((farthest_idx, _)) = + nodes.iter().enumerate().max_by_key(|(_, (_, d))| *d) + && dist < nodes[farthest_idx].1 + { + nodes[farthest_idx] = (contact.node.clone(), dist); } } @@ -1169,11 +1393,10 @@ impl PeerTableServer { /// Uses the discv5 spec log-distance: `floor(log2(XOR))` for non-zero XOR. /// Distance 0 is reserved for the local node itself (handled by the caller), /// so contacts start at distance >= 1. - fn do_get_nodes_at_distances(&self, local_node_id: H256, distances: &[u32]) -> Vec { - self.contacts - .iter() + fn do_get_nodes_at_distances(&self, distances: &[u32]) -> Vec { + self.iter_contacts() .filter_map(|(contact_id, contact)| { - let dist = distance(&local_node_id, contact_id) as u32; + let dist = distance(&self.local_node_id, contact_id) as u32; if distances.contains(&dist) { contact.record.clone() } else { @@ -1184,74 +1407,69 @@ impl PeerTableServer { .collect() } - async fn do_new_contacts( - &mut self, - nodes: Vec, - local_node_id: H256, - protocol: DiscoveryProtocol, - ) { + async fn do_new_contacts(&mut self, nodes: Vec, protocol: DiscoveryProtocol) { for node in nodes { let node_id = node.node_id(); - if self.discarded_contacts.contains(&node_id) || node_id == local_node_id { + if node_id == self.local_node_id { continue; } #[cfg(feature = "metrics")] let insert_start = std::time::Instant::now(); - let is_new = match self.contacts.entry(node_id) { - Entry::Vacant(vacant_entry) => { - vacant_entry.insert(Contact::new(node, protocol)); - true - } - Entry::Occupied(mut occupied_entry) => { - // Contact already exists, just add the protocol - occupied_entry.get_mut().add_protocol(protocol); - false + // Always add to the connection pool (regardless of k-bucket capacity) + self.insert_to_connection_pool(node_id, node.clone()); + + if self.contact_exists(&node_id) { + // Contact already exists (main or replacement list), update protocol + if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) { + contact.add_protocol(protocol); } - }; + } else { + let contact = Contact::new(node, protocol); + self.insert_contact(node_id, contact); + METRICS.record_new_discovery().await; + } #[cfg(feature = "metrics")] { use ethrex_metrics::p2p::METRICS_P2P; METRICS_P2P.observe_insert_contact_duration(insert_start.elapsed().as_secs_f64()); } - - if is_new { - METRICS.record_new_discovery().await; - } } } - async fn do_new_contact_records(&mut self, node_records: Vec, local_node_id: H256) { + async fn do_new_contact_records(&mut self, node_records: Vec) { for node_record in node_records { if !node_record.verify_signature() { continue; } if let Ok(node) = Node::from_enr(&node_record) { let node_id = node.node_id(); - if self.discarded_contacts.contains(&node_id) || node_id == local_node_id { + if node_id == self.local_node_id { continue; } - match self.contacts.entry(node_id) { - Entry::Vacant(vacant_entry) => { - let is_fork_id_valid = - Self::evaluate_fork_id(&node_record, &self.store).await; - let mut contact = Contact::new(node, DiscoveryProtocol::Discv5); - contact.is_fork_id_valid = is_fork_id_valid; - contact.record = Some(node_record); - vacant_entry.insert(contact); - METRICS.record_new_discovery().await; - } - Entry::Occupied(mut occupied_entry) => { - let should_update = match occupied_entry.get().record.as_ref() { + + // Always add to the connection pool (regardless of k-bucket capacity) + self.insert_to_connection_pool(node_id, node.clone()); + + if self.contact_exists(&node_id) { + // Check if we need to evaluate fork_id before taking + // the mutable borrow. + let should_update = self + .get_contact_or_replacement(&node_id) + .map(|c| match c.record.as_ref() { None => true, Some(r) => node_record.seq > r.seq, - }; - let contact = occupied_entry.get_mut(); + }) + .unwrap_or(false); + let is_fork_id_valid = if should_update { + Self::evaluate_fork_id(&node_record, &self.store).await + } else { + None + }; + if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) { contact.add_protocol(DiscoveryProtocol::Discv5); if should_update { - let is_fork_id_valid = - Self::evaluate_fork_id(&node_record, &self.store).await; if contact.node.ip != node.ip || contact.node.udp_port != node.udp_port { contact.validation_timestamp = None; @@ -1262,6 +1480,13 @@ impl PeerTableServer { contact.is_fork_id_valid = is_fork_id_valid; } } + } else { + let is_fork_id_valid = Self::evaluate_fork_id(&node_record, &self.store).await; + let mut contact = Contact::new(node, DiscoveryProtocol::Discv5); + contact.is_fork_id_valid = is_fork_id_valid; + contact.record = Some(node_record); + self.insert_contact(node_id, contact); + METRICS.record_new_discovery().await; } } } @@ -1319,10 +1544,6 @@ impl PeerTableServer { Some((peers[idx].0, peers[idx].1.clone())) } - fn distance(node_id_1: &H256, node_id_2: &H256) -> usize { - distance(node_id_1, node_id_2) - } - fn is_validation_needed(contact: &Contact, revalidation_interval: Duration) -> bool { if contact.disposable { return false; @@ -1348,3 +1569,195 @@ impl PeerTableServer { } pub type PeerTable = ActorRef; + +#[cfg(test)] +mod tests { + use super::*; + use ethrex_common::H512; + use std::net::Ipv4Addr; + + /// Helper: build a dummy contact with a unique node derived from `seed`. + fn dummy_contact(seed: u8) -> (H256, Contact) { + let pk = H512::from_low_u64_be(seed as u64 + 1); + let node = Node::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, seed)), 30303, 30303, pk); + let node_id = node.node_id(); + let contact = Contact::new(node, DiscoveryProtocol::Discv4); + (node_id, contact) + } + + // --- KBucket::insert --- + + #[test] + fn insert_into_empty_bucket() { + let mut bucket = KBucket::default(); + let (id, contact) = dummy_contact(1); + assert!(bucket.insert(id, contact)); + assert_eq!(bucket.contacts.len(), 1); + assert!(bucket.replacements.is_empty()); + } + + #[test] + fn insert_fills_bucket_then_goes_to_replacements() { + let mut bucket = KBucket::default(); + + // Fill the main list to capacity. + for i in 0..MAX_NODES_PER_BUCKET as u8 { + let (id, contact) = dummy_contact(i); + assert!(bucket.insert(id, contact), "contact {i} should go to main"); + } + assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET); + + // The next insert should go to the replacement list. + let (id, contact) = dummy_contact(200); + assert!(!bucket.insert(id, contact)); + assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET); + assert_eq!(bucket.replacements.len(), 1); + } + + // --- KBucket::contains --- + + #[test] + fn contains_checks_main_and_replacement() { + let mut bucket = KBucket::default(); + + let (id_main, contact_main) = dummy_contact(1); + bucket.insert(id_main, contact_main); + assert!(bucket.contains(&id_main)); + + // Fill bucket so next goes to replacement. + for i in 2..=(MAX_NODES_PER_BUCKET as u8) { + let (id, c) = dummy_contact(i); + bucket.insert(id, c); + } + let (id_repl, contact_repl) = dummy_contact(100); + bucket.insert(id_repl, contact_repl); + + assert!(bucket.contains(&id_repl)); + assert!(!bucket.contains(&H256::zero())); + } + + // --- KBucket::get / get_any --- + + #[test] + fn get_returns_main_list_only() { + let mut bucket = KBucket::default(); + let (id, contact) = dummy_contact(1); + bucket.insert(id, contact); + assert!(bucket.get(&id).is_some()); + assert!(bucket.get(&H256::zero()).is_none()); + } + + #[test] + fn get_any_returns_from_replacement() { + let mut bucket = KBucket::default(); + // Fill main list. + for i in 0..MAX_NODES_PER_BUCKET as u8 { + let (id, c) = dummy_contact(i); + bucket.insert(id, c); + } + // Insert into replacements. + let (id_repl, c_repl) = dummy_contact(200); + bucket.insert(id_repl, c_repl); + + assert!(bucket.get(&id_repl).is_none()); // not in main + assert!(bucket.get_any(&id_repl).is_some()); // found via replacement + } + + // --- KBucket::remove_and_promote --- + + #[test] + fn remove_and_promote_with_replacement() { + let mut bucket = KBucket::default(); + + // Fill main list. + let mut main_ids = Vec::new(); + for i in 0..MAX_NODES_PER_BUCKET as u8 { + let (id, c) = dummy_contact(i); + main_ids.push(id); + bucket.insert(id, c); + } + + // Add a replacement. + let (repl_id, repl_contact) = dummy_contact(200); + bucket.insert(repl_id, repl_contact); + + // Remove a main contact — the replacement should be promoted. + let promoted = bucket.remove_and_promote(&main_ids[0]); + assert_eq!(promoted, Some(repl_id)); + assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET); + assert!(bucket.replacements.is_empty()); + assert!(!bucket.contains(&main_ids[0])); + assert!(bucket.contains(&repl_id)); + } + + #[test] + fn remove_and_promote_without_replacement() { + let mut bucket = KBucket::default(); + let (id, c) = dummy_contact(1); + bucket.insert(id, c); + + let promoted = bucket.remove_and_promote(&id); + assert!(promoted.is_none()); + assert!(bucket.contacts.is_empty()); + } + + #[test] + fn remove_nonexistent_returns_none() { + let mut bucket = KBucket::default(); + assert!(bucket.remove_and_promote(&H256::zero()).is_none()); + } + + // --- Replacement eviction --- + + #[test] + fn replacement_list_evicts_oldest_when_full() { + let mut bucket = KBucket::default(); + // Fill main list. + for i in 0..MAX_NODES_PER_BUCKET as u8 { + let (id, c) = dummy_contact(i); + bucket.insert(id, c); + } + + // Fill replacement list beyond capacity. + let mut repl_ids = Vec::new(); + for i in 0..(MAX_REPLACEMENTS_PER_BUCKET + 2) as u8 { + let seed = 100 + i; + let (id, c) = dummy_contact(seed); + repl_ids.push(id); + bucket.insert(id, c); + } + + assert_eq!(bucket.replacements.len(), MAX_REPLACEMENTS_PER_BUCKET); + // The oldest two should have been evicted. + assert!(!bucket.contains(&repl_ids[0])); + assert!(!bucket.contains(&repl_ids[1])); + // The most recent ones should still be there. + assert!(bucket.contains(repl_ids.last().unwrap())); + } + + // --- bucket_index --- + + #[test] + fn bucket_index_self_is_none() { + let id = H256::random(); + assert_eq!(bucket_index(&id, &id), None); + } + + #[test] + fn bucket_index_minimal_distance() { + let local = H256::zero(); + // XOR distance = 1 → highest bit is bit 0 → bucket 0 + let mut remote = H256::zero(); + remote.0[31] = 1; + assert_eq!(bucket_index(&local, &remote), Some(0)); + } + + #[test] + fn bucket_index_maximal_distance() { + let local = H256::zero(); + // XOR distance has highest bit at position 255 → bucket 255 + let mut remote = H256::zero(); + remote.0[0] = 0x80; + assert_eq!(bucket_index(&local, &remote), Some(255)); + } +} diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index b3b7721d1fa..3eebd3c2001 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -136,6 +136,9 @@ pub async fn sync_cycle_snap( let mut attempts = 0; loop { + // Prune dead/unresponsive peers periodically to allow replacements to be promoted + let _ = peers.peer_table.prune_table(); + debug!("Requesting Block Headers from {current_head}"); let Some(mut block_headers) = peers diff --git a/crates/networking/rpc/test_utils.rs b/crates/networking/rpc/test_utils.rs index 7eef68de47b..b780ba901e0 100644 --- a/crates/networking/rpc/test_utils.rs +++ b/crates/networking/rpc/test_utils.rs @@ -331,7 +331,7 @@ pub async fn dummy_sync_manager() -> SyncManager { /// Creates a dummy PeerHandler for tests where interacting with peers is not needed /// This should only be used in tests as it won't be able to interact with the node's connected peers pub async fn dummy_peer_handler(store: Store) -> PeerHandler { - let peer_table = PeerTableServer::spawn(TARGET_PEERS, store); + let peer_table = PeerTableServer::spawn(H256::random(), TARGET_PEERS, store); PeerHandler::new(peer_table.clone(), dummy_actor(peer_table).await) } diff --git a/metrics/provisioning/grafana/dashboards/common_dashboards/p2p_packets.json b/metrics/provisioning/grafana/dashboards/common_dashboards/p2p_packets.json index d7592afa210..8d6db6bdd26 100644 --- a/metrics/provisioning/grafana/dashboards/common_dashboards/p2p_packets.json +++ b/metrics/provisioning/grafana/dashboards/common_dashboards/p2p_packets.json @@ -619,6 +619,167 @@ } ] } + , + { + "title": "Kademlia Table", + "type": "row", + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 63 }, + "id": 103, + "collapsed": false + }, + { + "title": "insert_contact Duration (p50 / p99 / max)", + "description": "Duration of Kademlia insert_contact operations", + "type": "timeseries", + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "gridPos": { "h": 10, "w": 12, "x": 0, "y": 64 }, + "id": 30, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "", + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "unit": "s" + }, + "overrides": [] + }, + "options": { + "legend": { "calcs": ["mean", "max", "lastNotNull"], "displayMode": "table", "placement": "bottom" }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, rate(ethrex_kademlia_insert_contact_duration_seconds_bucket{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval]))", + "legendFormat": "p50", + "refId": "A" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(ethrex_kademlia_insert_contact_duration_seconds_bucket{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval]))", + "legendFormat": "p99", + "refId": "B" + } + ] + }, + { + "title": "iter_contacts Full-Scan Duration (p50 / p99 / max)", + "description": "Duration of full Kademlia table scans (e.g. get_closest_nodes)", + "type": "timeseries", + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "gridPos": { "h": 10, "w": 12, "x": 12, "y": 64 }, + "id": 31, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "", + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "unit": "s" + }, + "overrides": [] + }, + "options": { + "legend": { "calcs": ["mean", "max", "lastNotNull"], "displayMode": "table", "placement": "bottom" }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, rate(ethrex_kademlia_iter_contacts_duration_seconds_bucket{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval]))", + "legendFormat": "p50", + "refId": "A" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(ethrex_kademlia_iter_contacts_duration_seconds_bucket{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval]))", + "legendFormat": "p99", + "refId": "B" + } + ] + }, + { + "title": "Kademlia Operations Rate", + "description": "Rate of insert_contact and iter_contacts operations per second", + "type": "timeseries", + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "gridPos": { "h": 10, "w": 24, "x": 0, "y": 74 }, + "id": 32, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "ops/s", + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "scheme", + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "unit": "ops" + }, + "overrides": [ + { "matcher": { "id": "byName", "options": "insert_contact" }, "properties": [{ "id": "color", "value": { "fixedColor": "green", "mode": "fixed" } }] }, + { "matcher": { "id": "byName", "options": "iter_contacts" }, "properties": [{ "id": "color", "value": { "fixedColor": "blue", "mode": "fixed" } }] } + ] + }, + "options": { + "legend": { "calcs": ["mean", "max", "lastNotNull"], "displayMode": "table", "placement": "bottom" }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "rate(ethrex_kademlia_insert_contact_duration_seconds_count{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval])", + "legendFormat": "insert_contact", + "refId": "A" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "rate(ethrex_kademlia_iter_contacts_duration_seconds_count{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval])", + "legendFormat": "iter_contacts", + "refId": "B" + } + ] + } ], "schemaVersion": 39, "tags": ["ethrex", "p2p"], diff --git a/test/tests/p2p/discovery/discv5_server_tests.rs b/test/tests/p2p/discovery/discv5_server_tests.rs index f8d0f72643b..25c380ce332 100644 --- a/test/tests/p2p/discovery/discv5_server_tests.rs +++ b/test/tests/p2p/discovery/discv5_server_tests.rs @@ -24,6 +24,7 @@ async fn test_server(peer_table: Option) -> DiscoveryServer { let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); let peer_table = peer_table.unwrap_or_else(|| { PeerTableServer::spawn( + local_node.node_id(), 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ) @@ -175,13 +176,12 @@ async fn test_enr_update_request_on_pong() { let remote_node_id = remote_node.node_id(); let peer_table = PeerTableServer::spawn( + local_node.node_id(), 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ); - peer_table - .new_contact_records(vec![remote_record], local_node.node_id()) - .unwrap(); + peer_table.new_contact_records(vec![remote_record]).unwrap(); let session = Session { outbound_key: [0u8; 16],