diff --git a/Makefile b/Makefile index cadf025d02..f546c5644a 100644 --- a/Makefile +++ b/Makefile @@ -110,6 +110,10 @@ install-node: ## Installs node install-validator: ## Installs validator cargo install --path bin/validator --locked +.PHONY: install-ntx-builder +install-ntx-builder: ## Installs ntx-builder + cargo install --path bin/ntx-builder --locked + .PHONY: install-remote-prover install-remote-prover: ## Install remote prover's CLI cargo install --path bin/remote-prover --bin miden-remote-prover --locked diff --git a/bin/node/src/commands/store.rs b/bin/node/src/commands/store.rs index 22768bf28f..87bf330668 100644 --- a/bin/node/src/commands/store.rs +++ b/bin/node/src/commands/store.rs @@ -22,7 +22,6 @@ use crate::commands::ENV_DATA_DIRECTORY; const ENV_RPC_LISTEN: &str = "MIDEN_NODE_STORE_RPC_LISTEN"; const ENV_UPSTREAM_URL: &str = "MIDEN_NODE_STORE_UPSTREAM_RPC_URL"; -const ENV_NTX_BUILDER_LISTEN: &str = "MIDEN_NODE_STORE_NTX_BUILDER_LISTEN"; const ENV_BLOCK_PRODUCER_LISTEN: &str = "MIDEN_NODE_STORE_BLOCK_PRODUCER_LISTEN"; const ENV_BLOCK_PROVER_URL: &str = "MIDEN_NODE_STORE_BLOCK_PROVER_URL"; const ENV_SQLITE_CONNECTION_POOL_SIZE: &str = "MIDEN_NODE_STORE_SQLITE_CONNECTION_POOL_SIZE"; @@ -50,10 +49,6 @@ pub enum StoreCommand { #[arg(long = "rpc.listen", env = ENV_RPC_LISTEN, value_name = "LISTEN")] rpc_listen: SocketAddr, - /// Socket address at which to serve the store's network transaction builder API. - #[arg(long = "ntx-builder.listen", env = ENV_NTX_BUILDER_LISTEN, value_name = "LISTEN")] - ntx_builder_listen: SocketAddr, - /// Socket address at which to serve the store's block producer API. #[arg(long = "block-producer.listen", env = ENV_BLOCK_PRODUCER_LISTEN, value_name = "LISTEN")] block_producer_listen: SocketAddr, @@ -100,8 +95,8 @@ pub enum StoreCommand { /// Starts the store in replica mode. /// /// In this mode the store syncs blocks from an upstream store's `Rpc` gRPC service. - /// Only the `Rpc` gRPC service is exposed — the `BlockProducer` and `NtxBuilder` services are - /// not started and no proof scheduler runs. + /// Only the `Rpc` gRPC service is exposed — the `BlockProducer` service is not started and + /// no proof scheduler runs. StartReplica { /// Socket address at which to serve the store's RPC API. #[arg(long = "rpc.listen", env = ENV_RPC_LISTEN, value_name = "LISTEN")] @@ -146,7 +141,6 @@ impl StoreCommand { }, StoreCommand::Start { rpc_listen, - ntx_builder_listen, block_producer_listen, block_prover_url, data_directory, @@ -158,7 +152,6 @@ impl StoreCommand { } => { Self::start( rpc_listen, - ntx_builder_listen, block_producer_listen, block_prover_url, data_directory, @@ -207,7 +200,6 @@ impl StoreCommand { #[expect(clippy::too_many_arguments)] async fn start( rpc_listen: SocketAddr, - ntx_builder_listen: SocketAddr, block_producer_listen: SocketAddr, block_prover_url: Option, data_directory: PathBuf, @@ -220,10 +212,6 @@ impl StoreCommand { .await .context("Failed to bind to store's RPC gRPC socket")?; - let ntx_builder_listener = tokio::net::TcpListener::bind(ntx_builder_listen) - .await - .context("Failed to bind to store's ntx-builder gRPC socket")?; - let block_producer_listener = tokio::net::TcpListener::bind(block_producer_listen) .await .context("Failed to bind to store's block-producer gRPC socket")?; @@ -232,7 +220,6 @@ impl StoreCommand { rpc_listener, mode: StoreMode::BlockProducer { block_producer_listener, - ntx_builder_listener, block_prover_url, max_concurrent_proofs, }, diff --git a/bin/ntx-builder/src/actor/execute.rs b/bin/ntx-builder/src/actor/execute.rs index 69bc220137..d52608a15a 100644 --- a/bin/ntx-builder/src/actor/execute.rs +++ b/bin/ntx-builder/src/actor/execute.rs @@ -818,8 +818,10 @@ mod tests { StoreError::GrpcClientError(tonic::Status::invalid_argument("bad input")); assert!(!is_transient_store_error(&terminal_grpc)); - let malformed = StoreError::MalformedResponse("bad".into()); - assert!(!is_transient_store_error(&malformed)); + let non_grpc = StoreError::Deserialize( + miden_protocol::utils::serde::DeserializationError::InvalidValue("bad".into()), + ); + assert!(!is_transient_store_error(&non_grpc)); } /// Smoke-test that the predicates used by the request-level retry wrappers compile and select diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index cefb9bdb65..d08a57573e 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -124,7 +124,11 @@ impl AccountActorContext { Self { clients: GrpcClients { - store: StoreClient::new(url.clone()), + store: StoreClient::new( + url.clone(), + Duration::from_millis(100), + Duration::from_secs(30), + ), block_producer: BlockProducerClient::new(url.clone()), validator: ValidatorClient::new(url), prover: None, diff --git a/bin/ntx-builder/src/builder.rs b/bin/ntx-builder/src/builder.rs index a051249601..df95d76d07 100644 --- a/bin/ntx-builder/src/builder.rs +++ b/bin/ntx-builder/src/builder.rs @@ -1,109 +1,84 @@ use std::pin::Pin; -use std::sync::Arc; use anyhow::Context; use futures::Stream; -use miden_node_proto::domain::account::NetworkAccountId; -use miden_node_proto::domain::mempool::MempoolEvent; -use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::block::BlockHeader; +use miden_protocol::block::{BlockNumber, SignedBlock}; use tokio::net::TcpListener; -use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio_stream::StreamExt; -use tonic::Status; use crate::NtxBuilderConfig; -use crate::actor::{AccountActorContext, ActorRequest}; -use crate::chain_state::SharedChainState; -use crate::clients::StoreClient; -use crate::coordinator::Coordinator; +use crate::chain_state::ChainState; +use crate::clients::StoreError; +use crate::committed_block::CommittedBlockEffects; use crate::db::Db; use crate::server::NtxBuilderRpcServer; // NETWORK TRANSACTION BUILDER // ================================================================================================ -/// A boxed, pinned stream of mempool events with a `'static` lifetime. +/// Boxed, pinned stream of committed blocks paired with the store-reported committed chain tip at +/// the time each block was emitted. /// -/// Boxing gives the stream a `'static` lifetime by ensuring it owns all its data, avoiding -/// complex lifetime annotations that would otherwise be required when storing `impl TryStream`. -pub(crate) type MempoolEventStream = - Pin> + Send>>; +/// Boxing gives the stream a `'static` lifetime by ensuring it owns all its data, avoiding the +/// complex lifetime annotations otherwise required to store `impl Stream`. +pub(crate) type BlockStream = + Pin> + Send>>; -/// Network transaction builder component. +/// Network transaction builder component (PR 1: subscription-driven sync only). /// -/// The network transaction builder is in charge of building transactions that consume notes -/// against network accounts. These notes are identified and communicated by the block producer. -/// The service maintains a list of unconsumed notes and periodically executes and proves -/// transactions that consume them (reaching out to the store to retrieve state as necessary). -/// -/// The builder manages the tasks for every network account on the chain through the coordinator. -/// -/// Create an instance using [`NtxBuilderConfig::build()`]. +/// The builder consumes the store's committed-block subscription and applies each block's +/// network-relevant effects to its local database. The actor execution path is wired back in a +/// subsequent PR; in this PR the binary stays up and keeps the local DB caught up to the live +/// chain tip without scheduling any network transactions. pub struct NetworkTransactionBuilder { /// Configuration for the builder. config: NtxBuilderConfig, - /// Coordinator for managing actor tasks. - coordinator: Coordinator, - /// Client for the store gRPC API. - store: StoreClient, /// Database for persistent state. db: Db, - /// Shared chain state updated by the event loop and read by actors. - chain_state: Arc, - /// Context shared with all account actors. - actor_context: AccountActorContext, - /// Stream of mempool events from the block producer. - mempool_events: MempoolEventStream, - /// Database update requests from account actors. - /// - /// We keep database writes centralized so this is how actors communicate - /// items to write. - actor_request_rx: mpsc::Receiver, + /// Stream of committed blocks from the store. + block_stream: BlockStream, + /// Highest block number applied to the DB so far. + last_applied_block: BlockNumber, + /// In-memory partial chain (tip header + chain MMR + tracked recent headers). Persisted + /// alongside each block in the DB so the builder can resume without replaying genesis on + /// restart. + chain: ChainState, + /// `false` until the first applied block whose `committed_chain_tip` matches the just-applied + /// block number. Stays `true` afterwards. Exposed so the gRPC status surface and PR 2's actor + /// spawn gating can read it. + is_synced: bool, } impl NetworkTransactionBuilder { - #[expect(clippy::too_many_arguments)] pub(crate) fn new( config: NtxBuilderConfig, - coordinator: Coordinator, - store: StoreClient, db: Db, - chain_state: Arc, - actor_context: AccountActorContext, - mempool_events: MempoolEventStream, - actor_request_rx: mpsc::Receiver, + block_stream: BlockStream, + last_applied_block: BlockNumber, + chain: ChainState, ) -> Self { Self { config, - coordinator, - store, db, - chain_state, - actor_context, - mempool_events, - actor_request_rx, + block_stream, + last_applied_block, + chain, + is_synced: false, } } + /// Returns `true` once the builder has caught up to the store's committed chain tip at least + /// once. Stays `true` for the lifetime of the process. + pub fn is_synced(&self) -> bool { + self.is_synced + } + /// Runs the network transaction builder event loop until a fatal error occurs. /// - /// If a `TcpListener` is provided, a gRPC server is also spawned to expose the - /// `GetNetworkNoteStatus` endpoint. - /// - /// This method: - /// 1. Optionally starts a gRPC server for note error queries - /// 2. Spawns a background task to load existing network accounts from the store - /// 3. Runs the main event loop, processing mempool events and managing actors - /// - /// # Errors - /// - /// Returns an error if: - /// - The mempool event stream ends unexpectedly - /// - An actor encounters a fatal error - /// - The account loader task fails - /// - The gRPC server fails + /// 1. Starts the gRPC server for note status queries. + /// 2. Continuously drains the committed-block subscription, applying each block's effects to + /// the local DB. pub async fn run(self, listener: TcpListener) -> anyhow::Result<()> { let mut join_set = JoinSet::new(); @@ -124,170 +99,53 @@ impl NetworkTransactionBuilder { Ok(()) } - /// Runs the main event loop. async fn run_event_loop(mut self) -> anyhow::Result<()> { - // Spawn a background task to load network accounts from the store. Accounts are sent - // through a channel and processed in the main event loop. - let (account_tx, mut account_rx) = - mpsc::channel::(self.config.account_channel_capacity); - let account_loader_store = self.store.clone(); - let mut account_loader_handle = tokio::spawn(async move { - account_loader_store - .stream_network_account_ids(account_tx) - .await - .context("failed to load network accounts from store") - }); - - // Main event loop. loop { - tokio::select! { - // Handle actor result. If a timed-out actor needs respawning, do so. - result = self.coordinator.next() => { - if let Some(account_id) = result? { - self.coordinator - .spawn_actor(account_id, &self.actor_context); - } - }, - // Handle mempool events. - event = self.mempool_events.next() => { - let event = event - .context("mempool event stream ended")? - .context("mempool event stream failed")?; - - self.handle_mempool_event(event).await?; - }, - // Handle account batches loaded from the store. Once all accounts are loaded, the - // channel closes and this branch becomes inactive (recv returns None and we stop - // matching). - Some(account_id) = account_rx.recv() => { - self.handle_loaded_account(account_id).await?; - }, - // Handle requests from actors. - Some(request) = self.actor_request_rx.recv() => { - self.handle_actor_request(request).await?; - }, - // Handle account loader task completion/failure. If the task fails, we abort since - // the builder would be in a degraded state where existing notes against network - // accounts won't be processed. - result = &mut account_loader_handle => { - result - .context("account loader task panicked") - .flatten()?; - - tracing::info!("account loading from store completed"); - account_loader_handle = tokio::spawn(std::future::pending()); - }, - } + let (block, committed_tip) = self + .block_stream + .next() + .await + .context("block stream ended")? + .context("block stream failed")?; + self.apply_committed_block(block, committed_tip).await?; } } - /// Handles account IDs loaded from the store by syncing state to DB and spawning actors. - #[tracing::instrument(name = "ntx.builder.handle_loaded_account", skip(self, account_id))] - async fn handle_loaded_account( + /// Applies a single committed block's effects to the DB, advances the in-memory partial chain, + /// persists the updated chain MMR atomically with the effects, and flips `is_synced` the first + /// time the applied block matches the store-reported committed tip. + #[tracing::instrument( + name = "ntx.builder.apply_committed_block", + skip(self, block), + fields(block_num = %block.header().block_num(), %committed_tip), + )] + async fn apply_committed_block( &mut self, - account_id: NetworkAccountId, - ) -> Result<(), anyhow::Error> { - // Fetch account from store and write to DB. - let account = self - .store - .get_network_account(account_id) - .await - .context("failed to load account from store")? - .context("account should exist in store")?; + block: SignedBlock, + committed_tip: BlockNumber, + ) -> anyhow::Result<()> { + let header = block.header().clone(); + let block_num = header.block_num(); - let block_num = self.chain_state.chain_tip_block_number(); - let notes = self - .store - .get_unconsumed_network_notes(account_id, block_num.as_u32()) - .await - .context("failed to load notes from store")?; + let effects = CommittedBlockEffects::from_signed_block(&block); + + // Advance the in-memory chain (adds the previous tip header as an MMR leaf and prunes older + // tracked headers) before snapshotting the MMR for persistence. + self.chain.update_chain_tip(header, self.config.max_block_count); + let next_mmr = self.chain.current_mmr(); - // Write account and notes to DB. self.db - .sync_account_from_store(account_id, account.clone(), notes.clone()) + .apply_committed_block(effects, next_mmr) .await - .context("failed to sync account to DB")?; - - self.coordinator.spawn_actor(account_id, &self.actor_context); - Ok(()) - } + .context("failed to apply committed block to DB")?; - /// Handles mempool events by writing to DB first, then notifying actors. - #[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))] - async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> { - match &event { - MempoolEvent::TransactionAdded { account_delta, .. } => { - // Write event effects to DB first. - self.coordinator - .write_event(&event) - .await - .context("failed to write TransactionAdded to DB")?; + self.last_applied_block = block_num; - // Spawn new actors for newly created network accounts. - if let Some(AccountUpdateDetails::Delta(delta)) = account_delta { - if delta.is_full_state() { - if let Ok(network_id) = NetworkAccountId::try_from(delta.id()) { - self.coordinator.spawn_actor(network_id, &self.actor_context); - } - } - } - let inactive_targets = self.coordinator.send_targeted(&event); - for account_id in inactive_targets { - self.coordinator.spawn_actor(account_id, &self.actor_context); - } - Ok(()) - }, - // Update chain state and notify affected actors. - MempoolEvent::BlockCommitted { header, .. } => { - // Write event effects to DB first. - let result = self - .coordinator - .write_event(&event) - .await - .context("failed to write BlockCommitted to DB")?; - - self.update_chain_tip(header.as_ref().clone()); - self.coordinator.notify_accounts(&result.accounts_to_notify); - Ok(()) - }, - // Notify affected actors (reverted account actors will self-cancel when they detect - // their account has been removed from the DB). - MempoolEvent::TransactionsReverted(_) => { - // Write event effects to DB first. - let result = self - .coordinator - .write_event(&event) - .await - .context("failed to write TransactionsReverted to DB")?; - - self.coordinator.notify_accounts(&result.accounts_to_notify); - Ok(()) - }, + if !self.is_synced && block_num == committed_tip { + self.is_synced = true; + tracing::info!(block = %block_num, "ntx-builder caught up to chain tip"); } - } - /// Processes a request from an account actor. - async fn handle_actor_request(&mut self, request: ActorRequest) -> Result<(), anyhow::Error> { - match request { - ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => { - self.db - .notes_failed(failed_notes, block_num) - .await - .context("failed to mark notes as failed")?; - let _ = ack_tx.send(()); - }, - ActorRequest::CacheNoteScript { script_root, script } => { - self.db - .insert_note_script(script_root, &script) - .await - .context("failed to cache note script")?; - }, - } Ok(()) } - - /// Updates the chain tip and prunes old blocks from the MMR. - fn update_chain_tip(&mut self, tip: BlockHeader) { - self.chain_state.update_chain_tip(tip, self.config.max_block_count); - } } diff --git a/bin/ntx-builder/src/chain_state.rs b/bin/ntx-builder/src/chain_state.rs index b59549ffec..033695c479 100644 --- a/bin/ntx-builder/src/chain_state.rs +++ b/bin/ntx-builder/src/chain_state.rs @@ -47,8 +47,18 @@ impl ChainState { (self.chain_tip_header, self.chain_mmr) } + /// Returns the current chain tip header. + pub(crate) fn chain_tip_header(&self) -> &BlockHeader { + &self.chain_tip_header + } + + /// Returns a clone of the current partial chain MMR. + pub(crate) fn current_mmr(&self) -> PartialMmr { + self.chain_mmr.mmr().clone() + } + /// Updates the chain tip and prunes old blocks from the MMR. - fn update_chain_tip(&mut self, tip: BlockHeader, max_block_count: usize) { + pub(crate) fn update_chain_tip(&mut self, tip: BlockHeader, max_block_count: usize) { // Skip blocks already reflected in the chain state. A `BlockCommitted` event may arrive for // a block whose state was already loaded from the store during startup: the mempool // subscription is established first and then the chain tip is fetched, so any block diff --git a/bin/ntx-builder/src/clients/block_producer.rs b/bin/ntx-builder/src/clients/block_producer.rs index b761e7dde1..0daf38e6cc 100644 --- a/bin/ntx-builder/src/clients/block_producer.rs +++ b/bin/ntx-builder/src/clients/block_producer.rs @@ -1,13 +1,7 @@ -use std::time::Duration; - -use futures::{TryStream, TryStreamExt}; use miden_node_proto::clients::{BlockProducerClient as InnerBlockProducerClient, Builder}; -use miden_node_proto::domain::mempool::MempoolEvent; use miden_node_proto::generated::{self as proto}; -use miden_node_utils::FlattenResult; use miden_protocol::transaction::ProvenTransaction; use miden_protocol::utils::serde::Serializable; -use tokio_stream::StreamExt; use tonic::Status; use tracing::{info, instrument}; use url::Url; @@ -27,6 +21,7 @@ pub struct BlockProducerClient { impl BlockProducerClient { /// Creates a new block producer client with a lazy connection. + #[cfg_attr(not(test), expect(dead_code))] pub fn new(block_producer_url: Url) -> Self { info!(target: COMPONENT, block_producer_endpoint = %block_producer_url, "Initializing block producer client with lazy connection"); @@ -53,45 +48,4 @@ impl BlockProducerClient { Ok(()) } - - #[instrument(target = COMPONENT, name = "ntx.block_producer.client.subscribe_to_mempool", skip_all, err)] - pub async fn subscribe_to_mempool_with_retry( - &self, - ) -> Result + Send + 'static, Status> { - let mut retry_counter = 0; - loop { - match self.subscribe_to_mempool().await { - Err(err) if err.code() == tonic::Code::Unavailable => { - // Exponential backoff with base 500ms and max 30s. - let backoff = Duration::from_millis(500) - .saturating_mul(1 << retry_counter.min(6)) - .min(Duration::from_secs(30)); - - tracing::warn!( - ?backoff, - %retry_counter, - %err, - "connection failed while subscribing to the mempool, retrying" - ); - - retry_counter += 1; - tokio::time::sleep(backoff).await; - }, - result => return result, - } - } - } - - async fn subscribe_to_mempool( - &self, - ) -> Result + Send + 'static, Status> { - let stream = self.client.clone().mempool_subscription(()).await?; - - let stream = stream - .into_inner() - .map_ok(MempoolEvent::try_from) - .map(FlattenResult::flatten_result); - - Ok(stream) - } } diff --git a/bin/ntx-builder/src/clients/store.rs b/bin/ntx-builder/src/clients/store.rs index c901fc69ab..4f25008c3c 100644 --- a/bin/ntx-builder/src/clients/store.rs +++ b/bin/ntx-builder/src/clients/store.rs @@ -1,34 +1,19 @@ use std::collections::BTreeSet; -use std::ops::RangeInclusive; use std::time::Duration; -use miden_node_proto::clients::{Builder, StoreNtxBuilderClient}; -use miden_node_proto::decode::ConversionResultExt; -use miden_node_proto::domain::account::{AccountDetails, AccountResponse, NetworkAccountId}; -use miden_node_proto::errors::ConversionError; -use miden_node_proto::generated::rpc::BlockRange; -use miden_node_proto::generated::{self as proto}; -use miden_node_proto::try_convert; -use miden_node_utils::tracing::OpenTelemetrySpanExt; +use backon::{ExponentialBuilder, Retryable}; +use futures::Stream; +use futures::stream::TryStreamExt; +use miden_node_proto::clients::{Builder, StoreRpcClient}; +use miden_node_proto::generated::rpc::{BlockSubscriptionRequest, BlockSubscriptionResponse}; +use miden_node_utils::ErrorReport; use miden_protocol::Word; -use miden_protocol::account::{ - Account, - AccountCode, - AccountId, - PartialAccount, - PartialStorage, - StorageMapKey, - StorageMapWitness, - StorageSlotName, -}; -use miden_protocol::asset::{AssetVaultKey, AssetWitness, PartialVault}; -use miden_protocol::block::{BlockHeader, BlockNumber}; -use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks, PartialMmr}; -use miden_protocol::crypto::merkle::smt::SmtProof; +use miden_protocol::account::{AccountId, StorageMapKey, StorageMapWitness, StorageSlotName}; +use miden_protocol::asset::{AssetVaultKey, AssetWitness}; +use miden_protocol::block::{BlockNumber, SignedBlock}; use miden_protocol::note::NoteScript; use miden_protocol::transaction::AccountInputs; -use miden_protocol::utils::serde::{Deserializable, Serializable}; -use miden_standards::note::AccountTargetNetworkNote; +use miden_protocol::utils::serde::Deserializable; use thiserror::Error; use tracing::{info, instrument}; use url::Url; @@ -38,17 +23,22 @@ use crate::COMPONENT; // STORE CLIENT // ================================================================================================ -/// Interface to the store's ntx-builder gRPC API. -/// -/// Essentially just a thin wrapper around the generated gRPC client which improves type safety. +/// Thin wrapper around the store's `Rpc` gRPC service that the ntx-builder uses to consume the +/// committed-block subscription stream. #[derive(Clone, Debug)] pub struct StoreClient { - inner: StoreNtxBuilderClient, + inner: StoreRpcClient, + /// Backoff schedule applied to repeated `block_subscription` connection attempts. Built once at + /// construction time and cloned cheaply on each retry loop. + backoff: ExponentialBuilder, } impl StoreClient { - /// Creates a new store client with a lazy connection. - pub fn new(store_url: Url) -> Self { + /// Creates a new store client with a lazy connection to the store's RPC endpoint. + /// + /// `backoff_initial` / `backoff_max` configure the exponential backoff schedule applied to + /// `block_subscription` retries (the only operation that retries today). + pub fn new(store_url: Url, backoff_initial: Duration, backoff_max: Duration) -> Self { info!(target: COMPONENT, store_endpoint = %store_url, "Initializing store client"); let store = Builder::new(store_url) @@ -57,459 +47,126 @@ impl StoreClient { .without_metadata_version() .without_metadata_genesis() .with_otel_context_injection() - .connect_lazy::(); - - Self { inner: store } - } - - /// Returns the block header and MMR peaks at the current chain tip. - #[instrument(target = COMPONENT, name = "store.client.get_latest_blockchain_data_with_retry", skip_all, err)] - pub async fn get_latest_blockchain_data_with_retry( - &self, - ) -> Result, StoreError> { - let mut retry_counter = 0; - loop { - match self.get_latest_blockchain_data().await { - Err(StoreError::GrpcClientError(err)) => { - // Exponential backoff with base 500ms and max 30s. - let backoff = Duration::from_millis(500) - .saturating_mul(1 << retry_counter.min(6)) - .min(Duration::from_secs(30)); - - tracing::warn!( - ?backoff, - %retry_counter, - %err, - "store connection failed while fetching latest blockchain data, retrying" - ); - - retry_counter += 1; - tokio::time::sleep(backoff).await; - }, - result => return result, - } - } - } - - #[instrument(target = COMPONENT, name = "store.client.get_latest_blockchain_data", skip_all, err)] - async fn get_latest_blockchain_data( - &self, - ) -> Result, StoreError> { - let request = tonic::Request::new(proto::blockchain::MaybeBlockNumber::default()); - - let response = self.inner.clone().get_current_blockchain_data(request).await?.into_inner(); - - match response.current_block_header { - // There are new blocks compared to the builder's latest state - Some(block) => { - let peaks: Vec = try_convert(response.current_peaks) - .collect::>() - .context("current_peaks") - .map_err(StoreError::DeserializationError)?; - let header = - BlockHeader::try_from(block).map_err(StoreError::DeserializationError)?; - - let peaks = MmrPeaks::new(Forest::new(header.block_num().as_usize()), peaks) - .map_err(|_| { - StoreError::MalformedResponse( - "returned peaks are not valid for the sent request".into(), - ) - })?; - - let partial_mmr = PartialMmr::from_peaks(peaks); - - Ok(Some((header, partial_mmr))) - }, - // No new blocks were created, return - None => Ok(None), - } - } - - #[instrument(target = COMPONENT, name = "store.client.get_network_account", skip_all, err)] - pub async fn get_network_account( - &self, - account_id: NetworkAccountId, - ) -> Result, StoreError> { - let request = proto::account::AccountId::from(account_id.inner()); - - let store_response = self - .inner - .clone() - .get_network_account_details_by_id(request) - .await? - .into_inner() - .details; - - // we only care about the case where the account returns and is actually a network account, - // which implies details being public, so OK to error otherwise - let account = match store_response.map(|acc| acc.details) { - Some(Some(details)) => Some(Account::read_from_bytes(&details).map_err(|err| { - StoreError::DeserializationError(ConversionError::from(err).context("details")) - })?), - _ => None, - }; - - Ok(account) - } - - /// Get the inputs for an account at a given block number from the store. - /// - /// Retrieves account details from the store. The retrieved details are limited to the account - /// code, account header, and storage header. The vault and storage slots are not required for - /// the purposes of the NTX Builder. - #[instrument(target = COMPONENT, name = "store.client.get_account_inputs", skip_all, err)] - pub async fn get_account_inputs( - &self, - account_id: AccountId, - block_num: BlockNumber, - ) -> Result { - // Construct proto request. - let proto_request = proto::rpc::AccountRequest { - account_id: Some(proto::account::AccountId { id: account_id.to_bytes() }), - block_num: Some(block_num.into()), - // Request account code, account header, and storage header in order to build minimal - // partial account. - details: Some(proto::rpc::account_request::AccountDetailRequest { - code_commitment: Some(Word::default().into()), - asset_vault_commitment: None, - storage_maps: vec![], - }), - }; - - // Make the gRPC call. - let proto_response = self.inner.clone().get_account(proto_request).await?.into_inner(); - - // Convert proto response to domain type. - let account_response = - AccountResponse::try_from(proto_response).map_err(StoreError::DeserializationError)?; - - // Build partial account. - let account_details = account_response - .details - .ok_or(StoreError::MissingDetails("account details".into()))?; - let partial_account = build_minimal_foreign_account(&account_details) - .map_err(StoreError::DeserializationError)?; - - Ok(AccountInputs::new(partial_account, account_response.witness)) - } - - /// Returns the list of unconsumed network notes for a specific network account up to a - /// specified block. - #[instrument(target = COMPONENT, name = "store.client.get_unconsumed_network_notes", skip_all, err)] - pub async fn get_unconsumed_network_notes( - &self, - network_account_id: NetworkAccountId, - block_num: u32, - ) -> Result, StoreError> { - // Upper bound of each note is ~10KB. Limit page size to ~10MB. - const PAGE_SIZE: u64 = 1024; - - let mut all_notes = Vec::new(); - let mut page_token: Option = None; - - let mut store_client = self.inner.clone(); - loop { - let req = proto::store::UnconsumedNetworkNotesRequest { - page_token, - page_size: PAGE_SIZE, - account_id: Some(network_account_id.inner().into()), - block_num, - }; - let resp = store_client.get_unconsumed_network_notes(req).await?.into_inner(); + .connect_lazy::(); - all_notes.reserve(resp.notes.len()); - for note in resp.notes { - all_notes.push( - AccountTargetNetworkNote::try_from(note) - .map_err(StoreError::DeserializationError)?, - ); - } + let backoff = ExponentialBuilder::default() + .with_min_delay(backoff_initial) + .with_max_delay(backoff_max) + .with_factor(2.0) + .with_jitter() + .without_max_times(); - match resp.next_token { - Some(token) => page_token = Some(token), - None => break, - } - } - - Ok(all_notes) - } - - /// Streams network account IDs to the provided sender. - /// - /// This method is designed to be run in a background task, sending accounts to the main event - /// loop as they are loaded. This allows the ntx-builder to start processing mempool events - /// without waiting for all accounts to be preloaded. - pub async fn stream_network_account_ids( - &self, - sender: tokio::sync::mpsc::Sender, - ) -> Result<(), StoreError> { - let mut block_range = BlockNumber::GENESIS..=BlockNumber::MAX; - - while let Some(next_start) = self.load_accounts_page(block_range, &sender).await? { - block_range = next_start..=BlockNumber::MAX; - } - - Ok(()) + Self { inner: store, backoff } } - /// Loads a single page of network accounts and submits them to the sender. + /// Opens a committed-block subscription starting at `block_from`, retrying indefinitely with + /// the client's configured exponential backoff while the initial connection attempt fails. /// - /// Returns the next block number to fetch from, or `None` if the chain tip has been reached. - #[instrument(target = COMPONENT, name = "store.client.load_accounts_page", skip_all, err)] - async fn load_accounts_page( - &self, - block_range: RangeInclusive, - sender: &tokio::sync::mpsc::Sender, - ) -> Result, StoreError> { - let (accounts, pagination_info) = self.fetch_network_account_ids_page(block_range).await?; - - let chain_tip = pagination_info.chain_tip; - let current_height = pagination_info.block_num; - - self.send_accounts_to_channel(accounts, sender).await?; - - if current_height >= chain_tip { - Ok(None) - } else { - Ok(Some(BlockNumber::from(current_height + 1))) - } - } - - #[instrument(target = COMPONENT, name = "store.client.fetch_network_account_ids_page", skip_all, err)] - async fn fetch_network_account_ids_page( - &self, - block_range: std::ops::RangeInclusive, - ) -> Result<(Vec, proto::rpc::PaginationInfo), StoreError> { - self.fetch_network_account_ids_page_inner(block_range) - .await - .inspect_err(|err| tracing::Span::current().set_error(err)) - } - - async fn fetch_network_account_ids_page_inner( + /// Returns a stream that decodes each [`BlockSubscriptionResponse`] into a `(SignedBlock, + /// committed_chain_tip)` pair. The committed chain tip is the latest block the store believes + /// is committed at the moment the response was emitted; the ntx-builder uses it to decide + /// when it has caught up to the live tip. + #[instrument( + target = COMPONENT, + name = "store.client.block_subscription_with_retry", + skip_all, + fields(%block_from), + err, + )] + pub async fn block_subscription_with_retry( &self, - block_range: std::ops::RangeInclusive, - ) -> Result<(Vec, proto::rpc::PaginationInfo), StoreError> { - let mut retry_counter = 0u32; - - let response = loop { - match self + block_from: BlockNumber, + ) -> Result< + impl Stream> + Send + 'static, + StoreError, + > { + (|| async move { + let request = + tonic::Request::new(BlockSubscriptionRequest { block_from: block_from.as_u32() }); + let stream = self .inner .clone() - .get_network_account_ids(Into::::into(block_range.clone())) + .block_subscription(request) .await - { - Ok(response) => break response.into_inner(), - Err(err) => { - // Exponential backoff with base 500ms and max 30s. - let backoff = Duration::from_millis(500) - .saturating_mul(1 << retry_counter.min(6)) - .min(Duration::from_secs(30)); - - tracing::warn!( - ?backoff, - %retry_counter, - %err, - "store connection failed while fetching committed accounts page, retrying" - ); - - retry_counter += 1; - tokio::time::sleep(backoff).await; - }, - } - }; - - let accounts = response - .account_ids - .into_iter() - .map(|account_id| { - let account_id = AccountId::read_from_bytes(&account_id.id).map_err(|err| { - StoreError::DeserializationError( - ConversionError::from(err).context("account_id"), - ) - })?; - NetworkAccountId::try_from(account_id).map_err(|_| { - StoreError::MalformedResponse( - "account id is not a valid network account".into(), - ) - }) - }) - .collect::, StoreError>>()?; - - let pagination_info = response.pagination_info.ok_or(ConversionError::missing_field::< - proto::store::NetworkAccountIdList, - >("pagination_info"))?; - - Ok((accounts, pagination_info)) + .map_err(StoreError::GrpcClientError)? + .into_inner(); + + Ok(stream + .map_err(StoreError::GrpcClientError) + .and_then(|response| async move { decode_block_subscription_response(&response) })) + }) + .retry(self.backoff) + .notify(|err: &StoreError, dur| { + tracing::warn!( + target: COMPONENT, + sleep_ms = dur.as_millis() as u64, + err = %err.as_report(), + "store connection failed while opening block subscription, retrying", + ); + }) + .await } +} - #[instrument( - target = COMPONENT, - name = "store.client.send_accounts_to_channel", - skip_all - )] - async fn send_accounts_to_channel( - &self, - accounts: Vec, - sender: &tokio::sync::mpsc::Sender, - ) -> Result<(), StoreError> { - for account in accounts { - // If the receiver is dropped, stop loading. - if sender.send(account).await.is_err() { - tracing::warn!("Account receiver dropped"); - return Ok(()); - } - } +fn decode_block_subscription_response( + response: &BlockSubscriptionResponse, +) -> Result<(SignedBlock, BlockNumber), StoreError> { + let block = SignedBlock::read_from_bytes(&response.block).map_err(StoreError::Deserialize)?; + let committed_tip = BlockNumber::from(response.committed_chain_tip); + Ok((block, committed_tip)) +} - Ok(()) - } +// ACTOR-PATH METHODS +// ================================================================================================ +// +// The actor module still references these methods. PR 1 keeps the actor code in tree as dead +// code (it is not spawned), so the methods exist as stubs to preserve compilation. PR 2 wires +// them through the appropriate store gRPC service. - #[instrument(target = COMPONENT, name = "store.client.get_note_script_by_root", skip_all, err)] - pub async fn get_note_script_by_root( +#[expect(clippy::unused_async)] +impl StoreClient { + pub async fn get_account_inputs( &self, - root: Word, - ) -> Result, StoreError> { - let request = proto::note::NoteScriptRoot { root: Some(root.into()) }; - - let script = self.inner.clone().get_note_script_by_root(request).await?.into_inner().script; - - script - .map(NoteScript::try_from) - .transpose() - .map_err(StoreError::DeserializationError) + _account_id: AccountId, + _block_num: BlockNumber, + ) -> Result { + unimplemented!("get_account_inputs is rewired in PR 2 of the ntx-builder refactor") } - #[instrument(target = COMPONENT, name = "store.client.get_vault_asset_witnesses", skip_all, err)] pub async fn get_vault_asset_witnesses( &self, - account_id: AccountId, - vault_keys: BTreeSet, - block_num: Option, + _account_id: AccountId, + _vault_keys: BTreeSet, + _block_num: Option, ) -> Result, StoreError> { - // Construct proto request. - let request = proto::store::VaultAssetWitnessesRequest { - account_id: Some(proto::account::AccountId { id: account_id.to_bytes() }), - vault_keys: vault_keys - .into_iter() - .map(|key| { - let word: Word = key.into(); - word.into() - }) - .collect(), - block_num: block_num.map(|num| num.as_u32()), - }; - - // Make the gRPC request. - let witness_proto = - self.inner.clone().get_vault_asset_witnesses(request).await?.into_inner(); - - // Convert the response to domain type. - let mut asset_witnesses = Vec::new(); - for asset_witness in witness_proto.asset_witnesses { - let smt_opening = asset_witness.proof.ok_or_else(|| { - StoreError::MalformedResponse("missing proof in vault asset witness".to_string()) - })?; - let proof: SmtProof = smt_opening - .try_into() - .context("proof") - .map_err(StoreError::DeserializationError)?; - let witness = AssetWitness::new(proof) - .map_err(|err| StoreError::DeserializationError(ConversionError::from(err)))?; - - asset_witnesses.push(witness); - } - - Ok(asset_witnesses) + unimplemented!("get_vault_asset_witnesses is rewired in PR 2 of the ntx-builder refactor") } - #[instrument(target = COMPONENT, name = "store.client.get_storage_map_witness", skip_all, err)] pub async fn get_storage_map_witness( &self, - account_id: AccountId, - slot_name: StorageSlotName, - map_key: StorageMapKey, - block_num: Option, + _account_id: AccountId, + _slot_name: StorageSlotName, + _map_key: StorageMapKey, + _block_num: Option, ) -> Result { - // Construct proto request. - let request = proto::store::StorageMapWitnessRequest { - account_id: Some(proto::account::AccountId { id: account_id.to_bytes() }), - map_key: Some(map_key.into()), - slot_name: slot_name.to_string(), - block_num: block_num.map(|num| num.as_u32()), - }; - - // Make the request to the store. - let witness_proto = self.inner.clone().get_storage_map_witness(request).await?.into_inner(); - - // Convert the response to domain type. - let witness_proto = witness_proto.witness.ok_or_else(|| { - StoreError::MalformedResponse("missing storage map witness in response".to_string()) - })?; - - let smt_opening = witness_proto.proof.ok_or_else(|| { - StoreError::MalformedResponse("missing proof in storage map witness".to_string()) - })?; - - let proof: SmtProof = smt_opening - .try_into() - .context("proof") - .map_err(StoreError::DeserializationError)?; - - // Create the storage map witness using the proof and raw map key. - let witness = StorageMapWitness::new(proof, [map_key]).map_err(|_err| { - StoreError::MalformedResponse("failed to create storage map witness".to_string()) - })?; + unimplemented!("get_storage_map_witness is rewired in PR 2 of the ntx-builder refactor") + } - Ok(witness) + pub async fn get_note_script_by_root( + &self, + _script_root: Word, + ) -> Result, StoreError> { + unimplemented!("get_note_script_by_root is rewired in PR 2 of the ntx-builder refactor") } } // STORE ERROR -// ================================================================================================= +// ================================================================================================ #[derive(Debug, Error)] pub enum StoreError { - #[error("gRPC client error")] - GrpcClientError(#[from] tonic::Status), - #[error("malformed response from store: {0}")] - MalformedResponse(String), - #[error("failed to parse response")] - DeserializationError(#[from] ConversionError), - #[error("missing details: {0}")] - MissingDetails(String), -} - -// HELPERS -// ================================================================================================= - -/// Builds a minimal partial account from the provided account details. -/// -/// The partial account is built without storage maps or an asset vault. This is intended to be used -/// to retrieve foreign account data during transaction execution. -pub fn build_minimal_foreign_account( - account_details: &AccountDetails, -) -> Result { - // Derive account code. - let account_code_bytes = account_details.account_code.as_ref().ok_or_else(|| { - ConversionError::missing_field::( - "account_code", - ) - })?; - let account_code = AccountCode::read_from_bytes(account_code_bytes)?; - - // Derive partial storage. Storage maps are not required for foreign accounts. - let partial_storage = PartialStorage::new(account_details.storage_details.header.clone(), [])?; - - // Derive partial vault from vault root only. - let partial_vault = PartialVault::new(account_details.account_header.vault_root()); - - // Construct partial account. - let partial_account = PartialAccount::new( - account_details.account_header.id(), - account_details.account_header.nonce(), - account_code, - partial_storage, - partial_vault, - None, - )?; - Ok(partial_account) + #[error("store gRPC call failed")] + GrpcClientError(#[source] tonic::Status), + #[error("failed to deserialize subscription payload")] + Deserialize(#[source] miden_protocol::utils::serde::DeserializationError), } diff --git a/bin/ntx-builder/src/clients/validator.rs b/bin/ntx-builder/src/clients/validator.rs index 9b095079b7..e8ecb01e20 100644 --- a/bin/ntx-builder/src/clients/validator.rs +++ b/bin/ntx-builder/src/clients/validator.rs @@ -25,6 +25,7 @@ pub struct ValidatorClient { impl ValidatorClient { /// Creates a new validator client with a lazy connection and a 10-second timeout. + #[cfg_attr(not(test), expect(dead_code))] pub fn new(validator_url: Url) -> Self { info!(target: COMPONENT, validator_endpoint = %validator_url, "Initializing validator client with lazy connection"); diff --git a/bin/ntx-builder/src/commands/mod.rs b/bin/ntx-builder/src/commands/mod.rs index a883b81a8b..4d08e3ed09 100644 --- a/bin/ntx-builder/src/commands/mod.rs +++ b/bin/ntx-builder/src/commands/mod.rs @@ -33,7 +33,7 @@ pub enum NtxBuilderCommand { #[arg(long = "listen", env = ENV_LISTEN, value_name = "LISTEN")] listen: SocketAddr, - /// The store's ntx-builder service gRPC url. + /// The store's RPC service gRPC url. #[arg(long = "store.url", env = ENV_STORE_URL, value_name = "URL")] store_url: Url, diff --git a/bin/ntx-builder/src/committed_block.rs b/bin/ntx-builder/src/committed_block.rs new file mode 100644 index 0000000000..bfe588a5f1 --- /dev/null +++ b/bin/ntx-builder/src/committed_block.rs @@ -0,0 +1,60 @@ +use miden_node_proto::domain::account::NetworkAccountId; +use miden_protocol::account::delta::AccountUpdateDetails; +use miden_protocol::block::{BlockHeader, SignedBlock}; +use miden_protocol::note::Nullifier; +use miden_protocol::transaction::OutputNote; +use miden_standards::note::AccountTargetNetworkNote; + +/// Network-relevant state extracted from a committed [`SignedBlock`]. +/// +/// Produced once per committed block on the ntx-builder side. Downstream code (DB layer, +/// coordinator) applies the contained effects to local state. +#[derive(Debug, Clone)] +pub struct CommittedBlockEffects { + pub header: BlockHeader, + pub network_notes: Vec, + pub nullifiers: Vec, + pub network_account_updates: Vec<(NetworkAccountId, AccountUpdateDetails)>, +} + +impl CommittedBlockEffects { + /// Filters the committed block down to the slice the ntx-builder cares about: public network + /// notes, network-account updates, and all created nullifiers. + /// + /// Private output notes cannot be network notes (which must be public) and are skipped. Non- + /// network output notes and non-network account updates are also dropped. + pub fn from_signed_block(block: &SignedBlock) -> Self { + let header = block.header().clone(); + let body = block.body(); + + let mut network_notes = Vec::new(); + for batch in body.output_note_batches() { + for (_idx, output_note) in batch { + if let OutputNote::Public(public) = output_note + && let Ok(network_note) = + AccountTargetNetworkNote::new(public.as_note().clone()) + { + network_notes.push(network_note); + } + } + } + + let nullifiers = body.created_nullifiers().to_vec(); + + let network_account_updates = body + .updated_accounts() + .iter() + .filter_map(|update| { + let network_id = NetworkAccountId::try_from(update.account_id()).ok()?; + Some((network_id, update.details().clone())) + }) + .collect(); + + Self { + header, + network_notes, + nullifiers, + network_account_updates, + } + } +} diff --git a/bin/ntx-builder/src/db/migrations.rs b/bin/ntx-builder/src/db/migrations.rs index 0a49bdc1a6..195301353c 100644 --- a/bin/ntx-builder/src/db/migrations.rs +++ b/bin/ntx-builder/src/db/migrations.rs @@ -30,7 +30,7 @@ mod tests { use super::*; const EXPECTED_SCHEMA_HASHES: [SchemaHash; 1] = [SchemaHash::from_hex( - "c6434bc6a142cd96dd4072bea641546d99788b1495cb0e52c2d98b9138f9c30d", + "892f3fb597808a97bdb55762a6ebd4b7941c855d22eb5e0d9b210901720e1125", )]; #[test] diff --git a/bin/ntx-builder/src/db/migrations/001_initial.sql b/bin/ntx-builder/src/db/migrations/001_initial.sql index 348fac8798..06b6a80d4d 100644 --- a/bin/ntx-builder/src/db/migrations/001_initial.sql +++ b/bin/ntx-builder/src/db/migrations/001_initial.sql @@ -1,48 +1,44 @@ --- Singleton row storing the chain tip header. --- The chain MMR is reconstructed on startup from the store and maintained in memory. +-- Singleton row storing the chain tip header and the partial chain MMR. +-- +-- The MMR is persisted so the ntx-builder can resume from its committed chain state on restart +-- without having to replay the full block subscription from genesis. CREATE TABLE chain_state ( -- Singleton constraint: only one row allowed. id INTEGER NOT NULL PRIMARY KEY CHECK (id = 0), -- Block number of the chain tip. - block_num BIGINT NOT NULL, + block_num BIGINT NOT NULL, -- Serialized BlockHeader. block_header BLOB NOT NULL, + -- Serialized PartialMmr corresponding to `block_header`. + chain_mmr BLOB NOT NULL, CONSTRAINT chain_state_block_num_is_u32 CHECK (block_num BETWEEN 0 AND 0xFFFFFFFF) ); --- Account states: both committed and inflight. --- Committed rows have transaction_id = NULL. Inflight rows have transaction_id set. --- The auto-incrementing order_id preserves insertion order (VecDeque semantics). +-- Committed network accounts, keyed by account ID. +-- +-- The ntx-builder derives all account state from the committed block stream, so we only ever +-- store the latest committed account row per account. CREATE TABLE accounts ( - -- Auto-incrementing ID preserves insertion order. - order_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, -- AccountId serialized bytes (8 bytes). - account_id BLOB NOT NULL, + account_id BLOB NOT NULL PRIMARY KEY, -- Serialized Account state. - account_data BLOB NOT NULL, - -- NULL if this is the committed state; transaction ID if inflight. - transaction_id BLOB -); - --- At most one committed row per account. -CREATE UNIQUE INDEX idx_accounts_committed ON accounts(account_id) WHERE transaction_id IS NULL; --- At most one inflight row per (account, transaction) pair. -CREATE UNIQUE INDEX idx_accounts_inflight ON accounts(account_id, transaction_id) - WHERE transaction_id IS NOT NULL; -CREATE INDEX idx_accounts_account ON accounts(account_id); -CREATE INDEX idx_accounts_tx ON accounts(transaction_id) WHERE transaction_id IS NOT NULL; + account_data BLOB NOT NULL +) WITHOUT ROWID; --- Notes: committed, inflight, and nullified - all in one table. --- created_by = NULL means committed note; non-NULL means created by inflight tx. --- consumed_by = NULL means unconsumed; non-NULL means consumed by inflight tx. --- committed_at = block number when the consuming transaction was committed on-chain. +-- Network notes targeting network accounts, plus backoff metadata used by the actor execution +-- path that consumes them in subsequent PRs. +-- +-- A row is inserted when the note appears in a committed block. When the note's nullifier later +-- appears in a committed block, `committed_at` is set to that block number rather than deleting +-- the row. This lets the `GetNetworkNoteStatus` endpoint surface the full lifecycle (pending, +-- consumed, discarded) for any note the ntx-builder has ever seen. CREATE TABLE notes ( -- Nullifier bytes (32 bytes). Primary key. nullifier BLOB PRIMARY KEY, -- Target account ID. account_id BLOB NOT NULL, - -- Serialized SingleTargetNetworkNote. + -- Serialized AccountTargetNetworkNote. note_data BLOB NOT NULL, -- Note ID bytes. note_id BLOB, @@ -52,12 +48,8 @@ CREATE TABLE notes ( last_attempt BIGINT, -- Latest execution error message. NULL if no error recorded. last_error TEXT, - -- NULL if the note came from a committed block; transaction ID if created by inflight tx. - created_by BLOB, - -- NULL if unconsumed; transaction ID of the consuming inflight tx. - consumed_by BLOB, - -- Block number at which the note's consuming transaction was committed. - -- NULL while the note is still pending or in-flight; set on block commit. + -- Block number in which the note's nullifier was observed in a committed block. NULL while + -- the note is still pending consumption. committed_at BIGINT, CONSTRAINT notes_attempt_count_non_negative CHECK (attempt_count >= 0), @@ -65,9 +57,8 @@ CREATE TABLE notes ( CONSTRAINT notes_committed_at_is_u32 CHECK (committed_at BETWEEN 0 AND 0xFFFFFFFF) ) WITHOUT ROWID; -CREATE INDEX idx_notes_account ON notes(account_id); -CREATE INDEX idx_notes_created_by ON notes(created_by) WHERE created_by IS NOT NULL; -CREATE INDEX idx_notes_consumed_by ON notes(consumed_by) WHERE consumed_by IS NOT NULL; +-- Partial index covers the actor's hot path (`account_id = ? AND committed_at IS NULL`). +CREATE INDEX idx_notes_account_pending ON notes(account_id) WHERE committed_at IS NULL; CREATE INDEX idx_notes_note_id ON notes(note_id) WHERE note_id IS NOT NULL; -- Persistent cache of note scripts, keyed by script root hash. diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index ac0f657211..4c85efba0b 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -5,14 +5,13 @@ use anyhow::Context; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::Word; -use miden_protocol::account::Account; -use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::block::{BlockHeader, BlockNumber}; +use miden_protocol::crypto::merkle::mmr::PartialMmr; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionId; use miden_standards::note::AccountTargetNetworkNote; use tracing::{info, instrument}; +use crate::committed_block::CommittedBlockEffects; use crate::db::migrations::apply_migrations; use crate::db::models::queries; use crate::{COMPONENT, NoteError}; @@ -72,7 +71,31 @@ impl Db { Ok(Db { inner }) } - // PUBLIC QUERY METHODS + // BLOCK APPLICATION + // ============================================================================================ + + /// Applies the effects of a committed block (account upserts, note inserts, nullifier-driven + /// deletes, and chain-state advancement) in a single transaction. Returns the set of network + /// accounts touched by this block. + pub async fn apply_committed_block( + &self, + effects: CommittedBlockEffects, + chain_mmr: PartialMmr, + ) -> Result> { + self.inner + .transact("apply_committed_block", move |conn| { + queries::apply_committed_block(conn, &effects, &chain_mmr) + }) + .await + } + + /// Reads the singleton chain state row, returning the last synced block number, its header, and + /// the persisted chain MMR if any block has been applied locally. + pub async fn get_chain_state(&self) -> Result> { + self.inner.query("get_chain_state", queries::select_chain_state).await + } + + // ACTOR-PATH QUERIES // ============================================================================================ /// Returns `true` if there are notes available for consumption by the given account. @@ -90,18 +113,11 @@ impl Db { .await } - /// Returns `true` when an inflight account row exists with the given transaction ID. - pub async fn transaction_exists(&self, tx_id: TransactionId) -> Result { - self.inner - .query("transaction_exists", move |conn| queries::transaction_exists(conn, &tx_id)) - .await - } - /// Returns `true` if a committed account state exists for the given account. pub async fn has_committed_account(&self, account_id: NetworkAccountId) -> Result { self.inner .query("has_committed_account", move |conn| { - Ok(queries::get_committed_account(conn, account_id)?.is_some()) + Ok(queries::get_account(conn, account_id)?.is_some()) }) .await } @@ -112,7 +128,7 @@ impl Db { account_id: NetworkAccountId, block_num: BlockNumber, max_note_attempts: usize, - ) -> Result<(Option, Vec)> { + ) -> Result<(Option, Vec)> { self.inner .query("select_candidate", move |conn| { let account = queries::get_account(conn, account_id)?; @@ -145,102 +161,72 @@ impl Db { .await } - /// Handles a `TransactionAdded` mempool event by writing effects to the DB. - pub async fn handle_transaction_added( - &self, - tx_id: TransactionId, - account_delta: Option, - notes: Vec, - nullifiers: Vec, - ) -> Result<()> { - self.inner - .transact("handle_transaction_added", move |conn| { - queries::add_transaction(conn, &tx_id, account_delta.as_ref(), ¬es, &nullifiers) - }) - .await - } + // SCRIPT CACHE + // ============================================================================================ - /// Handles a `BlockCommitted` mempool event by committing transaction effects. - /// - /// Returns the list of affected account IDs that should be notified. - pub async fn handle_block_committed( - &self, - txs: Vec, - block_num: BlockNumber, - header: BlockHeader, - ) -> Result> { + /// Looks up a cached note script by root hash. + pub async fn lookup_note_script(&self, script_root: Word) -> Result> { self.inner - .transact("handle_block_committed", move |conn| { - queries::commit_block(conn, &txs, block_num, &header) + .query("lookup_note_script", move |conn| { + queries::lookup_note_script(conn, &script_root) }) .await } - /// Handles a `TransactionsReverted` mempool event by undoing transaction effects. - /// - /// Returns all affected account IDs that should be notified. - pub async fn handle_transactions_reverted( - &self, - tx_ids: Vec, - ) -> Result> { + /// Persists a note script to the local cache. + pub async fn insert_note_script(&self, script_root: Word, script: &NoteScript) -> Result<()> { + let script = script.clone(); self.inner - .transact("handle_transactions_reverted", move |conn| { - queries::revert_transaction(conn, &tx_ids) + .transact("insert_note_script", move |conn| { + queries::insert_note_script(conn, &script_root, &script) }) .await } - /// Purges all inflight state. Called on startup to get a clean slate. - pub async fn purge_inflight(&self) -> Result<()> { - self.inner.transact("purge_inflight", queries::purge_inflight).await - } + // DEAD-CODE STUBS + // ============================================================================================ + // + // These methods exist to keep the dead actor/coordinator modules compiling in PR 1. They are + // never reached because `NetworkTransactionBuilder` does not spawn the actor path. PR 2 + // replaces them with their new committed-block-driven equivalents. - /// Inserts or replaces the singleton chain state row. - pub async fn upsert_chain_state( + #[expect(clippy::unused_async)] + pub async fn transaction_exists( &self, - block_num: BlockNumber, - header: BlockHeader, - ) -> Result<()> { - self.inner - .transact("upsert_chain_state", move |conn| { - queries::upsert_chain_state(conn, block_num, &header) - }) - .await + _tx_id: miden_protocol::transaction::TransactionId, + ) -> Result { + unimplemented!("transaction_exists is rewired in PR 2 of the ntx-builder refactor") } - /// Syncs an account and its notes from the store into the DB. - pub async fn sync_account_from_store( + #[expect(clippy::unused_async)] + pub async fn handle_transaction_added( &self, - account_id: NetworkAccountId, - account: Account, - notes: Vec, + _tx_id: miden_protocol::transaction::TransactionId, + _account_delta: Option, + _notes: Vec, + _nullifiers: Vec, ) -> Result<()> { - self.inner - .transact("sync_account_from_store", move |conn| { - queries::upsert_committed_account(conn, account_id, &account)?; - queries::insert_committed_notes(conn, ¬es)?; - Ok(()) - }) - .await + unimplemented!("handle_transaction_added is rewired in PR 2 of the ntx-builder refactor") } - /// Looks up a cached note script by root hash. - pub async fn lookup_note_script(&self, script_root: Word) -> Result> { - self.inner - .query("lookup_note_script", move |conn| { - queries::lookup_note_script(conn, &script_root) - }) - .await + #[expect(clippy::unused_async)] + pub async fn handle_block_committed( + &self, + _txs: Vec, + _block_num: BlockNumber, + _header: BlockHeader, + ) -> Result> { + unimplemented!("handle_block_committed is rewired in PR 2 of the ntx-builder refactor") } - /// Persists a note script to the local cache. - pub async fn insert_note_script(&self, script_root: Word, script: &NoteScript) -> Result<()> { - let script = script.clone(); - self.inner - .transact("insert_note_script", move |conn| { - queries::insert_note_script(conn, &script_root, &script) - }) - .await + #[expect(clippy::unused_async)] + pub async fn handle_transactions_reverted( + &self, + _tx_ids: Vec, + ) -> Result> { + unimplemented!( + "handle_transactions_reverted is rewired in PR 2 of the ntx-builder refactor" + ) } /// Creates a file-backed SQLite test connection with migrations applied. diff --git a/bin/ntx-builder/src/db/models/account_effect.rs b/bin/ntx-builder/src/db/models/account_effect.rs index 7a6acf0058..32353129af 100644 --- a/bin/ntx-builder/src/db/models/account_effect.rs +++ b/bin/ntx-builder/src/db/models/account_effect.rs @@ -28,6 +28,7 @@ impl NetworkAccountEffect { update.protocol_account_id().is_network().then_some(update) } + #[expect(dead_code)] pub fn network_account_id(&self) -> NetworkAccountId { // SAFETY: This is a network account by construction. self.protocol_account_id().try_into().unwrap() diff --git a/bin/ntx-builder/src/db/models/conv.rs b/bin/ntx-builder/src/db/models/conv.rs index 0fed2b9593..83663d4316 100644 --- a/bin/ntx-builder/src/db/models/conv.rs +++ b/bin/ntx-builder/src/db/models/conv.rs @@ -24,6 +24,7 @@ pub fn network_account_id_to_bytes(id: NetworkAccountId) -> Vec { id.inner().to_bytes() } +#[expect(dead_code)] pub fn transaction_id_to_bytes(id: &TransactionId) -> Vec { id.to_bytes() } @@ -56,6 +57,7 @@ pub fn account_id_from_bytes(bytes: &[u8]) -> Result { AccountId::read_from_bytes(bytes).map_err(|e| DatabaseError::deserialization("account id", e)) } +#[expect(dead_code)] pub fn network_account_id_from_bytes(bytes: &[u8]) -> Result { let account_id = account_id_from_bytes(bytes)?; NetworkAccountId::try_from(account_id) diff --git a/bin/ntx-builder/src/db/models/queries/accounts.rs b/bin/ntx-builder/src/db/models/queries/accounts.rs index 79035918e0..cdfdfad9e4 100644 --- a/bin/ntx-builder/src/db/models/queries/accounts.rs +++ b/bin/ntx-builder/src/db/models/queries/accounts.rs @@ -1,11 +1,9 @@ //! Account-related queries and models. -use diesel::dsl::exists; use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::account::Account; -use miden_protocol::transaction::TransactionId; use crate::db::models::conv as conversions; use crate::db::schema; @@ -13,19 +11,14 @@ use crate::db::schema; // MODELS // ================================================================================================ -/// Row for inserting into the unified `accounts` table. -/// -/// `transaction_id = None` means committed; `Some(tx_id_bytes)` means inflight. #[derive(Debug, Clone, Insertable)] #[diesel(table_name = schema::accounts)] #[diesel(check_for_backend(diesel::sqlite::Sqlite))] pub struct AccountInsert { pub account_id: Vec, pub account_data: Vec, - pub transaction_id: Option>, } -/// Row read from `accounts`. #[derive(Debug, Clone, Queryable, Selectable)] #[diesel(table_name = schema::accounts)] #[diesel(check_for_backend(diesel::sqlite::Sqlite))] @@ -36,54 +29,33 @@ pub struct AccountRow { // QUERIES // ================================================================================================ -/// Inserts or replaces the committed account state (`transaction_id = NULL`). -/// -/// Deletes any existing committed row first, then inserts a fresh one. +/// Inserts or replaces the committed account state. /// /// # Raw SQL /// /// ```sql -/// DELETE FROM accounts WHERE account_id = ?1 AND transaction_id IS NULL -/// -/// INSERT INTO accounts (account_id, account_data, transaction_id) -/// VALUES (?1, ?2, NULL) +/// INSERT OR REPLACE INTO accounts (account_id, account_data) +/// VALUES (?1, ?2) /// ``` -pub fn upsert_committed_account( +pub fn upsert_account( conn: &mut SqliteConnection, account_id: NetworkAccountId, account: &Account, ) -> Result<(), DatabaseError> { - let account_id_bytes = conversions::network_account_id_to_bytes(account_id); - - // Delete the existing committed row (if any). - diesel::delete( - schema::accounts::table - .filter(schema::accounts::account_id.eq(&account_id_bytes)) - .filter(schema::accounts::transaction_id.is_null()), - ) - .execute(conn)?; - - // Insert the new committed row. let row = AccountInsert { - account_id: account_id_bytes, + account_id: conversions::network_account_id_to_bytes(account_id), account_data: conversions::account_to_bytes(account), - transaction_id: None, }; - diesel::insert_into(schema::accounts::table).values(&row).execute(conn)?; + diesel::replace_into(schema::accounts::table).values(&row).execute(conn)?; Ok(()) } -/// Returns the latest account state: last inflight row (highest `order_id`), or committed if -/// none. +/// Returns the committed account state for the given network account. /// /// # Raw SQL /// /// ```sql -/// SELECT account_data -/// FROM accounts -/// WHERE account_id = ?1 -/// ORDER BY order_id DESC -/// LIMIT 1 +/// SELECT account_data FROM accounts WHERE account_id = ?1 /// ``` pub fn get_account( conn: &mut SqliteConnection, @@ -91,62 +63,12 @@ pub fn get_account( ) -> Result, DatabaseError> { let account_id_bytes = conversions::network_account_id_to_bytes(account_id); - // ORDER BY order_id DESC returns the latest inflight first, then committed. - let row: Option = schema::accounts::table - .filter(schema::accounts::account_id.eq(&account_id_bytes)) - .order(schema::accounts::order_id.desc()) - .select(AccountRow::as_select()) - .first(conn) - .optional()?; - - row.map(|AccountRow { account_data, .. }| conversions::account_from_bytes(&account_data)) - .transpose() -} - -/// Returns the committed account state (`transaction_id IS NULL`), ignoring any inflight rows. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT account_data -/// FROM accounts -/// WHERE account_id = ?1 AND transaction_id IS NULL -/// LIMIT 1 -/// ``` -pub fn get_committed_account( - conn: &mut SqliteConnection, - account_id: NetworkAccountId, -) -> Result, DatabaseError> { - let account_id_bytes = conversions::network_account_id_to_bytes(account_id); - let row: Option = schema::accounts::table - .filter(schema::accounts::account_id.eq(&account_id_bytes)) - .filter(schema::accounts::transaction_id.is_null()) + .find(&account_id_bytes) .select(AccountRow::as_select()) .first(conn) .optional()?; - row.map(|AccountRow { account_data, .. }| conversions::account_from_bytes(&account_data)) + row.map(|AccountRow { account_data }| conversions::account_from_bytes(&account_data)) .transpose() } - -/// Returns `true` when an inflight account row exists with the given `transaction_id`. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT EXISTS (SELECT 1 FROM accounts WHERE transaction_id = ?1) -/// ``` -pub fn transaction_exists( - conn: &mut SqliteConnection, - tx_id: &TransactionId, -) -> Result { - let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - - let result: bool = diesel::select(exists( - schema::accounts::table.filter(schema::accounts::transaction_id.eq(&tx_id_bytes)), - )) - .get_result(conn)?; - - Ok(result) -} diff --git a/bin/ntx-builder/src/db/models/queries/chain_state.rs b/bin/ntx-builder/src/db/models/queries/chain_state.rs index 9b529cadc5..1b8f27363f 100644 --- a/bin/ntx-builder/src/db/models/queries/chain_state.rs +++ b/bin/ntx-builder/src/db/models/queries/chain_state.rs @@ -3,6 +3,8 @@ use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_protocol::block::{BlockHeader, BlockNumber}; +use miden_protocol::crypto::merkle::mmr::PartialMmr; +use miden_protocol::utils::serde::{Deserializable, Serializable}; use crate::db::models::conv as conversions; use crate::db::schema; @@ -18,29 +20,70 @@ pub struct ChainStateInsert { pub id: i32, pub block_num: i64, pub block_header: Vec, + pub chain_mmr: Vec, +} + +#[derive(Debug, Clone, Queryable, Selectable)] +#[diesel(table_name = schema::chain_state)] +#[diesel(check_for_backend(diesel::sqlite::Sqlite))] +struct ChainStateRow { + block_num: i64, + block_header: Vec, + chain_mmr: Vec, } // QUERIES // ================================================================================================ -/// Inserts or replaces the singleton chain state row. +/// Inserts or replaces the singleton chain state row, persisting the chain tip header and the +/// associated partial chain MMR. /// /// # Raw SQL /// /// ```sql -/// INSERT OR REPLACE INTO chain_state (id, block_num, block_header) -/// VALUES (0, ?1, ?2) +/// INSERT OR REPLACE INTO chain_state (id, block_num, block_header, chain_mmr) +/// VALUES (0, ?1, ?2, ?3) /// ``` pub fn upsert_chain_state( conn: &mut SqliteConnection, block_num: BlockNumber, block_header: &BlockHeader, + chain_mmr: &PartialMmr, ) -> Result<(), DatabaseError> { let row = ChainStateInsert { id: 0, block_num: conversions::block_num_to_i64(block_num), block_header: conversions::block_header_to_bytes(block_header), + chain_mmr: chain_mmr.to_bytes(), }; diesel::replace_into(schema::chain_state::table).values(&row).execute(conn)?; Ok(()) } + +/// Reads the singleton chain state row, returning the persisted block number, header, and chain +/// MMR if any block has been applied locally. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT block_num, block_header, chain_mmr FROM chain_state WHERE id = 0 +/// ``` +pub fn select_chain_state( + conn: &mut SqliteConnection, +) -> Result, DatabaseError> { + let row: Option = schema::chain_state::table + .find(0i32) + .select(ChainStateRow::as_select()) + .first(conn) + .optional()?; + + row.map(|ChainStateRow { block_num, block_header, chain_mmr }| { + let block_num = conversions::block_num_from_i64(block_num); + let header = BlockHeader::read_from_bytes(&block_header) + .map_err(|e| DatabaseError::deserialization("block header", e))?; + let mmr = PartialMmr::read_from_bytes(&chain_mmr) + .map_err(|e| DatabaseError::deserialization("chain mmr", e))?; + Ok((block_num, header, mmr)) + }) + .transpose() +} diff --git a/bin/ntx-builder/src/db/models/queries/mod.rs b/bin/ntx-builder/src/db/models/queries/mod.rs index f2f97eaf3e..556a2e3f69 100644 --- a/bin/ntx-builder/src/db/models/queries/mod.rs +++ b/bin/ntx-builder/src/db/models/queries/mod.rs @@ -5,16 +5,10 @@ use std::collections::HashSet; use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; -use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::block::{BlockHeader, BlockNumber}; -use miden_protocol::note::Nullifier; -use miden_protocol::transaction::TransactionId; -use miden_protocol::utils::serde::Serializable; -use miden_standards::note::AccountTargetNetworkNote; +use miden_protocol::crypto::merkle::mmr::PartialMmr; use super::account_effect::NetworkAccountEffect; -use crate::db::models::conv as conversions; -use crate::db::schema; +use crate::committed_block::CommittedBlockEffects; mod accounts; pub use accounts::*; @@ -31,309 +25,61 @@ pub use notes::*; #[cfg(test)] mod tests; -// STARTUP QUERIES +// COMMITTED BLOCK APPLICATION // ================================================================================================ -/// Purges all inflight state. Called on startup to get a clean state. +/// Applies a committed block's effects to the database in a single transaction: /// -/// - Deletes account rows with `transaction_id IS NOT NULL`. -/// - Deletes note rows with `created_by IS NOT NULL`. -/// - Sets `consumed_by = NULL` on notes consumed by inflight transactions. +/// - Upserts each touched network account: new full-state deltas insert, partial deltas apply to +/// the existing committed row. +/// - Inserts each network note (`INSERT OR IGNORE` to tolerate redeliveries). +/// - Marks any of our pending notes whose nullifiers appear in this block as `committed_at = +/// block_num`, preserving the row so the `GetNetworkNoteStatus` endpoint can report the full +/// lifecycle. +/// - Upserts the singleton `chain_state` row with the new block header and the post-application +/// chain MMR. /// -/// # Raw SQL -/// -/// ```sql -/// DELETE FROM accounts WHERE transaction_id IS NOT NULL -/// -/// DELETE FROM notes WHERE created_by IS NOT NULL -/// -/// UPDATE notes SET consumed_by = NULL WHERE consumed_by IS NOT NULL AND committed_at IS NULL -/// ``` -pub fn purge_inflight(conn: &mut SqliteConnection) -> Result<(), DatabaseError> { - // Delete inflight account rows. - diesel::delete(schema::accounts::table.filter(schema::accounts::transaction_id.is_not_null())) - .execute(conn)?; - - // Delete inflight-created notes. - diesel::delete(schema::notes::table.filter(schema::notes::created_by.is_not_null())) - .execute(conn)?; - - // Un-nullify notes consumed by inflight transactions (skip committed notes). - diesel::update( - schema::notes::table - .filter(schema::notes::consumed_by.is_not_null()) - .filter(schema::notes::committed_at.is_null()), - ) - .set(schema::notes::consumed_by.eq(None::>)) - .execute(conn)?; - - Ok(()) -} - -// MEMPOOL EVENT HANDLERS -// ================================================================================================ - -/// Handles a `TransactionAdded` event by writing effects to the DB. -/// -/// # Raw SQL -/// -/// For account updates (applies delta to latest state and inserts inflight row): -/// -/// ```sql -/// -- Fetch latest account (see latest_account) -/// INSERT INTO accounts (account_id, transaction_id, account_data) -/// VALUES (?1, ?2, ?3) -/// ``` -/// -/// Per note (idempotent via `INSERT OR IGNORE`): -/// -/// ```sql -/// INSERT OR IGNORE INTO notes -/// (nullifier, account_id, note_data, attempt_count, last_attempt, created_by, consumed_by) -/// VALUES (?1, ?2, ?3, 0, NULL, ?4, NULL) -/// ``` -/// -/// Per nullifier (marks notes as consumed): -/// -/// ```sql -/// UPDATE notes -/// SET consumed_by = ?1 -/// WHERE nullifier = ?2 AND consumed_by IS NULL -/// ``` -pub fn add_transaction( +/// Returns the set of network accounts that were touched by this block (account-state updates or +/// notes targeting the account). +pub fn apply_committed_block( conn: &mut SqliteConnection, - tx_id: &TransactionId, - account_delta: Option<&AccountUpdateDetails>, - notes: &[AccountTargetNetworkNote], - nullifiers: &[Nullifier], -) -> Result<(), DatabaseError> { - let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - - // Process account delta. - if let Some(update) = account_delta.and_then(NetworkAccountEffect::from_protocol) { - let account_id = update.network_account_id(); - match update { - NetworkAccountEffect::Updated(ref account_delta) => { - // Query latest_account, apply delta, insert inflight row. - let current_account = - get_account(conn, account_id)?.expect("account must exist to apply delta"); - let mut updated = current_account; - updated.apply_delta(account_delta).expect( - "network account delta should apply since it was accepted by the mempool", - ); + effects: &CommittedBlockEffects, + chain_mmr: &PartialMmr, +) -> Result, DatabaseError> { + let mut affected_accounts: HashSet = HashSet::new(); - let insert = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(tx_id_bytes.clone()), - account_data: conversions::account_to_bytes(&updated), - }; - diesel::insert_into(schema::accounts::table).values(&insert).execute(conn)?; + for (network_id, details) in &effects.network_account_updates { + let Some(effect) = NetworkAccountEffect::from_protocol(details) else { + continue; + }; + match effect { + NetworkAccountEffect::Created(account) => { + upsert_account(conn, *network_id, &account)?; }, - NetworkAccountEffect::Created(ref account) => { - let insert = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(tx_id_bytes.clone()), - account_data: conversions::account_to_bytes(account), - }; - diesel::insert_into(schema::accounts::table).values(&insert).execute(conn)?; + NetworkAccountEffect::Updated(delta) => { + let mut current = get_account(conn, *network_id)?.expect( + "account must exist locally to apply a non-full-state delta from a committed \ + block", + ); + current + .apply_delta(&delta) + .expect("network account delta should apply since the block was committed"); + upsert_account(conn, *network_id, ¤t)?; }, } + affected_accounts.insert(*network_id); } - // Insert notes with created_by = tx_id. Uses INSERT OR IGNORE to make this idempotent if the - // same event is delivered twice (the nullifier PK would otherwise cause a constraint - // violation). - for note in notes { - let insert = NoteInsert { - nullifier: conversions::nullifier_to_bytes(¬e.as_note().nullifier()), - account_id: conversions::network_account_id_to_bytes( - note.target_account_id() - .try_into() - .expect("network note's target account must be a network account"), - ), - note_data: note.as_note().to_bytes(), - note_id: Some(conversions::note_id_to_bytes(¬e.as_note().id())), - attempt_count: 0, - last_attempt: None, - last_error: None, - created_by: Some(tx_id_bytes.clone()), - consumed_by: None, - committed_at: None, - }; - diesel::insert_or_ignore_into(schema::notes::table) - .values(&insert) - .execute(conn)?; + for note in &effects.network_notes { + let target = NetworkAccountId::try_from(note.target_account_id()) + .expect("network note's target account must be a network account"); + affected_accounts.insert(target); } + insert_network_notes(conn, &effects.network_notes)?; - // Mark consumed notes: set consumed_by = tx_id for matching nullifiers. - for nullifier in nullifiers { - let nullifier_bytes = conversions::nullifier_to_bytes(nullifier); + mark_notes_consumed(conn, &effects.nullifiers, effects.header.block_num())?; - // Only mark notes that are not already consumed. - diesel::update( - schema::notes::table - .find(&nullifier_bytes) - .filter(schema::notes::consumed_by.is_null()), - ) - .set(schema::notes::consumed_by.eq(Some(&tx_id_bytes))) - .execute(conn)?; - } - - Ok(()) -} - -/// Handles a `BlockCommitted` event by committing transaction effects. -/// -/// # Raw SQL -/// -/// Per committed transaction: -/// -/// ```sql -/// -- Find inflight accounts for this tx -/// SELECT account_id FROM accounts WHERE transaction_id = ?1 -/// -/// -- Delete old committed row -/// DELETE FROM accounts WHERE account_id = ?1 AND transaction_id IS NULL -/// -/// -- Promote inflight row to committed -/// UPDATE accounts SET transaction_id = NULL -/// WHERE account_id = ?1 AND transaction_id = ?2 -/// -/// -- Mark consumed notes as committed -/// UPDATE notes SET committed_at = ?block_num WHERE consumed_by = ?1 -/// -/// -- Promote inflight-created notes to committed -/// UPDATE notes SET created_by = NULL WHERE created_by = ?1 -/// ``` -/// -/// Finally updates chain state (see [`upsert_chain_state`]). -pub fn commit_block( - conn: &mut SqliteConnection, - tx_ids: &[TransactionId], - block_num: BlockNumber, - block_header: &BlockHeader, -) -> Result, DatabaseError> { - let mut affected_accounts = HashSet::new(); - - for tx_id in tx_ids { - let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - - // Promote inflight account rows: delete old committed, set transaction_id = NULL. Find - // accounts that have an inflight row for this tx. - let inflight_account_ids: Vec> = schema::accounts::table - .filter(schema::accounts::transaction_id.eq(&tx_id_bytes)) - .select(schema::accounts::account_id) - .load(conn)?; - - for account_id_bytes in &inflight_account_ids { - affected_accounts.insert(conversions::network_account_id_from_bytes(account_id_bytes)?); - - // Delete the old committed row for this account. - diesel::delete( - schema::accounts::table - .filter(schema::accounts::account_id.eq(account_id_bytes)) - .filter(schema::accounts::transaction_id.is_null()), - ) - .execute(conn)?; - - // Promote the inflight row to committed (set transaction_id = NULL). Only promote the - // row for this specific tx. - diesel::update( - schema::accounts::table - .filter(schema::accounts::account_id.eq(account_id_bytes)) - .filter(schema::accounts::transaction_id.eq(&tx_id_bytes)), - ) - .set(schema::accounts::transaction_id.eq(None::>)) - .execute(conn)?; - } - - // Collect accounts of notes consumed by this tx. - let consumed_note_accounts: Vec> = schema::notes::table - .filter(schema::notes::consumed_by.eq(&tx_id_bytes)) - .select(schema::notes::account_id) - .load(conn)?; - for account_id_bytes in &consumed_note_accounts { - affected_accounts.insert(conversions::network_account_id_from_bytes(account_id_bytes)?); - } - - // Mark consumed notes as committed (set committed_at = block_num). - let block_num_val = conversions::block_num_to_i64(block_num); - diesel::update(schema::notes::table.filter(schema::notes::consumed_by.eq(&tx_id_bytes))) - .set(schema::notes::committed_at.eq(Some(block_num_val))) - .execute(conn)?; - - // Promote inflight-created notes to committed (set created_by = NULL). - diesel::update(schema::notes::table.filter(schema::notes::created_by.eq(&tx_id_bytes))) - .set(schema::notes::created_by.eq(None::>)) - .execute(conn)?; - } - - // Update chain state. - upsert_chain_state(conn, block_num, block_header)?; - - Ok(affected_accounts.into_iter().collect()) -} - -/// Handles a `TransactionsReverted` event by undoing transaction effects. -/// -/// Returns all affected account IDs (for notification). Accounts whose creation was fully -/// reverted are included. -/// -/// # Raw SQL -/// -/// Per reverted transaction: -/// -/// ```sql -/// DELETE FROM accounts WHERE transaction_id = ?1 RETURNING account_id -/// -/// DELETE FROM notes WHERE created_by = ?1 -/// -/// UPDATE notes SET consumed_by = NULL WHERE consumed_by = ?1 RETURNING account_id -/// ``` -pub fn revert_transaction( - conn: &mut SqliteConnection, - tx_ids: &[TransactionId], -) -> Result, DatabaseError> { - use diesel::sql_types::Binary; - - let mut affected_accounts = HashSet::new(); - - for tx_id in tx_ids { - let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - - // Delete inflight account rows and collect affected account IDs. - let deleted_accounts: Vec = diesel::sql_query( - "DELETE FROM accounts WHERE transaction_id = ?1 RETURNING account_id", - ) - .bind::(&tx_id_bytes) - .load(conn)?; - - for row in &deleted_accounts { - affected_accounts.insert(conversions::network_account_id_from_bytes(&row.account_id)?); - } - - // Delete inflight-created notes (created_by = tx_id). - diesel::delete(schema::notes::table.filter(schema::notes::created_by.eq(&tx_id_bytes))) - .execute(conn)?; - - // Restore consumed notes and collect affected account IDs. - let restored_accounts: Vec = diesel::sql_query( - "UPDATE notes SET consumed_by = NULL WHERE consumed_by = ?1 RETURNING account_id", - ) - .bind::(&tx_id_bytes) - .load(conn)?; - - for row in &restored_accounts { - affected_accounts.insert(conversions::network_account_id_from_bytes(&row.account_id)?); - } - } + upsert_chain_state(conn, effects.header.block_num(), &effects.header, chain_mmr)?; Ok(affected_accounts.into_iter().collect()) } - -/// Helper row type for `RETURNING account_id` queries. -#[derive(diesel::QueryableByName)] -struct AccountIdRow { - #[diesel(sql_type = diesel::sql_types::Binary)] - account_id: Vec, -} diff --git a/bin/ntx-builder/src/db/models/queries/notes.rs b/bin/ntx-builder/src/db/models/queries/notes.rs index bb846d7a5e..d0892bf23e 100644 --- a/bin/ntx-builder/src/db/models/queries/notes.rs +++ b/bin/ntx-builder/src/db/models/queries/notes.rs @@ -15,7 +15,7 @@ use crate::db::schema; // MODELS // ================================================================================================ -/// Row read from the unified `notes` table. +/// Row read from `notes`. #[derive(Debug, Clone, Queryable, Selectable)] #[diesel(table_name = schema::notes)] #[diesel(check_for_backend(diesel::sqlite::Sqlite))] @@ -25,7 +25,7 @@ pub struct NoteRow { pub last_attempt: Option, } -/// Row for inserting into the unified `notes` table. +/// Row for inserting into `notes`. #[derive(Debug, Clone, Insertable)] #[diesel(table_name = schema::notes)] #[diesel(check_for_backend(diesel::sqlite::Sqlite))] @@ -37,8 +37,6 @@ pub struct NoteInsert { pub attempt_count: i32, pub last_attempt: Option, pub last_error: Option, - pub created_by: Option>, - pub consumed_by: Option>, pub committed_at: Option, } @@ -51,66 +49,63 @@ pub struct NoteStatusRow { pub last_error: Option, pub attempt_count: i32, pub last_attempt: Option, - pub consumed_by: Option>, pub committed_at: Option, } // QUERIES // ================================================================================================ -/// Batch inserts committed notes (`created_by = NULL`, `consumed_by = NULL`). -/// -/// # Raw SQL -/// -/// Per note: -/// -/// ```sql -/// INSERT OR REPLACE INTO notes -/// (nullifier, account_id, note_data, note_id, attempt_count, last_attempt, last_error, -/// created_by, consumed_by) -/// VALUES (?1, ?2, ?3, ?4, 0, NULL, NULL, NULL, NULL) -/// ``` -pub fn insert_committed_notes( +/// Inserts network notes from a committed block. Uses `INSERT OR IGNORE` so re-applying the same +/// block (e.g. on a redelivery from the subscription stream) is a no-op rather than a constraint +/// violation. +pub fn insert_network_notes( conn: &mut SqliteConnection, notes: &[AccountTargetNetworkNote], ) -> Result<(), DatabaseError> { for note in notes { + let target_id = NetworkAccountId::try_from(note.target_account_id()) + .expect("network note's target account must be a network account"); let row = NoteInsert { nullifier: conversions::nullifier_to_bytes(¬e.as_note().nullifier()), - account_id: conversions::network_account_id_to_bytes( - NetworkAccountId::try_from(note.target_account_id()) - .expect("account ID of a network note should be a network account"), - ), + account_id: conversions::network_account_id_to_bytes(target_id), note_data: note.as_note().to_bytes(), note_id: Some(conversions::note_id_to_bytes(¬e.as_note().id())), attempt_count: 0, last_attempt: None, last_error: None, - created_by: None, - consumed_by: None, committed_at: None, }; - diesel::replace_into(schema::notes::table).values(&row).execute(conn)?; + diesel::insert_or_ignore_into(schema::notes::table).values(&row).execute(conn)?; } Ok(()) } -/// Returns notes available for consumption by a given account. -/// -/// Queries unconsumed notes (`consumed_by IS NULL`) for the account that have not exceeded the -/// maximum attempt count, then applies backoff and execution hint filtering in Rust. +/// Marks notes as consumed by setting `committed_at` to the block number whose committed body +/// contained their nullifier. Rows for nullifiers we never inserted (notes whose targets are not +/// network accounts, or notes that arrived before our subscription cursor) are silently skipped. /// -/// # Raw SQL +/// Rows are kept around (not deleted) so the `GetNetworkNoteStatus` endpoint can report the full +/// lifecycle of any note the ntx-builder has ever seen. +pub fn mark_notes_consumed( + conn: &mut SqliteConnection, + nullifiers: &[Nullifier], + block_num: BlockNumber, +) -> Result<(), DatabaseError> { + let block_num_val = conversions::block_num_to_i64(block_num); + for nullifier in nullifiers { + let nullifier_bytes = conversions::nullifier_to_bytes(nullifier); + diesel::update(schema::notes::table.find(&nullifier_bytes)) + .filter(schema::notes::committed_at.is_null()) + .set(schema::notes::committed_at.eq(Some(block_num_val))) + .execute(conn)?; + } + Ok(()) +} + +/// Returns notes available for consumption by a given account. /// -/// ```sql -/// SELECT note_data, attempt_count, last_attempt -/// FROM notes -/// WHERE -/// account_id = ?1 -/// AND consumed_by IS NULL -/// AND committed_at IS NULL -/// AND attempt_count < ?2 -/// ``` +/// Selects unconsumed notes for the account (a row exists only while a note is unconsumed) whose +/// `attempt_count` is below the cap, then applies execution-hint and backoff filtering in Rust. #[expect(clippy::cast_possible_wrap)] pub fn available_notes( conn: &mut SqliteConnection, @@ -120,11 +115,8 @@ pub fn available_notes( ) -> Result, DatabaseError> { let account_id_bytes = conversions::network_account_id_to_bytes(account_id); - // Get unconsumed, uncommitted notes for this account that haven't exceeded the max attempt - // count. let rows: Vec = schema::notes::table .filter(schema::notes::account_id.eq(&account_id_bytes)) - .filter(schema::notes::consumed_by.is_null()) .filter(schema::notes::committed_at.is_null()) .filter(schema::notes::attempt_count.lt(max_attempts as i32)) .select(NoteRow::as_select()) @@ -146,18 +138,8 @@ pub fn available_notes( Ok(result) } -/// Marks notes as failed by incrementing `attempt_count`, setting `last_attempt`, and storing -/// the latest error message. -/// -/// # Raw SQL -/// -/// Per nullifier: -/// -/// ```sql -/// UPDATE notes -/// SET attempt_count = attempt_count + 1, last_attempt = ?1, last_error = ?2 -/// WHERE nullifier = ?3 -/// ``` +/// Marks notes as failed by incrementing `attempt_count`, setting `last_attempt`, and storing the +/// latest error message. pub fn notes_failed( conn: &mut SqliteConnection, failed_notes: &[(Nullifier, NoteError)], @@ -181,14 +163,6 @@ pub fn notes_failed( } /// Returns the status for a note identified by its note ID. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT note_id, last_error, attempt_count, last_attempt, consumed_by -/// FROM notes -/// WHERE note_id = ?1 -/// ``` pub fn get_note_status( conn: &mut SqliteConnection, note_id_bytes: &[u8], @@ -217,13 +191,6 @@ fn deserialize_note(note_data: &[u8]) -> Result backoff_threshold } diff --git a/bin/ntx-builder/src/db/models/queries/tests.rs b/bin/ntx-builder/src/db/models/queries/tests.rs index f46b1aafab..7768d5e025 100644 --- a/bin/ntx-builder/src/db/models/queries/tests.rs +++ b/bin/ntx-builder/src/db/models/queries/tests.rs @@ -1,14 +1,14 @@ -//! DB-level tests for NTX builder query functions. +//! DB-level tests for the committed-block-driven query layer. use std::sync::Arc; use diesel::prelude::*; use miden_protocol::Word; use miden_protocol::block::BlockNumber; +use miden_protocol::crypto::merkle::mmr::PartialMmr; use super::*; use crate::NoteError; -use crate::db::models::conv as conversions; use crate::db::{Db, schema}; use crate::test_utils::*; @@ -17,9 +17,6 @@ fn test_note_error(msg: &str) -> NoteError { Arc::new(std::io::Error::other(msg.to_string())) } -// TEST HELPERS -// ================================================================================================ - /// Creates a file-backed SQLite connection with migrations applied. fn test_conn() -> (SqliteConnection, tempfile::TempDir) { Db::test_conn() @@ -35,607 +32,212 @@ fn count_accounts(conn: &mut SqliteConnection) -> i64 { schema::accounts::table.count().get_result(conn).unwrap() } -/// Counts inflight account rows. -fn count_inflight_accounts(conn: &mut SqliteConnection) -> i64 { - schema::accounts::table - .filter(schema::accounts::transaction_id.is_not_null()) - .count() - .get_result(conn) - .unwrap() -} - -/// Counts committed account rows. -fn count_committed_accounts(conn: &mut SqliteConnection) -> i64 { - schema::accounts::table - .filter(schema::accounts::transaction_id.is_null()) - .count() - .get_result(conn) - .unwrap() -} - -// PURGE INFLIGHT TESTS +// ACCOUNT UPSERT // ================================================================================================ #[test] -fn purge_inflight_clears_all_inflight_state() { +fn upsert_account_replaces_existing_row() { let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note = mock_single_target_note(account_id, 10); - - // Insert committed account. - upsert_committed_account(conn, account_id, &mock_account(account_id)).unwrap(); - - // Insert a transaction (creates inflight account row + note + consumption). - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); + let account = mock_account(account_id); - assert!(count_inflight_accounts(conn) == 0); // No account delta, so no inflight account. - assert_eq!(count_notes(conn), 1); + upsert_account(conn, account_id, &account).unwrap(); + upsert_account(conn, account_id, &account).unwrap(); - // Mark note as consumed by another tx. - let tx_id2 = mock_tx_id(2); - add_transaction(conn, &tx_id2, None, &[], &[note.as_note().nullifier()]).unwrap(); - - // Verify consumed_by is set. - let consumed_count: i64 = schema::notes::table - .filter(schema::notes::consumed_by.is_not_null()) - .count() - .get_result(conn) - .unwrap(); - assert_eq!(consumed_count, 1); - - // Purge inflight state. - purge_inflight(conn).unwrap(); - - // Inflight accounts should be gone. - assert_eq!(count_inflight_accounts(conn), 0); - // Committed account should remain. - assert_eq!(count_committed_accounts(conn), 1); - // Inflight-created notes should be gone. - assert_eq!(count_notes(conn), 0); + assert_eq!(count_accounts(conn), 1, "second upsert must overwrite, not insert"); + assert!(get_account(conn, account_id).unwrap().is_some()); } -// HANDLE TRANSACTION ADDED TESTS +// NETWORK NOTE INSERT/DELETE // ================================================================================================ #[test] -fn transaction_added_inserts_notes_and_marks_consumed() { +fn insert_network_notes_is_idempotent() { let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note1 = mock_single_target_note(account_id, 10); - let note2 = mock_single_target_note(account_id, 20); + let note = mock_single_target_note(account_id, 7); - // Insert committed note first (to test consumption). - insert_committed_notes(conn, std::slice::from_ref(¬e1)).unwrap(); - assert_eq!(count_notes(conn), 1); + insert_network_notes(conn, std::slice::from_ref(¬e)).unwrap(); + // Re-applying the same block (e.g. on a subscription redelivery) must not error or duplicate. + insert_network_notes(conn, std::slice::from_ref(¬e)).unwrap(); - // Add transaction that creates note2 and consumes note1. - add_transaction( - conn, - &tx_id, - None, - std::slice::from_ref(¬e2), - &[note1.as_note().nullifier()], - ) - .unwrap(); - - // Should now have 2 notes total. - assert_eq!(count_notes(conn), 2); - - // note1 should be consumed. - let consumed: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e1.as_note().nullifier())) - .select(schema::notes::consumed_by) - .first(conn) - .unwrap(); - assert!(consumed.is_some()); - - // note2 should have created_by set. - let created: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e2.as_note().nullifier())) - .select(schema::notes::created_by) - .first(conn) - .unwrap(); - assert!(created.is_some()); + assert_eq!(count_notes(conn), 1); } #[test] -fn transaction_added_is_idempotent_for_notes() { +fn mark_notes_consumed_keeps_rows_and_sets_committed_at() { let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note = mock_single_target_note(account_id, 10); + let note_a = mock_single_target_note(account_id, 1); + let note_b = mock_single_target_note(account_id, 2); - // Insert the same transaction twice. - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); + insert_network_notes(conn, &[note_a.clone(), note_b.clone()]).unwrap(); + assert_eq!(count_notes(conn), 2); - // Should only have one note (INSERT OR IGNORE). - assert_eq!(count_notes(conn), 1); -} + let consumed_at = BlockNumber::from(42); + mark_notes_consumed(conn, &[note_a.as_note().nullifier()], consumed_at).unwrap(); -// HANDLE BLOCK COMMITTED TESTS -// ================================================================================================ + // Both rows are still present so the gRPC status endpoint can report them. + assert_eq!(count_notes(conn), 2); -#[test] -fn block_committed_promotes_inflight_notes_to_committed() { - let (conn, _dir) = &mut test_conn(); + let status_a = + get_note_status(conn, &crate::db::models::conv::note_id_to_bytes(¬e_a.as_note().id())) + .unwrap() + .unwrap(); + assert_eq!(status_a.committed_at, Some(i64::from(consumed_at.as_u32()))); - let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note = mock_single_target_note(account_id, 10); - let block_num = BlockNumber::from(1u32); - let header = mock_block_header(block_num); - - // Add a transaction that creates a note. - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); - - // Verify created_by is set. - let created: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select(schema::notes::created_by) - .first(conn) - .unwrap(); - assert!(created.is_some()); - - // Commit the block. - commit_block(conn, &[tx_id], block_num, &header).unwrap(); - - // created_by should now be NULL (promoted to committed). - let created: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select(schema::notes::created_by) - .first(conn) - .unwrap(); - assert!(created.is_none()); + let status_b = + get_note_status(conn, &crate::db::models::conv::note_id_to_bytes(¬e_b.as_note().id())) + .unwrap() + .unwrap(); + assert!(status_b.committed_at.is_none()); } #[test] -fn block_committed_marks_consumed_notes_as_committed() { +fn mark_notes_consumed_is_noop_when_unknown() { let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let note = mock_single_target_note(account_id, 10); - let note_id = note.as_note().id(); - - // Insert a committed note. - insert_committed_notes(conn, std::slice::from_ref(¬e)).unwrap(); - assert_eq!(count_notes(conn), 1); + let note = mock_single_target_note(account_id, 3); + insert_network_notes(conn, std::slice::from_ref(¬e)).unwrap(); - // Consume it via a transaction. - let tx_id = mock_tx_id(1); - add_transaction(conn, &tx_id, None, &[], &[note.as_note().nullifier()]).unwrap(); + // A nullifier we never inserted should not affect existing rows. + let phantom = mock_single_target_note(account_id, 99).as_note().nullifier(); + mark_notes_consumed(conn, &[phantom], BlockNumber::from(5)).unwrap(); - // Commit the block. - let block_num = BlockNumber::from(1u32); - let header = mock_block_header(block_num); - commit_block(conn, &[tx_id], block_num, &header).unwrap(); - - // Note should still exist but be marked as committed. assert_eq!(count_notes(conn), 1); - let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) - .unwrap() - .unwrap(); - assert_eq!(row.committed_at, Some(conversions::block_num_to_i64(block_num))); - assert!(row.consumed_by.is_some()); + let status = + get_note_status(conn, &crate::db::models::conv::note_id_to_bytes(¬e.as_note().id())) + .unwrap() + .unwrap(); + assert!(status.committed_at.is_none()); } #[test] -fn block_committed_promotes_inflight_account_to_committed() { +fn available_notes_excludes_consumed_notes() { let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let account = mock_account(account_id); + let note = mock_single_target_note(account_id, 21); + insert_network_notes(conn, std::slice::from_ref(¬e)).unwrap(); - // Insert committed account. - upsert_committed_account(conn, account_id, &account).unwrap(); - assert_eq!(count_committed_accounts(conn), 1); - - // Insert inflight row. - let tx_id = mock_tx_id(1); - let row = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(conversions::transaction_id_to_bytes(&tx_id)), - account_data: conversions::account_to_bytes(&account), - }; - diesel::insert_into(schema::accounts::table).values(&row).execute(conn).unwrap(); - - assert_eq!(count_inflight_accounts(conn), 1); - assert_eq!(count_committed_accounts(conn), 1); - - // Commit the block. - let block_num = BlockNumber::from(1u32); - let header = mock_block_header(block_num); - commit_block(conn, &[tx_id], block_num, &header).unwrap(); - - // Should have 1 committed and 0 inflight. - assert_eq!(count_committed_accounts(conn), 1); - assert_eq!(count_inflight_accounts(conn), 0); -} - -// GET COMMITTED ACCOUNT TESTS -// ================================================================================================ - -#[test] -fn get_committed_account_ignores_inflight() { - let (conn, _dir) = &mut test_conn(); + assert_eq!(available_notes(conn, account_id, BlockNumber::from(1), 30).unwrap().len(), 1); - let account_id = mock_network_account_id(); - let account = mock_account(account_id); + mark_notes_consumed(conn, &[note.as_note().nullifier()], BlockNumber::from(7)).unwrap(); - // Insert only an inflight account row (simulating account creation). - let tx_id = mock_tx_id(1); - let row = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(conversions::transaction_id_to_bytes(&tx_id)), - account_data: conversions::account_to_bytes(&account), - }; - diesel::insert_into(schema::accounts::table).values(&row).execute(conn).unwrap(); - - // get_committed_account should return None (only inflight exists). - let result = get_committed_account(conn, account_id).unwrap(); - assert!(result.is_none()); - - // Commit the block to promote inflight to committed. - let block_num = BlockNumber::from(1u32); - let header = mock_block_header(block_num); - commit_block(conn, &[tx_id], block_num, &header).unwrap(); - - // Now get_committed_account should return the account. - let result = get_committed_account(conn, account_id).unwrap(); - assert!(result.is_some()); + assert!( + available_notes(conn, account_id, BlockNumber::from(1000), 30) + .unwrap() + .is_empty() + ); } -// HANDLE TRANSACTIONS REVERTED TESTS +// AVAILABLE NOTES + BACKOFF // ================================================================================================ #[test] -fn transactions_reverted_restores_consumed_notes() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let note = mock_single_target_note(account_id, 10); - - // Insert committed note. - insert_committed_notes(conn, std::slice::from_ref(¬e)).unwrap(); - - // Consume it via a transaction. - let tx_id = mock_tx_id(1); - add_transaction(conn, &tx_id, None, &[], &[note.as_note().nullifier()]).unwrap(); - - // Verify consumed. - let consumed: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select(schema::notes::consumed_by) - .first(conn) - .unwrap(); - assert!(consumed.is_some()); - - // Revert the transaction. - revert_transaction(conn, &[tx_id]).unwrap(); - - // Note should be un-consumed. - let consumed: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select(schema::notes::consumed_by) - .first(conn) - .unwrap(); - assert!(consumed.is_none()); -} - -#[test] -fn transactions_reverted_deletes_inflight_created_notes() { +fn available_notes_returns_unconsumed_under_attempt_cap() { let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note = mock_single_target_note(account_id, 10); - - // Add transaction that creates a note. - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); - assert_eq!(count_notes(conn), 1); - - // Revert the transaction. - revert_transaction(conn, &[tx_id]).unwrap(); + let note = mock_single_target_note(account_id, 11); + insert_network_notes(conn, std::slice::from_ref(¬e)).unwrap(); - // Inflight-created note should be deleted. - assert_eq!(count_notes(conn), 0); + let available = available_notes(conn, account_id, BlockNumber::from(1), 30).unwrap(); + assert_eq!(available.len(), 1); } #[test] -fn transactions_reverted_reports_reverted_account_creations() { +fn available_notes_excludes_attempts_at_cap() { let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let account = mock_account(account_id); - let tx_id = mock_tx_id(1); - - // Insert an inflight account row (simulating account creation by tx). - let row = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(conversions::transaction_id_to_bytes(&tx_id)), - account_data: conversions::account_to_bytes(&account), - }; - diesel::insert_into(schema::accounts::table).values(&row).execute(conn).unwrap(); - - // Revert the transaction, account should be included in affected accounts. - let affected = revert_transaction(conn, &[tx_id]).unwrap(); - assert!(affected.contains(&account_id)); - - // Account should be gone. - assert_eq!(count_accounts(conn), 0); -} - -// AVAILABLE NOTES TESTS -// ================================================================================================ - -#[test] -fn available_notes_filters_consumed_and_exceeded_attempts() { - let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let note_good = mock_single_target_note(account_id, 10); - let note_consumed = mock_single_target_note(account_id, 20); - let note_failed = mock_single_target_note(account_id, 30); - - // Insert all as committed. - insert_committed_notes(conn, &[note_good.clone(), note_consumed.clone(), note_failed.clone()]) - .unwrap(); - - // Consume one note. - let tx_id = mock_tx_id(1); - add_transaction(conn, &tx_id, None, &[], &[note_consumed.as_note().nullifier()]).unwrap(); - - // Mark one note as failed many times (exceed max_attempts=3). - let block_num = BlockNumber::from(100u32); - notes_failed( - conn, - &[(note_failed.as_note().nullifier(), test_note_error("test error"))], - block_num, - ) - .unwrap(); - notes_failed( - conn, - &[(note_failed.as_note().nullifier(), test_note_error("test error"))], - block_num, - ) - .unwrap(); - notes_failed( - conn, - &[(note_failed.as_note().nullifier(), test_note_error("test error"))], - block_num, - ) - .unwrap(); - - // Query available notes with max_attempts=3. - let result = available_notes(conn, account_id, block_num, 3).unwrap(); - - // Only note_good should be available (note_consumed is consumed, note_failed exceeded - // attempts). - assert_eq!(result.len(), 1); - assert_eq!(result[0].as_note().nullifier(), note_good.as_note().nullifier()); -} - -#[test] -fn available_notes_only_returns_notes_for_specified_account() { - let (conn, _dir) = &mut test_conn(); - - let account_id_1 = mock_network_account_id(); - let account_id_2 = mock_network_account_id_seeded(42); + let note = mock_single_target_note(account_id, 13); + insert_network_notes(conn, std::slice::from_ref(¬e)).unwrap(); - let note_acct1 = mock_single_target_note(account_id_1, 10); - let note_acct2 = mock_single_target_note(account_id_2, 20); + // Push attempt_count up to the cap. + let nullifier = note.as_note().nullifier(); + for _ in 0..30 { + notes_failed(conn, &[(nullifier, test_note_error("boom"))], BlockNumber::from(5)).unwrap(); + } - insert_committed_notes(conn, &[note_acct1.clone(), note_acct2]).unwrap(); - - let block_num = BlockNumber::from(100u32); - let result = available_notes(conn, account_id_1, block_num, 30).unwrap(); - - assert_eq!(result.len(), 1); - assert_eq!(result[0].as_note().nullifier(), note_acct1.as_note().nullifier()); + let available = available_notes(conn, account_id, BlockNumber::from(1000), 30).unwrap(); + assert!(available.is_empty(), "notes at the attempt cap should not be available"); } -// NOTES FAILED TESTS +// CHAIN STATE // ================================================================================================ #[test] -fn notes_failed_increments_attempt_count() { +fn upsert_chain_state_persists_and_roundtrips_mmr() { let (conn, _dir) = &mut test_conn(); + let header = mock_block_header(BlockNumber::from(7)); + let mmr = PartialMmr::default(); - let account_id = mock_network_account_id(); - let note = mock_single_target_note(account_id, 10); - - insert_committed_notes(conn, std::slice::from_ref(¬e)).unwrap(); - - let block_num = BlockNumber::from(5u32); - notes_failed( - conn, - &[(note.as_note().nullifier(), test_note_error("execution failed"))], - block_num, - ) - .unwrap(); - notes_failed( - conn, - &[(note.as_note().nullifier(), test_note_error("execution failed 2"))], - block_num, - ) - .unwrap(); - - let (attempt_count, last_attempt): (i32, Option) = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select((schema::notes::attempt_count, schema::notes::last_attempt)) - .first(conn) - .unwrap(); - - assert_eq!(attempt_count, 2); - assert_eq!(last_attempt, Some(conversions::block_num_to_i64(block_num))); -} - -// GET NOTE STATUS TESTS -// ================================================================================================ - -#[test] -fn get_note_status_returns_latest_error() { - let (conn, _dir) = &mut test_conn(); + upsert_chain_state(conn, header.block_num(), &header, &mmr).unwrap(); - let account_id = mock_network_account_id(); - let note = mock_single_target_note(account_id, 10); - let note_id = note.as_note().id(); - - // Insert as committed note. - insert_committed_notes(conn, std::slice::from_ref(¬e)).unwrap(); - - // Initially no error, not consumed. - let result = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)).unwrap(); - assert!(result.is_some()); - let row = result.unwrap(); - assert!(row.last_error.is_none()); - assert_eq!(row.attempt_count, 0); - assert!(row.consumed_by.is_none()); - - // Mark as failed. - let block_num = BlockNumber::from(5u32); - notes_failed(conn, &[(note.as_note().nullifier(), test_note_error("first error"))], block_num) - .unwrap(); - - let result = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)).unwrap(); - let row = result.unwrap(); - assert_eq!(row.last_error.as_deref(), Some("first error")); - assert_eq!(row.attempt_count, 1); - - // Mark as failed again with different error, should overwrite. - notes_failed( - conn, - &[(note.as_note().nullifier(), test_note_error("second error"))], - block_num, - ) - .unwrap(); - - let result = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)).unwrap(); - let row = result.unwrap(); - assert_eq!(row.last_error.as_deref(), Some("second error")); - assert_eq!(row.attempt_count, 2); + let (loaded_num, loaded_header, _loaded_mmr) = select_chain_state(conn).unwrap().unwrap(); + assert_eq!(loaded_num, header.block_num()); + assert_eq!(loaded_header.block_num(), header.block_num()); } #[test] -fn get_note_status_returns_none_for_unknown_note() { +fn upsert_chain_state_overwrites_singleton() { let (conn, _dir) = &mut test_conn(); + let header_1 = mock_block_header(BlockNumber::from(1)); + let header_2 = mock_block_header(BlockNumber::from(2)); + let mmr = PartialMmr::default(); - let unknown_id = vec![0u8; 32]; - let result = get_note_status(conn, &unknown_id).unwrap(); - assert!(result.is_none()); -} + upsert_chain_state(conn, header_1.block_num(), &header_1, &mmr).unwrap(); + upsert_chain_state(conn, header_2.block_num(), &header_2, &mmr).unwrap(); -#[test] -fn get_note_status_includes_consumed_by() { - let (conn, _dir) = &mut test_conn(); + let (loaded_num, ..) = select_chain_state(conn).unwrap().unwrap(); + assert_eq!(loaded_num, header_2.block_num()); - let account_id = mock_network_account_id(); - let note = mock_single_target_note(account_id, 10); - let note_id = note.as_note().id(); - - // Insert as committed note. - insert_committed_notes(conn, &[note]).unwrap(); - - // Initially consumed_by is NULL. - let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) - .unwrap() - .unwrap(); - assert!(row.consumed_by.is_none()); - - // Simulate consumption by setting consumed_by to a dummy transaction ID. - let dummy_tx_id = vec![42u8; 32]; - diesel::update( - schema::notes::table - .filter(schema::notes::note_id.eq(conversions::note_id_to_bytes(¬e_id))), - ) - .set(schema::notes::consumed_by.eq(Some(&dummy_tx_id))) - .execute(conn) - .unwrap(); - - let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) - .unwrap() - .unwrap(); - assert_eq!(row.consumed_by, Some(dummy_tx_id)); + let row_count: i64 = schema::chain_state::table.count().get_result(conn).unwrap(); + assert_eq!(row_count, 1, "chain_state must remain a singleton"); } -// CHAIN STATE TESTS -// ================================================================================================ - #[test] -fn upsert_chain_state_updates_singleton() { +fn select_chain_state_returns_none_on_fresh_db() { let (conn, _dir) = &mut test_conn(); - - let block_num_1 = BlockNumber::from(1u32); - let header_1 = mock_block_header(block_num_1); - upsert_chain_state(conn, block_num_1, &header_1).unwrap(); - - // Upsert again with higher block. - let block_num_2 = BlockNumber::from(2u32); - let header_2 = mock_block_header(block_num_2); - upsert_chain_state(conn, block_num_2, &header_2).unwrap(); - - // Should only have one row. - let row_count: i64 = schema::chain_state::table.count().get_result(conn).unwrap(); - assert_eq!(row_count, 1); - - // Should have the latest block number. - let stored_block_num: i64 = schema::chain_state::table - .select(schema::chain_state::block_num) - .first(conn) - .unwrap(); - assert_eq!(stored_block_num, conversions::block_num_to_i64(block_num_2)); + assert!(select_chain_state(conn).unwrap().is_none()); } -// NOTE SCRIPT TESTS +// NOTE SCRIPT CACHE // ================================================================================================ #[test] -fn note_script_insert_and_lookup() { +fn note_script_cache_roundtrip() { let (conn, _dir) = &mut test_conn(); - - // Extract a NoteScript from a mock note. let account_id = mock_network_account_id(); - let note: miden_protocol::note::Note = mock_single_target_note(account_id, 10).into_note(); - let script = note.script().clone(); - let root = Word::from(script.root()); + let note = mock_single_target_note(account_id, 17); + let script = note.as_note().script().clone(); + let root: Word = script.root().into(); - // Insert the script. + assert!(lookup_note_script(conn, &root).unwrap().is_none()); insert_note_script(conn, &root, &script).unwrap(); + assert!(lookup_note_script(conn, &root).unwrap().is_some()); - // Look it up — should match the original. - let found = lookup_note_script(conn, &root).unwrap(); - assert!(found.is_some()); - assert_eq!(found.unwrap().root(), script.root()); + // Re-insert is idempotent. + insert_note_script(conn, &root, &script).unwrap(); } -#[test] -fn note_script_lookup_returns_none_for_missing() { - let (conn, _dir) = &mut test_conn(); - - let missing_root = Word::default(); - let found = lookup_note_script(conn, &missing_root).unwrap(); - assert!(found.is_none()); -} +// NOTE STATUS +// ================================================================================================ #[test] -fn note_script_insert_is_idempotent() { +fn notes_failed_increments_attempt_and_records_error() { let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let note: miden_protocol::note::Note = mock_single_target_note(account_id, 10).into_note(); - let script = note.script().clone(); - let root = Word::from(script.root()); + let note = mock_single_target_note(account_id, 19); + insert_network_notes(conn, std::slice::from_ref(¬e)).unwrap(); - // Insert the same script twice — should not error. - insert_note_script(conn, &root, &script).unwrap(); - insert_note_script(conn, &root, &script).unwrap(); + let nullifier = note.as_note().nullifier(); + notes_failed(conn, &[(nullifier, test_note_error("nope"))], BlockNumber::from(5)).unwrap(); + notes_failed(conn, &[(nullifier, test_note_error("nope"))], BlockNumber::from(6)).unwrap(); - // Should still be retrievable. - let found = lookup_note_script(conn, &root).unwrap(); - assert!(found.is_some()); + let row = + get_note_status(conn, &crate::db::models::conv::note_id_to_bytes(¬e.as_note().id())) + .unwrap() + .unwrap(); + assert_eq!(row.attempt_count, 2); + assert_eq!(row.last_attempt, Some(6)); + assert!(row.last_error.is_some()); } diff --git a/bin/ntx-builder/src/db/schema.rs b/bin/ntx-builder/src/db/schema.rs index 9797dca10a..dc41d61768 100644 --- a/bin/ntx-builder/src/db/schema.rs +++ b/bin/ntx-builder/src/db/schema.rs @@ -1,11 +1,9 @@ // @generated automatically by Diesel CLI. diesel::table! { - accounts (order_id) { - order_id -> Integer, + accounts (account_id) { account_id -> Binary, account_data -> Binary, - transaction_id -> Nullable, } } @@ -14,6 +12,7 @@ diesel::table! { id -> Integer, block_num -> BigInt, block_header -> Binary, + chain_mmr -> Binary, } } @@ -33,8 +32,6 @@ diesel::table! { attempt_count -> Integer, last_attempt -> Nullable, last_error -> Nullable, - created_by -> Nullable, - consumed_by -> Nullable, committed_at -> Nullable, } } diff --git a/bin/ntx-builder/src/lib.rs b/bin/ntx-builder/src/lib.rs index c280a5e4a1..e331d02042 100644 --- a/bin/ntx-builder/src/lib.rs +++ b/bin/ntx-builder/src/lib.rs @@ -3,26 +3,31 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use actor::{AccountActorContext, ActorConfig, GrpcClients, State}; use anyhow::Context; -use builder::MempoolEventStream; -use chain_state::SharedChainState; -use clients::{BlockProducerClient, StoreClient, ValidatorClient}; -use coordinator::Coordinator; +use builder::BlockStream; +use chain_state::ChainState; +use clients::StoreClient; use db::Db; -use futures::TryStreamExt; +use futures::StreamExt; use miden_node_utils::ErrorReport; -use miden_node_utils::lru_cache::LruCache; -use miden_remote_prover_client::RemoteTransactionProver; -use tokio::sync::mpsc; +use miden_protocol::block::BlockNumber; +use miden_protocol::crypto::merkle::mmr::PartialMmr; use url::Url; +use crate::committed_block::CommittedBlockEffects; + pub(crate) type NoteError = Arc; +// PR 1 of the block-subscription refactor leaves the actor execution path in tree but unwired. It +// is restored by PR 2 +#[expect(dead_code)] mod actor; mod builder; +#[expect(dead_code)] mod chain_state; mod clients; +mod committed_block; +#[expect(dead_code)] mod coordinator; pub(crate) mod db; pub mod server; @@ -284,15 +289,17 @@ impl NtxBuilderConfig { /// Builds and initializes the network transaction builder. /// - /// This method connects to the store and block producer services, fetches the current - /// chain tip, and subscribes to mempool events. + /// Opens a committed-block subscription against the store's `Rpc` service. On a fresh DB the + /// subscription starts at genesis and the first block is consumed inline to bootstrap the + /// in-memory chain state; on resume, the in-memory chain state is loaded from the persisted + /// header + chain MMR and the subscription starts at `persisted_tip + 1`. /// /// # Errors /// /// Returns an error if: - /// - The store connection fails - /// - The mempool subscription fails (after retries) - /// - The store contains no blocks (not bootstrapped) + /// - The DB cannot be opened or migrated + /// - The store connection fails (after retries) + /// - The genesis block cannot be read from the subscription on a fresh start pub async fn build(self) -> anyhow::Result { // Set up the database (bootstrap + connection pool). let db = Db::setup_with_pool_size( @@ -301,73 +308,66 @@ impl NtxBuilderConfig { ) .await?; - // Purge inflight state from previous run. - db.purge_inflight().await.context("failed to purge inflight state")?; + let store = StoreClient::new( + self.store_url.clone(), + self.request_backoff_initial, + self.request_backoff_max, + ); - let script_cache = LruCache::new(self.script_cache_size); - let coordinator = - Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, db.clone()); + // Decide where to start the subscription. On resume we load the persisted chain state; on + // fresh start we begin at genesis and bootstrap inline below. + let stored_chain_state = + db.get_chain_state().await.context("failed to read chain state")?; - let store = StoreClient::new(self.store_url.clone()); - let block_producer = BlockProducerClient::new(self.block_producer_url.clone()); - let validator = ValidatorClient::new(self.validator_url.clone()); - let prover = self.tx_prover_url.clone().map(RemoteTransactionProver::new); + let block_from = stored_chain_state + .as_ref() + .map_or(BlockNumber::GENESIS, |(num, ..)| num.child()); - // Subscribe to mempool first to ensure we don't miss any events. The subscription replays - // all inflight transactions, so the subscriber's state is fully reconstructed. - let subscription = block_producer - .subscribe_to_mempool_with_retry() - .await - .map_err(|err| anyhow::anyhow!(err)) - .context("failed to subscribe to mempool events")?; - let mempool_events: MempoolEventStream = Box::pin(subscription.into_stream()); - - let (chain_tip_header, chain_mmr) = store - .get_latest_blockchain_data_with_retry() - .await? - .context("store should contain a latest block")?; + tracing::info!( + %block_from, + resume = stored_chain_state.is_some(), + "ntx-builder opening committed-block subscription" + ); - // Store the chain tip in the DB. - db.upsert_chain_state(chain_tip_header.block_num(), chain_tip_header.clone()) + let raw_stream = store + .block_subscription_with_retry(block_from) .await - .context("failed to upsert chain state")?; - - let chain_state = Arc::new(SharedChainState::new(chain_tip_header, chain_mmr)); - - let (request_tx, actor_request_rx) = mpsc::channel(1); - - let actor_context = AccountActorContext { - clients: GrpcClients { - store: store.clone(), - block_producer: block_producer.clone(), - validator, - prover, - }, - state: State { - db: db.clone(), - chain: chain_state.clone(), - script_cache, - }, - config: ActorConfig { - max_notes_per_tx: self.max_notes_per_tx, - max_note_attempts: self.max_note_attempts, - idle_timeout: self.idle_timeout, - max_cycles: self.max_cycles, - request_backoff_initial: self.request_backoff_initial, - request_backoff_max: self.request_backoff_max, - }, - request_tx, + .map_err(|err| anyhow::anyhow!(err)) + .context("failed to subscribe to committed blocks")?; + let mut block_stream: BlockStream = Box::pin(raw_stream); + + let (chain, last_applied_block) = if let Some((block_num, header, mmr)) = stored_chain_state + { + (ChainState::new(header, mmr), block_num) + } else { + // Fresh DB: consume the genesis block inline so the in-memory chain state is non- empty + // before the steady-state loop runs. + let (genesis, _committed_tip) = block_stream + .next() + .await + .context("block stream ended before delivering the genesis block")? + .context("block stream failed before delivering the genesis block")?; + let genesis_header = genesis.header().clone(); + anyhow::ensure!( + genesis_header.block_num() == BlockNumber::GENESIS, + "expected genesis block from subscription but got block {}", + genesis_header.block_num() + ); + + let effects = CommittedBlockEffects::from_signed_block(&genesis); + db.apply_committed_block(effects, PartialMmr::default()) + .await + .context("failed to apply genesis block during bootstrap")?; + + (ChainState::new(genesis_header, PartialMmr::default()), BlockNumber::GENESIS) }; Ok(NetworkTransactionBuilder::new( self, - coordinator, - store, db, - chain_state, - actor_context, - mempool_events, - actor_request_rx, + block_stream, + last_applied_block, + chain, )) } } diff --git a/bin/ntx-builder/src/server.rs b/bin/ntx-builder/src/server.rs index 12ef5bfa63..636f5f2810 100644 --- a/bin/ntx-builder/src/server.rs +++ b/bin/ntx-builder/src/server.rs @@ -86,7 +86,6 @@ impl api_server::Api for NtxBuilderRpcServer { let status = derive_status( row.committed_at.is_some(), - row.consumed_by.is_some(), row.attempt_count as usize, self.max_note_attempts, ); @@ -108,14 +107,11 @@ impl api_server::Api for NtxBuilderRpcServer { /// Derives the lifecycle status of a network note from its DB state. fn derive_status( is_committed: bool, - is_consumed: bool, attempt_count: usize, max_note_attempts: usize, ) -> rpc::NetworkNoteStatus { if is_committed { rpc::NetworkNoteStatus::NullifierCommitted - } else if is_consumed { - rpc::NetworkNoteStatus::NullifierInflight } else if attempt_count >= max_note_attempts { rpc::NetworkNoteStatus::Discarded } else { @@ -131,30 +127,21 @@ mod tests { #[test] fn derive_status_pending() { - assert_eq!(derive_status(false, false, 0, 30), NetworkNoteStatus::Pending); - assert_eq!(derive_status(false, false, 15, 30), NetworkNoteStatus::Pending); - assert_eq!(derive_status(false, false, 29, 30), NetworkNoteStatus::Pending); - } - - #[test] - fn derive_status_processed() { - assert_eq!(derive_status(false, true, 0, 30), NetworkNoteStatus::NullifierInflight); - assert_eq!(derive_status(false, true, 5, 30), NetworkNoteStatus::NullifierInflight); - // consumed_by takes precedence over attempt count - assert_eq!(derive_status(false, true, 30, 30), NetworkNoteStatus::NullifierInflight); + assert_eq!(derive_status(false, 0, 30), NetworkNoteStatus::Pending); + assert_eq!(derive_status(false, 15, 30), NetworkNoteStatus::Pending); + assert_eq!(derive_status(false, 29, 30), NetworkNoteStatus::Pending); } #[test] fn derive_status_discarded() { - assert_eq!(derive_status(false, false, 30, 30), NetworkNoteStatus::Discarded); - assert_eq!(derive_status(false, false, 100, 30), NetworkNoteStatus::Discarded); + assert_eq!(derive_status(false, 30, 30), NetworkNoteStatus::Discarded); + assert_eq!(derive_status(false, 100, 30), NetworkNoteStatus::Discarded); } #[test] fn derive_status_committed() { - assert_eq!(derive_status(true, true, 0, 30), NetworkNoteStatus::NullifierCommitted); - assert_eq!(derive_status(true, true, 5, 30), NetworkNoteStatus::NullifierCommitted); - // committed takes precedence over everything - assert_eq!(derive_status(true, false, 30, 30), NetworkNoteStatus::NullifierCommitted); + // committed takes precedence over attempt count + assert_eq!(derive_status(true, 0, 30), NetworkNoteStatus::NullifierCommitted); + assert_eq!(derive_status(true, 30, 30), NetworkNoteStatus::NullifierCommitted); } } diff --git a/bin/stress-test/src/seeding/mod.rs b/bin/stress-test/src/seeding/mod.rs index bed3d5d34c..a846587389 100644 --- a/bin/stress-test/src/seeding/mod.rs +++ b/bin/stress-test/src/seeding/mod.rs @@ -852,9 +852,6 @@ pub async fn start_store( .await .expect("Failed to bind store block-producer gRPC endpoint"); let store_addr = rpc_listener.local_addr().expect("Failed to get store RPC address"); - let ntx_builder_listener = TcpListener::bind("127.0.0.1:0") - .await - .expect("Failed to bind store ntx-builder gRPC endpoint"); let store_block_producer_addr = block_producer_listener .local_addr() .expect("Failed to get store block-producer address"); @@ -865,7 +862,6 @@ pub async fn start_store( rpc_listener, mode: StoreMode::BlockProducer { block_producer_listener, - ntx_builder_listener, block_prover_url: None, max_concurrent_proofs: miden_node_store::DEFAULT_MAX_CONCURRENT_PROOFS, }, diff --git a/crates/block-producer/src/server/tests.rs b/crates/block-producer/src/server/tests.rs index 46559429db..e5a7a91291 100644 --- a/crates/block-producer/src/server/tests.rs +++ b/crates/block-producer/src/server/tests.rs @@ -156,9 +156,6 @@ async fn start_store(store_addr: std::net::SocketAddr) -> TestStore { let dir = data_directory.path().to_path_buf(); let rpc_listener = TcpListener::bind("127.0.0.1:0").await.expect("store should bind the RPC port"); - let ntx_builder_listener = TcpListener::bind("127.0.0.1:0") - .await - .expect("Failed to bind store ntx-builder gRPC endpoint"); let block_producer_listener = TcpListener::bind(store_addr) .await .expect("store should bind the block-producer port"); @@ -171,7 +168,6 @@ async fn start_store(store_addr: std::net::SocketAddr) -> TestStore { rpc_listener, mode: StoreMode::BlockProducer { block_producer_listener, - ntx_builder_listener, block_prover_url: None, max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS, }, diff --git a/crates/proto/src/clients/mod.rs b/crates/proto/src/clients/mod.rs index d171eb091d..c1d7c00359 100644 --- a/crates/proto/src/clients/mod.rs +++ b/crates/proto/src/clients/mod.rs @@ -6,19 +6,19 @@ //! # Examples //! //! ```rust -//! # use miden_node_proto::clients::{Builder, WantsTls, StoreNtxBuilderClient}; +//! # use miden_node_proto::clients::{Builder, WantsTls, StoreRpcClient}; //! # use url::Url; //! //! # async fn example() -> anyhow::Result<()> { //! // Create a store client with OTEL and TLS //! let url = Url::parse("https://example.com:8080")?; -//! let client: StoreNtxBuilderClient = Builder::new(url) +//! let client: StoreRpcClient = Builder::new(url) //! .with_tls()? // or `.without_tls()` //! .without_timeout() // or `.with_timeout(Duration::from_secs(10))` //! .without_metadata_version() // or `.with_metadata_version("1.0".into())` //! .without_metadata_genesis() // or `.with_metadata_genesis(genesis)` //! .with_otel_context_injection() // or `.without_otel_context_injection()` -//! .connect::() +//! .connect::() //! .await?; //! # Ok(()) //! # } @@ -111,8 +111,6 @@ type InterceptedChannel = InterceptedService; type GeneratedRpcClient = generated::rpc::api_client::ApiClient; type GeneratedBlockProducerClient = generated::block_producer::api_client::ApiClient; -type GeneratedStoreClientForNtxBuilder = - generated::store::ntx_builder_client::NtxBuilderClient; type GeneratedStoreClientForBlockProducer = generated::store::block_producer_client::BlockProducerClient; type GeneratedStoreClientForRpc = generated::store::rpc_client::RpcClient; @@ -130,8 +128,6 @@ pub struct RpcClient(GeneratedRpcClient); #[derive(Debug, Clone)] pub struct BlockProducerClient(GeneratedBlockProducerClient); #[derive(Debug, Clone)] -pub struct StoreNtxBuilderClient(GeneratedStoreClientForNtxBuilder); -#[derive(Debug, Clone)] pub struct StoreBlockProducerClient(GeneratedStoreClientForBlockProducer); #[derive(Debug, Clone)] pub struct StoreRpcClient(GeneratedStoreClientForRpc); @@ -172,20 +168,6 @@ impl Deref for BlockProducerClient { } } -impl DerefMut for StoreNtxBuilderClient { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl Deref for StoreNtxBuilderClient { - type Target = GeneratedStoreClientForNtxBuilder; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl DerefMut for StoreBlockProducerClient { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 @@ -290,15 +272,6 @@ impl GrpcClient for BlockProducerClient { } } -impl GrpcClient for StoreNtxBuilderClient { - fn with_interceptor(channel: Channel, interceptor: Interceptor) -> Self { - Self(GeneratedStoreClientForNtxBuilder::new(InterceptedService::new( - channel, - interceptor, - ))) - } -} - impl GrpcClient for StoreBlockProducerClient { fn with_interceptor(channel: Channel, interceptor: Interceptor) -> Self { Self(GeneratedStoreClientForBlockProducer::new(InterceptedService::new( diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index cf47b5d356..2c48617cf8 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -101,9 +101,6 @@ impl TestStore { let store_addr = store_listener.local_addr().expect("store listener should get a local address"); let rpc_listener = store_listener; - let ntx_builder_listener = TcpListener::bind("127.0.0.1:0") - .await - .expect("Failed to bind store ntx-builder gRPC endpoint"); let block_producer_listener = TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port"); @@ -116,7 +113,6 @@ impl TestStore { rpc_listener, mode: StoreMode::BlockProducer { block_producer_listener, - ntx_builder_listener, block_prover_url: None, max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS, }, diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index c81e860e51..98c21ef84e 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -32,7 +32,6 @@ use crate::{BlockProver, COMPONENT}; mod api; mod block_producer; pub mod block_prover_client; -mod ntx_builder; mod replica_sync; use replica_sync::ReplicaSync as _; @@ -48,13 +47,11 @@ mod rpc_api; pub enum StoreMode { /// Accepts blocks from a block producer via the `BlockProducer` gRPC service. /// - /// Exposes the `BlockProducer` and `NtxBuilder` gRPC services and runs the proof scheduler - /// to generate block proofs. + /// Exposes the `Rpc` and `BlockProducer` gRPC services and runs the proof scheduler to + /// generate block proofs. BlockProducer { /// Listener for the block producer gRPC endpoint. block_producer_listener: TcpListener, - /// Listener for the network transaction builder gRPC endpoint. - ntx_builder_listener: TcpListener, /// URL of the remote block prover. Uses a local prover if `None`. block_prover_url: Option, /// Maximum number of blocks proven concurrently by the proof scheduler. @@ -63,8 +60,8 @@ pub enum StoreMode { /// Receives blocks from an upstream store's `Rpc` gRPC service. /// - /// Only the `Rpc` gRPC service is exposed. The `BlockProducer` and `NtxBuilder` services are - /// not started and no proof scheduler runs. + /// Only the `Rpc` gRPC service is exposed. The `BlockProducer` service is not started and no + /// proof scheduler runs. Replica { upstream_url: Url }, } @@ -157,14 +154,12 @@ impl Store { let ModeSetup { mut grpc_servers, mode_task } = match self.mode { StoreMode::BlockProducer { block_producer_listener, - ntx_builder_listener, block_prover_url, max_concurrent_proofs, } => { Self::setup_block_producer_mode( state, block_producer_listener, - ntx_builder_listener, block_prover_url, max_concurrent_proofs, tx_proven_tip, @@ -198,11 +193,9 @@ impl Store { } } - #[expect(clippy::too_many_arguments)] async fn setup_block_producer_mode( state: State, block_producer_listener: TcpListener, - ntx_builder_listener: TcpListener, block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, tx_proven_tip: ProvenTipWriter, @@ -211,7 +204,6 @@ impl Store { ) -> anyhow::Result { info!(target: COMPONENT, block_producer_endpoint=?block_producer_listener.local_addr()?, - ntx_builder_endpoint=?ntx_builder_listener.local_addr()?, "Starting in block-producer mode"); let proof_cache = state.proof_cache.clone(); @@ -236,7 +228,6 @@ impl Store { block_producer_api, grpc_options, rpc_listener, - ntx_builder_listener, block_producer_listener, )?; @@ -310,19 +301,17 @@ impl Store { /// Spawns the gRPC servers for block-producer mode. /// - /// Starts three listeners: `Rpc`, `NtxBuilder`, and `BlockProducer`. + /// Starts two listeners: `Rpc` and `BlockProducer`. fn spawn_block_producer_grpc_servers( store_api: api::StoreApi, block_producer_api: block_producer::BlockProducerApi, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, - ntx_builder_listener: TcpListener, block_producer_listener: TcpListener, ) -> anyhow::Result>> { let mut join_set = JoinSet::new(); - let rpc_service = store::rpc_server::RpcServer::new(store_api.clone()); - let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(store_api); + let rpc_service = store::rpc_server::RpcServer::new(store_api); let block_producer_service = store::block_producer_server::BlockProducerServer::new(block_producer_api); @@ -345,13 +334,6 @@ impl Store { .serve_with_incoming(TcpListenerStream::new(rpc_listener)), ); - join_set.spawn( - make_server() - .add_service(ntx_builder_service) - .add_service(reflection_service.clone()) - .serve_with_incoming(TcpListenerStream::new(ntx_builder_listener)), - ); - join_set.spawn( make_server() .accept_http1(true) @@ -365,7 +347,7 @@ impl Store { /// Spawns the gRPC servers for replica mode. /// - /// Only the `Rpc` service is exposed — no `BlockProducer`, `NtxBuilder`, or proof scheduler. + /// Only the `Rpc` service is exposed — no `BlockProducer` or proof scheduler. fn spawn_replica_grpc_servers( store_api: api::StoreApi, grpc_options: GrpcOptionsInternal, diff --git a/crates/store/src/server/ntx_builder.rs b/crates/store/src/server/ntx_builder.rs deleted file mode 100644 index 3b114da5f0..0000000000 --- a/crates/store/src/server/ntx_builder.rs +++ /dev/null @@ -1,323 +0,0 @@ -use std::collections::BTreeSet; -use std::num::{NonZero, TryFromIntError}; - -use miden_crypto::merkle::smt::SmtProof; -use miden_node_proto::decode::{read_account_id, read_block_range, read_root}; -use miden_node_proto::domain::account::AccountInfo; -use miden_node_proto::errors::ConversionError; -use miden_node_proto::generated as proto; -use miden_node_proto::generated::rpc::BlockRange; -use miden_node_proto::generated::store::ntx_builder_server; -use miden_node_utils::ErrorReport; -use miden_protocol::account::{StorageMapKey, StorageSlotName}; -use miden_protocol::asset::AssetVaultKey; -use miden_protocol::block::BlockNumber; -use miden_protocol::note::Note; -use tonic::{Request, Response, Status}; -use tracing::debug; - -use crate::COMPONENT; -use crate::db::models::Page; -use crate::errors::{ - GetAccountError, - GetNetworkAccountIdsError, - GetNoteScriptByRootError, - GetWitnessesError, -}; -use crate::server::api::{StoreApi, internal_error, invalid_argument}; -use crate::state::Finality; - -// NTX BUILDER ENDPOINTS -// ================================================================================================ - -#[tonic::async_trait] -impl ntx_builder_server::NtxBuilder for StoreApi { - /// Returns block header for the specified block number. - /// - /// If the block number is not provided, block header for the latest block is returned. - async fn get_block_header_by_number( - &self, - request: Request, - ) -> Result, Status> { - self.get_block_header_by_number_inner(request).await - } - - /// Returns the chain tip's header and MMR peaks corresponding to that header. - /// If there are N blocks, the peaks will represent the MMR at block `N - 1`. - /// - /// This returns all the blockchain-related information needed for executing transactions - /// without authenticating notes. - async fn get_current_blockchain_data( - &self, - request: Request, - ) -> Result, Status> { - let block_num = request.into_inner().block_num.map(BlockNumber::from); - - let response = match self - .state - .get_current_blockchain_data(block_num) - .await - .map_err(internal_error)? - { - Some((header, peaks)) => proto::store::CurrentBlockchainData { - current_peaks: peaks.peaks().iter().map(Into::into).collect(), - current_block_header: Some(header.into()), - }, - None => proto::store::CurrentBlockchainData { - current_peaks: vec![], - current_block_header: None, - }, - }; - - Ok(Response::new(response)) - } - - async fn get_network_account_details_by_id( - &self, - request: Request, - ) -> Result, Status> { - let account_id = - read_account_id::(Some(request.into_inner()))?; - - let account_info: Option = - self.state.get_network_account_details_by_id(account_id).await?; - - Ok(Response::new(proto::store::MaybeAccountDetails { - details: account_info.map(|acc| (&acc).into()), - })) - } - - async fn get_unconsumed_network_notes( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let block_num = BlockNumber::from(request.block_num); - let account_id = read_account_id::( - request.account_id, - )?; - - let state = self.state.clone(); - - let size = - NonZero::try_from(request.page_size as usize).map_err(|err: TryFromIntError| { - invalid_argument(err.as_report_context("invalid page_size")) - })?; - let page = Page { token: request.page_token, size }; - // TODO: no need to get the whole NoteRecord here, a NetworkNote wrapper should be created - // instead - let (notes, next_page) = state - .get_unconsumed_network_notes_for_account(account_id, block_num, page) - .await - .map_err(internal_error)?; - - let mut network_notes = Vec::with_capacity(notes.len()); - for note in notes { - // SAFETY: Network notes are filtered in the database, so they should have details; - // otherwise the state would be corrupted - let (assets, recipient) = note.details.unwrap().into_parts(); - let partial_metadata = *note.metadata.partial_metadata(); - let note = - Note::with_attachments(assets, partial_metadata, recipient, note.attachments); - network_notes.push(note.into()); - } - - Ok(Response::new(proto::store::UnconsumedNetworkNotes { - notes: network_notes, - next_token: next_page.token, - })) - } - - /// Returns network account IDs within the specified block range (based on account creation - /// block). - /// - /// The function may return fewer accounts than exist in the range if the result would exceed - /// `MAX_RESPONSE_PAYLOAD_BYTES / AccountId::SERIALIZED_SIZE` rows. In this case, the result is - /// truncated at a block boundary to ensure all accounts from included blocks are returned. - /// - /// The response includes pagination info with the last block number that was fully included. - async fn get_network_account_ids( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - - let block_range = - read_block_range::(Some(request), "GetNetworkAccountIds")? - .into_inclusive_range::()?; - - let (account_ids, mut last_block_included) = - self.state.get_all_network_accounts(block_range).await.map_err(internal_error)?; - - let account_ids = Vec::from_iter(account_ids.into_iter().map(Into::into)); - - let mut chain_tip = self.state.chain_tip(Finality::Committed).await; - if last_block_included > chain_tip { - last_block_included = chain_tip; - } - - chain_tip = self.state.chain_tip(Finality::Committed).await; - - Ok(Response::new(proto::store::NetworkAccountIdList { - account_ids, - pagination_info: Some(proto::rpc::PaginationInfo { - chain_tip: chain_tip.as_u32(), - block_num: last_block_included.as_u32(), - }), - })) - } - - async fn get_account( - &self, - request: Request, - ) -> Result, Status> { - debug!(target: COMPONENT, ?request); - let request = request.into_inner(); - let account_request = request.try_into().map_err(GetAccountError::DeserializationFailed)?; - - let proof = self.state.get_account(account_request).await?; - - Ok(Response::new(proof.into())) - } - - async fn get_note_script_by_root( - &self, - request: Request, - ) -> Result, Status> { - debug!(target: COMPONENT, request = ?request); - - let root = - read_root::(request.into_inner().root, "NoteScriptRoot")?; - - let note_script = self - .state - .get_note_script_by_root(root) - .await - .map_err(GetNoteScriptByRootError::from)?; - - Ok(Response::new(proto::rpc::MaybeNoteScript { - script: note_script.map(Into::into), - })) - } - - async fn get_vault_asset_witnesses( - &self, - request: Request, - ) -> Result, Status> { - const MAX_VAULT_KEYS: usize = 100; - - let request = request.into_inner(); - - // Sanity check the number of vault keys in the request - if request.vault_keys.len() > MAX_VAULT_KEYS { - tracing::warn!( - limit=%MAX_VAULT_KEYS, - request=%request.vault_keys.len(), - account.id=%request.account_id.unwrap_or_default(), - "maximum vault key limit exceeded", - ); - - return Err(Status::invalid_argument(format!( - "number of vault keys in request cannot exceed {MAX_VAULT_KEYS}" - ))); - } - - // Read account ID. - let account_id = read_account_id::< - proto::store::VaultAssetWitnessesRequest, - GetWitnessesError, - >(request.account_id) - .map_err(invalid_argument)?; - - // Read vault keys. - let vault_keys = request - .vault_keys - .into_iter() - .map(|key_digest| { - let word = read_root::(Some(key_digest), "VaultKey") - .map_err(invalid_argument)?; - AssetVaultKey::try_from(word).map_err(|e| { - invalid_argument(GetWitnessesError::DeserializationFailed( - ConversionError::from(e), - )) - }) - }) - .collect::, Status>>()?; - - // Read block number from request, use latest if not provided. - let block_num = if let Some(num) = request.block_num { - num.into() - } else { - self.state.chain_tip(Finality::Committed).await - }; - - // Retrieve the asset witnesses. - let asset_witnesses = self - .state - .get_vault_asset_witnesses(account_id, block_num, vault_keys) - .map_err(internal_error)?; - - // Convert AssetWitness to protobuf format by extracting witness data. - let proto_witnesses = asset_witnesses - .into_iter() - .map(|witness| { - let proof: SmtProof = witness.into(); - proto::store::vault_asset_witnesses_response::VaultAssetWitness { - proof: Some(proof.into()), - } - }) - .collect(); - - Ok(Response::new(proto::store::VaultAssetWitnessesResponse { - block_num: block_num.as_u32(), - asset_witnesses: proto_witnesses, - })) - } - - async fn get_storage_map_witness( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - - // Read the account ID. - let account_id = - read_account_id::( - request.account_id, - ) - .map_err(invalid_argument)?; - - // Read the map key. - let map_key = read_root::(request.map_key, "MapKey") - .map(StorageMapKey::new) - .map_err(invalid_argument)?; - - // Read the slot name. - let slot_name = StorageSlotName::new(request.slot_name).map_err(|err| { - tonic::Status::invalid_argument(format!("Invalid storage slot name: {err}")) - })?; - - // Read the block number, use latest if not provided. - let block_num = if let Some(num) = request.block_num { - num.into() - } else { - self.state.chain_tip(Finality::Committed).await - }; - - // Retrieve the storage map witness. - let storage_witness = self - .state - .get_storage_map_witness(account_id, &slot_name, block_num, map_key) - .map_err(internal_error)?; - - // Convert StorageMapWitness to protobuf format by extracting witness data. - let proof: SmtProof = storage_witness.into(); - Ok(Response::new(proto::store::StorageMapWitnessResponse { - witness: Some(proto::store::storage_map_witness_response::StorageWitness { - key: Some(map_key.into()), - proof: Some(proof.into()), - }), - block_num: self.state.chain_tip(Finality::Committed).await.as_u32(), - })) - } -} diff --git a/docker-compose.yml b/docker-compose.yml index 2fbd170771..60a76d13e2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -50,7 +50,6 @@ services: - store - start - --rpc.listen=0.0.0.0:50001 - - --ntx-builder.listen=0.0.0.0:50002 - --block-producer.listen=0.0.0.0:50003 - --data-directory=/data/store - --account_tree.rocksdb.max_cache_size=4294967296 @@ -59,7 +58,6 @@ services: - --nullifier_tree.rocksdb.max_open_fds=512 ports: - "50001:50001" - - "50002:50002" - "50003:50003" validator: @@ -127,7 +125,7 @@ services: - miden-ntx-builder - start - --listen=0.0.0.0:50301 - - --store.url=http://store:50002 + - --store.url=http://store:50001 - --block-producer.url=http://block-producer:50201 - --validator.url=http://validator:50101 - --data-directory=/data/ntx-builder diff --git a/docs/external/src/operator/usage.md b/docs/external/src/operator/usage.md index beac148c31..2c9072e605 100644 --- a/docs/external/src/operator/usage.md +++ b/docs/external/src/operator/usage.md @@ -161,7 +161,6 @@ Each component can also be started as a standalone process. For example: # Start the store miden-node store start \ --rpc.listen 0.0.0.0:50001 \ - --ntx-builder.listen 0.0.0.0:50002 \ --block-producer.listen 0.0.0.0:50003 \ --data-directory /tmp/store @@ -184,7 +183,7 @@ miden-node rpc start \ # Start the network transaction builder miden-node ntx-builder start \ --listen 0.0.0.0:50301 \ - --store.url http://127.0.0.1:50002 \ + --store.url http://127.0.0.1:50001 \ --block-producer.url http://127.0.0.1:50201 \ --validator.url http://127.0.0.1:50101 \ --data-directory /tmp/ntx-builder diff --git a/proto/proto/internal/store.proto b/proto/proto/internal/store.proto index 002b853d82..afa058ca69 100644 --- a/proto/proto/internal/store.proto +++ b/proto/proto/internal/store.proto @@ -236,187 +236,3 @@ message TransactionInputs { optional bool new_account_id_prefix_is_unique = 5; // TODO: Replace this with an error. When a general error message exists. } -// NTX BUILDER STORE API -// ================================================================================================ - -// Store API for the network transaction builder component -service NtxBuilder { - // Retrieves block header by given block number. Optionally, it also returns the MMR path - // and current chain length to authenticate the block's inclusion. - rpc GetBlockHeaderByNumber(rpc.BlockHeaderByNumberRequest) returns (rpc.BlockHeaderByNumberResponse) {} - - // Returns a paginated list of unconsumed network notes. - rpc GetUnconsumedNetworkNotes(UnconsumedNetworkNotesRequest) returns (UnconsumedNetworkNotes) {} - - // Returns the block header at the chain tip, as well as the MMR peaks corresponding to this - // header for executing network transactions. If the block number is not provided, the latest - // header and peaks will be retrieved. - rpc GetCurrentBlockchainData(blockchain.MaybeBlockNumber) returns (CurrentBlockchainData) {} - - // Returns the latest state of a network account with the specified account ID. - rpc GetNetworkAccountDetailsById(account.AccountId) returns (MaybeAccountDetails) {} - - // Returns a list of all network account ids. - rpc GetNetworkAccountIds(rpc.BlockRange) returns (NetworkAccountIdList) {} - - // Returns the latest details of the specified account. - rpc GetAccount(rpc.AccountRequest) returns (rpc.AccountResponse) {} - - // Returns the script for a note by its root. - rpc GetNoteScriptByRoot(note.NoteScriptRoot) returns (rpc.MaybeNoteScript) {} - - // Returns vault asset witnesses for the specified account. - rpc GetVaultAssetWitnesses(VaultAssetWitnessesRequest) returns (VaultAssetWitnessesResponse) {} - - // Returns a storage map witness for the specified account and storage map entry. - rpc GetStorageMapWitness(StorageMapWitnessRequest) returns (StorageMapWitnessResponse) {} -} - -// GET NETWORK ACCOUNT DETAILS BY ID -// ================================================================================================ - -// Represents the result of getting network account details by ID. -message MaybeAccountDetails { - // Account details. - optional account.AccountDetails details = 1; -} - -// GET UNCONSUMED NETWORK NOTES -// ================================================================================================ - -// Returns a paginated list of unconsumed network notes for an account. -// -// Notes created or consumed after the specified block are excluded from the result. -message UnconsumedNetworkNotesRequest { - // This should be null on the first call, and set to the response token until the response token - // is null, at which point all data has been fetched. - // - // Note that this token is only valid if used with the same parameters. - optional uint64 page_token = 1; - - // Number of notes to retrieve per page. - uint64 page_size = 2; - - // The full account ID to filter notes by. - account.AccountId account_id = 3; - - // The block number to filter the returned notes by. - // - // Notes that are created or consumed after this block are excluded from the result. - fixed32 block_num = 4; -} - -// Represents the result of getting the unconsumed network notes. -message UnconsumedNetworkNotes { - // An opaque pagination token. - // - // Use this in your next request to get the next - // set of data. - // - // Will be null once there is no more data remaining. - optional uint64 next_token = 1; - - // The list of unconsumed network notes. - repeated note.NetworkNote notes = 2; -} - -// GET NETWORK ACCOUNTS -// ================================================================================================ - -// Represents the result of getting the network account ids. -message NetworkAccountIdList { - // Pagination information. - rpc.PaginationInfo pagination_info = 1; - - // The list of network account ids. - repeated account.AccountId account_ids = 2; -} - -// GET CURRENT BLOCKCHAIN DATA -// ================================================================================================ - -// Current blockchain data based on the requested block number. -message CurrentBlockchainData { - // Commitments that represent the current state according to the MMR. - repeated primitives.Digest current_peaks = 1; - // Current block header. - optional blockchain.BlockHeader current_block_header = 2; -} - -// GET VAULT ASSET WITNESSES -// ================================================================================================ - -// Request for vault asset witnesses for a specific account. -message VaultAssetWitnessesRequest { - // The account ID for which to retrieve vault asset witnesses. - account.AccountId account_id = 1; - - // Set of asset vault keys to retrieve witnesses for. - repeated primitives.Digest vault_keys = 2; - - // The witnesses returned correspond to the account state at the specified block number. - // - // Optional block number. If not provided, uses the latest state. - // - // The specified block number should be relatively near the chain tip else an error will be - // returned. - optional fixed32 block_num = 3; -} - -// Response containing vault asset witnesses. -message VaultAssetWitnessesResponse { - // A vault asset witness containing the asset and its proof. - message VaultAssetWitness { - // The SMT opening proof for the asset's inclusion in the vault. - primitives.SmtOpening proof = 1; - } - - // Block number at which the witnesses were generated. - // - // The witnesses returned corresponds to the account state at the specified block number. - fixed32 block_num = 1; - - // List of asset witnesses. - repeated VaultAssetWitness asset_witnesses = 2; -} - -// GET STORAGE MAP WITNESS -// ================================================================================================ - -// Request for a storage map witness for a specific account and storage slot. -message StorageMapWitnessRequest { - // The account ID for which to retrieve the storage map witness. - account.AccountId account_id = 1; - - // The raw, user-provided storage map key for which to retrieve the witness. - primitives.Digest map_key = 2; - - // Optional block number. If not provided, uses the latest state. - // - // The witness returned corresponds to the account state at the specified block number. - // - // The specified block number should be relatively near the chain tip else an error will be - // returned. - optional fixed32 block_num = 3; - - // The storage slot name for the map. - string slot_name = 4; -} - -// Response containing a storage map witness. -message StorageMapWitnessResponse { - // Storage map witness data. - message StorageWitness { - // The raw, user-provided storage map key. - primitives.Digest key = 1; - - // The SMT opening proof for the key-value pair. - primitives.SmtOpening proof = 3; - } - - // The storage map witness. - StorageWitness witness = 1; - - // Block number at which the witness was generated. - fixed32 block_num = 2; -} diff --git a/scripts/run-node.sh b/scripts/run-node.sh index 3ad9698f45..0968087bc7 100755 --- a/scripts/run-node.sh +++ b/scripts/run-node.sh @@ -21,12 +21,11 @@ VALIDATOR_DIR="/tmp/validator" NTX_BUILDER_DIR="/tmp/ntx-builder" ACCOUNTS_DIR="/tmp/accounts" -# Primary store (block-producer mode): 3 APIs. +# Primary store (block-producer mode): 2 APIs. STORE_RPC_PORT=50001 -STORE_NTX_BUILDER_PORT=50002 STORE_BLOCK_PRODUCER_PORT=50003 -# Replica stores expose only the RPC API (no block-producer or ntx-builder endpoints). +# Replica stores expose only the RPC API (no block-producer endpoint). STORE_REPLICA_1_RPC_PORT=50011 STORE_REPLICA_2_RPC_PORT=50021 @@ -111,7 +110,6 @@ echo "=== Starting components ===" echo "Starting store (block-producer mode)..." OTEL_SERVICE_NAME=miden-store-primary $BINARY store start \ --rpc.listen "0.0.0.0:$STORE_RPC_PORT" \ - --ntx-builder.listen "0.0.0.0:$STORE_NTX_BUILDER_PORT" \ --block-producer.listen "0.0.0.0:$STORE_BLOCK_PRODUCER_PORT" \ --data-directory "$STORE_DIR" \ $EXTRA_ARGS & @@ -187,7 +185,7 @@ PIDS+=($!) echo "Starting network transaction builder..." OTEL_SERVICE_NAME=miden-ntx-builder $NTX_BUILDER_BINARY start \ --listen "0.0.0.0:$NTX_BUILDER_PORT" \ - --store.url "http://127.0.0.1:$STORE_NTX_BUILDER_PORT" \ + --store.url "http://127.0.0.1:$STORE_RPC_PORT" \ --block-producer.url "http://127.0.0.1:$BLOCK_PRODUCER_PORT" \ --validator.url "http://127.0.0.1:$VALIDATOR_PORT" \ --data-directory "$NTX_BUILDER_DIR" \