Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/lance/src/dataset/mem_wal/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use lance_index::vector::pq::ProductQuantizer;
use lance_linalg::distance::DistanceType;
use lance_table::format::IndexMetadata;
use prost::Message as _;
use tracing::instrument;

/// Row position in MemTable.
///
Expand Down Expand Up @@ -331,6 +332,7 @@ impl IndexStore {
}

/// Insert a batch into all indexes with batch position tracking.
#[instrument(level = "debug", skip(self, batch), fields(num_rows = batch.num_rows(), row_offset, batch_position))]
pub fn insert_with_batch_position(
&self,
batch: &RecordBatch,
Expand Down Expand Up @@ -378,6 +380,7 @@ impl IndexStore {
/// For IVF-PQ indexes, this enables vectorized partition assignment and
/// PQ encoding across all batches, improving performance through better
/// SIMD utilization.
#[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len()))]
pub fn insert_batches(&self, batches: &[StoredBatch]) -> Result<()> {
if batches.is_empty() {
return Ok(());
Expand Down Expand Up @@ -419,6 +422,7 @@ impl IndexStore {
///
/// Returns a map of index names to their update durations for performance tracking.
#[allow(clippy::print_stderr)]
#[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len()))]
pub fn insert_batches_parallel(
&self,
batches: &[StoredBatch],
Expand Down
6 changes: 6 additions & 0 deletions rust/lance/src/dataset/mem_wal/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use object_store::PutOptions;
use object_store::path::Path;
use prost::Message;
use serde::{Deserialize, Serialize};
use tracing::instrument;
use uuid::Uuid;

use super::util::{manifest_filename, parse_bit_reversed_filename, shard_manifest_path};
Expand Down Expand Up @@ -91,6 +92,7 @@ impl ShardManifestStore {
/// Read the latest manifest version.
///
/// Returns `None` if no manifest exists (new shard).
#[instrument(level = "debug", skip(self), fields(shard_id = %self.shard_id))]
pub async fn read_latest(&self) -> Result<Option<ShardManifest>> {
let version = self.find_latest_version().await?;
if version == 0 {
Expand Down Expand Up @@ -134,6 +136,7 @@ impl ShardManifestStore {
/// # Errors
///
/// Returns `Error::AlreadyExists` if another writer already wrote this version.
#[instrument(level = "debug", skip(self, manifest), fields(shard_id = %self.shard_id, version = manifest.version, epoch = manifest.writer_epoch))]
pub async fn write(&self, manifest: &ShardManifest) -> Result<u64> {
let version = manifest.version;
let filename = manifest_filename(version);
Expand Down Expand Up @@ -369,6 +372,7 @@ impl ShardManifestStore {
/// # Errors
///
/// Returns an error if another writer already claimed the shard.
#[instrument(level = "info", skip(self), fields(shard_id = %self.shard_id, shard_spec_id))]
pub async fn claim_epoch(&self, shard_spec_id: u32) -> Result<(u64, ShardManifest)> {
let current = self.read_latest().await?;

Expand Down Expand Up @@ -415,6 +419,7 @@ impl ShardManifestStore {
///
/// Loads the current manifest and compares epochs. If the stored epoch
/// is higher than the local epoch, the writer has been fenced.
#[instrument(level = "debug", skip(self), fields(shard_id = %self.shard_id, local_epoch))]
pub async fn check_fenced(&self, local_epoch: u64) -> Result<()> {
let current = self.read_latest().await?;
Self::check_fenced_against(&current, local_epoch, self.shard_id)
Expand Down Expand Up @@ -452,6 +457,7 @@ impl ShardManifestStore {
/// # Returns
///
/// The successfully written manifest.
#[instrument(level = "debug", skip(self, prepare_fn), fields(shard_id = %self.shard_id, local_epoch))]
pub async fn commit_update<F>(&self, local_epoch: u64, prepare_fn: F) -> Result<ShardManifest>
where
F: Fn(&ShardManifest) -> ShardManifest,
Expand Down
3 changes: 3 additions & 0 deletions rust/lance/src/dataset/mem_wal/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use lance_core::datatypes::Schema;
use lance_core::{Error, Result};
use lance_index::scalar::bloomfilter::sbbf::Sbbf;
use tokio::sync::RwLock;
use tracing::instrument;
use uuid::Uuid;

use super::index::IndexStore;
Expand Down Expand Up @@ -341,6 +342,7 @@ impl MemTable {
/// # Single Writer Requirement
///
/// This method MUST only be called from the single writer task.
#[instrument(level = "debug", skip(self, batch), fields(num_rows = batch.num_rows(), generation = self.generation))]
pub async fn insert(&mut self, batch: RecordBatch) -> Result<usize> {
// Validate schema compatibility
if batch.schema() != self.schema {
Expand Down Expand Up @@ -423,6 +425,7 @@ impl MemTable {
/// # Single Writer Requirement
///
/// This method MUST only be called from the single writer task.
#[instrument(level = "debug", skip(self, batches), fields(batch_count = batches.len(), generation = self.generation))]
pub async fn insert_batches_only(
&mut self,
batches: Vec<RecordBatch>,
Expand Down
4 changes: 4 additions & 0 deletions rust/lance/src/dataset/mem_wal/memtable/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use lance_io::object_store::ObjectStore;
use lance_table::format::IndexMetadata;
use log::info;
use object_store::path::Path;
use tracing::instrument;
use uuid::Uuid;

use super::super::index::MemIndexConfig;
Expand Down Expand Up @@ -77,6 +78,7 @@ impl MemTableFlusher {
}

/// Flush the MemTable to storage (data files, indexes, bloom filter).
#[instrument(level = "info", skip(self, memtable), fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count()))]
pub async fn flush(&self, memtable: &MemTable, epoch: u64) -> Result<FlushResult> {
self.manifest_store.check_fenced(epoch).await?;

Expand Down Expand Up @@ -134,6 +136,7 @@ impl MemTableFlusher {
///
/// Returns the total number of rows written, which is needed for
/// reversing row positions in indexes.
#[instrument(level = "debug", skip(self, memtable), fields(path = %path))]
async fn write_data_file(&self, path: &Path, memtable: &MemTable) -> Result<usize> {
use arrow_array::RecordBatchIterator;

Expand Down Expand Up @@ -180,6 +183,7 @@ impl MemTableFlusher {
}

/// Flush the MemTable to storage with indexes.
#[instrument(level = "info", skip(self, memtable, index_configs), fields(shard_id = %self.shard_id, epoch, generation = memtable.generation(), row_count = memtable.row_count(), index_count = index_configs.len()))]
pub async fn flush_with_indexes(
&self,
memtable: &MemTable,
Expand Down
3 changes: 3 additions & 0 deletions rust/lance/src/dataset/mem_wal/scanner/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use datafusion::prelude::{Expr, SessionContext};
use futures::TryStreamExt;
use lance_core::{Error, Result};
use tracing::instrument;
use uuid::Uuid;

use super::collector::{ActiveMemTableRef, LsmDataSourceCollector};
Expand Down Expand Up @@ -165,6 +166,7 @@ impl LsmScanner {
}

/// Create the execution plan.
#[instrument(level = "debug", skip(self))]
pub async fn create_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
let collector = self.build_collector();
let base_schema = self.schema();
Expand All @@ -183,6 +185,7 @@ impl LsmScanner {
}

/// Execute the scan and return a stream of record batches.
#[instrument(level = "info", skip(self), fields(has_filter = self.filter.is_some(), limit = self.limit, num_shards = self.shard_snapshots.len()))]
pub async fn try_into_stream(&self) -> Result<SendableRecordBatchStream> {
Comment on lines +188 to 189
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to generate something similar to the plan_run events that we have on the normal scanner (

tracing::info!(
)

let plan = self.create_plan().await?;
let ctx = SessionContext::new();
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/dataset/mem_wal/scanner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::{ExecutionPlan, limit::GlobalLimitExec};
use datafusion::prelude::Expr;
use lance_core::Result;
use tracing::instrument;

use super::collector::LsmDataSourceCollector;
use super::data_source::LsmDataSource;
Expand Down Expand Up @@ -75,6 +76,7 @@ impl LsmScanPlanner {
/// - SortPreservingMergeExec is O(N log K) where K is the number of sources
/// - Memory usage is bounded by the sum of K sort buffers rather than all data
/// - No extra column for _memtable_gen in the common case
#[instrument(level = "debug", skip(self, projection, filter), fields(has_filter = filter.is_some(), limit, offset))]
pub async fn plan_scan(
&self,
projection: Option<&[String]>,
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/dataset/mem_wal/scanner/point_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::prelude::Expr;
use lance_core::Result;
use lance_index::scalar::bloomfilter::sbbf::Sbbf;
use tracing::instrument;

use super::collector::LsmDataSourceCollector;
use super::data_source::LsmDataSource;
Expand Down Expand Up @@ -117,6 +118,7 @@ impl LsmPointLookupPlanner {
///
/// An execution plan that returns at most one row - the newest version
/// of the row with the given primary key.
#[instrument(level = "debug", skip(self, pk_values, projection), fields(pk_column_count = self.pk_columns.len()))]
pub async fn plan_lookup(
&self,
pk_values: &[ScalarValue],
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/dataset/mem_wal/scanner/vector_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::union::UnionExec;
use lance_core::Result;
use lance_index::scalar::bloomfilter::sbbf::Sbbf;
use tracing::instrument;

use super::collector::LsmDataSourceCollector;
use super::data_source::LsmDataSource;
Expand Down Expand Up @@ -144,6 +145,7 @@ impl LsmVectorSearchPlanner {
///
/// An execution plan that returns the top-K nearest neighbors across all
/// LSM levels, with stale results filtered out.
#[instrument(level = "info", skip(self, query_vector, projection), fields(k, nprobes, vector_column = %self.vector_column, distance_type = ?self.distance_type))]
pub async fn plan_search(
&self,
query_vector: &FixedSizeListArray,
Expand Down
3 changes: 3 additions & 0 deletions rust/lance/src/dataset/mem_wal/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use lance_io::object_store::ObjectStore;
use object_store::path::Path;
use tokio::sync::{mpsc, watch};

use tracing::instrument;
use uuid::Uuid;

use super::util::{WatchableOnceCell, shard_wal_path, wal_entry_filename};
Expand Down Expand Up @@ -297,6 +298,7 @@ impl WalFlusher {
///
/// A `WalFlushResult` with timing metrics and the WAL entry.
/// Returns empty result if nothing to flush (already flushed past end_batch_position).
#[instrument(level = "info", skip(self, batch_store, indexes), fields(shard_id = %self.shard_id, end_batch_position, has_indexes = indexes.is_some()))]
pub async fn flush_to_with_index_update(
&self,
batch_store: &BatchStore,
Expand Down Expand Up @@ -498,6 +500,7 @@ impl WalEntryData {
/// # Returns
///
/// The parsed WAL entry data, or an error if reading/parsing fails.
#[instrument(level = "debug", skip(object_store), fields(path = %path))]
pub async fn read(object_store: &ObjectStore, path: &Path) -> Result<Self> {
// Read the file
let data = object_store
Expand Down
10 changes: 10 additions & 0 deletions rust/lance/src/dataset/mem_wal/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio::sync::{RwLock, mpsc};
use tokio::task::JoinHandle;
use tokio::time::{Interval, interval_at};
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use uuid::Uuid;

pub use super::index::{
Expand Down Expand Up @@ -942,6 +943,7 @@ impl ShardWriter {
///
/// The `base_path` should come from `ObjectStore::from_uri()` to ensure
/// WAL files are written inside the dataset directory.
#[instrument(level = "info", skip(object_store, base_path, base_uri, schema, index_configs), fields(shard_id = %config.shard_id, index_count = index_configs.len()))]
pub async fn open(
object_store: Arc<ObjectStore>,
base_path: Path,
Expand Down Expand Up @@ -1102,6 +1104,7 @@ impl ShardWriter {
/// Fencing is detected lazily during WAL flush via atomic writes.
/// If another writer has taken over, the WAL flush will fail with
/// `AlreadyExists`, indicating this writer has been fenced.
#[instrument(level = "info", skip(self, batches), fields(batch_count = batches.len(), shard_id = %self.config.shard_id))]
pub async fn put(&self, batches: Vec<RecordBatch>) -> Result<WriteResult> {
if batches.is_empty() {
return Err(Error::invalid_input("Cannot write empty batch list"));
Expand Down Expand Up @@ -1257,6 +1260,7 @@ impl ShardWriter {
/// Close the writer gracefully.
///
/// Flushes pending data and shuts down background tasks.
#[instrument(level = "info", skip(self), fields(shard_id = %self.config.shard_id, epoch = self.epoch))]
pub async fn close(self) -> Result<()> {
Comment on lines 946 to 1264
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we want to give these traces names like sw_open, sw_put, sw_close? A lot of tracing tools will just give the span name unless you drill down and open/put/close are going to feel like regularly filesystem calls.

info!("Closing ShardWriter for shard {}", self.config.shard_id);

Expand Down Expand Up @@ -1373,6 +1377,11 @@ impl WalFlushHandler {
/// * `batch_store` - The batch store to flush from
/// * `indexes` - Optional indexes to update in parallel with WAL I/O
/// * `end_batch_position` - End batch ID (exclusive). Flush batches in (max_flushed, end_batch_position).
#[instrument(
level = "debug",
skip(self, batch_store, indexes),
fields(end_batch_position)
)]
async fn do_flush(
&self,
batch_store: Arc<BatchStore>,
Expand Down Expand Up @@ -1486,6 +1495,7 @@ impl MemTableFlushHandler {
/// This method waits for the WAL flush to complete (sent at freeze time),
/// then flushes to Lance storage. The WAL flush is already queued by
/// freeze_memtable to ensure strict ordering of WAL entries.
#[instrument(level = "info", skip(self, memtable), fields(generation = memtable.generation(), row_count = memtable.row_count()))]
async fn flush_memtable(
&mut self,
memtable: Arc<MemTable>,
Expand Down
Loading