Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl<'a> BlobIngestion<'a> {
tree.index.config.path.join(BLOBS_FOLDER),
tree.index.id,
tree.index.config.descriptor_table.clone(),
tree.index.config.fs.clone(),
)?
.use_target_size(blob_file_size)
.use_compression(kv.compression);
Expand Down Expand Up @@ -261,6 +262,7 @@ impl<'a> BlobIngestion<'a> {
},
global_seqno,
&self.tree.index.config.visible_seqno,
&*self.tree.index.config.fs,
)?;

// Perform maintenance on the version history (e.g., clean up old versions).
Expand Down
7 changes: 5 additions & 2 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ pub struct BlobTree {
impl BlobTree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
use crate::file::{fsync_directory, BLOBS_FOLDER};
use crate::fs::Fs;

let index = crate::Tree::open(config)?;

let blobs_folder = index.config.path.join(BLOBS_FOLDER);
std::fs::create_dir_all(&blobs_folder)?;
fsync_directory(&blobs_folder)?;
(*index.config.fs).create_dir_all(&blobs_folder)?;
fsync_directory(&blobs_folder, &*index.config.fs)?;

let blob_file_id_to_continue_with = index
.current_version()
Expand Down Expand Up @@ -424,6 +425,7 @@ impl AbstractTree for BlobTree {
self.index.table_id_counter.clone(),
64 * 1_024 * 1_024,
0,
self.index.config.fs.clone(),
)?
.use_data_block_restart_interval(data_block_restart_interval)
.use_index_block_restart_interval(index_block_restart_interval)
Expand Down Expand Up @@ -468,6 +470,7 @@ impl AbstractTree for BlobTree {
self.index.config.path.join(BLOBS_FOLDER),
self.id(),
self.index.config.descriptor_table.clone(),
self.index.config.fs.clone(),
)?
.use_target_size(kv_opts.file_target_size)
.use_compression(kv_opts.compression);
Expand Down
1 change: 1 addition & 0 deletions src/compaction/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ impl<'a, 'b: 'a> StreamFilterAdapter<'a, 'b> {
self.shared.blobs_folder,
self.shared.opts.tree_id,
self.shared.opts.config.descriptor_table.clone(),
self.shared.opts.config.fs.clone(),
)?
.use_target_size(blob_opts.file_target_size)
.use_compression(blob_opts.compression);
Expand Down
3 changes: 3 additions & 0 deletions src/compaction/flavour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub(super) fn prepare_table_writer(
opts.table_id_generator.clone(),
payload.target_size,
payload.dest_level,
opts.config.fs.clone(),
)?
// Compaction consumes input tables, so clip RTs to each output table's key range.
.use_clip_range_tombstones();
Expand Down Expand Up @@ -328,6 +329,7 @@ impl CompactionFlavour for RelocatingCompaction {
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
)?;

// NOTE: If the application were to crash >here< it's fine
Expand Down Expand Up @@ -466,6 +468,7 @@ impl CompactionFlavour for StandardCompaction {
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
)?;

// NOTE: If the application were to crash >here< it's fine
Expand Down
3 changes: 3 additions & 0 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ fn move_tables(
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
)?;

if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
Expand Down Expand Up @@ -478,6 +479,7 @@ fn merge_tables(
&blobs_folder,
opts.tree_id,
opts.config.descriptor_table.clone(),
opts.config.fs.clone(),
)?
.use_target_size(blob_opts.file_target_size)
.use_passthrough_compression(blob_opts.compression);
Expand Down Expand Up @@ -672,6 +674,7 @@ fn drop_tables(
},
&opts.global_seqno,
&opts.visible_seqno,
&*opts.config.fs,
)?;

if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
Expand Down
33 changes: 21 additions & 12 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{fs::FsFile, Slice};
use crate::{
fs::{Fs, FsFile},
Slice,
Comment thread
polaz marked this conversation as resolved.
};
use std::{io::Write, path::Path};

