diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index f1fa5fe4..79da78ff 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -41,8 +41,8 @@ use crate::{ }, req_resp::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, - MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status, - fetch_block_from_peer, + MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status, + fetch_block_from_peer, request_blocks_by_range_from_peer, }, swarm_adapter::SwarmHandle, }; @@ -59,12 +59,22 @@ const MAX_FETCH_RETRIES: u32 = 10; const INITIAL_BACKOFF_MS: u64 = 5; const BACKOFF_MULTIPLIER: u64 = 2; const PEER_REDIAL_INTERVAL_SECS: u64 = 12; +const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days) pub(crate) struct PendingRequest { pub(crate) attempts: u32, pub(crate) failed_peers: HashSet, } +pub(crate) enum PendingRequestKind { + Root(H256), + Range { + start_slot: u64, + end_slot: u64, + total_end_slot: u64, + }, +} + // --- Swarm construction --- /// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining identify, Gossipsub @@ -300,8 +310,9 @@ impl P2P { block_topic: built.block_topic, aggregation_topic: built.aggregation_topic, connected_peers: HashSet::new(), - pending_requests: HashMap::new(), - request_id_map: HashMap::new(), + pending_root_requests: HashMap::new(), + outbound_requests: HashMap::new(), + pending_range_requests: HashSet::new(), bootnode_addrs: built.bootnode_addrs, node_names, }; @@ -336,8 +347,9 @@ pub struct P2PServer { pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) connected_peers: HashSet, - pub(crate) pending_requests: HashMap, - pub(crate) request_id_map: HashMap, + pub(crate) pending_root_requests: HashMap, + pub(crate) outbound_requests: HashMap, + pub(crate) pending_range_requests: HashSet<(u64, u64)>, bootnode_addrs: HashMap, node_names: HashMap, } @@ -359,6 +371,13 @@ pub(crate) trait P2PProtocol: Send + Sync { fn retry_block_fetch(&self, root: H256) -> Result<(), ActorError>; #[allow(dead_code)] // invoked via send_after, not called directly fn retry_peer_redial(&self, peer_id: PeerId) -> Result<(), ActorError>; + #[allow(dead_code)] + fn retry_range_sync( + &self, + start_slot: u64, + end_slot: u64, + peer_id: PeerId, + ) -> Result<(), ActorError>; } #[actor(protocol = P2PProtocol)] @@ -371,7 +390,7 @@ impl P2PServer { ) { let root = msg.root; // Check if still pending (might have succeeded during backoff) - if !self.pending_requests.contains_key(&root) { + if !self.pending_root_requests.contains_key(&root) { trace!(%root, "Block fetch completed during backoff, skipping retry"); return; } @@ -380,7 +399,7 @@ impl P2PServer { if !fetch_block_from_peer(self, root).await { tracing::error!(%root, "Failed to retry block fetch, giving up"); - self.pending_requests.remove(&root); + self.pending_root_requests.remove(&root); } } @@ -403,6 +422,31 @@ impl P2PServer { self.swarm_handle.dial(addr.clone()); } } + + #[send_handler] + async fn handle_retry_range_sync( + &mut self, + msg: p2p_protocol::RetryRangeSync, + _ctx: &Context, + ) { + let start_slot = msg.start_slot; + let end_slot = msg.end_slot; + let peer = msg.peer_id; + + // safety check: if already synced, skip retry + let still_needed = !self + .pending_range_requests + .contains(&(start_slot, end_slot)); + + if still_needed { + tracing::trace!(%peer, start_slot, end_slot, "Skipping retry, range already resolved"); + return; + } + + info!(%peer, start_slot, end_slot, "Retrying BlocksByRange sync"); + + request_blocks_by_range_from_peer(self, peer, start_slot, end_slot).await; + } } // --- Manual Handler impls for network-api messages --- @@ -436,7 +480,7 @@ impl Handler for P2PServer { async fn handle(&mut self, msg: FetchBlock, _ctx: &Context) { let root = msg.root; // Deduplicate - if already pending, ignore - if self.pending_requests.contains_key(&root) { + if self.pending_root_requests.contains_key(&root) { trace!(%root, "Block fetch already in progress, ignoring duplicate"); return; } diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 31743316..b424fb66 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -12,13 +12,13 @@ use ethlambda_types::primitives::HashTreeRoot as _; use ethlambda_types::{block::SignedBlock, primitives::H256}; use super::{ - BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, - Request, Response, ResponsePayload, Status, + BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, + BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status, messages::{ResponseCode, error_message}, }; use crate::{ - BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, - p2p_protocol, req_resp::RequestedBlockRoots, + BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, MAX_SYNC_RANGE, P2PServer, + PendingRequest, PendingRequestKind, p2p_protocol, req_resp::RequestedBlockRoots, }; pub async fn handle_req_resp_message( @@ -62,12 +62,40 @@ pub async fn handle_req_resp_message( Response::Success { payload } => match payload { ResponsePayload::Status(status) => { info!(kind = "status_response", peer_count, "P2P message received"); - handle_status_response(status, peer).await; + handle_status_response(server, status, peer).await; } ResponsePayload::Blocks(blocks) => { info!(kind = "blocks_response", peer_count, "P2P message received"); - handle_blocks_by_root_response(server, blocks, peer, request_id, ctx) - .await; + + match server.outbound_requests.remove(&request_id) { + Some(PendingRequestKind::Range { + start_slot, + end_slot, + total_end_slot, + }) => { + server + .pending_range_requests + .remove(&(start_slot, end_slot)); + handle_blocks_by_range_response( + server, + blocks, + peer, + start_slot, + end_slot, + total_end_slot, + ) + .await; + } + Some(PendingRequestKind::Root(root)) => { + handle_blocks_by_root_response( + server, blocks, peer, request_id, root, ctx, + ) + .await; + } + None => { + warn!(%peer, ?request_id, "Received blocks response for unknown request_id"); + } + } } }, Response::Error { code, message } => { @@ -86,8 +114,29 @@ pub async fn handle_req_resp_message( warn!(%peer, ?request_id, %error, "Outbound request failed"); // Check if this was a block fetch request - if let Some(root) = server.request_id_map.remove(&request_id) { - handle_fetch_failure(server, root, peer, ctx).await; + match server.outbound_requests.remove(&request_id) { + Some(PendingRequestKind::Root(root)) => { + handle_fetch_failure(server, root, peer, ctx).await; + } + Some(PendingRequestKind::Range { + start_slot, + end_slot, + total_end_slot, + }) => { + server + .pending_range_requests + .remove(&(start_slot, end_slot)); + send_after( + Duration::from_millis(500), + ctx.clone(), + p2p_protocol::RetryRangeSync { + peer_id: peer, + start_slot, + end_slot: total_end_slot, // retry the full remaining range + }, + ); + } + None => {} } } request_response::Event::InboundFailure { @@ -118,8 +167,20 @@ async fn handle_status_request( server.swarm_handle.send_response(channel, response); } -async fn handle_status_response(status: Status, peer: PeerId) { +async fn handle_status_response(server: &mut P2PServer, status: Status, peer: PeerId) { info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}"); + + let our_head_slot = server.store.head_slot(); + if status.head.slot <= our_head_slot { + return; + } + let gap = status.head.slot - our_head_slot; + let start_slot = our_head_slot.saturating_add(1); + let total_end_slot = start_slot + .saturating_add(gap.min(MAX_SYNC_RANGE)) + .saturating_sub(1); + request_blocks_by_range_from_peer(server, peer, start_slot, total_end_slot).await; + info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); } async fn handle_blocks_by_root_request( @@ -226,18 +287,16 @@ async fn handle_blocks_by_root_response( blocks: Vec, peer: PeerId, request_id: request_response::OutboundRequestId, + requested_root: H256, ctx: &Context, ) { info!(%peer, count = blocks.len(), "Received BlocksByRoot response"); - // Look up which root was requested for this specific request - let Some(requested_root) = server.request_id_map.remove(&request_id) else { - warn!(%peer, ?request_id, "Received response for unknown request_id"); - return; - }; - if blocks.is_empty() { - server.request_id_map.insert(request_id, requested_root); + // Re-insert so failure handling can find it + server + .outbound_requests + .insert(request_id, PendingRequestKind::Root(requested_root)); warn!(%peer, "Received empty BlocksByRoot response"); handle_fetch_failure(server, requested_root, peer, ctx).await; return; @@ -258,7 +317,7 @@ async fn handle_blocks_by_root_response( } // Clean up tracking for this root - server.pending_requests.remove(&root); + server.pending_root_requests.remove(&root); if let Some(ref blockchain) = server.blockchain { let _ = blockchain @@ -268,6 +327,54 @@ async fn handle_blocks_by_root_response( } } +async fn handle_blocks_by_range_response( + server: &mut P2PServer, + blocks: Vec, + peer: PeerId, + start_slot: u64, + end_slot: u64, + total_end_slot: u64, +) { + server + .pending_range_requests + .remove(&(start_slot, end_slot)); + + info!(%peer, count = blocks.len(), "Received BlocksByRange response"); + + if blocks.is_empty() { + warn!(%peer, start_slot, end_slot, "Received empty BlocksByRange response"); + return; + } + + let Some(ref blockchain) = server.blockchain else { + warn!(%peer, "No blockchain handler available"); + return; + }; + + for block in blocks { + let slot = block.message.slot; + + if slot < start_slot || slot > end_slot { + warn!(%peer, %slot, start_slot, end_slot, "Received block outside requested range"); + continue; + } + + let block_root = block.message.hash_tree_root(); + if let Err(err) = blockchain.new_block(block) { + error!( + %err, %slot, %peer, + block_root = %ethlambda_types::ShortRoot(&block_root.0), + "Failed to forward range-fetched block to blockchain" + ); + } + } + + // Chain the next batch if there are more slots to fetch + if end_slot < total_end_slot { + request_blocks_by_range_from_peer(server, peer, end_slot + 1, total_end_slot).await; + } +} + /// Build a Status message from the current Store state. pub fn build_status(store: &Store) -> Status { let finalized = store.latest_finalized(); @@ -294,7 +401,10 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { } // Exclude peers that already returned empty responses for this root - let failed = server.pending_requests.get(&root).map(|p| &p.failed_peers); + let failed = server + .pending_root_requests + .get(&root) + .map(|p| &p.failed_peers); let pool: Vec<_> = if failed.is_none_or(|f| f.is_empty()) { server.connected_peers.iter().copied().collect() } else { @@ -312,7 +422,7 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { // retries start a fresh round of elimination. let pool = if pool.is_empty() { warn!(%root, "All peers failed for this block, retrying with full peer set"); - if let Some(pending) = server.pending_requests.get_mut(&root) { + if let Some(pending) = server.pending_root_requests.get_mut(&root) { pending.failed_peers.clear(); } server.connected_peers.iter().copied().collect() @@ -353,7 +463,7 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { // Track the request if not already tracked (new request) server - .pending_requests + .pending_root_requests .entry(root) .or_insert(PendingRequest { attempts: 1, @@ -361,7 +471,97 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { }); // Map request_id to root for failure handling - server.request_id_map.insert(request_id, root); + server + .outbound_requests + .insert(request_id, PendingRequestKind::Root(root)); + + true +} + +pub async fn request_blocks_by_range_from_peer( + server: &mut P2PServer, + peer: PeerId, + start_slot: u64, + total_end_slot: u64, +) -> bool { + if start_slot > total_end_slot { + return true; + } + + // Trim effective_start forward past any in-flight coverage, handling + // non-contiguous gaps by looping until no range covers the current position. + let mut effective_start = start_slot; + loop { + let covered = server + .pending_range_requests + .iter() + .find(|&&(s, e)| s <= effective_start && effective_start <= e) + .copied(); + match covered { + Some((_, covered_end)) => effective_start = covered_end + 1, + None => break, + } + if effective_start > total_end_slot { + info!( + %peer, + start_slot, + total_end_slot, + "BlocksByRange fully covered by in-flight requests, skipping" + ); + return true; + } + } + + // Send only one batch — response handler chains the next one + let count = (total_end_slot - effective_start + 1).min(MAX_REQUEST_BLOCKS); + let batch_end = effective_start.saturating_add(count).saturating_sub(1); + + server + .pending_range_requests + .insert((effective_start, batch_end)); + + let request = BlocksByRangeRequest { + start_slot: effective_start, + count, + }; + + info!( + %peer, + start_slot = effective_start, + count, + total_end_slot, + "Sending BlocksByRange request (single batch)" + ); + + let Some(request_id) = server + .swarm_handle + .send_request( + peer, + Request::BlocksByRange(request), + libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), + ) + .await + else { + warn!( + %peer, + start_slot = effective_start, + count, + "Failed to send BlocksByRange request" + ); + server + .pending_range_requests + .remove(&(effective_start, batch_end)); + return false; + }; + + server.outbound_requests.insert( + request_id, + PendingRequestKind::Range { + start_slot: effective_start, + end_slot: batch_end, + total_end_slot, + }, + ); true } @@ -372,7 +572,7 @@ async fn handle_fetch_failure( peer: PeerId, ctx: &Context, ) { - let Some(pending) = server.pending_requests.get_mut(&root) else { + let Some(pending) = server.pending_root_requests.get_mut(&root) else { return; }; @@ -381,7 +581,7 @@ async fn handle_fetch_failure( if pending.attempts >= MAX_FETCH_RETRIES { error!(%root, %peer, attempts=%pending.attempts, "Block fetch failed after max retries, giving up"); - server.pending_requests.remove(&root); + server.pending_root_requests.remove(&root); return; } diff --git a/crates/net/p2p/src/req_resp/mod.rs b/crates/net/p2p/src/req_resp/mod.rs index 11acb79f..26240ec5 100644 --- a/crates/net/p2p/src/req_resp/mod.rs +++ b/crates/net/p2p/src/req_resp/mod.rs @@ -5,7 +5,9 @@ mod messages; pub use codec::Codec; pub use encoding::{MAX_COMPRESSED_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE}; -pub use handlers::{build_status, fetch_block_from_peer, handle_req_resp_message}; +pub use handlers::{ + build_status, fetch_block_from_peer, handle_req_resp_message, request_blocks_by_range_from_peer, +}; pub use messages::{ BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, RequestedBlockRoots, Response,