diff --git a/src/descriptor_table.rs b/src/descriptor_table.rs index 12df8db2b..742bc4b9a 100644 --- a/src/descriptor_table.rs +++ b/src/descriptor_table.rs @@ -2,14 +2,14 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::GlobalTableId; +use crate::{fs::FsFile, GlobalTableId}; use quick_cache::{sync::Cache as QuickCache, UnitWeighter}; -use std::{fs::File, sync::Arc}; +use std::sync::Arc; const TAG_BLOCK: u8 = 0; const TAG_BLOB: u8 = 1; -type Item = Arc; +type Item = Arc; #[derive(Eq, std::hash::Hash, PartialEq)] struct CacheKey(u8, u64, u64); @@ -40,7 +40,7 @@ impl DescriptorTable { } #[must_use] - pub fn access_for_table(&self, id: &GlobalTableId) -> Option> { + pub fn access_for_table(&self, id: &GlobalTableId) -> Option> { let key = CacheKey(TAG_BLOCK, id.tree_id(), id.table_id()); self.inner.get(&key) } @@ -51,7 +51,7 @@ impl DescriptorTable { } #[must_use] - pub fn access_for_blob_file(&self, id: &GlobalTableId) -> Option> { + pub fn access_for_blob_file(&self, id: &GlobalTableId) -> Option> { let key = CacheKey(TAG_BLOB, id.tree_id(), id.table_id()); self.inner.get(&key) } diff --git a/src/file.rs b/src/file.rs index 87ce99dd0..89e89c12f 100644 --- a/src/file.rs +++ b/src/file.rs @@ -11,19 +11,24 @@ pub const TABLES_FOLDER: &str = "tables"; pub const BLOBS_FOLDER: &str = "blobs"; pub const CURRENT_VERSION_FILE: &str = "current"; -/// Reads bytes from a file using `pread`. -pub fn read_exact(file: &impl FsFile, offset: u64, size: usize) -> std::io::Result { +/// Reads bytes from a file at the given offset without changing the cursor. +/// +/// Uses [`FsFile::read_at`] (equivalent to `pread(2)`) so multiple threads +/// can call this concurrently on the same file handle. +pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Result { // SAFETY: This slice builder starts uninitialized, but we know its length // - // We use FsFile::read_at which gives us the number of bytes read + // We use FsFile::read_at which gives us the number of bytes read. // If that number does not match the slice length, the function errors, - // so the (partially) uninitialized buffer is discarded + // so the (partially) uninitialized buffer is discarded. // // Additionally, generally, block loads furthermore do a checksum check which - // would likely catch the buffer being wrong somehow + // would likely catch the buffer being wrong somehow. #[expect(unsafe_code, reason = "see safety")] let mut builder = unsafe { Slice::builder_unzeroed(size) }; + // Single call is correct: FsFile::read_at has fill-or-EOF semantics — + // implementations handle EINTR/short-read retry internally. let bytes_read = file.read_at(&mut builder, offset)?; if bytes_read != size { @@ -87,6 +92,23 @@ mod tests { use std::io::Write; use test_log::test; + #[test] + fn read_exact_short_read_returns_error() -> crate::Result<()> { + let dir = tempfile::tempdir()?; + let path = dir.path().join("short.bin"); + { + let mut f = File::create(&path)?; + f.write_all(b"hello")?; // 5 bytes + } + + let file = File::open(&path)?; + // Request 10 bytes from a 5-byte file → short read → UnexpectedEof + let err = read_exact(&file, 0, 10).unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof); + + Ok(()) + } + #[test] fn atomic_rewrite() -> crate::Result<()> { let dir = tempfile::tempdir()?; diff --git a/src/file_accessor.rs b/src/file_accessor.rs index 742205fe8..ae16b5dc9 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -3,8 +3,9 @@ // (found in the LICENSE-* files in the repository) use crate::descriptor_table::DescriptorTable; +use crate::fs::FsFile; use crate::GlobalTableId; -use std::{fs::File, sync::Arc}; +use std::sync::Arc; /// Allows accessing a file (either cached or pinned) #[derive(Clone)] @@ -12,7 +13,7 @@ pub enum FileAccessor { /// Pinned file descriptor /// /// This is used in case file descriptor cache is `None` (to skip cache lookups) - File(Arc), + File(Arc), /// Access to file descriptor cache DescriptorTable(Arc), @@ -28,21 +29,21 @@ impl FileAccessor { } #[must_use] - pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option> { + pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option> { match self { Self::File(fd) => Some(fd.clone()), Self::DescriptorTable(descriptor_table) => descriptor_table.access_for_table(table_id), } } - pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc) { + pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc) { if let Self::DescriptorTable(descriptor_table) = self { descriptor_table.insert_for_table(table_id, fd); } } #[must_use] - pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option> { + pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option> { match self { Self::File(fd) => Some(fd.clone()), Self::DescriptorTable(descriptor_table) => { @@ -51,7 +52,7 @@ impl FileAccessor { } } - pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc) { + pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc) { if let Self::DescriptorTable(descriptor_table) = self { descriptor_table.insert_for_blob_file(table_id, fd); } diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 43f339282..6e14b17f1 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -170,6 +170,16 @@ pub trait FsFile: Read + Write + Seek + Send + Sync { /// Equivalent to `pread(2)` on Unix. Multiple threads can call this /// concurrently on the same file handle without synchronization. /// + /// Implementations must provide *fill-or-EOF* semantics: on success, + /// this method either fills `buf` completely and returns + /// `Ok(buf.len())`, or returns `Ok(n)` with `n < buf.len()` only if + /// the read has reached EOF. Callers may rely on a short read + /// indicating EOF and therefore do not need a retry loop. + /// + /// Implementations are responsible for handling OS-level short reads + /// and interrupts internally (for example, by retrying on `EINTR`) + /// so that the above guarantee holds unless an error is returned. + /// /// # Errors /// /// Returns an I/O error if the read fails. diff --git a/src/fs/std_fs.rs b/src/fs/std_fs.rs index eb0653c10..a810448ea 100644 --- a/src/fs/std_fs.rs +++ b/src/fs/std_fs.rs @@ -65,26 +65,54 @@ impl FsFile for File { } fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { - #[cfg(unix)] - { - use std::os::unix::fs::FileExt; - FileExt::read_at(self, buf, offset) - } - - #[cfg(windows)] - { - use std::os::windows::fs::FileExt; - FileExt::seek_read(self, buf, offset) + // Fill-or-EOF loop: retry on short reads and EINTR so that callers + // see either a full buffer or a short read that signals EOF. + let mut filled = 0usize; + + while filled < buf.len() { + // SAFETY: loop guard `filled < buf.len()` ensures this is in-bounds. + #[expect(clippy::expect_used, reason = "filled < buf.len() by loop guard")] + let remaining = buf.get_mut(filled..).expect("filled < buf.len()"); + let off = offset.saturating_add(filled as u64); + + let n = { + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + match FileExt::read_at(self, remaining, off) { + Ok(n) => n, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + } + + #[cfg(windows)] + { + use std::os::windows::fs::FileExt; + match FileExt::seek_read(self, remaining, off) { + Ok(n) => n, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + } + + #[cfg(not(any(unix, windows)))] + { + let _ = (remaining, off); + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "read_at is not supported on this platform", + )); + } + }; + + if n == 0 { + break; // EOF + } + filled += n; } - #[cfg(not(any(unix, windows)))] - { - let _ = (buf, offset); - Err(io::Error::new( - io::ErrorKind::Unsupported, - "read_at is not supported on this platform", - )) - } + Ok(filled) } fn lock_exclusive(&self) -> io::Result<()> { diff --git a/src/table/block/mod.rs b/src/table/block/mod.rs index bf24c8090..c1fb21a88 100644 --- a/src/table/block/mod.rs +++ b/src/table/block/mod.rs @@ -267,7 +267,7 @@ impl Block { /// Pipeline: read → verify checksum → decrypt → decompress. /// When `encryption` is `None`, the decrypt step is skipped. pub fn from_file( - file: &impl FsFile, + file: &dyn FsFile, handle: BlockHandle, compression: CompressionType, encryption: Option<&dyn EncryptionProvider>, diff --git a/src/table/meta.rs b/src/table/meta.rs index 5bb6ced3a..bcfb21654 100644 --- a/src/table/meta.rs +++ b/src/table/meta.rs @@ -99,7 +99,7 @@ fn validated_kv_seqno(kv_seqno: SeqNo, max_seqno: SeqNo) -> crate::Result impl ParsedMeta { #[expect(clippy::expect_used, clippy::too_many_lines)] pub fn load_with_handle( - file: &impl FsFile, + file: &dyn FsFile, handle: &BlockHandle, encryption: Option<&dyn crate::encryption::EncryptionProvider>, ) -> crate::Result { diff --git a/src/table/mod.rs b/src/table/mod.rs index 3c1c067e2..06a083911 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -48,7 +48,6 @@ use inner::Inner; use iter::Iter; use std::{ borrow::Cow, - fs::File, ops::{Bound, RangeBounds}, path::PathBuf, sync::Arc, @@ -117,11 +116,13 @@ impl Table { if let Some(fd) = self.file_accessor.access_for_table(&table_id) { (fd, false) } else { - (Arc::new(File::open(&*self.path)?), true) + let fd: Arc = Arc::new(std::fs::File::open(&*self.path)?); + (fd, true) }; // Read the exact region using pread-style helper - let buf = crate::file::read_exact(&*fd, *handle.offset(), handle.size() as usize)?; + let buf = + crate::file::read_exact(fd.as_ref(), *handle.offset(), handle.size() as usize)?; // If we opened the file here, cache the FD for future accesses if fd_cache_miss { @@ -430,7 +431,7 @@ impl Table { fn read_tli( regions: &ParsedRegions, - file: &impl FsFile, + file: &dyn FsFile, compression: CompressionType, encryption: Option<&dyn crate::encryption::EncryptionProvider>, ) -> crate::Result { @@ -487,12 +488,12 @@ impl Table { let metadata = ParsedMeta::load_with_handle(&file, ®ions.metadata, encryption.as_deref())?; - let file = Arc::new(file); + let file_handle: Arc = Arc::new(file); let file_accessor = if let Some(dt) = descriptor_table { FileAccessor::DescriptorTable(dt) } else { - FileAccessor::File(file.clone()) + FileAccessor::File(file_handle.clone()) }; let block_index = if regions.index.is_some() { @@ -503,7 +504,7 @@ impl Table { let block = Self::read_tli( ®ions, - &*file, + file_handle.as_ref(), metadata.index_block_compression, encryption.as_deref(), )?; @@ -529,7 +530,7 @@ impl Table { let block = Self::read_tli( ®ions, - &*file, + file_handle.as_ref(), metadata.index_block_compression, encryption.as_deref(), )?; @@ -554,7 +555,7 @@ impl Table { let pinned_filter_index = if let Some(filter_tli_handle) = regions.filter_tli { let block = Block::from_file( - &*file, + file_handle.as_ref(), filter_tli_handle, metadata.index_block_compression, encryption.as_deref(), @@ -574,7 +575,7 @@ impl Table { ); let block = Block::from_file( - &*file, + file_handle.as_ref(), filter_handle, crate::CompressionType::None, // NOTE: We never write a filter block with compression encryption.as_deref(), @@ -601,7 +602,7 @@ impl Table { let range_tombstones = if let Some(rt_handle) = regions.range_tombstones { log::trace!("Loading range tombstone block, with rt_ptr={rt_handle:?}"); let block = Block::from_file( - &*file, + file_handle.as_ref(), rt_handle, crate::CompressionType::None, encryption.as_deref(), diff --git a/src/table/util.rs b/src/table/util.rs index 35a79035e..6a5a9a0e4 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -4,8 +4,8 @@ use super::{Block, BlockHandle, GlobalTableId}; use crate::{ - encryption::EncryptionProvider, file_accessor::FileAccessor, table::block::BlockType, - version::run::Ranged, Cache, CompressionType, KeyRange, Table, + encryption::EncryptionProvider, file_accessor::FileAccessor, fs::FsFile, + table::block::BlockType, version::run::Ranged, Cache, CompressionType, KeyRange, Table, }; use std::{path::Path, sync::Arc}; @@ -76,15 +76,19 @@ pub fn load_block( (cached_fd, false) } else { - let fd = std::fs::File::open(path)?; + let file = std::fs::File::open(path)?; #[cfg(feature = "metrics")] metrics.table_file_opened_uncached.fetch_add(1, Relaxed); - (Arc::new(fd), true) + // The if-branch returns Arc from the descriptor + // table, so the else-branch needs an explicit type annotation + // to trigger unsizing coercion. + let fd: Arc = Arc::new(file); + (fd, true) }; - let block = Block::from_file(&*fd, *handle, compression, encryption)?; + let block = Block::from_file(fd.as_ref(), *handle, compression, encryption)?; if block.header.block_type != block_type { return Err(crate::Error::InvalidTag(( diff --git a/src/vlog/accessor.rs b/src/vlog/accessor.rs index f246ff7d7..82ca17448 100644 --- a/src/vlog/accessor.rs +++ b/src/vlog/accessor.rs @@ -3,11 +3,12 @@ // (found in the LICENSE-* files in the repository) use crate::{ + fs::FsFile, version::BlobFileList, vlog::{blob_file::reader::Reader, ValueHandle}, Cache, GlobalTableId, TreeId, UserValue, }; -use std::{fs::File, path::Path, sync::Arc}; +use std::{path::Path, sync::Arc}; pub struct Accessor<'a>(&'a BlobFileList); @@ -38,13 +39,13 @@ impl<'a> Accessor<'a> { if let Some(cached_fd) = blob_file.file_accessor().access_for_blob_file(&bf_id) { (cached_fd, false) } else { - let file = Arc::new(File::open( + let file: Arc = Arc::new(std::fs::File::open( base_path.join(vhandle.blob_file_id.to_string()), )?); (file, true) }; - let value = Reader::new(blob_file, &file).get(key, vhandle)?; + let value = Reader::new(blob_file, file.as_ref()).get(key, vhandle)?; cache.insert_blob(tree_id, vhandle, value.clone()); if fd_cache_miss { diff --git a/src/vlog/blob_file/multi_writer.rs b/src/vlog/blob_file/multi_writer.rs index bd8f8608f..7c23563a8 100644 --- a/src/vlog/blob_file/multi_writer.rs +++ b/src/vlog/blob_file/multi_writer.rs @@ -3,6 +3,7 @@ // (found in the LICENSE-* files in the repository) use super::writer::Writer; +use crate::fs::FsFile; use crate::{ file_accessor::FileAccessor, vlog::{ @@ -12,7 +13,6 @@ use crate::{ BlobFile, CompressionType, DescriptorTable, SeqNo, SequenceNumberCounter, TreeId, }; use std::{ - fs::File, path::{Path, PathBuf}, sync::{atomic::AtomicBool, Arc}, }; @@ -136,10 +136,11 @@ impl MultiWriter { let (metadata, checksum) = writer.finish()?; - let file = Arc::new(File::open(&path)?); - let file_accessor = descriptor_table.map_or(FileAccessor::File(file.clone()), |dt| { - FileAccessor::DescriptorTable(dt) - }); + let file: Arc = Arc::new(std::fs::File::open(&path)?); + let file_accessor = descriptor_table.map_or_else( + || FileAccessor::File(file.clone()), + FileAccessor::DescriptorTable, + ); file_accessor.insert_for_blob_file((tree_id, blob_file_id).into(), file); let blob_file = BlobFile(Arc::new(BlobFileInner { diff --git a/src/vlog/blob_file/reader.rs b/src/vlog/blob_file/reader.rs index 982eb1b63..43bbe33e4 100644 --- a/src/vlog/blob_file/reader.rs +++ b/src/vlog/blob_file/reader.rs @@ -3,6 +3,7 @@ // (found in the LICENSE-* files in the repository) use crate::{ + fs::FsFile, vlog::{ blob_file::writer::{ validate_header_crc, BLOB_HEADER_LEN_V4, BLOB_HEADER_MAGIC_V3, BLOB_HEADER_MAGIC_V4, @@ -11,6 +12,8 @@ use crate::{ }, BlobFile, Checksum, CompressionType, UserValue, }; +use byteorder::{LittleEndian, ReadBytesExt}; +use std::io::{Cursor, Read}; /// Safety cap on blob value size (256 MiB). /// @@ -23,20 +26,15 @@ use crate::{ /// `table::block` rather than shared, because blocks and blobs are /// independent storage formats that may diverge in the future. const MAX_DECOMPRESSION_SIZE: usize = 256 * 1024 * 1024; -use byteorder::{LittleEndian, ReadBytesExt}; -use std::{ - fs::File, - io::{Cursor, Read}, -}; /// Reads a single blob from a blob file pub struct Reader<'a> { blob_file: &'a BlobFile, - file: &'a File, + file: &'a dyn FsFile, } impl<'a> Reader<'a> { - pub fn new(blob_file: &'a BlobFile, file: &'a File) -> Self { + pub fn new(blob_file: &'a BlobFile, file: &'a dyn FsFile) -> Self { Self { blob_file, file } } @@ -242,6 +240,7 @@ mod tests { use super::*; use crate::vlog::blob_file::writer::BLOB_HEADER_LEN_V3; use crate::SequenceNumberCounter; + use std::fs::File; use test_log::test; #[test] diff --git a/tests/ingestion_api.rs b/tests/ingestion_api.rs index 27ba70d1b..96fc2cf50 100644 --- a/tests/ingestion_api.rs +++ b/tests/ingestion_api.rs @@ -163,13 +163,15 @@ fn blob_ingestion_out_of_order_panics_without_blob_write() -> lsm_tree::Result<( let before = tree.blob_file_count(); // Use a small value for the first write to avoid blob I/O - let result = std::panic::catch_unwind(|| { + // AssertUnwindSafe needed: tree contains Arc (from DescriptorTable) + // which isn't UnwindSafe. Safe here — test only checks panic behavior. + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { let mut ingest = tree.ingestion().unwrap(); ingest.write(b"k2", b"x").unwrap(); // Second write would require blob I/O, but ordering check should fire before any blob write let _ = ingest.write(b"k1", [1u8; 16]); - }); + })); assert!(result.is_err()); let after = tree.blob_file_count();