pub const MAGIC_BYTES: [u8; 4] = [b'L', b'S', b'M', 3];
Expand Down Expand Up @@ -42,52 +45,58 @@ pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Resul
}

/// Atomically rewrites a file.
pub fn rewrite_atomic(path: &Path, content: &[u8]) -> std::io::Result<()> {
pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> {
#[expect(
clippy::expect_used,
reason = "every file should have a parent directory"
)]
let folder = path.parent().expect("should have a parent");

// NOTE: tempfile crate uses std::fs internally; migrating temp-file
// creation to Fs would require a custom implementation.
let mut temp_file = tempfile::NamedTempFile::new_in(folder)?;
temp_file.write_all(content)?;
temp_file.flush()?;
temp_file.as_file_mut().sync_all()?;
temp_file.persist(path)?;

// TODO: not sure why it fails on Windows...
// Suppress unused-variable warning on Windows where the post-persist
// sync block is skipped (directory fsync is unsupported).
let _ = &fs;
Comment thread
polaz marked this conversation as resolved.

#[cfg(not(target_os = "windows"))]
{
let file = std::fs::File::open(path)?;
file.sync_all()?;
use crate::fs::FsOpenOptions;

let file = fs.open(path, &FsOpenOptions::new().read(true))?;
FsFile::sync_all(&*file)?;

#[expect(
clippy::expect_used,
reason = "files should always have a parent directory"
)]
let folder = path.parent().expect("should have parent folder");
fsync_directory(folder)?;
fs.sync_directory(folder)?;
}
Comment thread
polaz marked this conversation as resolved.

Ok(())
}

