From ee8353c578c72bfafb8e2a71f35b750fa1857a72 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 3 Apr 2026 11:53:03 -0500 Subject: [PATCH] chore: add tracing instrumentation to mem_wal module Add #[instrument] spans to key functions across the mem_wal subsystem (write, flush, scan, index, manifest) for improved observability. Co-Authored-By: Claude Opus 4.6 (1M context) --- rust/lance/src/dataset/mem_wal/index.rs | 4 ++++ rust/lance/src/dataset/mem_wal/manifest.rs | 6 ++++++ rust/lance/src/dataset/mem_wal/memtable.rs | 3 +++ rust/lance/src/dataset/mem_wal/memtable/flush.rs | 4 ++++ rust/lance/src/dataset/mem_wal/scanner/builder.rs | 3 +++ rust/lance/src/dataset/mem_wal/scanner/planner.rs | 2 ++ rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs | 2 ++ .../lance/src/dataset/mem_wal/scanner/vector_search.rs | 2 ++ rust/lance/src/dataset/mem_wal/wal.rs | 3 +++ rust/lance/src/dataset/mem_wal/write.rs | 10 ++++++++++ 10 files changed, 39 insertions(+) diff --git a/rust/lance/src/dataset/mem_wal/index.rs b/rust/lance/src/dataset/mem_wal/index.rs index e7eb7394c45..06afc5cf7e7 100644 --- a/rust/lance/src/dataset/mem_wal/index.rs +++ b/rust/lance/src/dataset/mem_wal/index.rs @@ -31,6 +31,7 @@ use lance_index::vector::pq::ProductQuantizer; use lance_linalg::distance::DistanceType; use lance_table::format::IndexMetadata; use prost::Message as _; +use tracing::instrument; /// Row position in MemTable. /// @@ -331,6 +332,7 @@ impl IndexStore { } /// Insert a batch into all indexes with batch position tracking. + #[instrument(level = "debug", skip(self, batch), fields(num_rows = batch.num_rows(), row_offset, batch_position))] pub fn insert_with_batch_position( &self, batch: &RecordBatch, @@ -378,6 +380,7 @@ impl IndexStore { /// For IVF-PQ indexes, this enables vectorized partition assignment and /// PQ encoding across all batches, improving performance through better /// SIMD utilization. + #[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len()))] pub fn insert_batches(&self, batches: &[StoredBatch]) -> Result<()> { if batches.is_empty() { return Ok(()); @@ -419,6 +422,7 @@ impl IndexStore { /// /// Returns a map of index names to their update durations for performance tracking. #[allow(clippy::print_stderr)] + #[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len()))] pub fn insert_batches_parallel( &self, batches: &[StoredBatch], diff --git a/rust/lance/src/dataset/mem_wal/manifest.rs b/rust/lance/src/dataset/mem_wal/manifest.rs index 1aceac60346..46758f29336 100644 --- a/rust/lance/src/dataset/mem_wal/manifest.rs +++ b/rust/lance/src/dataset/mem_wal/manifest.rs @@ -42,6 +42,7 @@ use object_store::PutOptions; use object_store::path::Path; use prost::Message; use serde::{Deserialize, Serialize}; +use tracing::instrument; use uuid::Uuid; use super::util::{manifest_filename, parse_bit_reversed_filename, shard_manifest_path}; @@ -91,6 +92,7 @@ impl ShardManifestStore { /// Read the latest manifest version. /// /// Returns `None` if no manifest exists (new shard). + #[instrument(level = "debug", skip(self), fields(shard_id = %self.shard_id))] pub async fn read_latest(&self) -> Result> { let version = self.find_latest_version().await?; if version == 0 { @@ -134,6 +136,7 @@ impl ShardManifestStore { /// # Errors /// /// Returns `Error::AlreadyExists` if another writer already wrote this version. + #[instrument(level = "debug", skip(self, manifest), fields(shard_id = %self.shard_id, version = manifest.version, epoch = manifest.writer_epoch))] pub async fn write(&self, manifest: &ShardManifest) -> Result { let version = manifest.version; let filename = manifest_filename(version); @@ -369,6 +372,7 @@ impl ShardManifestStore { /// # Errors /// /// Returns an error if another writer already claimed the shard. + #[instrument(level = "info", skip(self), fields(shard_id = %self.shard_id, shard_spec_id))] pub async fn claim_epoch(&self, shard_spec_id: u32) -> Result<(u64, ShardManifest)> { let current = self.read_latest().await?; @@ -415,6 +419,7 @@ impl ShardManifestStore { /// /// Loads the current manifest and compares epochs. If the stored epoch /// is higher than the local epoch, the writer has been fenced. + #[instrument(level = "debug", skip(self), fields(shard_id = %self.shard_id, local_epoch))] pub async fn check_fenced(&self, local_epoch: u64) -> Result<()> { let current = self.read_latest().await?; Self::check_fenced_against(¤t, local_epoch, self.shard_id) @@ -452,6 +457,7 @@ impl ShardManifestStore { /// # Returns /// /// The successfully written manifest. + #[instrument(level = "debug", skip(self, prepare_fn), fields(shard_id = %self.shard_id, local_epoch))] pub async fn commit_update(&self, local_epoch: u64, prepare_fn: F) -> Result where F: Fn(&ShardManifest) -> ShardManifest, diff --git a/rust/lance/src/dataset/mem_wal/memtable.rs b/rust/lance/src/dataset/mem_wal/memtable.rs index a93b5627f2d..1ef20f46457 100644 --- a/rust/lance/src/dataset/mem_wal/memtable.rs +++ b/rust/lance/src/dataset/mem_wal/memtable.rs @@ -17,6 +17,7 @@ use lance_core::datatypes::Schema; use lance_core::{Error, Result}; use lance_index::scalar::bloomfilter::sbbf::Sbbf; use tokio::sync::RwLock; +use tracing::instrument; use uuid::Uuid; use super::index::IndexStore; @@ -341,6 +342,7 @@ impl MemTable { /// # Single Writer Requirement /// /// This method MUST only be called from the single writer task. + #[instrument(level = "debug", skip(self, batch), fields(num_rows = batch.num_rows(), generation = self.generation))] pub async fn insert(&mut self, batch: RecordBatch) -> Result { // Validate schema compatibility if batch.schema() != self.schema { @@ -423,6 +425,7 @@ impl MemTable { /// # Single Writer Requirement /// /// This method MUST only be called from the single writer task. + #[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len(), generation = self.generation))] pub async fn insert_batches_only( &mut self, batches: Vec, diff --git a/rust/lance/src/dataset/mem_wal/memtable/flush.rs b/rust/lance/src/dataset/mem_wal/memtable/flush.rs index 4a46ed062c0..de4f8b37a16 100644 --- a/rust/lance/src/dataset/mem_wal/memtable/flush.rs +++ b/rust/lance/src/dataset/mem_wal/memtable/flush.rs @@ -15,6 +15,7 @@ use lance_io::object_store::ObjectStore; use lance_table::format::IndexMetadata; use log::info; use object_store::path::Path; +use tracing::instrument; use uuid::Uuid; use super::super::index::MemIndexConfig; @@ -77,6 +78,7 @@ impl MemTableFlusher { } /// Flush the MemTable to storage (data files, indexes, bloom filter). + #[instrument(level = "info", skip(self, memtable), fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count()))] pub async fn flush(&self, memtable: &MemTable, epoch: u64) -> Result { self.manifest_store.check_fenced(epoch).await?; @@ -134,6 +136,7 @@ impl MemTableFlusher { /// /// Returns the total number of rows written, which is needed for /// reversing row positions in indexes. + #[instrument(level = "debug", skip(self, memtable), fields(path = %path))] async fn write_data_file(&self, path: &Path, memtable: &MemTable) -> Result { use arrow_array::RecordBatchIterator; @@ -180,6 +183,7 @@ impl MemTableFlusher { } /// Flush the MemTable to storage with indexes. + #[instrument(level = "info", skip(self, memtable, index_configs), fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count(), index_count = index_configs.len()))] pub async fn flush_with_indexes( &self, memtable: &MemTable, diff --git a/rust/lance/src/dataset/mem_wal/scanner/builder.rs b/rust/lance/src/dataset/mem_wal/scanner/builder.rs index 3168f8f5feb..66502452166 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/builder.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/builder.rs @@ -13,6 +13,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use datafusion::prelude::{Expr, SessionContext}; use futures::TryStreamExt; use lance_core::{Error, Result}; +use tracing::instrument; use uuid::Uuid; use super::collector::{ActiveMemTableRef, LsmDataSourceCollector}; @@ -165,6 +166,7 @@ impl LsmScanner { } /// Create the execution plan. + #[instrument(level = "debug", skip(self))] pub async fn create_plan(&self) -> Result> { let collector = self.build_collector(); let base_schema = self.schema(); @@ -183,6 +185,7 @@ impl LsmScanner { } /// Execute the scan and return a stream of record batches. + #[instrument(level = "info", skip(self), fields(has_filter = self.filter.is_some(), limit = self.limit, num_shards = self.shard_snapshots.len()))] pub async fn try_into_stream(&self) -> Result { let plan = self.create_plan().await?; let ctx = SessionContext::new(); diff --git a/rust/lance/src/dataset/mem_wal/scanner/planner.rs b/rust/lance/src/dataset/mem_wal/scanner/planner.rs index d75a304480e..09d4ff4dc22 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/planner.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/planner.rs @@ -14,6 +14,7 @@ use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{ExecutionPlan, limit::GlobalLimitExec}; use datafusion::prelude::Expr; use lance_core::Result; +use tracing::instrument; use super::collector::LsmDataSourceCollector; use super::data_source::LsmDataSource; @@ -75,6 +76,7 @@ impl LsmScanPlanner { /// - SortPreservingMergeExec is O(N log K) where K is the number of sources /// - Memory usage is bounded by the sum of K sort buffers rather than all data /// - No extra column for _memtable_gen in the common case + #[instrument(level = "debug", skip(self, projection, filter), fields(has_filter = filter.is_some(), limit, offset))] pub async fn plan_scan( &self, projection: Option<&[String]>, diff --git a/rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs b/rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs index fecdb251166..0391eed005c 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs @@ -14,6 +14,7 @@ use datafusion::physical_plan::limit::GlobalLimitExec; use datafusion::prelude::Expr; use lance_core::Result; use lance_index::scalar::bloomfilter::sbbf::Sbbf; +use tracing::instrument; use super::collector::LsmDataSourceCollector; use super::data_source::LsmDataSource; @@ -117,6 +118,7 @@ impl LsmPointLookupPlanner { /// /// An execution plan that returns at most one row - the newest version /// of the row with the given primary key. + #[instrument(level = "debug", skip(self, pk_values, projection), fields(pk_column_count = self.pk_columns.len()))] pub async fn plan_lookup( &self, pk_values: &[ScalarValue], diff --git a/rust/lance/src/dataset/mem_wal/scanner/vector_search.rs b/rust/lance/src/dataset/mem_wal/scanner/vector_search.rs index 0433929501f..2d19c24dca8 100644 --- a/rust/lance/src/dataset/mem_wal/scanner/vector_search.rs +++ b/rust/lance/src/dataset/mem_wal/scanner/vector_search.rs @@ -18,6 +18,7 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::union::UnionExec; use lance_core::Result; use lance_index::scalar::bloomfilter::sbbf::Sbbf; +use tracing::instrument; use super::collector::LsmDataSourceCollector; use super::data_source::LsmDataSource; @@ -144,6 +145,7 @@ impl LsmVectorSearchPlanner { /// /// An execution plan that returns the top-K nearest neighbors across all /// LSM levels, with stale results filtered out. + #[instrument(level = "info", skip(self, query_vector, projection), fields(k, nprobes, vector_column = %self.vector_column, distance_type = ?self.distance_type))] pub async fn plan_search( &self, query_vector: &FixedSizeListArray, diff --git a/rust/lance/src/dataset/mem_wal/wal.rs b/rust/lance/src/dataset/mem_wal/wal.rs index 4e92355402c..9aec151430a 100644 --- a/rust/lance/src/dataset/mem_wal/wal.rs +++ b/rust/lance/src/dataset/mem_wal/wal.rs @@ -21,6 +21,7 @@ use lance_io::object_store::ObjectStore; use object_store::path::Path; use tokio::sync::{mpsc, watch}; +use tracing::instrument; use uuid::Uuid; use super::util::{WatchableOnceCell, shard_wal_path, wal_entry_filename}; @@ -297,6 +298,7 @@ impl WalFlusher { /// /// A `WalFlushResult` with timing metrics and the WAL entry. /// Returns empty result if nothing to flush (already flushed past end_batch_position). + #[instrument(level = "info", skip(self, batch_store, indexes), fields(shard_id = %self.shard_id, end_batch_position, has_indexes = indexes.is_some()))] pub async fn flush_to_with_index_update( &self, batch_store: &BatchStore, @@ -498,6 +500,7 @@ impl WalEntryData { /// # Returns /// /// The parsed WAL entry data, or an error if reading/parsing fails. + #[instrument(level = "debug", skip(object_store), fields(path = %path))] pub async fn read(object_store: &ObjectStore, path: &Path) -> Result { // Read the file let data = object_store diff --git a/rust/lance/src/dataset/mem_wal/write.rs b/rust/lance/src/dataset/mem_wal/write.rs index 4c956fd3ad2..a030f1c6122 100644 --- a/rust/lance/src/dataset/mem_wal/write.rs +++ b/rust/lance/src/dataset/mem_wal/write.rs @@ -31,6 +31,7 @@ use tokio::sync::{RwLock, mpsc}; use tokio::task::JoinHandle; use tokio::time::{Interval, interval_at}; use tokio_util::sync::CancellationToken; +use tracing::instrument; use uuid::Uuid; pub use super::index::{ @@ -942,6 +943,7 @@ impl ShardWriter { /// /// The `base_path` should come from `ObjectStore::from_uri()` to ensure /// WAL files are written inside the dataset directory. + #[instrument(level = "info", skip(object_store, base_path, base_uri, schema, index_configs), fields(shard_id = %config.shard_id, index_count = index_configs.len()))] pub async fn open( object_store: Arc, base_path: Path, @@ -1102,6 +1104,7 @@ impl ShardWriter { /// Fencing is detected lazily during WAL flush via atomic writes. /// If another writer has taken over, the WAL flush will fail with /// `AlreadyExists`, indicating this writer has been fenced. + #[instrument(level = "info", skip(self, batches), fields(batch_count = batches.len(), shard_id = %self.config.shard_id))] pub async fn put(&self, batches: Vec) -> Result { if batches.is_empty() { return Err(Error::invalid_input("Cannot write empty batch list")); @@ -1257,6 +1260,7 @@ impl ShardWriter { /// Close the writer gracefully. /// /// Flushes pending data and shuts down background tasks. + #[instrument(level = "info", skip(self), fields(shard_id = %self.config.shard_id, epoch = self.epoch))] pub async fn close(self) -> Result<()> { info!("Closing ShardWriter for shard {}", self.config.shard_id); @@ -1373,6 +1377,11 @@ impl WalFlushHandler { /// * `batch_store` - The batch store to flush from /// * `indexes` - Optional indexes to update in parallel with WAL I/O /// * `end_batch_position` - End batch ID (exclusive). Flush batches in (max_flushed, end_batch_position). + #[instrument( + level = "debug", + skip(self, batch_store, indexes), + fields(end_batch_position) + )] async fn do_flush( &self, batch_store: Arc, @@ -1486,6 +1495,7 @@ impl MemTableFlushHandler { /// This method waits for the WAL flush to complete (sent at freeze time), /// then flushes to Lance storage. The WAL flush is already queued by /// freeze_memtable to ensure strict ordering of WAL entries. + #[instrument(level = "info", skip(self, memtable), fields(generation = memtable.generation(), row_count = memtable.row_count()))] async fn flush_memtable( &mut self, memtable: Arc,