Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/descriptor_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::GlobalTableId;
use crate::{fs::FsFile, GlobalTableId};
use quick_cache::{sync::Cache as QuickCache, UnitWeighter};
use std::{fs::File, sync::Arc};
use std::sync::Arc;

const TAG_BLOCK: u8 = 0;
const TAG_BLOB: u8 = 1;

type Item = Arc<File>;
type Item = Arc<dyn FsFile>;

#[derive(Eq, std::hash::Hash, PartialEq)]
struct CacheKey(u8, u64, u64);
Expand Down Expand Up @@ -40,7 +40,7 @@ impl DescriptorTable {
}

#[must_use]
pub fn access_for_table(&self, id: &GlobalTableId) -> Option<Arc<File>> {
pub fn access_for_table(&self, id: &GlobalTableId) -> Option<Arc<dyn FsFile>> {
let key = CacheKey(TAG_BLOCK, id.tree_id(), id.table_id());
self.inner.get(&key)
}
Expand All @@ -51,7 +51,7 @@ impl DescriptorTable {
}

#[must_use]
pub fn access_for_blob_file(&self, id: &GlobalTableId) -> Option<Arc<File>> {
pub fn access_for_blob_file(&self, id: &GlobalTableId) -> Option<Arc<dyn FsFile>> {
let key = CacheKey(TAG_BLOB, id.tree_id(), id.table_id());
self.inner.get(&key)
}
Expand Down
32 changes: 27 additions & 5 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,24 @@ pub const TABLES_FOLDER: &str = "tables";
pub const BLOBS_FOLDER: &str = "blobs";
pub const CURRENT_VERSION_FILE: &str = "current";

/// Reads bytes from a file using `pread`.
pub fn read_exact(file: &impl FsFile, offset: u64, size: usize) -> std::io::Result<Slice> {
/// Reads bytes from a file at the given offset without changing the cursor.
///
/// Uses [`FsFile::read_at`] (equivalent to `pread(2)`) so multiple threads
/// can call this concurrently on the same file handle.
Comment thread
polaz marked this conversation as resolved.
pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Result<Slice> {
Comment thread
polaz marked this conversation as resolved.
// SAFETY: This slice builder starts uninitialized, but we know its length
//
// We use FsFile::read_at which gives us the number of bytes read
// We use FsFile::read_at which gives us the number of bytes read.
// If that number does not match the slice length, the function errors,
// so the (partially) uninitialized buffer is discarded
// so the (partially) uninitialized buffer is discarded.
Comment thread
polaz marked this conversation as resolved.
//
// Additionally, generally, block loads furthermore do a checksum check which
// would likely catch the buffer being wrong somehow
// would likely catch the buffer being wrong somehow.
#[expect(unsafe_code, reason = "see safety")]
let mut builder = unsafe { Slice::builder_unzeroed(size) };

// Single call is correct: FsFile::read_at has fill-or-EOF semantics —
// implementations handle EINTR/short-read retry internally.
let bytes_read = file.read_at(&mut builder, offset)?;

if bytes_read != size {
Expand Down Expand Up @@ -87,6 +92,23 @@ mod tests {
use std::io::Write;
use test_log::test;

#[test]
fn read_exact_short_read_returns_error() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("short.bin");
{
let mut f = File::create(&path)?;
f.write_all(b"hello")?; // 5 bytes
}

let file = File::open(&path)?;
// Request 10 bytes from a 5-byte file → short read → UnexpectedEof
let err = read_exact(&file, 0, 10).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);

Ok(())
}

#[test]
fn atomic_rewrite() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
Expand Down
13 changes: 7 additions & 6 deletions src/file_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
// (found in the LICENSE-* files in the repository)

use crate::descriptor_table::DescriptorTable;
use crate::fs::FsFile;
use crate::GlobalTableId;
use std::{fs::File, sync::Arc};
use std::sync::Arc;

/// Allows accessing a file (either cached or pinned)
#[derive(Clone)]
pub enum FileAccessor {
/// Pinned file descriptor
///
/// This is used in case file descriptor cache is `None` (to skip cache lookups)
File(Arc<File>),
File(Arc<dyn FsFile>),

/// Access to file descriptor cache
DescriptorTable(Arc<DescriptorTable>),
Expand All @@ -28,21 +29,21 @@ impl FileAccessor {
}

#[must_use]
pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option<Arc<File>> {
pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option<Arc<dyn FsFile>> {
match self {
Self::File(fd) => Some(fd.clone()),
Self::DescriptorTable(descriptor_table) => descriptor_table.access_for_table(table_id),
}
}

pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc<File>) {
pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc<dyn FsFile>) {
if let Self::DescriptorTable(descriptor_table) = self {
descriptor_table.insert_for_table(table_id, fd);
}
}

#[must_use]
pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option<Arc<File>> {
pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option<Arc<dyn FsFile>> {
match self {
Self::File(fd) => Some(fd.clone()),
Self::DescriptorTable(descriptor_table) => {
Expand All @@ -51,7 +52,7 @@ impl FileAccessor {
}
}

pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc<File>) {
pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc<dyn FsFile>) {
if let Self::DescriptorTable(descriptor_table) = self {
descriptor_table.insert_for_blob_file(table_id, fd);
}
Expand Down
10 changes: 10 additions & 0 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ pub trait FsFile: Read + Write + Seek + Send + Sync {
/// Equivalent to `pread(2)` on Unix. Multiple threads can call this
/// concurrently on the same file handle without synchronization.
///
/// Implementations must provide *fill-or-EOF* semantics: on success,
/// this method either fills `buf` completely and returns
/// `Ok(buf.len())`, or returns `Ok(n)` with `n < buf.len()` only if
/// the read has reached EOF. Callers may rely on a short read
/// indicating EOF and therefore do not need a retry loop.
///
/// Implementations are responsible for handling OS-level short reads
/// and interrupts internally (for example, by retrying on `EINTR`)
/// so that the above guarantee holds unless an error is returned.
///
Comment thread
polaz marked this conversation as resolved.
/// # Errors
///
/// Returns an I/O error if the read fails.
Expand Down
64 changes: 46 additions & 18 deletions src/fs/std_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,54 @@ impl FsFile for File {
}

fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
FileExt::read_at(self, buf, offset)
}

#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
FileExt::seek_read(self, buf, offset)
// Fill-or-EOF loop: retry on short reads and EINTR so that callers
// see either a full buffer or a short read that signals EOF.
let mut filled = 0usize;

while filled < buf.len() {
// SAFETY: loop guard `filled < buf.len()` ensures this is in-bounds.
#[expect(clippy::expect_used, reason = "filled < buf.len() by loop guard")]
let remaining = buf.get_mut(filled..).expect("filled < buf.len()");
let off = offset.saturating_add(filled as u64);

Comment thread
polaz marked this conversation as resolved.
let n = {
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
match FileExt::read_at(self, remaining, off) {
Ok(n) => n,
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
Comment thread
polaz marked this conversation as resolved.
}
}

#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
match FileExt::seek_read(self, remaining, off) {
Ok(n) => n,
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}

#[cfg(not(any(unix, windows)))]
{
let _ = (remaining, off);
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"read_at is not supported on this platform",
));
}
};
Comment thread
polaz marked this conversation as resolved.

if n == 0 {
break; // EOF
}
filled += n;
}

#[cfg(not(any(unix, windows)))]
{
let _ = (buf, offset);
Err(io::Error::new(
io::ErrorKind::Unsupported,
"read_at is not supported on this platform",
))
}
Ok(filled)
}

fn lock_exclusive(&self) -> io::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/table/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl Block {
/// Pipeline: read → verify checksum → decrypt → decompress.
/// When `encryption` is `None`, the decrypt step is skipped.
pub fn from_file(
file: &impl FsFile,
file: &dyn FsFile,
handle: BlockHandle,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
Expand Down
2 changes: 1 addition & 1 deletion src/table/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn validated_kv_seqno(kv_seqno: SeqNo, max_seqno: SeqNo) -> crate::Result<SeqNo>
impl ParsedMeta {
#[expect(clippy::expect_used, clippy::too_many_lines)]
pub fn load_with_handle(
file: &impl FsFile,
file: &dyn FsFile,
handle: &BlockHandle,
encryption: Option<&dyn crate::encryption::EncryptionProvider>,
) -> crate::Result<Self> {
Expand Down
23 changes: 12 additions & 11 deletions src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use inner::Inner;
use iter::Iter;
use std::{
borrow::Cow,
fs::File,
ops::{Bound, RangeBounds},
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -117,11 +116,13 @@ impl Table {
if let Some(fd) = self.file_accessor.access_for_table(&table_id) {
(fd, false)
} else {
(Arc::new(File::open(&*self.path)?), true)
let fd: Arc<dyn FsFile> = Arc::new(std::fs::File::open(&*self.path)?);
(fd, true)
};

// Read the exact region using pread-style helper
let buf = crate::file::read_exact(&*fd, *handle.offset(), handle.size() as usize)?;
let buf =
crate::file::read_exact(fd.as_ref(), *handle.offset(), handle.size() as usize)?;

// If we opened the file here, cache the FD for future accesses
if fd_cache_miss {
Expand Down Expand Up @@ -430,7 +431,7 @@ impl Table {

fn read_tli(
regions: &ParsedRegions,
file: &impl FsFile,
file: &dyn FsFile,
compression: CompressionType,
encryption: Option<&dyn crate::encryption::EncryptionProvider>,
) -> crate::Result<IndexBlock> {
Expand Down Expand Up @@ -487,12 +488,12 @@ impl Table {
let metadata =
ParsedMeta::load_with_handle(&file, &regions.metadata, encryption.as_deref())?;

let file = Arc::new(file);
let file_handle: Arc<dyn FsFile> = Arc::new(file);

let file_accessor = if let Some(dt) = descriptor_table {
FileAccessor::DescriptorTable(dt)
} else {
FileAccessor::File(file.clone())
FileAccessor::File(file_handle.clone())
};

let block_index = if regions.index.is_some() {
Expand All @@ -503,7 +504,7 @@ impl Table {

let block = Self::read_tli(
&regions,
&*file,
file_handle.as_ref(),
metadata.index_block_compression,
encryption.as_deref(),
)?;
Expand All @@ -529,7 +530,7 @@ impl Table {

let block = Self::read_tli(
&regions,
&*file,
file_handle.as_ref(),
metadata.index_block_compression,
encryption.as_deref(),
)?;
Expand All @@ -554,7 +555,7 @@ impl Table {

let pinned_filter_index = if let Some(filter_tli_handle) = regions.filter_tli {
let block = Block::from_file(
&*file,
file_handle.as_ref(),
filter_tli_handle,
metadata.index_block_compression,
encryption.as_deref(),
Expand All @@ -574,7 +575,7 @@ impl Table {
);

let block = Block::from_file(
&*file,
file_handle.as_ref(),
filter_handle,
crate::CompressionType::None, // NOTE: We never write a filter block with compression
encryption.as_deref(),
Expand All @@ -601,7 +602,7 @@ impl Table {
let range_tombstones = if let Some(rt_handle) = regions.range_tombstones {
log::trace!("Loading range tombstone block, with rt_ptr={rt_handle:?}");
let block = Block::from_file(
&*file,
file_handle.as_ref(),
rt_handle,
crate::CompressionType::None,
encryption.as_deref(),
Expand Down
14 changes: 9 additions & 5 deletions src/table/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

use super::{Block, BlockHandle, GlobalTableId};
use crate::{
encryption::EncryptionProvider, file_accessor::FileAccessor, table::block::BlockType,
version::run::Ranged, Cache, CompressionType, KeyRange, Table,
encryption::EncryptionProvider, file_accessor::FileAccessor, fs::FsFile,
table::block::BlockType, version::run::Ranged, Cache, CompressionType, KeyRange, Table,
};
use std::{path::Path, sync::Arc};

Expand Down Expand Up @@ -76,15 +76,19 @@ pub fn load_block(

(cached_fd, false)
} else {
let fd = std::fs::File::open(path)?;
let file = std::fs::File::open(path)?;

#[cfg(feature = "metrics")]
metrics.table_file_opened_uncached.fetch_add(1, Relaxed);

(Arc::new(fd), true)
// The if-branch returns Arc<dyn FsFile> from the descriptor
// table, so the else-branch needs an explicit type annotation
// to trigger unsizing coercion.
let fd: Arc<dyn FsFile> = Arc::new(file);
(fd, true)
Comment thread
polaz marked this conversation as resolved.
Comment thread
polaz marked this conversation as resolved.
};

let block = Block::from_file(&*fd, *handle, compression, encryption)?;
let block = Block::from_file(fd.as_ref(), *handle, compression, encryption)?;

if block.header.block_type != block_type {
return Err(crate::Error::InvalidTag((
Expand Down
Loading
Loading