#[cfg(not(target_os = "windows"))]
pub fn fsync_directory(path: &Path) -> std::io::Result<()> {
let file = std::fs::File::open(path)?;
debug_assert!(file.metadata()?.is_dir());
file.sync_all()
pub fn fsync_directory(path: &Path, fs: &dyn Fs) -> std::io::Result<()> {
fs.sync_directory(path)
}

#[cfg(target_os = "windows")]
pub fn fsync_directory(path: &Path) -> std::io::Result<()> {
pub fn fsync_directory(_path: &Path, _fs: &dyn Fs) -> std::io::Result<()> {
// Cannot fsync directory on Windows
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use crate::fs::StdFs;
use std::fs::File;
use std::io::Write;
use test_log::test;
Expand Down Expand Up @@ -119,7 +128,7 @@ mod tests {
write!(file, "asdasdasdasdasd")?;
}

rewrite_atomic(&path, b"newcontent")?;
rewrite_atomic(&path, b"newcontent", &StdFs)?;

let content = std::fs::read_to_string(&path)?;
assert_eq!("newcontent", content);
Expand Down
15 changes: 10 additions & 5 deletions src/table/multi_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@

use super::{filter::BloomConstructionPolicy, writer::Writer};
use crate::{
blob_tree::handle::BlobIndirection, encryption::EncryptionProvider, prefix::PrefixExtractor,
range_tombstone::RangeTombstone, table::writer::LinkedFile, value::InternalValue,
vlog::BlobFileId, Checksum, CompressionType, HashMap, SequenceNumberCounter, TableId, UserKey,
blob_tree::handle::BlobIndirection, encryption::EncryptionProvider, fs::Fs,
prefix::PrefixExtractor, range_tombstone::RangeTombstone, table::writer::LinkedFile,
value::InternalValue, vlog::BlobFileId, Checksum, CompressionType, HashMap,
SequenceNumberCounter, TableId, UserKey,
};
use std::{path::PathBuf, sync::Arc};

/// Like `Writer` but will rotate to a new table, once a table grows larger than `target_size`
///
/// This results in a sorted "run" of tables
pub struct MultiWriter {
fs: Arc<dyn Fs>,

pub(crate) base_path: PathBuf,

data_block_hash_ratio: f32,
Expand Down Expand Up @@ -72,13 +75,15 @@ impl MultiWriter {
table_id_generator: SequenceNumberCounter,
target_size: u64,
initial_level: u8,
fs: Arc<dyn Fs>,
) -> crate::Result<Self> {
let current_table_id = table_id_generator.next();

let path = base_path.join(current_table_id.to_string());
let writer = Writer::new(path, current_table_id, initial_level)?;
let writer = Writer::new(path, current_table_id, initial_level, fs.clone())?;

Ok(Self {
fs,
initial_level,

base_path,
Expand Down Expand Up @@ -318,7 +323,7 @@ impl MultiWriter {
let new_table_id = self.table_id_generator.next();
let path = self.base_path.join(new_table_id.to_string());

let mut new_writer = Writer::new(path, new_table_id, self.initial_level)?
let mut new_writer = Writer::new(path, new_table_id, self.initial_level, self.fs.clone())?
.use_data_block_compression(self.data_block_compression)
.use_index_block_compression(self.index_block_compression)
.use_data_block_size(self.data_block_size)
Expand Down
17 changes: 9 additions & 8 deletions src/table/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

use super::*;
use crate::{
config::BloomConstructionPolicy, table::filter::standard_bloom::Builder as BloomBuilder,
config::BloomConstructionPolicy, fs::StdFs,
table::filter::standard_bloom::Builder as BloomBuilder,
};
use tempfile::tempdir;
use test_log::test;
Expand All @@ -25,7 +26,7 @@ fn test_with_table(
let file = dir.path().join("table");

{
let mut writer = Writer::new(file.clone(), 0, 0)?;
let mut writer = Writer::new(file.clone(), 0, 0, Arc::new(StdFs))?;

if let Some(f) = &config_writer {
writer = f(writer);
Expand Down Expand Up @@ -203,7 +204,7 @@ fn test_with_table(

// Test with partitioned indexes
{
let mut writer = Writer::new(file.clone(), 0, 0)?.use_partitioned_index();
let mut writer = Writer::new(file.clone(), 0, 0, Arc::new(StdFs))?.use_partitioned_index();

if let Some(f) = config_writer {
writer = f(writer);
Expand Down Expand Up @@ -1226,7 +1227,7 @@ fn table_read_fuzz_1() -> crate::Result<()> {

let data_block_size = 97;

let mut writer = crate::table::Writer::new(file.clone(), 0, 0)
let mut writer = crate::table::Writer::new(file.clone(), 0, 0, Arc::new(StdFs))
.unwrap()
.use_data_block_size(data_block_size);

Expand Down Expand Up @@ -1300,7 +1301,7 @@ fn table_partitioned_index() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let file = dir.path().join("table_fuzz");

let mut writer = crate::table::Writer::new(file.clone(), 0, 0)
let mut writer = crate::table::Writer::new(file.clone(), 0, 0, Arc::new(StdFs))
.unwrap()
.use_partitioned_index()
.use_data_block_size(5)
Expand Down Expand Up @@ -1412,7 +1413,7 @@ fn table_global_seqno() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let file = dir.path().join("table_fuzz");

let mut writer = crate::table::Writer::new(file.clone(), 0, 0)
let mut writer = crate::table::Writer::new(file.clone(), 0, 0, Arc::new(StdFs))
.unwrap()
.use_partitioned_filter()
.use_data_block_size(1)
Expand Down Expand Up @@ -1606,7 +1607,7 @@ fn load_block_range_tombstone_metrics() -> crate::Result<()> {
let file = dir.path().join("table");

// Build a table that contains a range tombstone block.
let mut writer = Writer::new(file.clone(), 0, 0)?;
let mut writer = Writer::new(file.clone(), 0, 0, Arc::new(StdFs))?;
writer.write(InternalValue::from_components(
b"a",
b"v1",
Expand Down Expand Up @@ -1726,7 +1727,7 @@ fn meta_seqno_kv_max_corruption_returns_invalid_data() -> crate::Result<()> {

// Write a valid table with KV entries at seqnos 1..=5.
// Both seqno#max and seqno#kv_max will be 5.
let mut writer = Writer::new(file.clone(), 0, 0)?;
let mut writer = Writer::new(file.clone(), 0, 0, Arc::new(StdFs))?;
for (i, key) in (b'a'..=b'e').enumerate() {
writer.write(InternalValue::from_components(
&[key],
Expand Down
4 changes: 2 additions & 2 deletions src/table/writer/filter/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
table::{filter::standard_bloom::Builder, Block},
CompressionType, UserKey,
};
use std::{fs::File, io::BufWriter, sync::Arc};
use std::sync::Arc;

pub struct FullFilterWriter {
/// Key hashes for AMQ filter
Expand Down Expand Up @@ -86,7 +86,7 @@ impl<W: std::io::Write + std::io::Seek> FilterWriter<W> for FullFilterWriter {

fn finish(
self: Box<Self>,
file_writer: &mut sfa::Writer<ChecksummedWriter<BufWriter<File>>>,
file_writer: &mut sfa::Writer<ChecksummedWriter<W>>,
) -> crate::Result<usize> {
if self.bloom_hash_buffer.is_empty() {
log::trace!("Filter writer has no buffered hashes - not building filter");
Expand Down
6 changes: 3 additions & 3 deletions src/table/writer/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use crate::{
checksum::ChecksummedWriter, config::BloomConstructionPolicy, encryption::EncryptionProvider,
prefix::PrefixExtractor, CompressionType, UserKey,
};
use std::{fs::File, io::BufWriter, sync::Arc};
use std::sync::Arc;

// All methods are required (no defaults) by design so that implementations must
// explicitly handle configuration changes (e.g., filter policies, prefix extractors).
pub trait FilterWriter<W: std::io::Write> {
pub trait FilterWriter<W: std::io::Write + std::io::Seek> {
// NOTE: We purposefully use a UserKey instead of &[u8]
// so we can clone it without heap allocation, if needed
/// Registers a key in the block index.
Expand All @@ -27,7 +27,7 @@ pub trait FilterWriter<W: std::io::Write> {
/// Returns the number of filter blocks written (always 1 in case of full filter block).
fn finish(
self: Box<Self>,
file_writer: &mut sfa::Writer<ChecksummedWriter<BufWriter<File>>>,
file_writer: &mut sfa::Writer<ChecksummedWriter<W>>,
) -> crate::Result<usize>;

fn set_filter_policy(
Expand Down
9 changes: 4 additions & 5 deletions src/table/writer/filter/partitioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use crate::{
CompressionType, UserKey,
};
use std::{
fs::File,
io::{BufWriter, Seek, Write},
io::{Seek, Write},
sync::Arc,
};

Expand Down Expand Up @@ -115,9 +114,9 @@ impl PartitionedFilterWriter {
Ok(())
}

fn write_top_level_index(
fn write_top_level_index<WR: Write + Seek>(
&mut self,
file_writer: &mut sfa::Writer<ChecksummedWriter<BufWriter<File>>>,
file_writer: &mut sfa::Writer<ChecksummedWriter<WR>>,
index_base_offset: BlockOffset,
) -> crate::Result<()> {
file_writer.start("filter_tli")?;
Expand Down Expand Up @@ -216,7 +215,7 @@ impl<W: std::io::Write + std::io::Seek> FilterWriter<W> for PartitionedFilterWri

fn finish(
mut self: Box<Self>,
file_writer: &mut sfa::Writer<ChecksummedWriter<BufWriter<File>>>,
file_writer: &mut sfa::Writer<ChecksummedWriter<W>>,
) -> crate::Result<usize> {
if self.last_key.is_none() {
log::trace!("Filter writer has not seen any writes - not building filter");
Expand Down
Loading
Loading