Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
- Refactored the remote prover gRPC API implementation to use the new per-method trait implementations ([#1975](https://github.com/0xMiden/node/issues/1975)).
- Aligned `SyncNullifiers` list-limit validation in RPC and store with `nullifier_prefix` parameter semantics, extended `GetLimits` test coverage, and documented query parameter limits ([#1986](https://github.com/0xMiden/node/pull/1986)).
- Added a `replica` mode to the store, which streams blocks from an upstream master store ([#1987](https://github.com/0xMiden/node/pull/1987)).
- Added `StoreReplica` gRPC service with endpoints for streaming blocks and proofs ([#1987](https://github.com/0xMiden/node/pull/1987)).
- Added public and store `Rpc` streaming endpoints for replica block/proof synchronization ([#1987](https://github.com/0xMiden/node/pull/1987)).
- Replaced the network monitor's JavaScript dashboard with a server-rendered Maud + HTMX frontend ([#2024](https://github.com/0xMiden/node/pull/2024)).
- [BREAKING] Removed `CheckNullifiers` endpoint ([#2049](https://github.com/0xMiden/node/pull/2049)).
- Replaced blocking-in-async operations in the validator, remote prover, and ntx-builder with `spawn_blocking` to avoid starving the Tokio runtime ([#2041](https://github.com/0xMiden/node/pull/2041)).
Expand Down
8 changes: 4 additions & 4 deletions bin/node/src/commands/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ pub enum StoreCommand {

/// Starts the store in replica mode.
///
/// In this mode the store syncs blocks from an upstream store's `StoreReplica` gRPC service.
/// Only the `Rpc` and `StoreReplica` gRPC services are exposed — the `BlockProducer` and
/// `NtxBuilder` services are not started and no proof scheduler runs.
/// In this mode the store syncs blocks from an upstream store's `Rpc` gRPC service.
/// Only the `Rpc` gRPC service is exposed — the `BlockProducer` and `NtxBuilder` services are
/// not started and no proof scheduler runs.
StartReplica {
/// Socket address at which to serve the store's RPC API.
#[arg(long = "rpc.listen", env = ENV_RPC_LISTEN, value_name = "LISTEN")]
rpc_listen: SocketAddr,

/// gRPC URL of the upstream store's `StoreReplica` endpoint to sync blocks from.
/// gRPC URL of the upstream store's `Rpc` endpoint to sync blocks from.
#[arg(long = "upstream-store.url", env = ENV_UPSTREAM_URL, value_name = "URL")]
upstream_store_url: Url,

Expand Down
27 changes: 27 additions & 0 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ impl RpcService {

#[tonic::async_trait]
impl api_server::Api for RpcService {
type BlockSubscriptionStream = tonic::codec::Streaming<proto::rpc::BlockSubscriptionResponse>;
type ProofSubscriptionStream = tonic::codec::Streaming<proto::rpc::ProofSubscriptionResponse>;

// -- Nullifier endpoints -----------------------------------------------------------------

async fn sync_nullifiers(
Expand Down Expand Up @@ -305,6 +308,30 @@ impl api_server::Api for RpcService {
self.store.clone().sync_chain_mmr(request).await
}

async fn block_subscription(
&self,
request: Request<proto::rpc::BlockSubscriptionRequest>,
) -> Result<Response<Self::BlockSubscriptionStream>, Status> {
let request_ref = request.get_ref();
Span::current().set_attribute("block.from", request_ref.block_from);

debug!(target: COMPONENT, request = ?request_ref);

self.store.clone().block_subscription(request).await
}

async fn proof_subscription(
&self,
request: Request<proto::rpc::ProofSubscriptionRequest>,
) -> Result<Response<Self::ProofSubscriptionStream>, Status> {
let request_ref = request.get_ref();
Span::current().set_attribute("block.from", request_ref.block_from);

debug!(target: COMPONENT, request = ?request_ref);

self.store.clone().proof_subscription(request).await
}

// -- Note endpoints ----------------------------------------------------------------------

async fn sync_notes(
Expand Down
18 changes: 6 additions & 12 deletions crates/store/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ pub enum StoreMode {
max_concurrent_proofs: NonZeroUsize,
},

/// Receives blocks from an upstream store's `StoreReplica` gRPC service.
/// Receives blocks from an upstream store's `Rpc` gRPC service.
///
/// Only the `Rpc` and `StoreReplica` gRPC services are exposed. The `BlockProducer` and
/// `NtxBuilder` services are not started and no proof scheduler runs.
/// Only the `Rpc` gRPC service is exposed. The `BlockProducer` and `NtxBuilder` services are
/// not started and no proof scheduler runs.
Replica { upstream_url: Url },
}

Expand Down Expand Up @@ -310,7 +310,7 @@ impl Store {

/// Spawns the gRPC servers for block-producer mode.
///
/// Starts three listeners: Rpc+StoreReplica (shared), `NtxBuilder`, and `BlockProducer`.
/// Starts three listeners: `Rpc`, `NtxBuilder`, and `BlockProducer`.
fn spawn_block_producer_grpc_servers(
store_api: api::StoreApi,
block_producer_api: block_producer::BlockProducerApi,
Expand All @@ -322,8 +322,6 @@ impl Store {
let mut join_set = JoinSet::new();

let rpc_service = store::rpc_server::RpcServer::new(store_api.clone());
let replica_service =
store::store_replica_server::StoreReplicaServer::new(store_api.clone());
let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(store_api);
let block_producer_service =
store::block_producer_server::BlockProducerServer::new(block_producer_api);
Expand All @@ -343,7 +341,6 @@ impl Store {
join_set.spawn(
make_server()
.add_service(rpc_service)
.add_service(replica_service)
.add_service(reflection_service.clone())
.serve_with_incoming(TcpListenerStream::new(rpc_listener)),
);
Expand All @@ -368,17 +365,15 @@ impl Store {

/// Spawns the gRPC servers for replica mode.
///
/// Only the Rpc and `StoreReplica` services are exposed — no `BlockProducer`, `NtxBuilder`, or
/// proof scheduler.
/// Only the `Rpc` service is exposed — no `BlockProducer`, `NtxBuilder`, or proof scheduler.
fn spawn_replica_grpc_servers(
store_api: api::StoreApi,
grpc_options: GrpcOptionsInternal,
rpc_listener: TcpListener,
) -> anyhow::Result<JoinSet<Result<(), tonic::transport::Error>>> {
let mut join_set = JoinSet::new();

let rpc_service = store::rpc_server::RpcServer::new(store_api.clone());
let replica_service = store::store_replica_server::StoreReplicaServer::new(store_api);
let rpc_service = store::rpc_server::RpcServer::new(store_api);

let reflection_service = tonic_reflection::server::Builder::configure()
.register_file_descriptor_set(store_api_descriptor())
Expand All @@ -391,7 +386,6 @@ impl Store {
.layer(CatchPanicLayer::custom(catch_panic_layer_fn))
.layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
.add_service(rpc_service)
.add_service(replica_service)
.add_service(reflection_service)
.serve_with_incoming(TcpListenerStream::new(rpc_listener)),
);
Expand Down
142 changes: 52 additions & 90 deletions crates/store/src/server/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,34 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use miden_node_proto::generated::store::{
BlockProof,
BlockSubscriptionRequest,
ProofSubscriptionRequest,
SignedBlock,
store_replica_server,
};
use miden_node_proto::generated::rpc::{BlockSubscriptionResponse, ProofSubscriptionResponse};
use miden_node_utils::ErrorReport;
use miden_protocol::block::BlockNumber;
use pin_project::pin_project;
use tokio::sync::{OwnedSemaphorePermit, mpsc, watch};
use tokio_stream::Stream;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tonic::Status;

use crate::server::api::StoreApi;
use crate::state::{BlockCache, ProofCache, State};

// GUARDED STREAM
// ================================================================================================

/// Wraps a stream and holds a semaphore permit for its lifetime, releasing it on drop.
#[pin_project]
struct GuardedStream<S: Stream> {
pub(super) struct GuardedStream<S: Stream> {
#[pin]
inner: S,
_permit: OwnedSemaphorePermit,
}

impl<S: Stream> GuardedStream<S> {
pub(super) fn new(inner: S, permit: OwnedSemaphorePermit) -> Self {
Self { inner, _permit: permit }
}
}

impl<S: Stream> Stream for GuardedStream<S> {
type Item = S::Item;

Expand All @@ -39,87 +38,35 @@ impl<S: Stream> Stream for GuardedStream<S> {
}
}

// STORE REPLICA API
// RPC SUBSCRIPTION API
// ================================================================================================

#[tonic::async_trait]
impl store_replica_server::StoreReplica for StoreApi {
type BlockSubscriptionStream = Pin<
Box<
dyn tonic::codegen::tokio_stream::Stream<Item = Result<SignedBlock, Status>>
+ Send
+ 'static,
>,
>;

type ProofSubscriptionStream = Pin<
Box<
dyn tonic::codegen::tokio_stream::Stream<Item = Result<BlockProof, Status>>
+ Send
+ 'static,
>,
>;

/// Streams committed blocks to a replica starting from `from_block_number`.
///
/// Subscribes to the committed-tip watch channel and maintains a sequential counter. On each
/// tip advance it emits all blocks from the current position up to the new tip, falling back to
/// the block store for any entry not in the in-memory cache. The stream closes only when the
/// client disconnects or the server shuts down.
async fn block_subscription(
&self,
request: Request<BlockSubscriptionRequest>,
) -> Result<Response<Self::BlockSubscriptionStream>, Status> {
let permit = Arc::clone(&self.block_subscription_semaphore)
.try_acquire_owned()
.map_err(|_| Status::resource_exhausted("maximum block subscriptions reached"))?;

let from = BlockNumber::from(request.into_inner().block_from);

let stream = build_block_stream(
from,
self.block_cache.clone(),
self.committed_tip_rx.clone(),
Arc::clone(&self.state),
);
Ok(Response::new(Box::pin(GuardedStream { inner: stream, _permit: permit })))
}

/// Streams block proofs to a replica starting from `from_block_number`.
///
/// Uses the same watch-channel approach as [`Self::block_subscription`]: waits for the
/// proven-in-sequence tip to advance, then emits all proofs from the current position up to
/// the new tip, falling back to the block store for cache misses.
async fn proof_subscription(
&self,
request: Request<ProofSubscriptionRequest>,
) -> Result<Response<Self::ProofSubscriptionStream>, Status> {
let permit = Arc::clone(&self.proof_subscription_semaphore)
.try_acquire_owned()
.map_err(|_| Status::resource_exhausted("maximum proof subscriptions reached"))?;

let from = BlockNumber::from(request.into_inner().block_from);

let stream = build_proof_stream(
from,
self.proof_cache.clone(),
self.proven_tip_rx.clone(),
Arc::clone(&self.state),
);
Ok(Response::new(Box::pin(GuardedStream { inner: stream, _permit: permit })))
}
}
pub(super) type BlockSubscriptionStream = Pin<
Box<
dyn tonic::codegen::tokio_stream::Stream<Item = Result<BlockSubscriptionResponse, Status>>
+ Send
+ 'static,
>,
>;

pub(super) type ProofSubscriptionStream = Pin<
Box<
dyn tonic::codegen::tokio_stream::Stream<Item = Result<ProofSubscriptionResponse, Status>>
+ Send
+ 'static,
>,
>;

// STREAM BUILDERS
// ================================================================================================

/// Spawns the block-stream task and returns its output as a [`ReceiverStream`].
fn build_block_stream(
pub(super) fn build_block_stream(
from: BlockNumber,
cache: BlockCache,
tip_rx: watch::Receiver<BlockNumber>,
state: Arc<State>,
) -> impl Stream<Item = Result<SignedBlock, Status>> + Send + 'static {
) -> impl Stream<Item = Result<BlockSubscriptionResponse, Status>> + Send + 'static {
let (tx, rx) = mpsc::channel(32);
tokio::spawn(async move {
if let Err(status) = run_block_stream(from, cache, tip_rx, state, &tx).await {
Expand All @@ -131,12 +78,12 @@ fn build_block_stream(
}

/// Spawns the proof-stream task and returns its output as a [`ReceiverStream`].
fn build_proof_stream(
pub(super) fn build_proof_stream(
from: BlockNumber,
cache: ProofCache,
tip_rx: watch::Receiver<BlockNumber>,
state: Arc<State>,
) -> impl Stream<Item = Result<BlockProof, Status>> + Send + 'static {
) -> impl Stream<Item = Result<ProofSubscriptionResponse, Status>> + Send + 'static {
let (tx, rx) = mpsc::channel(32);
tokio::spawn(async move {
if let Err(status) = run_proof_stream(from, cache, tip_rx, state, &tx).await {
Expand All @@ -159,15 +106,22 @@ async fn run_block_stream(
cache: BlockCache,
mut tip_rx: watch::Receiver<BlockNumber>,
state: Arc<State>,
tx: &mpsc::Sender<Result<SignedBlock, Status>>,
tx: &mpsc::Sender<Result<BlockSubscriptionResponse, Status>>,
) -> Result<(), Status> {
let mut next = from;
loop {
// Read tip.
let tip = *tip_rx.borrow_and_update();
let mut tip = *tip_rx.borrow_and_update();
while next <= tip {
let bytes = fetch_block(next, &cache, &state).await?;
if tx.send(Ok(SignedBlock { block: bytes })).await.is_err() {
tip = *tip_rx.borrow_and_update();
if tx
.send(Ok(BlockSubscriptionResponse {
block: bytes,
committed_chain_tip: tip.as_u32(),
}))
.await
.is_err()
{
// Client disconnected.
return Ok(());
}
Expand All @@ -190,15 +144,23 @@ async fn run_proof_stream(
cache: ProofCache,
mut tip_rx: watch::Receiver<BlockNumber>,
state: Arc<State>,
tx: &mpsc::Sender<Result<BlockProof, Status>>,
tx: &mpsc::Sender<Result<ProofSubscriptionResponse, Status>>,
) -> Result<(), Status> {
let mut next = from;
loop {
// Read tip.
let tip = *tip_rx.borrow_and_update();
let mut tip = *tip_rx.borrow_and_update();
while next <= tip {
let proof = fetch_proof(next, &cache, &state).await?;
if tx.send(Ok(BlockProof { block_num: next.as_u32(), proof })).await.is_err() {
tip = *tip_rx.borrow_and_update();
if tx
.send(Ok(ProofSubscriptionResponse {
block_num: next.as_u32(),
proof,
proven_chain_tip: tip.as_u32(),
}))
.await
.is_err()
{
// Client disconnected.
return Ok(());
}
Expand Down
Loading
Loading