diff --git a/.gitignore b/.gitignore index d9405e5..63c45e0 100644 --- a/.gitignore +++ b/.gitignore @@ -15,5 +15,6 @@ /target *.log - +.vscode +.cargo config/block_submitter.yaml diff --git a/Cargo.lock b/Cargo.lock index 5fe3465..05cb005 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,9 +115,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.50" +version = "0.1.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" +checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" dependencies = [ "proc-macro2", "quote", @@ -144,6 +144,17 @@ dependencies = [ "num-traits 0.2.14", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "auto_impl" version = "0.4.1" @@ -460,6 +471,37 @@ dependencies = [ "generic-array 0.14.4", ] +[[package]] +name = "clap" +version = "3.0.0-beta.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feff3878564edb93745d58cf63e17b63f24142506e7a20c87a5521ed7bfb1d63" +dependencies = [ + "atty", + "bitflags", + "clap_derive", + "indexmap", + "lazy_static", + "os_str_bytes", + "strsim", + "termcolor", + "textwrap", + "unicase", +] + +[[package]] +name = "clap_derive" +version = "3.0.0-beta.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b15c6b4f786ffb6192ffe65a36855bc1fc2444bcd0945ae16748dcd6ed7d0d3" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "cmake" version = "0.1.45" @@ -1122,7 +1164,7 @@ checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" [[package]] name = "fluidex-common" version = "0.1.0" -source = "git+https://github.com/fluidex/common-rs?branch=master#a6a2b252a0a2dfd92726491d32008fcfef5033ef" +source = "git+https://github.com/fluidex/common-rs?branch=master#671a92a986920b013e5457a9c2be2b295f1bc733" dependencies = [ "anyhow", "babyjubjub-rs", @@ -2079,6 +2121,15 @@ dependencies = [ "tonic-build 0.5.2", ] +[[package]] +name = "os_str_bytes" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addaa943333a514159c80c97ff4a93306530d965d27e139188283cd13e06a799" +dependencies = [ + "memchr", +] + [[package]] name = "parity-scale-codec" version = "2.2.0" @@ -2383,9 +2434,9 @@ checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" [[package]] name = "proc-macro2" -version = "1.0.27" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038" +checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43" dependencies = [ "unicode-xid", ] @@ -2639,7 +2690,9 @@ name = "regnbue-bridge" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "chrono", + "clap", "config", "const_format", "crossbeam-channel", @@ -3363,6 +3416,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "subtle" version = "2.4.0" @@ -3371,9 +3430,9 @@ checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2" [[package]] name = "syn" -version = "1.0.73" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f71489ff30030d2ae598524f61326b902466f72a0fb1a8564c001cc63425bcc7" +checksum = "f2afee18b8beb5a596ecb4a2dce128c719b4ba399d34126b9e4396e3f9860966" dependencies = [ "proc-macro2", "quote", @@ -3400,6 +3459,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80" +dependencies = [ + "unicode-width", +] + [[package]] name = "thiserror" version = "1.0.29" @@ -3844,6 +3921,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.5" @@ -3868,6 +3954,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796" +[[package]] +name = "unicode-width" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" + [[package]] name = "unicode-xid" version = "0.2.2" @@ -4084,6 +4176,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index eadc362..574b147 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ config_rs = { package = "config", version = "0.10.1" } const_format = "0.2.15" crossbeam-channel = "0.5.1" ctrlc = { version = "3.1", features = [ "termination" ] } +clap = "=3.0.0-beta.5" #todo: need to update the dependency after official released dotenv = "0.15.0" ethers = { git = "https://github.com/gakonst/ethers-rs" } fluidex-common = { git = "https://github.com/fluidex/common-rs", branch = "master", features = [ "kafka", "non-blocking-tracing", "rollup-state-db" ] } @@ -26,6 +27,7 @@ serde_json = "1.0.64" sqlx = { version = "0.5.1", features = [ "runtime-tokio-rustls", "postgres", "chrono", "decimal", "json", "migrate" ] } tokio = { version = "1.0", features = [ "full" ] } tonic = "0.5.2" +async-trait = "0.1.52" [build-dependencies] prost = "0.7.0" @@ -33,6 +35,7 @@ tonic-build = "0.4.0" [features] default = ["ganache"] +windows_build = [ "fluidex-common/rdkafka-dynamic" ] ganache = [] [[bin]] diff --git a/src/bin/block_submitter.rs b/src/bin/block_submitter.rs index fb02ab9..eeceb15 100644 --- a/src/bin/block_submitter.rs +++ b/src/bin/block_submitter.rs @@ -1,8 +1,32 @@ +use clap::Parser; use fluidex_common::non_blocking_tracing; use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt}; -use regnbue_bridge::block_submitter::{storage, EthSender, Settings, TaskFetcher}; +use regnbue_bridge::block_submitter::{storage, types, EthSenderConfigure, Settings, TaskFetcher}; use std::cell::RefCell; +#[derive(Parser, Debug)] +#[clap(version = "0.1")] +struct Opts { + #[clap(subcommand)] + command: Option, +} + +#[derive(Parser, Debug)] +enum SubCommand { + /// Verify a block with specified block id + Verify(VerifyBlock), + /// manual submit a block + Manual(ManualSubmit), +} + +#[derive(Parser, Debug)] +struct VerifyBlock { + block_id: i64, +} + +#[derive(Parser, Debug)] +struct ManualSubmit {} + #[tokio::main] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); @@ -15,6 +39,37 @@ async fn main() -> anyhow::Result<()> { let settings: Settings = conf.try_into().unwrap(); log::debug!("{:?}", settings); + let opts: Opts = Opts::parse(); + log::debug!("{:?}", opts); + + // TODO: maybe separate and have: 1. consumer 2. producer 3. sender + let dbpool = storage::from_config(&settings).await?; + //let eth_sender = eth + let client = EthSenderConfigure::from_config(&settings).await?.build(dbpool.clone()); + + // one-block mode + if let Some(sub_cmd) = opts.command { + match sub_cmd { + SubCommand::Verify(opts) => { + let block_id = opts.block_id; + let block = types::SubmitBlockArgs::fetch_by_blockid(block_id, &dbpool).await?; + let ret = client + .verify_block(block.ok_or_else(|| anyhow::anyhow!("block {} not existed", block_id))?) + .await?; + println!("verify block {} result: {}", block_id, ret); + } + SubCommand::Manual(_) => { + let block = types::SubmitBlockArgs::fetch_latest(None, &dbpool).await?; + let block = block.ok_or_else(|| anyhow::anyhow!("no pending block for submitting"))?; + client.submit_block(block).await?; + } + }; + + return Ok(()); + } + + // continuous mode + // handle ctrl+c let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256); { @@ -26,13 +81,10 @@ async fn main() -> anyhow::Result<()> { .expect("Error setting Ctrl-C handler"); } - // TODO: maybe separate and have: 1. consumer 2. producer 3. sender - let dbpool = storage::from_config(&settings).await?; let (tx, rx) = crossbeam_channel::unbounded(); let mut fetcher = TaskFetcher::from_config_with_pool(&settings, dbpool.clone()); let fetcher_task_handle = tokio::spawn(async move { fetcher.run(tx).await }); - let eth_sender = EthSender::from_config_with_pool(&settings, dbpool).await?; - let eth_sender_task_handle = tokio::spawn(async move { eth_sender.run(rx).await }); + let eth_sender_task_handle = tokio::spawn(async move { client.run(rx).await }); tokio::select! { _ = async { fetcher_task_handle.await } => { diff --git a/src/block_submitter/config.rs b/src/block_submitter/config.rs index bf5b360..86020e3 100644 --- a/src/block_submitter/config.rs +++ b/src/block_submitter/config.rs @@ -7,7 +7,7 @@ pub struct Settings { pub contract_abi_file_path: String, pub confirmations: usize, // TODO: default pub web3_url: String, - pub keystore: String, - pub password: String, - pub chain_id: u64, + pub keystore: Option, + pub password: Option, + pub chain_id: Option, } diff --git a/src/block_submitter/eth_sender/mod.rs b/src/block_submitter/eth_sender/mod.rs index acb82ea..ab211fa 100644 --- a/src/block_submitter/eth_sender/mod.rs +++ b/src/block_submitter/eth_sender/mod.rs @@ -2,91 +2,170 @@ use super::types::{ContractCall, SubmitBlockArgs}; use crate::block_submitter::Settings; use crate::contracts; use crate::storage::PoolType; +use async_trait::async_trait; use crossbeam_channel::Receiver; use ethers::abi::Abi; use ethers::prelude::*; use ethers::types::H256; use fluidex_common::db::models; use std::convert::TryFrom; +use std::{boxed, pin}; type SignedClient = SignerMiddleware, LocalWallet>; +#[async_trait] +pub trait EthSend: Sync + Send { + async fn verify_block(&self, args: SubmitBlockArgs) -> Result; + async fn submit_block(&self, args: SubmitBlockArgs) -> Result<(), anyhow::Error>; + async fn run(&self, rx: Receiver); +} + #[derive(Debug)] -pub struct EthSender { - connpool: PoolType, - client: SignedClient, - account: Address, - contract: Contract, +pub struct EthSenderConfigure { + abi: Abi, + base_cli: Provider, + address: Address, confirmations: usize, + account: Address, + wallet: Option, } -impl EthSender { - pub async fn from_config_with_pool(config: &Settings, connpool: PoolType) -> Result { +impl EthSenderConfigure { + pub async fn from_config(config: &Settings) -> Result { let address = config.contract_address.parse::
()?; let abi: Abi = contracts::get_abi(&config.contract_abi_file_path)?; + let base_cli = Provider::::try_from(config.web3_url.as_str())?; - let client = Provider::::try_from(config.web3_url.as_str())?; - let wallet = LocalWallet::decrypt_keystore(config.keystore.as_str(), config.password.as_str())?.with_chain_id(config.chain_id); - let account = wallet.address(); - let client = SignerMiddleware::new(client, wallet); + let wallet = if let Some(keystore) = &config.keystore { + let psw = config.password.as_ref().map_or_else(Default::default, Clone::clone); + let wallet = LocalWallet::decrypt_keystore(keystore.as_str(), &psw)?.with_chain_id(config.chain_id.unwrap_or(5u64)); //use goerli's chain as default + Some(wallet) + } else { + None + }; - let contract = Contract::new(address, abi, client.clone()); + let account = if let Some(wallet) = &wallet { + wallet.address() + } else { + base_cli.get_accounts().await?[0] + }; Ok(Self { - connpool, - client, + abi, + address, + base_cli, + wallet, account, - contract, confirmations: config.confirmations, }) } - pub async fn run(&self, rx: Receiver) { + fn build_with_keystore(self, connpool: PoolType) -> EthSender { + let wallet = self.wallet.unwrap(); + let client = SignerMiddleware::new(self.base_cli, wallet); + + EthSender { + connpool, + account: self.account, + contract: Contract::new(self.address, self.abi, client), + confirmations: self.confirmations, + } + } + + fn build_with_account(self, connpool: PoolType) -> EthSender> { + EthSender { + connpool, + account: self.account, + contract: Contract::new(self.address, self.abi, self.base_cli), + confirmations: self.confirmations, + } + } + + pub fn build(self, connpool: PoolType) -> pin::Pin> { + if self.wallet.is_some() { + boxed::Box::pin(self.build_with_keystore(connpool)) + } else { + boxed::Box::pin(self.build_with_account(connpool)) + } + } +} + +#[derive(Debug)] +pub struct EthSender { + connpool: PoolType, + //client: SignedClient, + contract: Contract, + account: Address, + confirmations: usize, +} + +#[async_trait] +impl EthSend for EthSender { + async fn run(&self, rx: Receiver) { for call in rx.iter() { log::debug!("{:?}", call); - if let Err(e) = self.run_inner(call).await { + let ret = match call { + ContractCall::SubmitBlock(args) => self.submit_block(args).await, + }; + if let Err(e) = ret { log::error!("{:?}", e); }; } } - async fn run_inner(&self, call: ContractCall) -> Result<(), anyhow::Error> { - match call { - ContractCall::SubmitBlock(args) => { - let tx_hash = match self.submit_block(args.clone()).await? { - // https://stackoverflow.com/questions/57350082/to-convert-a-ethereum-typesh256-to-string-in-rust - Some(h) => format!("{:#x}", h), - None => "".to_string(), - }; - - let stmt = format!( - "update {} set status = $1, l1_tx_hash = $2 where block_id = $3", - models::tablenames::L2_BLOCK - ); - sqlx::query(&stmt) - .bind(models::l2_block::BlockStatus::Verified) - .bind(tx_hash) - .bind(args.block_id.as_u64() as i64) - .execute(&self.connpool) - .await?; - } - }; + async fn verify_block(&self, args: SubmitBlockArgs) -> Result { + // println!("block aux {:02x?}", args.deposit_aux); + let ret = self + .contract + .method::<_, u64>( + "verifyBlock", + (args.public_inputs, args.serialized_proof, args.public_data, args.deposit_aux), + )? + .call() + .await + .map_err(|e| anyhow::anyhow!("verify fail: {}", e))?; - Ok(()) + Ok(ret) } - pub async fn submit_block(&self, args: SubmitBlockArgs) -> Result, anyhow::Error> { + async fn submit_block(&self, args: SubmitBlockArgs) -> Result<(), anyhow::Error> { let call = self .contract - .method::<_, H256>("submitBlock", (args.block_id, args.public_inputs, args.serialized_proof)) - .unwrap() + .method::<_, H256>( + "submitBlock", + ( + args.block_id, + args.public_inputs, + args.serialized_proof, + args.public_data, + args.deposit_aux, + ), + )? .from(self.account); // ganache does not support EIP-1559 #[cfg(feature = "ganache")] let call = call.legacy(); - let pending_tx = call.send().await?; + let pending_tx = call.send().await.map_err(|e| anyhow::anyhow!("submit fail: {}", e))?; let receipt = pending_tx.confirmations(self.confirmations).await?; log::info!("block {:?} confirmed. receipt: {:?}.", args.block_id, receipt); - Ok(receipt.map(|r| r.transaction_hash)) + + // https://stackoverflow.com/questions/57350082/to-convert-a-ethereum-typesh256-to-string-in-rust + let tx_hash_str = match receipt { + Some(receipt) => format!("{:#x}", receipt.transaction_hash), + None => String::default(), + }; + + let stmt = format!( + "update {} set status = $1, l1_tx_hash = $2 where block_id = $3", + models::tablenames::L2_BLOCK + ); + sqlx::query(&stmt) + .bind(models::l2_block::BlockStatus::Verified) + .bind(tx_hash_str) + .bind(args.block_id.as_u64() as i64) + .execute(&self.connpool) + .await?; + + Ok(()) } } diff --git a/src/block_submitter/mod.rs b/src/block_submitter/mod.rs index 1a1a113..dfa1372 100644 --- a/src/block_submitter/mod.rs +++ b/src/block_submitter/mod.rs @@ -5,5 +5,5 @@ pub mod task_fetcher; pub mod types; pub use config::Settings; -pub use eth_sender::EthSender; +pub use eth_sender::{EthSender, EthSenderConfigure}; pub use task_fetcher::TaskFetcher; diff --git a/src/block_submitter/task_fetcher.rs b/src/block_submitter/task_fetcher.rs index 5698ee2..2cb8bd9 100644 --- a/src/block_submitter/task_fetcher.rs +++ b/src/block_submitter/task_fetcher.rs @@ -1,9 +1,11 @@ use super::types::{ContractCall, SubmitBlockArgs}; use crate::block_submitter::Settings; -use crate::storage::PoolType; +use crate::storage::{DbType, PoolType}; +use anyhow::anyhow; use crossbeam_channel::Sender; use ethers::types::U256; use fluidex_common::db::models; +use serde::Deserialize; use std::time::Duration; #[derive(Debug)] @@ -12,41 +14,97 @@ pub struct TaskFetcher { last_block_id: Option, } -impl TaskFetcher { - pub fn from_config_with_pool(_config: &Settings, connpool: PoolType) -> Self { - Self { - connpool, - last_block_id: None, - } - } +#[derive(Deserialize, Debug, Clone)] +pub struct L2PubDataAux { + #[serde(rename = "deposit")] + pub deposit_txs_pos: Vec, +} - pub async fn run(&mut self, tx: Sender) { - let mut timer = tokio::time::interval(Duration::from_secs(1)); - loop { - timer.tick().await; - log::debug!("ticktock!"); +#[derive(sqlx::FromRow, Debug, Clone)] +struct Task { + block_id: i64, + public_input: Vec, + proof: Vec, + public_data: Vec, + aux_data: Option, +} - if let Err(e) = self.run_inner(&tx).await { - log::error!("{}", e); - }; - } +impl TryFrom for SubmitBlockArgs { + type Error = anyhow::Error; + + fn try_from(t: Task) -> Result { + let public_inputs: Vec = serde_json::de::from_slice(&t.public_input)?; + let serialized_proof: Vec = serde_json::de::from_slice(&t.proof)?; + let block_id = U256::from(t.block_id); + let public_data = t.public_data; + let deposit_aux: Vec = match t.aux_data { + Some(val) => { + //encode deposit position array into compact bytes + let val_arr = val + .get("deposit") + .ok_or_else(|| anyhow!("no deposit field"))? + .as_array() + .ok_or_else(|| anyhow!("malform in deposit {}", val))?; + let mut ret_arr = Vec::new(); + for i in val_arr { + let ni = i.as_u64().ok_or_else(|| anyhow!("malform in deposit arr {}", i))? as u16; + ret_arr.append(&mut Vec::from(ni.to_be_bytes())); + } + ret_arr + } + None => Vec::default(), + }; + + Ok(SubmitBlockArgs { + block_id, + public_inputs, + serialized_proof, + public_data, + deposit_aux, + }) } +} - async fn run_inner(&mut self, tx: &Sender) -> Result<(), anyhow::Error> { - let mut db_tx = self.connpool.begin().await?; +impl SubmitBlockArgs { + pub async fn fetch_by_blockid<'c>( + block_id: i64, + conn: impl sqlx::Executor<'c, Database = DbType>, + ) -> Result, anyhow::Error> { + let query: &'static str = const_format::formatcp!( + r#" + select t.block_id as block_id, + t.public_input as public_input, + t.proof as proof, + l2b.raw_public_data as public_data, + l2b.public_data_aux as aux_data + from {} t + inner join {} l2b + on t.block_id = l2b.block_id + where t.block_id = $1 + limit 1"#, + models::tablenames::TASK, + models::tablenames::L2_BLOCK, + ); - #[derive(sqlx::FromRow, Debug, Clone)] - struct Task { - block_id: i64, - public_input: Vec, - proof: Vec, + let task: Option = sqlx::query_as(query).bind(block_id).fetch_optional(conn).await?; + + match task { + Some(task) => Self::try_from(task).map(Some), + None => Ok(None), } + } + pub async fn fetch_latest<'c>( + start_id: Option, + conn: impl sqlx::Executor<'c, Database = DbType>, + ) -> Result, anyhow::Error> { let query: &'static str = const_format::formatcp!( r#" select t.block_id as block_id, t.public_input as public_input, - t.proof as proof + t.proof as proof, + l2b.raw_public_data as public_data, + l2b.public_data_aux as aux_data from {} t inner join {} l2b on t.block_id = l2b.block_id @@ -54,7 +112,7 @@ impl TaskFetcher { from task where status <> 'proved' order by block_id - limit 1), 0) + limit 1), 9223372036854775807) and t.block_id > $1 and t.status = 'proved' -- defense filter and l2b.status = 'uncommited' @@ -64,20 +122,42 @@ impl TaskFetcher { models::tablenames::L2_BLOCK, ); - let task: Option = sqlx::query_as(query) - .bind(self.last_block_id.unwrap_or(-1)) - .fetch_optional(&mut db_tx) - .await?; - - if let Some(task) = task { - let public_inputs: Vec = serde_json::de::from_slice(&task.public_input)?; - let serialized_proof: Vec = serde_json::de::from_slice(&task.proof)?; - tx.try_send(ContractCall::SubmitBlock(SubmitBlockArgs { - block_id: task.block_id.into(), - public_inputs, - serialized_proof, - }))?; - self.last_block_id = Some(task.block_id); + let task: Option = sqlx::query_as(query).bind(start_id.unwrap_or(-1)).fetch_optional(conn).await?; + + match task { + Some(task) => Self::try_from(task).map(Some), + None => Ok(None), + } + } +} + +impl TaskFetcher { + pub fn from_config_with_pool(_config: &Settings, connpool: PoolType) -> Self { + Self { + connpool, + last_block_id: None, + } + } + + pub async fn run(&mut self, tx: Sender) { + let mut timer = tokio::time::interval(Duration::from_secs(1)); + loop { + timer.tick().await; + log::debug!("ticktock!"); + + if let Err(e) = self.run_inner(&tx).await { + log::error!("{}", e); + }; + } + } + + async fn run_inner(&mut self, tx: &Sender) -> Result<(), anyhow::Error> { + let mut db_tx = self.connpool.begin().await?; + + if let Some(args) = SubmitBlockArgs::fetch_latest(self.last_block_id, &mut db_tx).await? { + let last_id = args.block_id.as_u64() as i64; + tx.try_send(ContractCall::SubmitBlock(args))?; + self.last_block_id = Some(last_id); } db_tx.commit().await?; diff --git a/src/block_submitter/types.rs b/src/block_submitter/types.rs index b3f153a..3f10fe0 100644 --- a/src/block_submitter/types.rs +++ b/src/block_submitter/types.rs @@ -10,4 +10,6 @@ pub struct SubmitBlockArgs { pub block_id: U256, pub public_inputs: Vec, pub serialized_proof: Vec, + pub public_data: Vec, + pub deposit_aux: Vec, }