Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,8 @@ pub async fn init_l1(

let local_node_record = get_local_node_record(&datadir, &local_p2p_node, &signer);

let peer_table = PeerTableServer::spawn(opts.target_peers, store.clone());
let peer_table =
PeerTableServer::spawn(local_p2p_node.node_id(), opts.target_peers, store.clone());

// TODO: Check every module starts properly.
let tracker = TaskTracker::new();
Expand Down
6 changes: 5 additions & 1 deletion cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ pub async fn init_l2(
if !opts.sequencer_opts.based {
blockchain.set_synced();
}
let peer_table = PeerTableServer::spawn(opts.node_opts.target_peers, store.clone());
let peer_table = PeerTableServer::spawn(
local_p2p_node.node_id(),
opts.node_opts.target_peers,
store.clone(),
);
let p2p_context = P2PContext::new(
local_p2p_node.clone(),
network_config,
Expand Down
4 changes: 2 additions & 2 deletions crates/blockchain/metrics/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl MetricsP2P {
kademlia_insert_contact_duration: Histogram::with_opts(
HistogramOpts::new(
"ethrex_kademlia_insert_contact_duration_seconds",
"Duration of peer table contact insertion operations",
"Duration of Kademlia insert_contact operations",
)
.buckets(vec![
0.000_001, 0.000_005, 0.000_01, 0.000_05, 0.000_1, 0.000_5, 0.001, 0.01,
Expand All @@ -108,7 +108,7 @@ impl MetricsP2P {
kademlia_iter_contacts_duration: Histogram::with_opts(
HistogramOpts::new(
"ethrex_kademlia_iter_contacts_duration_seconds",
"Duration of peer table full-scan operations",
"Duration of Kademlia iter_contacts full-scan operations",
)
.buckets(vec![
0.000_01, 0.000_05, 0.000_1, 0.000_5, 0.001, 0.005, 0.01, 0.05, 0.1,
Expand Down
165 changes: 141 additions & 24 deletions crates/networking/p2p/discovery/discv4_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,32 @@ use crate::{
backend,
discv4::{
messages::{
ENRRequestMessage, ENRResponseMessage, Message, NeighborsMessage, PingMessage,
PongMessage,
ENRRequestMessage, ENRResponseMessage, FindNodeMessage, Message, NeighborsMessage,
PingMessage, PongMessage,
},
server::{Discv4Message, EXPIRATION_SECONDS},
},
discovery::lookup::{BOOTSTRAP_ALPHA, IterativeLookup, LOOKUP_ALPHA, LOOKUP_BUCKET_SIZE},
metrics::METRICS,
peer_table::{Contact, ContactValidation, DiscoveryProtocol, PeerTableServerProtocol as _},
types::{Endpoint, Node, NodeRecord},
utils::{get_msg_expiration_from_seconds, is_msg_expired, node_id},
utils::{get_msg_expiration_from_seconds, is_msg_expired, node_id, public_key_from_signing_key},
};
use bytes::{Bytes, BytesMut};
use ethrex_common::{H256, H512, types::ForkId};
use rand::rngs::OsRng;
use secp256k1::SecretKey;
use std::time::Duration;
use tracing::{debug, error, trace};

use super::server::{DiscoveryServer, DiscoveryServerError};

/// Discv4 revalidation interval.
const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12 hours
/// Maximum number of concurrent iterative lookups during bootstrap.
const MAX_CONCURRENT_LOOKUPS: usize = 3;
/// Peer count threshold below which we use aggressive bootstrap settings.
const BOOTSTRAP_THRESHOLD: usize = 30;

impl DiscoveryServer {
pub(crate) async fn discv4_process_message(
Expand Down Expand Up @@ -120,35 +127,110 @@ impl DiscoveryServer {
}

pub(crate) async fn discv4_lookup(&mut self) -> Result<(), DiscoveryServerError> {
if self.discv4.is_none() {
return Ok(());
}

// Remove finished lookups (geth-style back-to-back chaining)
self.discv4
.as_mut()
.expect("discv4 state must exist")
.active_lookups
.retain(|(l, _)| !l.is_finished());

// Determine max concurrent lookups based on peer count
let peer_count = self.peer_table.peer_count().await.unwrap_or(0);
let max_lookups = if peer_count < BOOTSTRAP_THRESHOLD {
MAX_CONCURRENT_LOOKUPS
} else {
1
};

// Start new lookups up to the limit
while self.discv4.as_ref().expect("discv4 state must exist").active_lookups.len() < max_lookups {
// Generate random target
let random_priv_key = SecretKey::new(&mut OsRng);
let random_pub_key = public_key_from_signing_key(&random_priv_key);
let target_id = node_id(&random_pub_key);

// Seed with closest known nodes from the connection pool
let seed = self
.peer_table
.get_closest_from_pool(target_id, LOOKUP_BUCKET_SIZE)
.await?;
if seed.is_empty() {
trace!(protocol = "discv4", "No seeds for lookup, connection pool empty");
break;
}

trace!(protocol = "discv4", seeds = seed.len(), "Starting new iterative lookup");
let lookup = IterativeLookup::new(target_id, seed);

// Sign one FindNode message for this target
let expiration = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
let msg = Message::FindNode(FindNodeMessage::new(random_pub_key, expiration));
let mut buf = BytesMut::new();
msg.encode_with_header(&mut buf, &self.signer);

let discv4 = self.discv4.as_mut().expect("discv4 state must exist");
discv4.active_lookups.push((lookup, buf));
}

self.advance_v4_lookup().await
}

async fn advance_v4_lookup(&mut self) -> Result<(), DiscoveryServerError> {
let discv4 = match &mut self.discv4 {
Some(s) => s,
None => return Ok(()),
};
if let Some(contact) = self
.peer_table
.get_contact_for_lookup(DiscoveryProtocol::Discv4)
.await?
{
if let Err(e) = self
.udp_socket
.send_to(&discv4.find_node_message, &contact.node.udp_addr())
.await
{
error!(protocol = "discv4", sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message");
self.peer_table.set_disposable(contact.node.node_id())?;

if discv4.active_lookups.is_empty() {
return Ok(());
}

// Adaptive alpha: query aggressively during bootstrap
let peer_count = self.peer_table.peer_count().await.unwrap_or(0);
let alpha = if peer_count < BOOTSTRAP_THRESHOLD {
BOOTSTRAP_ALPHA
} else {
LOOKUP_ALPHA
};

// Collect all queries from all active lookups
let mut queries: Vec<(usize, H256, Node, BytesMut)> = Vec::new();
for (idx, (lookup, message)) in discv4.active_lookups.iter_mut().enumerate() {
for (node_id, node) in lookup.next_to_query(alpha) {
queries.push((idx, node_id, node, message.clone()));
}
}

if !queries.is_empty() {
trace!(protocol = "discv4", count = queries.len(), "Advancing lookups, querying nodes");
}

for (idx, node_id, node, message) in queries {
if let Err(e) = self.udp_socket.send_to(&message, &node.udp_addr()).await {
error!(protocol = "discv4", sending = "FindNode", addr = ?node.udp_addr(), err=?e, "Error sending message");
self.peer_table.set_disposable(node_id)?;
METRICS.record_new_discarded_node();
if let Some(discv4) = &mut self.discv4
&& let Some((lookup, _)) = discv4.active_lookups.get_mut(idx)
{
lookup.record_timeout();
}
} else {
#[cfg(feature = "metrics")]
{
use ethrex_metrics::p2p::METRICS_P2P;
METRICS_P2P.inc_discv4_outgoing("FindNode");
}
discv4
self.discv4
.as_mut()
.expect("discv4 state must exist")
.pending_find_node
.insert(contact.node.node_id(), std::time::Instant::now());
.insert(node_id, std::time::Instant::now());
}
self.peer_table
.increment_find_node_sent(contact.node.node_id())?;
}
Ok(())
}
Expand Down Expand Up @@ -251,6 +333,9 @@ impl DiscoveryServer {
.unwrap_or(false)
{
self.discv4_send_ping(&node).await?;
if let Some(discv4) = &mut self.discv4 {
discv4.pinged_nodes.insert(node.node_id());
}
} else {
let node_id = node_id(&sender_public_key);
let stored_enr_seq = self
Expand Down Expand Up @@ -343,11 +428,43 @@ impl DiscoveryServer {
}

let nodes = neighbors_message.nodes;
self.peer_table.new_contacts(
nodes,
self.local_node.node_id(),
DiscoveryProtocol::Discv4,
)?;
self.peer_table
.new_contacts(nodes.clone(), DiscoveryProtocol::Discv4)?;

// Pre-bond: ping new nodes so they accept our future FindNode queries
for node in &nodes {
let nid = node.node_id();
if nid == self.local_node.node_id() {
continue;
}
let already_pinged = self
.discv4
.as_ref()
.map(|s| s.pinged_nodes.contains(&nid))
.unwrap_or(true);
if already_pinged {
continue;
}
if let Some(discv4) = &mut self.discv4 {
discv4.pinged_nodes.insert(nid);
}
let _ = self.discv4_send_ping(node).await;
}

// Feed results into ALL active lookups and advance them
if let Some(discv4) = &mut self.discv4 {
let entries: Vec<(H256, Node)> =
nodes.iter().map(|n| (n.node_id(), n.clone())).collect();
for (lookup, _) in &mut discv4.active_lookups {
lookup.feed_results(entries.clone());
}
// Record response on first active lookup (we don't track which triggered it)
if let Some((lookup, _)) = discv4.active_lookups.first_mut() {
lookup.record_response();
}
}
self.advance_v4_lookup().await?;

Ok(())
}

Expand Down
Loading
Loading