diff --git a/backend/crates/atlas-common/src/types.rs b/backend/crates/atlas-common/src/types.rs index eac7d8a..67903db 100644 --- a/backend/crates/atlas-common/src/types.rs +++ b/backend/crates/atlas-common/src/types.rs @@ -1,6 +1,6 @@ use bigdecimal::BigDecimal; use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Serializer}; use sqlx::FromRow; /// Block data as stored in the database @@ -40,12 +40,20 @@ pub struct Transaction { pub value: BigDecimal, pub gas_price: BigDecimal, pub gas_used: i64, + #[serde(serialize_with = "serialize_bytes_as_hex")] pub input_data: Vec, pub status: bool, pub contract_created: Option, pub timestamp: i64, } +fn serialize_bytes_as_hex(bytes: &[u8], serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_str(&format!("0x{}", hex::encode(bytes))) +} + /// Address data as stored in the database #[derive(Debug, Clone, Serialize, Deserialize, FromRow)] pub struct Address { @@ -170,6 +178,10 @@ pub struct EventLog { pub data: Vec, pub block_number: i64, pub decoded: Option, + pub decode_status: String, + pub decoded_at: Option>, + pub decode_attempted_at: Option>, + pub decode_source: Option, } /// Known event signature for decoding diff --git a/backend/crates/atlas-server/src/api/handlers/contracts.rs b/backend/crates/atlas-server/src/api/handlers/contracts.rs index 933bfcc..9c2c477 100644 --- a/backend/crates/atlas-server/src/api/handlers/contracts.rs +++ b/backend/crates/atlas-server/src/api/handlers/contracts.rs @@ -20,6 +20,7 @@ use tokio::fs; use crate::api::error::ApiResult; use crate::api::AppState; +use crate::event_log_decode::enqueue_jobs_for_verified_contract; use atlas_common::{AtlasError, FullContractAbi}; // ── Request / Response types ────────────────────────────────────────────────── @@ -220,18 +221,19 @@ pub async fn verify_contract( // Compile the submitted source let compiled_contract = compile_source(&solc_path, &req).await?; - // Strip CBOR metadata from both sides before comparing + // Solc reports immutable offsets against the full deployed bytecode, so + // zero those ranges before stripping the trailing CBOR metadata blob. let deployed_bytes = decode_hex_bytecode(&deployed_hex)?; - let deployed_stripped = strip_metadata(&deployed_bytes); - let compiled_stripped = strip_metadata(&compiled_contract.bytecode); - let deployed_cmp = normalize_bytecode_for_comparison( - deployed_stripped, + let deployed_normalized = normalize_bytecode_for_comparison( + &deployed_bytes, &compiled_contract.immutable_references, )?; - let compiled_cmp = normalize_bytecode_for_comparison( - compiled_stripped, + let compiled_normalized = normalize_bytecode_for_comparison( + &compiled_contract.bytecode, &compiled_contract.immutable_references, )?; + let deployed_cmp = strip_metadata(&deployed_normalized).to_vec(); + let compiled_cmp = strip_metadata(&compiled_normalized).to_vec(); // eth_getCode returns deployed runtime bytecode, so constructor args are not // part of the bytecode comparison. We still parse and persist them as metadata. @@ -282,6 +284,8 @@ pub async fn verify_contract( return Err(AtlasError::Verification(format!("{address} is already verified")).into()); } + enqueue_jobs_for_verified_contract(&state.pool, &address).await?; + Ok(( StatusCode::OK, Json(VerifyResponse { @@ -1093,6 +1097,24 @@ mod tests { assert!(matches!(err, AtlasError::Compilation(_))); } + #[test] + fn normalize_then_strip_metadata_preserves_immutable_offsets() { + let bytecode = vec![ + 0xaa, 0xbb, 0x11, 0x22, 0xcc, 0xdd, 0x01, 0x02, 0x03, 0x00, 0x03, + ]; + let normalized = normalize_bytecode_for_comparison( + &bytecode, + &[ImmutableReference { + start: 2, + length: 2, + }], + ) + .unwrap(); + let stripped = strip_metadata(&normalized); + + assert_eq!(stripped, &[0xaa, 0xbb, 0x00, 0x00, 0xcc, 0xdd]); + } + #[test] fn extract_immutable_references_parses_multiple_entries() { let refs = extract_immutable_references(&serde_json::json!({ diff --git a/backend/crates/atlas-server/src/api/handlers/logs.rs b/backend/crates/atlas-server/src/api/handlers/logs.rs index 81a386c..f5b349c 100644 --- a/backend/crates/atlas-server/src/api/handlers/logs.rs +++ b/backend/crates/atlas-server/src/api/handlers/logs.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use crate::api::error::ApiResult; use crate::api::AppState; +use crate::event_log_decode::EventLogApiResponse; use atlas_common::{EventLog, PaginatedResponse, Pagination}; /// Pagination for transaction log endpoints. @@ -61,7 +62,7 @@ pub async fn get_transaction_logs( State(state): State>, Path(hash): Path, Query(query): Query, -) -> ApiResult>> { +) -> ApiResult>> { let hash = normalize_hash(&hash); let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM event_logs WHERE tx_hash = $1") @@ -70,7 +71,8 @@ pub async fn get_transaction_logs( .await?; let logs: Vec = sqlx::query_as( - "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data, block_number, decoded + "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data, block_number, + decoded, decode_status, decoded_at, decode_attempted_at, decode_source FROM event_logs WHERE tx_hash = $1 ORDER BY log_index ASC @@ -83,7 +85,7 @@ pub async fn get_transaction_logs( .await?; Ok(Json(PaginatedResponse::new( - logs, + logs.iter().map(EventLogApiResponse::from).collect(), query.page, query.clamped_limit(), total.0, @@ -95,7 +97,7 @@ pub async fn get_address_logs( State(state): State>, Path(address): Path, Query(query): Query, -) -> ApiResult>> { +) -> ApiResult>> { let address = normalize_address(&address); let (total, logs) = if let Some(topic0) = &query.topic0 { @@ -109,7 +111,8 @@ pub async fn get_address_logs( .await?; let logs: Vec = sqlx::query_as( - "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data, block_number, decoded + "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data, block_number, + decoded, decode_status, decoded_at, decode_attempted_at, decode_source FROM event_logs WHERE address = $1 AND topic0 = $2 ORDER BY block_number DESC, log_index DESC @@ -130,7 +133,8 @@ pub async fn get_address_logs( .await?; let logs: Vec = sqlx::query_as( - "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data, block_number, decoded + "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data, block_number, + decoded, decode_status, decoded_at, decode_attempted_at, decode_source FROM event_logs WHERE address = $1 ORDER BY block_number DESC, log_index DESC @@ -146,85 +150,20 @@ pub async fn get_address_logs( }; Ok(Json(PaginatedResponse::new( - logs, + logs.iter().map(EventLogApiResponse::from).collect(), query.pagination.page, query.clamped_limit(), total, ))) } -/// Enriched log with event name -#[derive(Debug, Clone, serde::Serialize)] -pub struct EnrichedEventLog { - #[serde(flatten)] - pub log: EventLog, - pub event_name: Option, - pub event_signature: Option, -} - /// GET /api/transactions/:hash/logs/decoded - Get decoded logs for a transaction pub async fn get_transaction_logs_decoded( State(state): State>, Path(hash): Path, Query(query): Query, -) -> ApiResult>> { - let hash = normalize_hash(&hash); - - let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM event_logs WHERE tx_hash = $1") - .bind(&hash) - .fetch_one(&state.pool) - .await?; - - let logs: Vec = sqlx::query_as( - "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, data, block_number, decoded - FROM event_logs - WHERE tx_hash = $1 - ORDER BY log_index ASC - LIMIT $2 OFFSET $3", - ) - .bind(&hash) - .bind(query.limit()) - .bind(query.offset()) - .fetch_all(&state.pool) - .await?; - - // Collect unique topic0 values for signature lookup - let topic0s: Vec = logs.iter().map(|l| l.topic0.clone()).collect(); - - // Fetch known event signatures - let signatures: Vec<(String, String, String)> = sqlx::query_as( - "SELECT signature, name, full_signature FROM event_signatures WHERE signature = ANY($1)", - ) - .bind(&topic0s) - .fetch_all(&state.pool) - .await?; - - let sig_map: std::collections::HashMap = signatures - .into_iter() - .map(|(sig, name, full)| (sig.to_lowercase(), (name, full))) - .collect(); - - let enriched: Vec = logs - .into_iter() - .map(|log| { - let (event_name, event_signature) = sig_map - .get(&log.topic0.to_lowercase()) - .map(|(n, s)| (Some(n.clone()), Some(s.clone()))) - .unwrap_or((None, None)); - EnrichedEventLog { - log, - event_name, - event_signature, - } - }) - .collect(); - - Ok(Json(PaginatedResponse::new( - enriched, - query.page, - query.clamped_limit(), - total.0, - ))) +) -> ApiResult>> { + get_transaction_logs(State(state), Path(hash), Query(query)).await } fn default_page() -> u32 { diff --git a/backend/crates/atlas-server/src/api/handlers/proxy.rs b/backend/crates/atlas-server/src/api/handlers/proxy.rs index 6d6a308..f1933a3 100644 --- a/backend/crates/atlas-server/src/api/handlers/proxy.rs +++ b/backend/crates/atlas-server/src/api/handlers/proxy.rs @@ -12,138 +12,10 @@ use std::sync::Arc; use crate::api::error::ApiResult; use crate::api::AppState; -use atlas_common::{AtlasError, ContractAbi, ProxyContract}; - -// EIP-1967 implementation slot: keccak256("eip1967.proxy.implementation") - 1 -const EIP1967_IMPL_SLOT: &str = - "0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc"; -// EIP-1822 (UUPS) implementation slot: keccak256("PROXIABLE") -const EIP1822_IMPL_SLOT: &str = - "0xc5f16f0fcc639fa48a6947836d9850f504798523bf8c9a3a87d5876cf622bcf7"; - -/// Try to read a storage slot via eth_getStorageAt and return a non-zero address if found. -async fn read_address_slot( - rpc_url: &str, - address: &str, - slot: &str, -) -> Result, AtlasError> { - let body = serde_json::json!({ - "jsonrpc": "2.0", - "method": "eth_getStorageAt", - "params": [address, slot, "latest"], - "id": 1 - }); - - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(5)) - .build() - .map_err(|e| AtlasError::Internal(e.to_string()))?; - - let resp: serde_json::Value = client - .post(rpc_url) - .json(&body) - .send() - .await - .map_err(|e| AtlasError::Rpc(format!("eth_getStorageAt failed: {e}")))? - .json() - .await - .map_err(|e| AtlasError::Rpc(format!("failed to parse eth_getStorageAt response: {e}")))?; - - let raw = resp.get("result").and_then(|r| r.as_str()).unwrap_or("0x"); - - // Result is 32 bytes; address occupies the last 20 bytes (40 hex chars). - let hex = raw.trim_start_matches("0x"); - if hex.len() < 40 || hex.chars().all(|c| c == '0') { - return Ok(None); - } - let addr = format!("0x{}", &hex[hex.len() - 40..]).to_lowercase(); - if addr == "0x0000000000000000000000000000000000000000" { - return Ok(None); - } - Ok(Some(addr)) -} - -/// Detect a proxy pattern for `address` via RPC and, if found, persist it in `proxy_contracts`. -/// Returns the cached or newly detected `ProxyContract`, or `None` if not a proxy. -async fn resolve_proxy( - state: &AppState, - address: &str, -) -> Result, AtlasError> { - // 1. Check DB cache first. - let cached: Option = sqlx::query_as( - "SELECT proxy_address, implementation_address, proxy_type, admin_address, - detected_at_block, last_checked_block, updated_at - FROM proxy_contracts WHERE proxy_address = $1", - ) - .bind(address) - .fetch_optional(&state.pool) - .await?; - - if let Some(mut cached_proxy) = cached { - // Re-read the implementation slot to detect upgrades. - let current_impl = match cached_proxy.proxy_type.as_str() { - "eip1967" => read_address_slot(&state.rpc_url, address, EIP1967_IMPL_SLOT).await?, - "eip1822" => read_address_slot(&state.rpc_url, address, EIP1822_IMPL_SLOT).await?, - _ => None, - }; - - if let Some(current_addr) = current_impl { - if current_addr != cached_proxy.implementation_address { - sqlx::query( - "UPDATE proxy_contracts SET implementation_address = $1, updated_at = NOW() - WHERE proxy_address = $2", - ) - .bind(¤t_addr) - .bind(address) - .execute(&state.pool) - .await?; - cached_proxy.implementation_address = current_addr; - } - } - - return Ok(Some(cached_proxy)); - } - - // 2. Not in DB — try RPC detection. - let detected = if let Some(impl_addr) = - read_address_slot(&state.rpc_url, address, EIP1967_IMPL_SLOT).await? - { - Some((impl_addr, "eip1967")) - } else { - read_address_slot(&state.rpc_url, address, EIP1822_IMPL_SLOT) - .await? - .map(|impl_addr| (impl_addr, "eip1822")) - }; - - let Some((impl_addr, proxy_type)) = detected else { - return Ok(None); - }; - - // 3. Persist so future requests hit the DB cache. - sqlx::query( - "INSERT INTO proxy_contracts - (proxy_address, implementation_address, proxy_type, detected_at_block, last_checked_block) - VALUES ($1, $2, $3, 0, 0) - ON CONFLICT (proxy_address) DO NOTHING", - ) - .bind(address) - .bind(&impl_addr) - .bind(proxy_type) - .execute(&state.pool) - .await?; - - // 4. Re-fetch so the returned struct has the real DB timestamps. - let proxy: Option = sqlx::query_as( - "SELECT proxy_address, implementation_address, proxy_type, admin_address, - detected_at_block, last_checked_block, updated_at - FROM proxy_contracts WHERE proxy_address = $1", - ) - .bind(address) - .fetch_optional(&state.pool) - .await?; - - Ok(proxy) -} +use crate::contract_abi::{ + load_combined_abi, load_contract_abi, merge_abis, proxies_using_implementation, resolve_proxy, +}; +use atlas_common::{ContractAbi, ProxyContract}; /// GET /api/contracts/:address/proxy - Get proxy information for a contract pub async fn get_proxy_info( @@ -152,19 +24,10 @@ pub async fn get_proxy_info( ) -> ApiResult> { let address = normalize_address(&address); - let proxy = resolve_proxy(&state, &address).await?; + let proxy = resolve_proxy(&state.pool, &state.rpc_url, &address).await?; // Check if this address is an implementation for any proxies - let proxies_using_this: Vec = sqlx::query_as( - "SELECT proxy_address, implementation_address, proxy_type, admin_address, detected_at_block, last_checked_block, updated_at - FROM proxy_contracts - WHERE implementation_address = $1", - ) - .bind(&address) - .fetch_optional(&state.pool) - .await? - .map(|p| vec![p]) - .unwrap_or_default(); + let proxies_using_this = proxies_using_implementation(&state.pool, &address).await?; if proxy.is_none() && proxies_using_this.is_empty() { return Ok(Json(ProxyInfoResponse { @@ -178,14 +41,7 @@ pub async fn get_proxy_info( // Get implementation ABI if this is a proxy let implementation_abi = if let Some(ref p) = proxy { - sqlx::query_as::<_, ContractAbi>( - "SELECT address, abi, source_code, compiler_version, optimization_used, runs, verified_at - FROM contract_abis - WHERE address = $1", - ) - .bind(&p.implementation_address) - .fetch_optional(&state.pool) - .await? + load_contract_abi(&state.pool, &p.implementation_address).await? } else { None }; @@ -217,28 +73,14 @@ pub async fn get_combined_abi( let address = normalize_address(&address); // Resolve proxy (DB cache → RPC detection) - let proxy = resolve_proxy(&state, &address).await?; + let proxy = resolve_proxy(&state.pool, &state.rpc_url, &address).await?; // Get proxy ABI - let proxy_abi: Option = sqlx::query_as( - "SELECT address, abi, source_code, compiler_version, optimization_used, runs, verified_at - FROM contract_abis - WHERE address = $1", - ) - .bind(&address) - .fetch_optional(&state.pool) - .await?; + let proxy_abi = load_contract_abi(&state.pool, &address).await?; if let Some(proxy_info) = proxy { // Get implementation ABI - let impl_abi: Option = sqlx::query_as( - "SELECT address, abi, source_code, compiler_version, optimization_used, runs, verified_at - FROM contract_abis - WHERE address = $1", - ) - .bind(&proxy_info.implementation_address) - .fetch_optional(&state.pool) - .await?; + let impl_abi = load_contract_abi(&state.pool, &proxy_info.implementation_address).await?; // Merge ABIs let combined = merge_abis( @@ -257,12 +99,15 @@ pub async fn get_combined_abi( })) } else { // Not a proxy, just return the contract's ABI + let combined = load_combined_abi(&state.pool, &state.rpc_url, &address) + .await? + .map(|resolved| resolved.abi); Ok(Json(CombinedAbiResponse { is_proxy: false, proxy_address: address, implementation_address: None, proxy_type: None, - combined_abi: proxy_abi.as_ref().map(|a| a.abi.clone()), + combined_abi: combined, proxy_abi: None, implementation_abi: proxy_abi.map(|a| a.abi), })) @@ -281,49 +126,6 @@ pub struct CombinedAbiResponse { pub implementation_abi: Option, } -/// Merge proxy and implementation ABIs -fn merge_abis( - proxy_abi: Option<&serde_json::Value>, - impl_abi: Option<&serde_json::Value>, -) -> Option { - match (proxy_abi, impl_abi) { - (Some(proxy), Some(implementation)) => { - // Both ABIs exist - merge them - let mut merged = Vec::new(); - - // Add all implementation functions/events (these are the main ones) - if let Some(impl_arr) = implementation.as_array() { - merged.extend(impl_arr.clone()); - } - - // Add proxy-specific functions that aren't in implementation - // (like upgradeTo, admin, etc.) - if let Some(proxy_arr) = proxy.as_array() { - let impl_names: std::collections::HashSet = merged - .iter() - .filter_map(|item| item.get("name").and_then(|n| n.as_str())) - .map(String::from) - .collect(); - - for item in proxy_arr { - if let Some(name) = item.get("name").and_then(|n| n.as_str()) { - if !impl_names.contains(name) { - merged.push(item.clone()); - } - } else { - // Include items without names (like fallback, receive) - merged.push(item.clone()); - } - } - } - - Some(serde_json::Value::Array(merged)) - } - (Some(abi), None) | (None, Some(abi)) => Some(abi.clone()), - (None, None) => None, - } -} - /// GET /api/proxies - List all known proxy contracts pub async fn list_proxies( State(state): State>, diff --git a/backend/crates/atlas-server/src/contract_abi.rs b/backend/crates/atlas-server/src/contract_abi.rs new file mode 100644 index 0000000..7a3f305 --- /dev/null +++ b/backend/crates/atlas-server/src/contract_abi.rs @@ -0,0 +1,230 @@ +use atlas_common::{AtlasError, ContractAbi, ProxyContract}; +use sqlx::PgPool; + +pub const DIRECT_ABI_SOURCE: &str = "direct_abi"; +pub const PROXY_COMBINED_ABI_SOURCE: &str = "proxy_combined_abi"; + +// EIP-1967 implementation slot: keccak256("eip1967.proxy.implementation") - 1 +const EIP1967_IMPL_SLOT: &str = + "0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc"; +// EIP-1822 (UUPS) implementation slot: keccak256("PROXIABLE") +const EIP1822_IMPL_SLOT: &str = + "0xc5f16f0fcc639fa48a6947836d9850f504798523bf8c9a3a87d5876cf622bcf7"; + +#[derive(Debug, Clone)] +pub struct ResolvedContractAbi { + pub abi: serde_json::Value, + pub source: &'static str, +} + +/// Try to read a storage slot via eth_getStorageAt and return a non-zero address if found. +async fn read_address_slot( + rpc_url: &str, + address: &str, + slot: &str, +) -> Result, AtlasError> { + let body = serde_json::json!({ + "jsonrpc": "2.0", + "method": "eth_getStorageAt", + "params": [address, slot, "latest"], + "id": 1 + }); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(5)) + .build() + .map_err(|e| AtlasError::Internal(e.to_string()))?; + + let resp: serde_json::Value = client + .post(rpc_url) + .json(&body) + .send() + .await + .map_err(|e| AtlasError::Rpc(format!("eth_getStorageAt failed: {e}")))? + .json() + .await + .map_err(|e| AtlasError::Rpc(format!("failed to parse eth_getStorageAt response: {e}")))?; + + let raw = resp.get("result").and_then(|r| r.as_str()).unwrap_or("0x"); + + // Result is 32 bytes; address occupies the last 20 bytes (40 hex chars). + let hex = raw.trim_start_matches("0x"); + if hex.len() < 40 || hex.chars().all(|c| c == '0') { + return Ok(None); + } + let addr = format!("0x{}", &hex[hex.len() - 40..]).to_lowercase(); + if addr == "0x0000000000000000000000000000000000000000" { + return Ok(None); + } + Ok(Some(addr)) +} + +/// Detect a proxy pattern for `address` via RPC and, if found, persist it in `proxy_contracts`. +/// Returns the cached or newly detected `ProxyContract`, or `None` if not a proxy. +pub async fn resolve_proxy( + pool: &PgPool, + rpc_url: &str, + address: &str, +) -> Result, AtlasError> { + // 1. Check DB cache first. + let cached: Option = sqlx::query_as( + "SELECT proxy_address, implementation_address, proxy_type, admin_address, + detected_at_block, last_checked_block, updated_at + FROM proxy_contracts WHERE proxy_address = $1", + ) + .bind(address) + .fetch_optional(pool) + .await?; + + if let Some(mut cached_proxy) = cached { + // Re-read the implementation slot to detect upgrades. + let current_impl = match cached_proxy.proxy_type.as_str() { + "eip1967" => read_address_slot(rpc_url, address, EIP1967_IMPL_SLOT).await?, + "eip1822" => read_address_slot(rpc_url, address, EIP1822_IMPL_SLOT).await?, + _ => None, + }; + + if let Some(current_addr) = current_impl { + if current_addr != cached_proxy.implementation_address { + sqlx::query( + "UPDATE proxy_contracts SET implementation_address = $1, updated_at = NOW() + WHERE proxy_address = $2", + ) + .bind(¤t_addr) + .bind(address) + .execute(pool) + .await?; + cached_proxy.implementation_address = current_addr; + } + } + + return Ok(Some(cached_proxy)); + } + + // 2. Not in DB — try RPC detection. + let detected = if let Some(implementation_address) = + read_address_slot(rpc_url, address, EIP1967_IMPL_SLOT).await? + { + Some((implementation_address, "eip1967")) + } else { + read_address_slot(rpc_url, address, EIP1822_IMPL_SLOT) + .await? + .map(|implementation_address| (implementation_address, "eip1822")) + }; + + let Some((implementation_address, proxy_type)) = detected else { + return Ok(None); + }; + + // 3. Persist so future requests hit the DB cache. + sqlx::query( + "INSERT INTO proxy_contracts + (proxy_address, implementation_address, proxy_type, detected_at_block, last_checked_block) + VALUES ($1, $2, $3, 0, 0) + ON CONFLICT (proxy_address) DO NOTHING", + ) + .bind(address) + .bind(&implementation_address) + .bind(proxy_type) + .execute(pool) + .await?; + + // 4. Re-fetch so the returned struct has the real DB timestamps. + let proxy: Option = sqlx::query_as( + "SELECT proxy_address, implementation_address, proxy_type, admin_address, + detected_at_block, last_checked_block, updated_at + FROM proxy_contracts WHERE proxy_address = $1", + ) + .bind(address) + .fetch_optional(pool) + .await?; + + Ok(proxy) +} + +pub fn merge_abis( + proxy_abi: Option<&serde_json::Value>, + implementation_abi: Option<&serde_json::Value>, +) -> Option { + match (proxy_abi, implementation_abi) { + (Some(proxy), Some(implementation)) => { + let mut merged = Vec::new(); + if let Some(implementation_items) = implementation.as_array() { + merged.extend(implementation_items.clone()); + } + if let Some(proxy_items) = proxy.as_array() { + merged.extend(proxy_items.clone()); + } + + let merged_value = serde_json::Value::Array(merged); + match serde_json::from_value::(merged_value.clone()) { + Ok(mut abi) => { + abi.dedup(); + serde_json::to_value(abi).ok().or(Some(merged_value)) + } + Err(_) => Some(merged_value), + } + } + (Some(abi), None) | (None, Some(abi)) => Some(abi.clone()), + (None, None) => None, + } +} + +pub async fn load_contract_abi( + pool: &PgPool, + address: &str, +) -> Result, AtlasError> { + sqlx::query_as( + "SELECT address, abi, source_code, compiler_version, optimization_used, runs, verified_at + FROM contract_abis + WHERE address = $1", + ) + .bind(address) + .fetch_optional(pool) + .await + .map_err(Into::into) +} + +pub async fn load_combined_abi( + pool: &PgPool, + rpc_url: &str, + address: &str, +) -> Result, AtlasError> { + let proxy = resolve_proxy(pool, rpc_url, address).await?; + let direct_abi = load_contract_abi(pool, address).await?; + + if let Some(proxy_info) = proxy { + let implementation_abi = + load_contract_abi(pool, &proxy_info.implementation_address).await?; + let combined = merge_abis( + direct_abi.as_ref().map(|abi| &abi.abi), + implementation_abi.as_ref().map(|abi| &abi.abi), + ); + + Ok(combined.map(|abi| ResolvedContractAbi { + abi, + source: PROXY_COMBINED_ABI_SOURCE, + })) + } else { + Ok(direct_abi.map(|abi| ResolvedContractAbi { + abi: abi.abi, + source: DIRECT_ABI_SOURCE, + })) + } +} + +pub async fn proxies_using_implementation( + pool: &PgPool, + implementation_address: &str, +) -> Result, AtlasError> { + sqlx::query_as( + "SELECT proxy_address, implementation_address, proxy_type, admin_address, + detected_at_block, last_checked_block, updated_at + FROM proxy_contracts + WHERE implementation_address = $1", + ) + .bind(implementation_address) + .fetch_all(pool) + .await + .map_err(Into::into) +} diff --git a/backend/crates/atlas-server/src/event_log_decode.rs b/backend/crates/atlas-server/src/event_log_decode.rs new file mode 100644 index 0000000..d993ac9 --- /dev/null +++ b/backend/crates/atlas-server/src/event_log_decode.rs @@ -0,0 +1,502 @@ +use std::collections::{BTreeSet, HashMap}; +use std::str::FromStr; + +use alloy::dyn_abi::EventExt; +use alloy::json_abi::{Event, JsonAbi}; +use alloy::primitives::B256; +use atlas_common::{AtlasError, EventLog}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; + +use crate::contract_abi::{load_combined_abi, proxies_using_implementation, ResolvedContractAbi}; + +pub const EVENT_LOG_DECODE_PENDING: &str = "pending"; +pub const EVENT_LOG_DECODE_DECODED: &str = "decoded"; +pub const EVENT_LOG_DECODE_NO_ABI: &str = "no_abi"; +pub const EVENT_LOG_DECODE_NO_MATCHING_EVENT: &str = "no_matching_event"; +pub const EVENT_LOG_DECODE_FAILED: &str = "decode_failed"; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DecodedEventParam { + pub name: String, + pub r#type: String, + pub value: String, + pub indexed: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct StoredDecodedEventLog { + pub event_name: String, + pub event_signature: String, + pub decoded_params: Vec, +} + +#[derive(Debug, Clone)] +pub struct EventLogApiResponse { + pub id: i64, + pub tx_hash: String, + pub log_index: i32, + pub address: String, + pub topic0: String, + pub topic1: Option, + pub topic2: Option, + pub topic3: Option, + pub data: String, + pub block_number: i64, + pub decode_status: String, + pub decoded_at: Option>, + pub decode_attempted_at: Option>, + pub decode_source: Option, + pub event_name: Option, + pub event_signature: Option, + pub decoded_params: Option>, +} + +impl serde::Serialize for EventLogApiResponse { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + + let mut state = serializer.serialize_struct("EventLogApiResponse", 17)?; + state.serialize_field("id", &self.id)?; + state.serialize_field("tx_hash", &self.tx_hash)?; + state.serialize_field("log_index", &self.log_index)?; + state.serialize_field("address", &self.address)?; + state.serialize_field("topic0", &self.topic0)?; + state.serialize_field("topic1", &self.topic1)?; + state.serialize_field("topic2", &self.topic2)?; + state.serialize_field("topic3", &self.topic3)?; + state.serialize_field("data", &self.data)?; + state.serialize_field("block_number", &self.block_number)?; + state.serialize_field("decode_status", &self.decode_status)?; + state.serialize_field("decoded_at", &self.decoded_at)?; + state.serialize_field("decode_attempted_at", &self.decode_attempted_at)?; + state.serialize_field("decode_source", &self.decode_source)?; + state.serialize_field("event_name", &self.event_name)?; + state.serialize_field("event_signature", &self.event_signature)?; + state.serialize_field("decoded_params", &self.decoded_params)?; + state.end() + } +} + +impl From<&EventLog> for EventLogApiResponse { + fn from(log: &EventLog) -> Self { + let stored = log + .decoded + .as_ref() + .and_then(|value| serde_json::from_value::(value.clone()).ok()); + + Self { + id: log.id, + tx_hash: log.tx_hash.clone(), + log_index: log.log_index, + address: log.address.clone(), + topic0: log.topic0.clone(), + topic1: log.topic1.clone(), + topic2: log.topic2.clone(), + topic3: log.topic3.clone(), + data: format!("0x{}", hex::encode(&log.data)), + block_number: log.block_number, + decode_status: log.decode_status.clone(), + decoded_at: log.decoded_at, + decode_attempted_at: log.decode_attempted_at, + decode_source: log.decode_source.clone(), + event_name: stored.as_ref().map(|decoded| decoded.event_name.clone()), + event_signature: stored + .as_ref() + .map(|decoded| decoded.event_signature.clone()), + decoded_params: stored.map(|decoded| decoded.decoded_params), + } + } +} + +#[derive(Debug, Clone)] +pub struct DecodeOutcome { + pub decoded: Option, + pub decode_status: String, + pub decode_source: Option, + pub decoded_at: Option>, + pub decode_attempted_at: DateTime, +} + +#[derive(Debug, Clone)] +struct EventCandidate { + event: Event, + full_signature: String, +} + +#[derive(Debug, Clone)] +pub struct EventLogDecoder { + source: &'static str, + selectors: HashMap>, +} + +impl EventLogDecoder { + pub fn from_resolved_abi(resolved: &ResolvedContractAbi) -> Result { + let mut abi: JsonAbi = serde_json::from_value(resolved.abi.clone()).map_err(|e| { + AtlasError::Internal(format!( + "failed to parse contract ABI for event decoding: {e}" + )) + })?; + abi.dedup(); + + let mut selectors: HashMap> = HashMap::new(); + for event in abi.events() { + if event.anonymous { + continue; + } + + let selector = format!("0x{}", hex::encode(event.selector().as_slice())); + selectors.entry(selector).or_default().push(EventCandidate { + event: event.clone(), + full_signature: event.full_signature(), + }); + } + + Ok(Self { + source: resolved.source, + selectors, + }) + } + + pub fn decode_log(&self, log: &EventLog) -> DecodeAttempt { + let Some(candidates) = self.selectors.get(&log.topic0) else { + return DecodeAttempt { + decoded: None, + decode_status: EVENT_LOG_DECODE_NO_MATCHING_EVENT, + decode_source: Some(self.source.to_string()), + }; + }; + + let topics = match log_topics(log) { + Ok(topics) => topics, + Err(_) => { + return DecodeAttempt { + decoded: None, + decode_status: EVENT_LOG_DECODE_FAILED, + decode_source: Some(self.source.to_string()), + } + } + }; + + for candidate in candidates { + match candidate + .event + .decode_log_parts(topics.iter().copied(), &log.data) + { + Ok(decoded_event) => { + let mut decoded_params = Vec::with_capacity(candidate.event.inputs.len()); + let mut indexed_iter = decoded_event.indexed.into_iter(); + let mut body_iter = decoded_event.body.into_iter(); + + for (index, input) in candidate.event.inputs.iter().enumerate() { + let value = if input.indexed { + indexed_iter.next() + } else { + body_iter.next() + }; + + let Some(value) = value else { + return DecodeAttempt { + decoded: None, + decode_status: EVENT_LOG_DECODE_FAILED, + decode_source: Some(self.source.to_string()), + }; + }; + + decoded_params.push(DecodedEventParam { + name: if input.name.is_empty() { + format!("param{index}") + } else { + input.name.clone() + }, + r#type: input.ty.clone(), + value: format_dyn_value(&value), + indexed: input.indexed, + }); + } + + let decoded = StoredDecodedEventLog { + event_name: candidate.event.name.clone(), + event_signature: candidate.full_signature.clone(), + decoded_params, + }; + + return DecodeAttempt { + decoded: Some( + serde_json::to_value(decoded) + .expect("stored decoded event log should serialize"), + ), + decode_status: EVENT_LOG_DECODE_DECODED, + decode_source: Some(self.source.to_string()), + }; + } + Err(_) => continue, + } + } + + DecodeAttempt { + decoded: None, + decode_status: EVENT_LOG_DECODE_FAILED, + decode_source: Some(self.source.to_string()), + } + } +} + +#[derive(Debug, Clone)] +pub struct DecodeAttempt { + pub decoded: Option, + pub decode_status: &'static str, + pub decode_source: Option, +} + +pub fn apply_decode_attempt( + log: &EventLog, + attempt: DecodeAttempt, + attempted_at: DateTime, +) -> DecodeOutcome { + let has_successful_decode = + log.decode_status == EVENT_LOG_DECODE_DECODED && log.decoded.as_ref().is_some(); + + if attempt.decode_status == EVENT_LOG_DECODE_DECODED { + return DecodeOutcome { + decoded: attempt.decoded, + decode_status: EVENT_LOG_DECODE_DECODED.to_string(), + decode_source: attempt.decode_source, + decoded_at: Some(attempted_at), + decode_attempted_at: attempted_at, + }; + } + + if has_successful_decode { + return DecodeOutcome { + decoded: log.decoded.clone(), + decode_status: log.decode_status.clone(), + decode_source: log.decode_source.clone(), + decoded_at: log.decoded_at, + decode_attempted_at: attempted_at, + }; + } + + DecodeOutcome { + decoded: None, + decode_status: attempt.decode_status.to_string(), + decode_source: attempt.decode_source, + decoded_at: None, + decode_attempted_at: attempted_at, + } +} + +pub async fn build_decoder_for_address( + pool: &PgPool, + rpc_url: &str, + address: &str, +) -> Result, AtlasError> { + match load_combined_abi(pool, rpc_url, address).await? { + Some(resolved) => EventLogDecoder::from_resolved_abi(&resolved).map(Some), + None => Ok(None), + } +} + +pub async fn enqueue_decode_jobs( + pool: &PgPool, + addresses: &[String], + full_rescan: bool, +) -> Result<(), AtlasError> { + if addresses.is_empty() { + return Ok(()); + } + + let deduped = dedupe_addresses(addresses); + sqlx::query( + "INSERT INTO event_log_decode_jobs + (address, full_rescan, requested_at, updated_at, error_message) + SELECT address, $2, NOW(), NOW(), NULL + FROM unnest($1::text[]) AS t(address) + ON CONFLICT (address) DO UPDATE SET + full_rescan = event_log_decode_jobs.full_rescan OR EXCLUDED.full_rescan, + requested_at = EXCLUDED.requested_at, + updated_at = EXCLUDED.updated_at, + error_message = NULL", + ) + .bind(&deduped) + .bind(full_rescan) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn enqueue_jobs_for_verified_contract( + pool: &PgPool, + address: &str, +) -> Result<(), AtlasError> { + let mut addresses = vec![address.to_string()]; + for proxy in proxies_using_implementation(pool, address).await? { + addresses.push(proxy.proxy_address); + } + enqueue_decode_jobs(pool, &addresses, true).await +} + +fn dedupe_addresses(addresses: &[String]) -> Vec { + let mut set = BTreeSet::new(); + for address in addresses { + set.insert(address.to_lowercase()); + } + set.into_iter().collect() +} + +fn log_topics(log: &EventLog) -> Result, AtlasError> { + let mut topics = Vec::with_capacity(4); + topics.push(parse_topic(&log.topic0)?); + if let Some(topic) = &log.topic1 { + topics.push(parse_topic(topic)?); + } + if let Some(topic) = &log.topic2 { + topics.push(parse_topic(topic)?); + } + if let Some(topic) = &log.topic3 { + topics.push(parse_topic(topic)?); + } + Ok(topics) +} + +fn parse_topic(topic: &str) -> Result { + B256::from_str(topic) + .map_err(|e| AtlasError::Internal(format!("failed to parse event topic {topic}: {e}"))) +} + +fn format_dyn_value(value: &alloy::dyn_abi::DynSolValue) -> String { + use alloy::dyn_abi::DynSolValue; + + match value { + DynSolValue::Bool(value) => value.to_string(), + DynSolValue::Int(value, _) => value.to_string(), + DynSolValue::Uint(value, _) => value.to_string(), + DynSolValue::FixedBytes(word, size) => { + format!("0x{}", hex::encode(&word.as_slice()[..*size])) + } + DynSolValue::Address(value) => format!("{value:?}").to_lowercase(), + DynSolValue::Function(value) => format!("{value:?}").to_lowercase(), + DynSolValue::Bytes(value) => format!("0x{}", hex::encode(value)), + DynSolValue::String(value) => value.clone(), + DynSolValue::Array(values) + | DynSolValue::FixedArray(values) + | DynSolValue::Tuple(values) => { + let formatted = values + .iter() + .map(format_dyn_value) + .collect::>() + .join(", "); + format!("[{formatted}]") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn base_log(topic0: &str, decoded: Option) -> EventLog { + let has_decoded = decoded.is_some(); + EventLog { + id: 1, + tx_hash: "0x1".to_string(), + log_index: 0, + address: "0x1111111111111111111111111111111111111111".to_string(), + topic0: topic0.to_string(), + topic1: Some( + "0x0000000000000000000000002222222222222222222222222222222222222222".to_string(), + ), + topic2: None, + topic3: None, + data: vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 7, + ], + block_number: 1, + decoded, + decode_status: if has_decoded { + EVENT_LOG_DECODE_DECODED.to_string() + } else { + EVENT_LOG_DECODE_PENDING.to_string() + }, + decoded_at: None, + decode_attempted_at: None, + decode_source: if has_decoded { + Some(crate::contract_abi::DIRECT_ABI_SOURCE.to_string()) + } else { + None + }, + } + } + + #[test] + fn apply_decode_attempt_preserves_existing_success_when_new_attempt_fails() { + let existing = StoredDecodedEventLog { + event_name: "Transfer".to_string(), + event_signature: "Transfer(address,uint256)".to_string(), + decoded_params: vec![], + }; + let log = base_log( + "0xdeadbeef", + Some(serde_json::to_value(existing.clone()).unwrap()), + ); + let attempted_at = Utc::now(); + + let outcome = apply_decode_attempt( + &log, + DecodeAttempt { + decoded: None, + decode_status: EVENT_LOG_DECODE_NO_ABI, + decode_source: None, + }, + attempted_at, + ); + + assert_eq!(outcome.decode_status, EVENT_LOG_DECODE_DECODED); + assert_eq!( + outcome.decoded, + Some(serde_json::to_value(existing).unwrap()) + ); + assert_eq!( + outcome.decode_source, + Some(crate::contract_abi::DIRECT_ABI_SOURCE.to_string()) + ); + assert_eq!(outcome.decode_attempted_at, attempted_at); + } + + #[test] + fn decoder_decodes_transfer_event() { + let abi = serde_json::json!([ + { + "type": "event", + "name": "Transfer", + "anonymous": false, + "inputs": [ + {"name": "from", "type": "address", "indexed": true}, + {"name": "value", "type": "uint256", "indexed": false} + ] + } + ]); + let decoder = EventLogDecoder::from_resolved_abi(&ResolvedContractAbi { + abi, + source: crate::contract_abi::DIRECT_ABI_SOURCE, + }) + .unwrap(); + let topic0 = decoder.selectors.keys().next().expect("selector").clone(); + let log = base_log(&topic0, None); + + let attempt = decoder.decode_log(&log); + assert_eq!(attempt.decode_status, EVENT_LOG_DECODE_DECODED); + let stored: StoredDecodedEventLog = + serde_json::from_value(attempt.decoded.expect("decoded payload")).unwrap(); + assert_eq!(stored.event_name, "Transfer"); + assert_eq!(stored.decoded_params.len(), 2); + assert_eq!(stored.decoded_params[0].name, "from"); + assert_eq!(stored.decoded_params[0].indexed, true); + } +} diff --git a/backend/crates/atlas-server/src/indexer/event_log_decode_worker.rs b/backend/crates/atlas-server/src/indexer/event_log_decode_worker.rs new file mode 100644 index 0000000..1d0783f --- /dev/null +++ b/backend/crates/atlas-server/src/indexer/event_log_decode_worker.rs @@ -0,0 +1,250 @@ +use anyhow::Result; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use std::time::Duration; + +use atlas_common::EventLog; + +use crate::event_log_decode::{ + apply_decode_attempt, build_decoder_for_address, EVENT_LOG_DECODE_PENDING, +}; + +const JOB_BATCH_SIZE: i64 = 25; +const LOG_BATCH_SIZE: i64 = 500; +const IDLE_SLEEP: Duration = Duration::from_secs(5); + +const CLAIM_JOBS_SQL: &str = " + WITH candidates AS ( + SELECT address + FROM event_log_decode_jobs + ORDER BY full_rescan DESC, requested_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE event_log_decode_jobs AS jobs + SET last_attempted_at = NOW(), updated_at = NOW() + FROM candidates + WHERE jobs.address = candidates.address + RETURNING jobs.address, jobs.full_rescan, jobs.updated_at"; + +pub struct EventLogDecodeWorker { + pool: PgPool, + rpc_url: String, +} + +impl EventLogDecodeWorker { + pub fn new(pool: PgPool, rpc_url: &str) -> Self { + Self { + pool, + rpc_url: rpc_url.to_string(), + } + } + + pub async fn run(&self) -> Result<()> { + tracing::info!("Event log decode worker started"); + + loop { + let processed = self.process_batch().await?; + if processed == 0 { + tokio::time::sleep(IDLE_SLEEP).await; + } + } + } + + async fn process_batch(&self) -> Result { + let jobs: Vec<(String, bool, DateTime)> = sqlx::query_as(CLAIM_JOBS_SQL) + .bind(JOB_BATCH_SIZE) + .fetch_all(&self.pool) + .await?; + + if jobs.is_empty() { + return Ok(0); + } + + for (address, full_rescan, claim_token) in &jobs { + match self.process_job(address, *full_rescan).await { + Ok(()) => { + sqlx::query( + "DELETE FROM event_log_decode_jobs + WHERE address = $1 AND updated_at = $2", + ) + .bind(address) + .bind(*claim_token) + .execute(&self.pool) + .await?; + } + Err(error) => { + tracing::warn!( + address = %address, + full_rescan, + error = %error, + "event log decode job failed" + ); + sqlx::query( + "UPDATE event_log_decode_jobs + SET retry_count = retry_count + 1, error_message = $3 + WHERE address = $1 AND updated_at = $2", + ) + .bind(address) + .bind(*claim_token) + .bind(error.to_string()) + .execute(&self.pool) + .await?; + } + } + } + + Ok(jobs.len()) + } + + async fn process_job(&self, address: &str, full_rescan: bool) -> Result<()> { + let decoder = build_decoder_for_address(&self.pool, &self.rpc_url, address).await?; + let mut cursor: Option<(i64, i32, String)> = None; + + loop { + let logs = self + .fetch_logs(address, full_rescan, cursor.as_ref()) + .await?; + if logs.is_empty() { + break; + } + + let attempted_at = Utc::now(); + let mut tx = self.pool.begin().await?; + for log in &logs { + let attempt = match &decoder { + Some(decoder) => decoder.decode_log(log), + None => crate::event_log_decode::DecodeAttempt { + decoded: None, + decode_status: crate::event_log_decode::EVENT_LOG_DECODE_NO_ABI, + decode_source: None, + }, + }; + let outcome = apply_decode_attempt(log, attempt, attempted_at); + + sqlx::query( + "UPDATE event_logs + SET decoded = $1, + decode_status = $2, + decode_source = $3, + decoded_at = $4, + decode_attempted_at = $5 + WHERE id = $6 AND block_number = $7", + ) + .bind(&outcome.decoded) + .bind(&outcome.decode_status) + .bind(&outcome.decode_source) + .bind(outcome.decoded_at) + .bind(outcome.decode_attempted_at) + .bind(log.id) + .bind(log.block_number) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + + let last = logs.last().expect("non-empty logs batch"); + cursor = Some((last.block_number, last.log_index, last.tx_hash.clone())); + } + + Ok(()) + } + + async fn fetch_logs( + &self, + address: &str, + full_rescan: bool, + cursor: Option<&(i64, i32, String)>, + ) -> Result> { + let rows = match (full_rescan, cursor) { + (true, Some((block_number, log_index, tx_hash))) => { + sqlx::query_as( + "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, + data, block_number, decoded, decode_status, decoded_at, + decode_attempted_at, decode_source + FROM event_logs + WHERE address = $1 + AND (block_number, log_index, tx_hash) > ($2, $3, $4) + ORDER BY block_number ASC, log_index ASC, tx_hash ASC + LIMIT $5", + ) + .bind(address) + .bind(*block_number) + .bind(*log_index) + .bind(tx_hash) + .bind(LOG_BATCH_SIZE) + .fetch_all(&self.pool) + .await? + } + (true, None) => { + sqlx::query_as( + "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, + data, block_number, decoded, decode_status, decoded_at, + decode_attempted_at, decode_source + FROM event_logs + WHERE address = $1 + ORDER BY block_number ASC, log_index ASC, tx_hash ASC + LIMIT $2", + ) + .bind(address) + .bind(LOG_BATCH_SIZE) + .fetch_all(&self.pool) + .await? + } + (false, Some((block_number, log_index, tx_hash))) => { + sqlx::query_as( + "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, + data, block_number, decoded, decode_status, decoded_at, + decode_attempted_at, decode_source + FROM event_logs + WHERE address = $1 + AND decode_status = $2 + AND (block_number, log_index, tx_hash) > ($3, $4, $5) + ORDER BY block_number ASC, log_index ASC, tx_hash ASC + LIMIT $6", + ) + .bind(address) + .bind(EVENT_LOG_DECODE_PENDING) + .bind(*block_number) + .bind(*log_index) + .bind(tx_hash) + .bind(LOG_BATCH_SIZE) + .fetch_all(&self.pool) + .await? + } + (false, None) => { + sqlx::query_as( + "SELECT id, tx_hash, log_index, address, topic0, topic1, topic2, topic3, + data, block_number, decoded, decode_status, decoded_at, + decode_attempted_at, decode_source + FROM event_logs + WHERE address = $1 AND decode_status = $2 + ORDER BY block_number ASC, log_index ASC, tx_hash ASC + LIMIT $3", + ) + .bind(address) + .bind(EVENT_LOG_DECODE_PENDING) + .bind(LOG_BATCH_SIZE) + .fetch_all(&self.pool) + .await? + } + }; + + Ok(rows) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn claim_jobs_sql_uses_skip_locked() { + assert!(CLAIM_JOBS_SQL.contains("FOR UPDATE SKIP LOCKED")); + } + + #[test] + fn incremental_mode_looks_for_pending_logs() { + assert_eq!(EVENT_LOG_DECODE_PENDING, "pending"); + } +} diff --git a/backend/crates/atlas-server/src/indexer/gap_fill_worker.rs b/backend/crates/atlas-server/src/indexer/gap_fill_worker.rs index 0ec6ef5..9ac0029 100644 --- a/backend/crates/atlas-server/src/indexer/gap_fill_worker.rs +++ b/backend/crates/atlas-server/src/indexer/gap_fill_worker.rs @@ -153,6 +153,7 @@ impl GapFillWorker { } if let Err(e) = Indexer::write_batch_and_clear_failed_block( + &self.pool, &mut copy_client, batch, block_number, diff --git a/backend/crates/atlas-server/src/indexer/indexer.rs b/backend/crates/atlas-server/src/indexer/indexer.rs index 1bfe5fe..281263b 100644 --- a/backend/crates/atlas-server/src/indexer/indexer.rs +++ b/backend/crates/atlas-server/src/indexer/indexer.rs @@ -24,6 +24,7 @@ use super::fetcher::{ WorkItem, }; use crate::config::Config; +use crate::event_log_decode::enqueue_decode_jobs; use crate::head::HeadTracker; use crate::metrics::Metrics; use crate::state_keys::ERC20_SUPPLY_HISTORY_COMPLETE_KEY; @@ -333,7 +334,7 @@ impl Indexer { // One DB transaction for the entire batch let db_write_start = std::time::Instant::now(); - Self::write_batch(&mut copy_client, batch, true).await?; + Self::write_batch(&self.pool, &mut copy_client, batch, true).await?; self.metrics .record_db_write_duration(db_write_start.elapsed().as_secs_f64()); self.metrics @@ -398,7 +399,8 @@ impl Indexer { // Don't update the watermark — the main batch already wrote // a higher last_indexed_block; overwriting it with this // block's lower number would cause a regression on restart. - Self::write_batch(&mut copy_client, mini_batch, false).await?; + Self::write_batch(&self.pool, &mut copy_client, mini_batch, false) + .await?; known_erc20.extend(new_erc20); known_nft.extend(new_nft); tracing::info!(block = block_num, "block retry succeeded"); @@ -731,22 +733,25 @@ impl Indexer { // ----------------------------------------------------------------------- pub(crate) async fn write_batch( + pool: &PgPool, copy_client: &mut Client, batch: BlockBatch, update_watermark: bool, ) -> Result<()> { - Self::write_batch_internal(copy_client, batch, update_watermark, None).await + Self::write_batch_internal(pool, copy_client, batch, update_watermark, None).await } pub(crate) async fn write_batch_and_clear_failed_block( + pool: &PgPool, copy_client: &mut Client, batch: BlockBatch, failed_block_number: i64, ) -> Result<()> { - Self::write_batch_internal(copy_client, batch, false, Some(failed_block_number)).await + Self::write_batch_internal(pool, copy_client, batch, false, Some(failed_block_number)).await } async fn write_batch_internal( + pool: &PgPool, copy_client: &mut Client, batch: BlockBatch, update_watermark: bool, @@ -765,6 +770,8 @@ impl Indexer { copy_nft_transfers(&mut pg_tx, &batch).await?; copy_erc20_transfers(&mut pg_tx, &batch).await?; + let event_log_addresses = batch.el_addresses.clone(); + let BlockBatch { tl_hashes, tl_block_numbers, @@ -958,6 +965,8 @@ impl Indexer { pg_tx.commit().await?; + enqueue_decode_jobs(pool, &event_log_addresses, false).await?; + Ok(()) } diff --git a/backend/crates/atlas-server/src/indexer/mod.rs b/backend/crates/atlas-server/src/indexer/mod.rs index 55bbb38..9b25bd9 100644 --- a/backend/crates/atlas-server/src/indexer/mod.rs +++ b/backend/crates/atlas-server/src/indexer/mod.rs @@ -1,6 +1,7 @@ pub(crate) mod batch; pub(crate) mod copy; pub mod da_worker; +pub mod event_log_decode_worker; pub(crate) mod evnode; pub(crate) mod fetcher; pub mod gap_fill_worker; @@ -9,6 +10,7 @@ pub mod indexer; pub mod metadata; pub use da_worker::{DaSseUpdate, DaWorker}; +pub use event_log_decode_worker::EventLogDecodeWorker; pub use gap_fill_worker::GapFillWorker; pub use indexer::Indexer; pub use metadata::MetadataFetcher; diff --git a/backend/crates/atlas-server/src/lib.rs b/backend/crates/atlas-server/src/lib.rs index 4014e51..c338b66 100644 --- a/backend/crates/atlas-server/src/lib.rs +++ b/backend/crates/atlas-server/src/lib.rs @@ -1,6 +1,8 @@ pub mod api; pub mod cli; pub mod config; +pub mod contract_abi; +pub mod event_log_decode; pub mod faucet; pub mod head; pub mod indexer; diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index 9a2c28d..fce2e7b 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -11,6 +11,8 @@ use alloy::signers::local::PrivateKeySigner; mod api; mod cli; mod config; +mod contract_abi; +mod event_log_decode; mod faucet; mod head; mod indexer; @@ -406,6 +408,14 @@ async fn run(args: cli::RunArgs) -> Result<()> { } }); + let event_log_decode_worker = + indexer::EventLogDecodeWorker::new(indexer_pool.clone(), &config.rpc_url); + tokio::spawn(async move { + if let Err(e) = run_with_retry(|| event_log_decode_worker.run()).await { + tracing::error!("Event log decode worker terminated with error: {}", e); + } + }); + if config.da_tracking_enabled { let evnode_url = config .evnode_url diff --git a/backend/crates/atlas-server/tests/integration/addresses.rs b/backend/crates/atlas-server/tests/integration/addresses.rs index c1b8fd2..4537a9b 100644 --- a/backend/crates/atlas-server/tests/integration/addresses.rs +++ b/backend/crates/atlas-server/tests/integration/addresses.rs @@ -189,6 +189,7 @@ fn get_address_transactions() { let body = common::json_body(response).await; let data = body["data"].as_array().unwrap(); assert_eq!(data.len(), 2); + assert_eq!(data[0]["input_data"].as_str().unwrap(), "0x"); }); } diff --git a/backend/crates/atlas-server/tests/integration/logs.rs b/backend/crates/atlas-server/tests/integration/logs.rs new file mode 100644 index 0000000..e20a5f3 --- /dev/null +++ b/backend/crates/atlas-server/tests/integration/logs.rs @@ -0,0 +1,141 @@ +use axum::{ + body::Body, + http::{Request, StatusCode}, +}; +use tower::ServiceExt; + +use crate::common; + +const TX_HASH: &str = "0x3000000000000000000000000000000000000000000000000000000000000001"; +const EMITTER: &str = "0x3000000000000000000000000000000000000001"; + +async fn seed_logs(pool: &sqlx::PgPool) { + sqlx::query( + "INSERT INTO blocks (number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + ON CONFLICT (number) DO NOTHING", + ) + .bind(3000i64) + .bind(format!("0x{:064x}", 3000)) + .bind(format!("0x{:064x}", 2999)) + .bind(1_700_003_000i64) + .bind(63_000i64) + .bind(30_000_000i64) + .bind(1i32) + .execute(pool) + .await + .expect("seed block"); + + sqlx::query( + "INSERT INTO transactions (hash, block_number, block_index, from_address, to_address, value, gas_price, gas_used, input_data, status, timestamp) + VALUES ($1, $2, 0, $3, $4, $5, $6, $7, $8, true, $9) + ON CONFLICT (hash, block_number) DO NOTHING", + ) + .bind(TX_HASH) + .bind(3000i64) + .bind("0x3000000000000000000000000000000000000002") + .bind("0x3000000000000000000000000000000000000003") + .bind(1_000_000_000_000_000_000i64) + .bind(20_000_000_000i64) + .bind(21_000i64) + .bind(Vec::::new()) + .bind(1_700_003_000i64) + .execute(pool) + .await + .expect("seed tx"); + + sqlx::query( + "INSERT INTO event_logs + (tx_hash, log_index, address, topic0, topic1, data, block_number, decoded, decode_status, decoded_at, decode_attempted_at, decode_source) + VALUES ($1, 0, $2, $3, $4, $5, $6, $7, 'decoded', NOW(), NOW(), 'direct_abi') + ON CONFLICT (tx_hash, log_index, block_number) DO NOTHING", + ) + .bind(TX_HASH) + .bind(EMITTER) + .bind("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef") + .bind("0x0000000000000000000000003000000000000000000000000000000000000002") + .bind(vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, + ]) + .bind(3000i64) + .bind(serde_json::json!({ + "event_name": "Transfer", + "event_signature": "Transfer(address,uint256)", + "decoded_params": [ + { + "name": "from", + "type": "address", + "value": "0x3000000000000000000000000000000000000002", + "indexed": true + }, + { + "name": "value", + "type": "uint256", + "value": "9", + "indexed": false + } + ] + })) + .execute(pool) + .await + .expect("seed log"); +} + +#[test] +fn transaction_logs_include_stored_decoded_fields() { + common::run(async { + let pool = common::pool(); + seed_logs(&pool).await; + + let app = common::test_router(); + let response = app + .oneshot( + Request::builder() + .uri(format!("/api/transactions/{TX_HASH}/logs")) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body = common::json_body(response).await; + let logs = body["data"].as_array().expect("logs array"); + assert_eq!(logs.len(), 1); + assert_eq!(logs[0]["event_name"].as_str(), Some("Transfer")); + assert_eq!(logs[0]["decode_status"].as_str(), Some("decoded")); + assert_eq!(logs[0]["decoded_params"][0]["name"].as_str(), Some("from")); + assert_eq!( + logs[0]["data"].as_str(), + Some("0x0000000000000000000000000000000000000000000000000000000000000009") + ); + }); +} + +#[test] +fn decoded_logs_endpoint_matches_stored_log_shape() { + common::run(async { + let pool = common::pool(); + seed_logs(&pool).await; + + let app = common::test_router(); + let response = app + .oneshot( + Request::builder() + .uri(format!("/api/transactions/{TX_HASH}/logs/decoded")) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body = common::json_body(response).await; + let logs = body["data"].as_array().expect("logs array"); + assert_eq!( + logs[0]["event_signature"].as_str(), + Some("Transfer(address,uint256)") + ); + assert_eq!(logs[0]["decode_source"].as_str(), Some("direct_abi")); + }); +} diff --git a/backend/crates/atlas-server/tests/integration/main.rs b/backend/crates/atlas-server/tests/integration/main.rs index 4e8a728..5b5f73c 100644 --- a/backend/crates/atlas-server/tests/integration/main.rs +++ b/backend/crates/atlas-server/tests/integration/main.rs @@ -3,6 +3,7 @@ mod common; mod addresses; mod blocks; mod gap_fill; +mod logs; mod nfts; mod schema; mod search; diff --git a/backend/crates/atlas-server/tests/integration/schema.rs b/backend/crates/atlas-server/tests/integration/schema.rs index e8d72fd..f08537e 100644 --- a/backend/crates/atlas-server/tests/integration/schema.rs +++ b/backend/crates/atlas-server/tests/integration/schema.rs @@ -37,6 +37,7 @@ fn all_expected_tables_exist() { "erc20_balances", "erc20_contracts", "erc20_transfers", + "event_log_decode_jobs", "event_logs", "event_signatures", "failed_blocks", @@ -112,6 +113,8 @@ fn key_indexes_exist() { "idx_transactions_to", // event_logs "idx_event_logs_address", + "idx_event_logs_address_cursor", + "idx_event_logs_decode_pending", "idx_event_logs_topic0", // addresses "idx_addresses_contract", diff --git a/backend/crates/atlas-server/tests/integration/transactions.rs b/backend/crates/atlas-server/tests/integration/transactions.rs index b3f4f53..07317d0 100644 --- a/backend/crates/atlas-server/tests/integration/transactions.rs +++ b/backend/crates/atlas-server/tests/integration/transactions.rs @@ -76,6 +76,7 @@ fn list_transactions() { let body = common::json_body(response).await; let data = body["data"].as_array().unwrap(); assert!(data.len() >= 3); + assert_eq!(data[0]["input_data"].as_str().unwrap(), "0x"); }); } @@ -100,6 +101,7 @@ fn get_transaction_by_hash() { let body = common::json_body(response).await; assert_eq!(body["hash"].as_str().unwrap(), TX_HASH_1); assert_eq!(body["block_number"].as_i64().unwrap(), 2000); + assert_eq!(body["input_data"].as_str().unwrap(), "0x"); assert!(body["status"].as_bool().unwrap()); }); } @@ -143,6 +145,7 @@ fn get_block_transactions() { let body = common::json_body(response).await; let data = body["data"].as_array().unwrap(); assert_eq!(data.len(), 3); + assert_eq!(data[0]["input_data"].as_str().unwrap(), "0x"); // Should be ordered by block_index ASC let idx0 = data[0]["block_index"].as_i64().unwrap(); diff --git a/backend/migrations/20260528000001_event_log_decode_state.sql b/backend/migrations/20260528000001_event_log_decode_state.sql new file mode 100644 index 0000000..5b15c75 --- /dev/null +++ b/backend/migrations/20260528000001_event_log_decode_state.sql @@ -0,0 +1,62 @@ +ALTER TABLE event_logs + ADD COLUMN IF NOT EXISTS decode_status TEXT NOT NULL DEFAULT 'pending', + ADD COLUMN IF NOT EXISTS decoded_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS decode_attempted_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS decode_source TEXT; + +UPDATE event_logs +SET decode_status = CASE + WHEN decoded IS NOT NULL THEN 'decoded' + ELSE 'pending' + END, + decoded_at = CASE + WHEN decoded IS NOT NULL AND decoded_at IS NULL THEN NOW() + ELSE decoded_at + END +WHERE decode_status = 'pending'; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'event_logs_decode_status_check' + ) THEN + ALTER TABLE event_logs + ADD CONSTRAINT event_logs_decode_status_check + CHECK (decode_status IN ('pending', 'decoded', 'no_abi', 'no_matching_event', 'decode_failed')); + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'event_logs_decode_source_check' + ) THEN + ALTER TABLE event_logs + ADD CONSTRAINT event_logs_decode_source_check + CHECK (decode_source IS NULL OR decode_source IN ('direct_abi', 'proxy_combined_abi')); + END IF; +END $$; + +CREATE INDEX IF NOT EXISTS idx_event_logs_address_cursor + ON event_logs(address, block_number, log_index, tx_hash); + +CREATE INDEX IF NOT EXISTS idx_event_logs_decode_pending + ON event_logs(address, block_number, log_index, tx_hash) + WHERE decode_status = 'pending'; + +CREATE TABLE IF NOT EXISTS event_log_decode_jobs ( + address VARCHAR(42) PRIMARY KEY, + full_rescan BOOLEAN NOT NULL DEFAULT FALSE, + requested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_attempted_at TIMESTAMPTZ, + retry_count INTEGER NOT NULL DEFAULT 0, + error_message TEXT, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_event_log_decode_jobs_requested_at + ON event_log_decode_jobs(requested_at); diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index bc0fcf8..fcea2b6 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -286,15 +286,22 @@ export interface EventLog { topic3: string | null; data: string; block_number: number; - timestamp: number; -} - -export interface DecodedEventLog extends EventLog { + decode_status: + | "pending" + | "decoded" + | "no_abi" + | "no_matching_event" + | "decode_failed"; + decoded_at: string | null; + decode_attempted_at: string | null; + decode_source: "direct_abi" | "proxy_combined_abi" | null; event_name: string | null; event_signature: string | null; decoded_params: DecodedParam[] | null; } +export type DecodedEventLog = EventLog; + export interface DecodedParam { name: string; type: string; diff --git a/frontend/src/utils/abiDecode.ts b/frontend/src/utils/abiDecode.ts index 5a45cb0..db9ce2a 100644 --- a/frontend/src/utils/abiDecode.ts +++ b/frontend/src/utils/abiDecode.ts @@ -16,7 +16,7 @@ export interface DecodedCall { * Returns null if the input is empty, too short, or no matching function is found. */ export function decodeInputData(input: string | undefined | null, abi?: AbiItem[]): DecodedCall | null { - if (!input || !abi || input === '0x' || input.length < 10) return null; + if (typeof input !== 'string' || !abi || input === '0x' || input.length < 10) return null; const selectorHex = input.slice(2, 10).toLowerCase();