Skip to content

Commit 626d2f1

Browse files
hamersawclaude
andcommitted
chore: add tracing instrumentation to mem_wal module
Add #[instrument] spans to key functions across the mem_wal subsystem (write, flush, scan, index, manifest) for improved observability. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent effca10 commit 626d2f1

10 files changed

Lines changed: 39 additions & 0 deletions

File tree

rust/lance/src/dataset/mem_wal/index.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use lance_index::vector::pq::ProductQuantizer;
3131
use lance_linalg::distance::DistanceType;
3232
use lance_table::format::IndexMetadata;
3333
use prost::Message as _;
34+
use tracing::instrument;
3435

3536
/// Row position in MemTable.
3637
///
@@ -331,6 +332,7 @@ impl IndexStore {
331332
}
332333

333334
/// Insert a batch into all indexes with batch position tracking.
335+
#[instrument(level = "debug", skip(self, batch), fields(num_rows = batch.num_rows(), row_offset, batch_position))]
334336
pub fn insert_with_batch_position(
335337
&self,
336338
batch: &RecordBatch,
@@ -378,6 +380,7 @@ impl IndexStore {
378380
/// For IVF-PQ indexes, this enables vectorized partition assignment and
379381
/// PQ encoding across all batches, improving performance through better
380382
/// SIMD utilization.
383+
#[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len()))]
381384
pub fn insert_batches(&self, batches: &[StoredBatch]) -> Result<()> {
382385
if batches.is_empty() {
383386
return Ok(());
@@ -419,6 +422,7 @@ impl IndexStore {
419422
///
420423
/// Returns a map of index names to their update durations for performance tracking.
421424
#[allow(clippy::print_stderr)]
425+
#[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len()))]
422426
pub fn insert_batches_parallel(
423427
&self,
424428
batches: &[StoredBatch],

rust/lance/src/dataset/mem_wal/manifest.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use object_store::PutOptions;
4242
use object_store::path::Path;
4343
use prost::Message;
4444
use serde::{Deserialize, Serialize};
45+
use tracing::instrument;
4546
use uuid::Uuid;
4647

4748
use super::util::{manifest_filename, parse_bit_reversed_filename, region_manifest_path};
@@ -91,6 +92,7 @@ impl RegionManifestStore {
9192
/// Read the latest manifest version.
9293
///
9394
/// Returns `None` if no manifest exists (new region).
95+
#[instrument(level = "debug", skip(self), fields(region_id = %self.region_id))]
9496
pub async fn read_latest(&self) -> Result<Option<RegionManifest>> {
9597
let version = self.find_latest_version().await?;
9698
if version == 0 {
@@ -134,6 +136,7 @@ impl RegionManifestStore {
134136
/// # Errors
135137
///
136138
/// Returns `Error::AlreadyExists` if another writer already wrote this version.
139+
#[instrument(level = "debug", skip(self, manifest), fields(region_id = %self.region_id, version = manifest.version, epoch = manifest.writer_epoch))]
137140
pub async fn write(&self, manifest: &RegionManifest) -> Result<u64> {
138141
let version = manifest.version;
139142
let filename = manifest_filename(version);
@@ -369,6 +372,7 @@ impl RegionManifestStore {
369372
/// # Errors
370373
///
371374
/// Returns an error if another writer already claimed the region.
375+
#[instrument(level = "info", skip(self), fields(region_id = %self.region_id, region_spec_id))]
372376
pub async fn claim_epoch(&self, region_spec_id: u32) -> Result<(u64, RegionManifest)> {
373377
let current = self.read_latest().await?;
374378

@@ -415,6 +419,7 @@ impl RegionManifestStore {
415419
///
416420
/// Loads the current manifest and compares epochs. If the stored epoch
417421
/// is higher than the local epoch, the writer has been fenced.
422+
#[instrument(level = "debug", skip(self), fields(region_id = %self.region_id, local_epoch))]
418423
pub async fn check_fenced(&self, local_epoch: u64) -> Result<()> {
419424
let current = self.read_latest().await?;
420425
Self::check_fenced_against(&current, local_epoch, self.region_id)
@@ -452,6 +457,7 @@ impl RegionManifestStore {
452457
/// # Returns
453458
///
454459
/// The successfully written manifest.
460+
#[instrument(level = "debug", skip(self, prepare_fn), fields(region_id = %self.region_id, local_epoch))]
455461
pub async fn commit_update<F>(&self, local_epoch: u64, prepare_fn: F) -> Result<RegionManifest>
456462
where
457463
F: Fn(&RegionManifest) -> RegionManifest,

rust/lance/src/dataset/mem_wal/memtable.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use lance_core::datatypes::Schema;
1717
use lance_core::{Error, Result};
1818
use lance_index::scalar::bloomfilter::sbbf::Sbbf;
1919
use tokio::sync::RwLock;
20+
use tracing::instrument;
2021
use uuid::Uuid;
2122

2223
use super::index::IndexStore;
@@ -341,6 +342,7 @@ impl MemTable {
341342
/// # Single Writer Requirement
342343
///
343344
/// This method MUST only be called from the single writer task.
345+
#[instrument(level = "debug", skip(self, batch), fields(num_rows = batch.num_rows(), generation = self.generation))]
344346
pub async fn insert(&mut self, batch: RecordBatch) -> Result<usize> {
345347
// Validate schema compatibility
346348
if batch.schema() != self.schema {
@@ -423,6 +425,7 @@ impl MemTable {
423425
/// # Single Writer Requirement
424426
///
425427
/// This method MUST only be called from the single writer task.
428+
#[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len(), generation = self.generation))]
426429
pub async fn insert_batches_only(
427430
&mut self,
428431
batches: Vec<RecordBatch>,

rust/lance/src/dataset/mem_wal/memtable/flush.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use lance_io::object_store::ObjectStore;
1515
use lance_table::format::IndexMetadata;
1616
use log::info;
1717
use object_store::path::Path;
18+
use tracing::instrument;
1819
use uuid::Uuid;
1920

2021
use super::super::index::MemIndexConfig;
@@ -77,6 +78,7 @@ impl MemTableFlusher {
7778
}
7879

7980
/// Flush the MemTable to storage (data files, indexes, bloom filter).
81+
#[instrument(level = "info", skip(self, memtable), fields(region_id = %self.region_id, epoch, generation = memtable.generation(), row_count = memtable.row_count()))]
8082
pub async fn flush(&self, memtable: &MemTable, epoch: u64) -> Result<FlushResult> {
8183
self.manifest_store.check_fenced(epoch).await?;
8284

@@ -134,6 +136,7 @@ impl MemTableFlusher {
134136
///
135137
/// Returns the total number of rows written, which is needed for
136138
/// reversing row positions in indexes.
139+
#[instrument(level = "debug", skip(self, memtable), fields(path = %path))]
137140
async fn write_data_file(&self, path: &Path, memtable: &MemTable) -> Result<usize> {
138141
use arrow_array::RecordBatchIterator;
139142

@@ -180,6 +183,7 @@ impl MemTableFlusher {
180183
}
181184

182185
/// Flush the MemTable to storage with indexes.
186+
#[instrument(level = "info", skip(self, memtable, index_configs), fields(region_id = %self.region_id, epoch, generation = memtable.generation(), row_count = memtable.row_count(), index_count = index_configs.len()))]
183187
pub async fn flush_with_indexes(
184188
&self,
185189
memtable: &MemTable,

rust/lance/src/dataset/mem_wal/scanner/builder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
1313
use datafusion::prelude::{Expr, SessionContext};
1414
use futures::TryStreamExt;
1515
use lance_core::{Error, Result};
16+
use tracing::instrument;
1617
use uuid::Uuid;
1718

1819
use super::collector::{ActiveMemTableRef, LsmDataSourceCollector};
@@ -165,6 +166,7 @@ impl LsmScanner {
165166
}
166167

167168
/// Create the execution plan.
169+
#[instrument(level = "debug", skip(self))]
168170
pub async fn create_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
169171
let collector = self.build_collector();
170172
let base_schema = self.schema();
@@ -183,6 +185,7 @@ impl LsmScanner {
183185
}
184186

185187
/// Execute the scan and return a stream of record batches.
188+
#[instrument(level = "info", skip(self), fields(has_filter = self.filter.is_some(), limit = self.limit, num_regions = self.region_snapshots.len()))]
186189
pub async fn try_into_stream(&self) -> Result<SendableRecordBatchStream> {
187190
let plan = self.create_plan().await?;
188191
let ctx = SessionContext::new();

rust/lance/src/dataset/mem_wal/scanner/planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use datafusion::physical_plan::union::UnionExec;
1414
use datafusion::physical_plan::{ExecutionPlan, limit::GlobalLimitExec};
1515
use datafusion::prelude::Expr;
1616
use lance_core::Result;
17+
use tracing::instrument;
1718

1819
use super::collector::LsmDataSourceCollector;
1920
use super::data_source::LsmDataSource;
@@ -75,6 +76,7 @@ impl LsmScanPlanner {
7576
/// - SortPreservingMergeExec is O(N log K) where K is the number of sources
7677
/// - Memory usage is bounded by the sum of K sort buffers rather than all data
7778
/// - No extra column for _memtable_gen in the common case
79+
#[instrument(level = "debug", skip(self, projection, filter), fields(has_filter = filter.is_some(), limit, offset))]
7880
pub async fn plan_scan(
7981
&self,
8082
projection: Option<&[String]>,

rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use datafusion::physical_plan::limit::GlobalLimitExec;
1414
use datafusion::prelude::Expr;
1515
use lance_core::Result;
1616
use lance_index::scalar::bloomfilter::sbbf::Sbbf;
17+
use tracing::instrument;
1718

1819
use super::collector::LsmDataSourceCollector;
1920
use super::data_source::LsmDataSource;
@@ -117,6 +118,7 @@ impl LsmPointLookupPlanner {
117118
///
118119
/// An execution plan that returns at most one row - the newest version
119120
/// of the row with the given primary key.
121+
#[instrument(level = "debug", skip(self, pk_values, projection), fields(pk_column_count = self.pk_columns.len()))]
120122
pub async fn plan_lookup(
121123
&self,
122124
pk_values: &[ScalarValue],

rust/lance/src/dataset/mem_wal/scanner/vector_search.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
1818
use datafusion::physical_plan::union::UnionExec;
1919
use lance_core::Result;
2020
use lance_index::scalar::bloomfilter::sbbf::Sbbf;
21+
use tracing::instrument;
2122

2223
use super::collector::LsmDataSourceCollector;
2324
use super::data_source::LsmDataSource;
@@ -144,6 +145,7 @@ impl LsmVectorSearchPlanner {
144145
///
145146
/// An execution plan that returns the top-K nearest neighbors across all
146147
/// LSM levels, with stale results filtered out.
148+
#[instrument(level = "info", skip(self, query_vector, projection), fields(k, nprobes, vector_column = %self.vector_column, distance_type = ?self.distance_type))]
147149
pub async fn plan_search(
148150
&self,
149151
query_vector: &FixedSizeListArray,

rust/lance/src/dataset/mem_wal/wal.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use lance_io::object_store::ObjectStore;
2121
use object_store::path::Path;
2222
use tokio::sync::{mpsc, watch};
2323

24+
use tracing::instrument;
2425
use uuid::Uuid;
2526

2627
use super::util::{WatchableOnceCell, region_wal_path, wal_entry_filename};
@@ -297,6 +298,7 @@ impl WalFlusher {
297298
///
298299
/// A `WalFlushResult` with timing metrics and the WAL entry.
299300
/// Returns empty result if nothing to flush (already flushed past end_batch_position).
301+
#[instrument(level = "info", skip(self, batch_store, indexes), fields(region_id = %self.region_id, end_batch_position, has_indexes = indexes.is_some()))]
300302
pub async fn flush_to_with_index_update(
301303
&self,
302304
batch_store: &BatchStore,
@@ -498,6 +500,7 @@ impl WalEntryData {
498500
/// # Returns
499501
///
500502
/// The parsed WAL entry data, or an error if reading/parsing fails.
503+
#[instrument(level = "debug", skip(object_store), fields(path = %path))]
501504
pub async fn read(object_store: &ObjectStore, path: &Path) -> Result<Self> {
502505
// Read the file
503506
let data = object_store

rust/lance/src/dataset/mem_wal/write.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tokio::sync::{RwLock, mpsc};
3131
use tokio::task::JoinHandle;
3232
use tokio::time::{Interval, interval_at};
3333
use tokio_util::sync::CancellationToken;
34+
use tracing::instrument;
3435
use uuid::Uuid;
3536

3637
pub use super::index::{
@@ -942,6 +943,7 @@ impl RegionWriter {
942943
///
943944
/// The `base_path` should come from `ObjectStore::from_uri()` to ensure
944945
/// WAL files are written inside the dataset directory.
946+
#[instrument(level = "info", skip(object_store, base_path, base_uri, schema, index_configs), fields(region_id = %config.region_id, index_count = index_configs.len()))]
945947
pub async fn open(
946948
object_store: Arc<ObjectStore>,
947949
base_path: Path,
@@ -1102,6 +1104,7 @@ impl RegionWriter {
11021104
/// Fencing is detected lazily during WAL flush via atomic writes.
11031105
/// If another writer has taken over, the WAL flush will fail with
11041106
/// `AlreadyExists`, indicating this writer has been fenced.
1107+
#[instrument(level = "info", skip(self, batches), fields(batch_count = batches.len(), region_id = %self.config.region_id))]
11051108
pub async fn put(&self, batches: Vec<RecordBatch>) -> Result<WriteResult> {
11061109
if batches.is_empty() {
11071110
return Err(Error::invalid_input("Cannot write empty batch list"));
@@ -1257,6 +1260,7 @@ impl RegionWriter {
12571260
/// Close the writer gracefully.
12581261
///
12591262
/// Flushes pending data and shuts down background tasks.
1263+
#[instrument(level = "info", skip(self), fields(region_id = %self.config.region_id, epoch = self.epoch))]
12601264
pub async fn close(self) -> Result<()> {
12611265
info!("Closing RegionWriter for region {}", self.config.region_id);
12621266

@@ -1373,6 +1377,11 @@ impl WalFlushHandler {
13731377
/// * `batch_store` - The batch store to flush from
13741378
/// * `indexes` - Optional indexes to update in parallel with WAL I/O
13751379
/// * `end_batch_position` - End batch ID (exclusive). Flush batches in (max_flushed, end_batch_position).
1380+
#[instrument(
1381+
level = "debug",
1382+
skip(self, batch_store, indexes),
1383+
fields(end_batch_position)
1384+
)]
13761385
async fn do_flush(
13771386
&self,
13781387
batch_store: Arc<BatchStore>,
@@ -1486,6 +1495,7 @@ impl MemTableFlushHandler {
14861495
/// This method waits for the WAL flush to complete (sent at freeze time),
14871496
/// then flushes to Lance storage. The WAL flush is already queued by
14881497
/// freeze_memtable to ensure strict ordering of WAL entries.
1498+
#[instrument(level = "info", skip(self, memtable), fields(generation = memtable.generation(), row_count = memtable.row_count()))]
14891499
async fn flush_memtable(
14901500
&mut self,
14911501
memtable: Arc<MemTable>,

0 commit comments

Comments
 (0)