diff --git a/Cargo.lock b/Cargo.lock index 87fa059..a7caf79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,6 +283,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5c434ae3cf0089ca203e9019ebe529c47ff45cefe8af7c85ecb734ef541822f" +[[package]] +name = "camino" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "276a59bf2b2c967788139340c9f0c5b12d7fd6630315c15c217e559de85d2609" + [[package]] name = "cc" version = "1.2.43" @@ -1781,6 +1787,7 @@ dependencies = [ "arc-swap", "backon", "bytes", + "camino", "flatbuffers", "foyer", "foyer-memory", diff --git a/Cargo.toml b/Cargo.toml index 19bb8d8..b0ab2cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ arc-swap = "1.7.1" backon = { version = "1.6.0", default-features = false, features = ["tokio-sleep"] } bytes = "1.10" bytesize = "2.1.0" +camino = "1.2.1" clap = { version = "4.5.47", features = ["derive"] } flatbuffers = "25.2" foyer = "0.20.0" diff --git a/pond-fs/src/fuse.rs b/pond-fs/src/fuse.rs index d3ede9e..f15db81 100644 --- a/pond-fs/src/fuse.rs +++ b/pond-fs/src/fuse.rs @@ -1,15 +1,24 @@ use pond::{ErrorKind, Fd, Ino, Volume}; +use std::collections::HashMap; use std::ffi::OsStr; +use std::hash::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; use std::time::Duration; use std::time::SystemTime; use tracing::instrument; +type FilenameHash = i64; + +const READDIR_BATCH: usize = 64; + pub struct Pond { volume: Volume, runtime: tokio::runtime::Runtime, uid: u32, gid: u32, kernel_cache_timeout: Duration, + readdir_offsets: HashMap<(Ino, FilenameHash), String>, } impl Pond { @@ -28,6 +37,7 @@ impl Pond { uid, gid, kernel_cache_timeout, + readdir_offsets: Default::default(), } } } @@ -88,7 +98,7 @@ impl fuser::Filesystem for Pond { match fs_try!(reply, self.volume.lookup(parent.into(), name)) { Some(attr) => reply.entry( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), 0, ), None => reply.error(libc::ENOENT), @@ -106,7 +116,7 @@ impl fuser::Filesystem for Pond { let attr = fs_try!(reply, self.volume.getattr(ino.into())); reply.attr( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), ); } @@ -116,20 +126,62 @@ impl fuser::Filesystem for Pond { _req: &fuser::Request<'_>, ino: u64, _fh: u64, - offset: i64, + offset: FilenameHash, mut reply: fuser::ReplyDirectory, ) { - let iter = fs_try!(reply, self.volume.readdir(ino.into())); - let offset = fs_try!(reply, offset.try_into().map_err(|_| ErrorKind::InvalidData)); + // translate the offset into the filename where the last readdir left off before its buffer + // was full. keep the cookie around so we can fall back to it if nothing new was returned. + let mut token: Option = if offset == 0 { + None + } else { + let name = fs_try!( + reply, + // note that we aren't removing it here. this means that the map grows over the + // lifetime of the mount. this allows uses to call readdir with the same offset + // multiple times, whereas removing it would cause subsequent calls to fail + // completely. if this becomes a problem, we can revisit removing the entry here or + // having a TTL map. + self.readdir_offsets + .get(&(ino.into(), offset)) + .ok_or_else(|| pond::Error::new( + pond::ErrorKind::InvalidData, + format!("bad offset passed to readdir: {offset}"), + )) + ); + Some(name.clone()) + }; + + 'outer: loop { + let chunk_iter = fs_try!( + reply, + self.volume + .readdir(ino.into(), token.clone(), READDIR_BATCH,) + ); + + let mut num_entries = 0; + for entry in chunk_iter { + num_entries += 1; + let name = entry.name(); + let attr = entry.attr(); + + let full_buffer = + reply.add(attr.ino.into(), hash(name), fuse_kind(attr.kind), name); + if full_buffer { + break 'outer; + } + + token = Some(name.to_string()); + } - for (i, entry) in iter.enumerate().skip(offset) { - let attr = entry.attr(); - let name = entry.name(); - let is_full = reply.add(attr.ino.into(), (i + 1) as i64, fuse_kind(attr.kind), name); - if is_full { + // if this emits less than the READDIR_BATCH we asked for, we've reached EOF. + if num_entries < READDIR_BATCH { break; } } + + if let Some(name) = token { + self.readdir_offsets.insert((ino.into(), hash(&name)), name); + } reply.ok(); } @@ -147,7 +199,7 @@ impl fuser::Filesystem for Pond { let attr = fs_try!(reply, self.volume.mkdir(parent.into(), name.to_string())); reply.entry( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), 0, ); } @@ -213,11 +265,12 @@ impl fuser::Filesystem for Pond { let name = fs_try!(reply, from_os_str(name)); let (attr, fd) = fs_try!( reply, - self.volume.create(parent.into(), name.to_string(), excl) + self.runtime + .block_on(self.volume.create(parent.into(), name.to_string(), excl)) ); reply.created( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), 0, fd.into(), 0, @@ -344,7 +397,7 @@ impl fuser::Filesystem for Pond { let attr = fs_try!(reply, self.volume.getattr(ino)); reply.attr( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), ); } @@ -451,3 +504,9 @@ fn getuid() -> u32 { fn getgid() -> u32 { unsafe { libc::getgid() } } + +fn hash(s: &str) -> FilenameHash { + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + hasher.finish() as i64 +} diff --git a/pond-fs/tests/fuzz.rs b/pond-fs/tests/fuzz.rs index 5a4646f..1d83303 100644 --- a/pond-fs/tests/fuzz.rs +++ b/pond-fs/tests/fuzz.rs @@ -264,7 +264,7 @@ fn test_commit( // we have all the data from before the commit and nothing // from after it. let volume = runtime.block_on(client.load_volume(&None)).unwrap(); - assert_eq!(volume.version(), &Version::from_static("v1")); + assert_eq!(volume.version(), Version::from_static("v1")); let mount = spawn_mount(mount_dir, volume); let expected = read_entries(expected_dir); diff --git a/pond/Cargo.toml b/pond/Cargo.toml index 1107615..ed8229f 100644 --- a/pond/Cargo.toml +++ b/pond/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" arc-swap.workspace = true backon.workspace = true bytes.workspace = true +camino.workspace = true flatbuffers.workspace = true foyer.workspace = true foyer-memory.workspace = true diff --git a/pond/src/lib.rs b/pond/src/lib.rs index 3bff853..29010d4 100644 --- a/pond/src/lib.rs +++ b/pond/src/lib.rs @@ -9,6 +9,7 @@ mod metrics; mod storage; mod volume; +use camino::Utf8PathBuf; pub use client::Client; pub use error::{Error, ErrorKind, Result}; pub use location::Location; @@ -146,15 +147,14 @@ impl FileAttr { } #[derive(Debug, Clone)] -pub struct DirEntry<'a> { +pub struct DirEntryRef<'a> { name: &'a str, parents: Vec<&'a str>, attr: &'a FileAttr, - locations: &'a [Location], data: &'a metadata::EntryData, } -impl<'a> DirEntry<'a> { +impl<'a> DirEntryRef<'a> { pub fn name(&self) -> &str { self.name } @@ -163,15 +163,12 @@ impl<'a> DirEntry<'a> { self.attr } - pub fn location(&self) -> Option<(&Location, ByteRange)> { + pub fn location(&self) -> Option<(Location, ByteRange)> { match self.data { metadata::EntryData::File { - location_idx, + location, byte_range, - } => { - let location = &self.locations[*location_idx]; - Some((location, *byte_range)) - } + } => Some((location.clone(), *byte_range)), _ => None, } } @@ -185,6 +182,47 @@ impl<'a> DirEntry<'a> { pub fn is_regular(&self) -> bool { self.attr.ino.is_regular() } + + pub(crate) fn to_owned(&self) -> DirEntry { + let location = self.location().map(|(loc, range)| (loc.clone(), range)); + let mut path: Utf8PathBuf = self.parents.iter().collect(); + path.push(self.name()); + DirEntry { + path, + attr: self.attr().clone(), + location, + } + } +} + +/// Owned equivalent of [`DirEntryRef`] that does not borrow from the underlying volume. +#[derive(Debug, Clone)] +pub struct DirEntry { + path: Utf8PathBuf, + attr: FileAttr, + location: Option<(Location, ByteRange)>, +} + +impl DirEntry { + pub fn name(&self) -> &str { + self.path.file_name().expect("BUG: path ends in '..'") + } + + pub fn path(&self) -> &str { + self.path.as_str() + } + + pub fn attr(&self) -> &FileAttr { + &self.attr + } + + pub fn location(&self) -> Option<(&Location, ByteRange)> { + self.location.as_ref().map(|(loc, range)| (loc, *range)) + } + + pub fn is_regular(&self) -> bool { + self.attr.ino.is_regular() + } } // TODO: add checksums/etags here? diff --git a/pond/src/location.rs b/pond/src/location.rs index 441c707..6b55c0d 100644 --- a/pond/src/location.rs +++ b/pond/src/location.rs @@ -1,25 +1,29 @@ use std::{borrow::Cow, sync::Arc}; +/// Location is an enum that acts as a pointer to a blob of bytes. +/// +/// Location is a lightweight as each enum is basically just an Arc. Cheap to clone. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Location { - Staged { path: std::path::PathBuf }, - Committed { key: Arc }, + Staged { + path: Arc, + generation: u64, + }, + Committed { + key: Arc, + }, } impl std::fmt::Display for Location { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Location::Staged { path } => write!(f, "{}", path.display()), + Location::Staged { path, .. } => write!(f, "{}", path.display()), Location::Committed { key } => write!(f, "{key}"), } } } impl Location { - pub(crate) fn is_staged(&self) -> bool { - matches!(self, Location::Staged { .. }) - } - pub(crate) fn committed<'a>(key: impl Into>) -> Self { let key = match key.into() { Cow::Borrowed(str) => Arc::from(str), @@ -28,9 +32,10 @@ impl Location { Location::Committed { key } } - pub(crate) fn staged(path: impl AsRef) -> Self { + pub(crate) fn staged(path: impl AsRef, generation: u64) -> Self { Location::Staged { - path: path.as_ref().to_path_buf(), + path: path.as_ref().to_path_buf().into(), + generation, } } } diff --git a/pond/src/metadata.rs b/pond/src/metadata.rs index a7915db..c0e2d81 100644 --- a/pond/src/metadata.rs +++ b/pond/src/metadata.rs @@ -1,13 +1,14 @@ use std::{ borrow::{Borrow, Cow}, - collections::{BTreeMap, VecDeque, btree_map}, + collections::{BTreeMap, BTreeSet, VecDeque, btree_map}, fmt::Debug, path::PathBuf, str::FromStr, + sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use crate::{ByteRange, DirEntry, Error, FileAttr, FileType, Ino, Location, error::ErrorKind}; +use crate::{ByteRange, DirEntryRef, Error, FileAttr, FileType, Ino, Location, error::ErrorKind}; // TODO: we duplicate file/dir names as strings in data values and entry keys. // have to figure out how to intern somewhere if we want to stop, and probably @@ -116,7 +117,7 @@ impl Version { /// Because staged locations are effectively dangling pointers, volumes cannot /// be serialized while they're being staged. // # TODO: should we guarantee inodes are stable in the docs? -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct VolumeMetadata { version: Version, @@ -124,10 +125,9 @@ pub(crate) struct VolumeMetadata { // volume. next_ino: Ino, - // interned list of locations that hold data blobs. this list must - // remain in a stable order. each Entry contains indexes into this - // vec, and re-ordering it invalidates those indexes. - locations: Vec, + // interned set of locations that hold data blobs. the locations themselves are cheap to clone + // as each enum is just an Arc. + locations: BTreeSet, // Ino -> Entry data: BTreeMap, @@ -154,23 +154,24 @@ impl Entry { } } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] pub(crate) enum EntryData { Directory, File { - location_idx: usize, + /// Location is an enum over Arcs, it's cheap to copy. + location: Location, byte_range: ByteRange, }, Dynamic, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] struct EntryKeyRef<'a> { ino: Ino, name: Cow<'a, str>, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] struct EntryKey<'a>(EntryKeyRef<'a>); impl<'a> From<(Ino, &'a str)> for EntryKey<'a> { @@ -232,7 +233,7 @@ impl VolumeMetadata { Self { version, next_ino: Ino::min_regular(), - locations: vec![], + locations: BTreeSet::new(), data, dirs, } @@ -527,7 +528,7 @@ impl VolumeMetadata { let ino = self.next_ino()?; let slot = self.data.entry(ino); - let location_idx = insert_unique(&mut self.locations, location); + let location = get_interned_or_insert(&mut self.locations, location); let now = SystemTime::now(); let new_entry = Entry { name: name.clone().into(), @@ -540,7 +541,7 @@ impl VolumeMetadata { kind: FileType::Regular, }, data: EntryData::File { - location_idx, + location, byte_range, }, }; @@ -642,56 +643,21 @@ impl VolumeMetadata { Ok(&entry.attr) } - /// Change the physical location of a data blob. - /// - /// This changes the physical location for all files in the volume that - /// refer to this blob. To relocate an individual file, see `modify`. - pub(crate) fn relocate(&mut self, from: &Location, to: Location) -> crate::Result<()> { - let Some(from) = self.locations.iter_mut().find(|l| l == &from) else { - return Err(ErrorKind::NotFound.into()); - }; - *from = to; - Ok(()) - } - - /// Clean up all staged locations, replacing all internal state to instead point to dest. - /// - /// This attempts to keep Volume in a consistent state at every step of modification. - pub(crate) fn clean_staged_locations(&mut self, dest: Location) { - // find the first staged location within self.locations and use that as the final spot - // for dest. - let Some((idx, location)) = self - .locations - .iter_mut() - .enumerate() - .find(|(i, l)| l.is_staged()) - else { + /// Remove any locations that no longer have references. + pub(crate) fn prune_unreferenced_locations(&mut self) { + if self.locations.is_empty() { return; - }; - *location = dest.clone(); - - // we're making an assumption here that once you see the first staged location, - // every location after that must also be staged. so if we see a location_idx > idx, this - // means it's a staged location. - assert!( - &self - .locations - .iter() - .skip(idx + 1) - .all(|l| l.is_staged() || l == &dest) - ); + } - // update all relevant EntryData::File::location_idx to point to the same idx we found - // above! - for (_, Entry { data, .. }) in self.data.iter_mut() { - let EntryData::File { location_idx, .. } = data else { - continue; - }; - *location_idx = idx.min(*location_idx); + // keep track of all referenced locations while we're walking the entries + let mut seen = BTreeSet::new(); + for entry in self.data.values() { + if let EntryData::File { location, .. } = &entry.data { + seen.insert(location); + } } - // lop off the tail - self.locations.truncate(idx + 1); + self.locations.retain(|e| seen.contains(e)); } /// Update a file's metadata to reflect that a file's data has been modified. @@ -710,7 +676,7 @@ impl VolumeMetadata { return Err(ErrorKind::NotFound.into()); }; let EntryData::File { - location_idx, + location: mut_location, byte_range, } = &mut entry.data else { @@ -718,8 +684,7 @@ impl VolumeMetadata { }; if let Some(location) = location { - let new_location = insert_unique(&mut self.locations, location); - *location_idx = new_location; + *mut_location = get_interned_or_insert(&mut self.locations, location); } match range { Some(Modify::Set(range)) => { @@ -754,12 +719,19 @@ impl VolumeMetadata { /// `(filename, attr)` pairs. /// /// Iterator order is not guaranteed to be stable. - pub(crate) fn readdir<'a>(&'a self, ino: Ino) -> crate::Result> { + pub(crate) fn readdir<'a>( + &'a self, + ino: Ino, + offset: Option, + ) -> crate::Result> { let parents = self.dir_path(ino)?; + let range = match offset { + Some(offset) => self.dirs.range(entry_range_with_offset(ino, offset)?), + None => self.dirs.range(entry_range(ino)?), + }; Ok(ReadDir { data: &self.data, - locations: &self.locations, - range: self.dirs.range(entry_range(ino)?), + range, parents, }) } @@ -771,7 +743,6 @@ impl VolumeMetadata { ) -> crate::Result> { Ok(ReadDir { data: &self.data, - locations: &self.locations, range: self.dirs.range(entry_range(ino)?), parents, }) @@ -800,7 +771,6 @@ impl VolumeMetadata { let root_dir = self.dir_entry(ino)?; let root_iter = ReadDir { data: &self.data, - locations: &self.locations, range: self.dirs.range(entry_range(ino)?), parents: parents.clone(), }; @@ -816,11 +786,11 @@ impl VolumeMetadata { /// order. /// /// Returns an iterator over `(FileAttr, PathBuf)` tuples. - pub(crate) fn iter_staged(&self) -> impl Iterator { + pub(crate) fn iter_staged(&self) -> impl Iterator)> { self.data .iter() .filter_map(|(ino, entry)| match self.location(*ino) { - Some((Location::Staged { path }, _)) => Some((&entry.attr, path)), + Some((Location::Staged { path, .. }, _)) => Some((&entry.attr, path.clone())), _ => None, }) } @@ -838,16 +808,13 @@ impl VolumeMetadata { /// /// Attempting to get the physical location of a directory or a symlink /// returns an error. - pub(crate) fn location(&self, ino: Ino) -> Option<(&Location, &ByteRange)> { + pub(crate) fn location(&self, ino: Ino) -> Option<(Location, &ByteRange)> { self.data.get(&ino).and_then(|entry| match &entry.data { // files need to map to blob list EntryData::File { - location_idx, + location, byte_range, - } => { - let location = self.locations.get(*location_idx)?; - Some((location, byte_range)) - } + } => Some((location.clone(), byte_range)), // no other file type has a location _ => None, }) @@ -863,10 +830,15 @@ impl VolumeMetadata { pub(crate) fn to_bytes_with_version(&self, version: &Version) -> crate::Result> { let mut fbb = FlatBufferBuilder::new(); + // flatten our interned location set into a vector. we'll serialize the locations as a + // vector, and each data entry will have an index into this vector to save on space. + let mut location_idx_map: BTreeMap = BTreeMap::new(); let locations = { let mut locations = Vec::with_capacity(self.locations.len()); - for location in &self.locations { - locations.push(to_fb_location(&mut fbb, location)?) + // stable iteration order since self.locations is a BTreeMap + for (i, location) in self.locations.iter().enumerate() { + locations.push(to_fb_location(&mut fbb, location)?); + location_idx_map.insert(location.clone(), i); } fbb.create_vector(&locations) }; @@ -876,7 +848,7 @@ impl VolumeMetadata { if !ino.is_regular() { continue; } - dir_entries.push(to_fb_entry(&mut fbb, entry)?); + dir_entries.push(to_fb_entry(&mut fbb, &location_idx_map, entry)?); } fbb.create_vector(&dir_entries) }; @@ -908,14 +880,15 @@ impl VolumeMetadata { let mut volume = Self::new(version); // set the locations - volume.locations = locations; + volume.locations = locations.into_iter().collect(); + let location_idx_map: Vec<_> = volume.locations.iter().cloned().collect(); // walk the serialized entries and insert both the entries // and their dir index entry for entry in fb_volume.entries().iter() { let parent_ino = entry.parent_ino().into(); let dir_key = (parent_ino, entry.name().to_string()).into(); - let entry = from_fb_entry(&entry)?; + let entry = from_fb_entry(&location_idx_map, &entry)?; max_ino = max_ino.max(entry.attr.ino); volume.dirs.insert(dir_key, entry.attr.ino); volume.data.insert(entry.attr.ino, entry); @@ -926,13 +899,18 @@ impl VolumeMetadata { } } -fn insert_unique(xs: &mut Vec, x: T) -> usize { - match xs.iter().position(|e| e == &x) { - Some(idx) => idx, +/// Insert it into `locations` if it doesn't exist, else grab a clone of the +/// location inside of it (location is a bag of Arcs, so this keeps only 1 copy around). +pub(crate) fn get_interned_or_insert( + locations: &mut BTreeSet, + location: Location, +) -> Location { + match locations.get(&location) { + // this gives us the interned location + Some(location) => location.clone(), None => { - let idx = xs.len(); - xs.push(x); - idx + locations.insert(location.clone()); + location } } } @@ -943,16 +921,27 @@ fn entry_range(ino: Ino) -> crate::Result>> { Ok(start..end) } +fn entry_range_with_offset( + ino: Ino, + offset: String, +) -> crate::Result>> { + let start: EntryKey = (ino, offset).into(); + let end: EntryKey = (ino.add(1)?, "").into(); + Ok(( + std::ops::Bound::Excluded(start), + std::ops::Bound::Excluded(end), + )) +} + /// The iterator returned from [readdir][Volume::readdir]. pub(crate) struct ReadDir<'a> { data: &'a BTreeMap, - locations: &'a [Location], range: btree_map::Range<'a, EntryKey<'static>, Ino>, parents: Vec<&'a str>, } impl<'a> Iterator for ReadDir<'a> { - type Item = DirEntry<'a>; + type Item = DirEntryRef<'a>; fn next(&mut self) -> Option { self.range.next().map(|(EntryKey(_), ino)| { @@ -961,11 +950,10 @@ impl<'a> Iterator for ReadDir<'a> { .get(ino) .unwrap_or_else(|| panic!("BUG: invalid dirent: ino={ino:?}")); - DirEntry { + DirEntryRef { name: &dent.name, parents: self.parents.clone(), attr: &dent.attr, - locations: self.locations, data: &dent.data, } }) @@ -987,7 +975,7 @@ impl<'a> Iterator for WalkIter<'a> { // TODO: it's a big gnarly to be cloning and returning the ancestors path // every time but the lifetime on returning a slice referencing self // is a pain to express. - type Item = crate::Result>; + type Item = crate::Result>; fn next(&mut self) -> Option { while !self.readdirs.is_empty() { @@ -1019,6 +1007,7 @@ impl<'a> Iterator for WalkIter<'a> { fn to_fb_entry<'a>( fbb: &mut FlatBufferBuilder<'a>, + location_idx_map: &BTreeMap, entry: &Entry, ) -> crate::Result>> { let attrs = fb::FileAttrs::create( @@ -1033,14 +1022,17 @@ fn to_fb_entry<'a>( ); let location_ref = match &entry.data { EntryData::File { - location_idx, + location, byte_range, } => Some({ let byte_range = fb::ByteRange::new(byte_range.offset, byte_range.len); fb::LocationRef::create( fbb, &fb::LocationRefArgs { - location_index: *location_idx as u16, + location_index: *(location_idx_map + .get(location) + .expect("BUG: metadata had a dangling location")) + as u16, byte_range: Some(&byte_range), }, ) @@ -1060,7 +1052,7 @@ fn to_fb_entry<'a>( )) } -fn from_fb_entry(fb_entry: &fb::Entry) -> crate::Result { +fn from_fb_entry(location_idx_map: &[Location], fb_entry: &fb::Entry) -> crate::Result { let name = fb_entry.name().to_string().into(); let parent_ino = fb_entry.parent_ino(); let attr = { @@ -1088,8 +1080,15 @@ fn from_fb_entry(fb_entry: &fb::Entry) -> crate::Result { "missing file data range", )); }; + let Some(location) = location_idx_map.get(location_ref.location_index() as usize) + else { + return Err(Error::new( + ErrorKind::InvalidData, + format!("missing location at idx: {}", location_ref.location_index()), + )); + }; EntryData::File { - location_idx: location_ref.location_index() as usize, + location: location.clone(), byte_range: ByteRange { offset: byte_range.offset(), len: byte_range.length(), @@ -1191,8 +1190,8 @@ mod test { fn readdir_nospecial( v: &VolumeMetadata, ino: Ino, - ) -> crate::Result>> { - Ok(v.readdir(ino)?.filter(|e| e.attr.ino.is_regular())) + ) -> crate::Result>> { + Ok(v.readdir(ino, None)?.filter(|e| e.attr.ino.is_regular())) } #[test] @@ -1213,7 +1212,7 @@ mod test { .clone(); // location should match what we just created with let (l1, _) = volume.location(f1.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); let f2 = volume .create( @@ -1231,10 +1230,10 @@ mod test { // old locations should be stable let (l1, _) = volume.location(f1.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); // location should match what we just created with let (l2, _) = volume.location(f2.ino).unwrap(); - assert_eq!(l2, &Location::committed("aaaa")); + assert_eq!(l2, Location::committed("aaaa")); } #[test] @@ -1262,15 +1261,15 @@ mod test { // location should match what we just created with let (l1, _) = volume.location(f1.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); let (l1, _) = volume.location(f2.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); // delete the first file volume.delete(Ino::Root, "zzzz").unwrap(); assert_eq!(volume.location(f1.ino), None); let (l1, _) = volume.location(f2.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); // delete both files volume.delete(a.ino, "zzzz").unwrap(); @@ -1284,7 +1283,11 @@ mod test { .collect(); assert_eq!(vec!["a"], names); // should be empty - let names: Vec<_> = volume.readdir(a.ino).unwrap().map(|e| e.name).collect(); + let names: Vec<_> = volume + .readdir(a.ino, None) + .unwrap() + .map(|e| e.name) + .collect(); assert!(names.is_empty()); } @@ -1326,7 +1329,7 @@ mod test { c.ino, "hi.txt".to_string(), true, - Location::staged("whatever"), + Location::staged("whatever", 0), ByteRange::empty(), ) .unwrap() @@ -1365,7 +1368,7 @@ mod test { ); assert_eq!( volume - .readdir(a.ino) + .readdir(a.ino, None) .unwrap() .map(|e| e.path()) .collect::>(), @@ -1373,7 +1376,7 @@ mod test { ); assert_eq!( volume - .readdir(b.ino) + .readdir(b.ino, None) .unwrap() .map(|e| e.path()) .collect::>(), @@ -1561,7 +1564,7 @@ mod test { assert_eq!( volume.location(test_txt.ino).unwrap(), ( - &Location::committed("test-key.txt"), + Location::committed("test-key.txt"), &ByteRange { offset: 0, len: 64 }, ) ); @@ -1595,7 +1598,7 @@ mod test { .unwrap(); let (location, range) = meta.location(ino).unwrap(); - assert_eq!(location, &new_location); + assert_eq!(location, new_location); assert_eq!(*range, new_range); let attr = meta.getattr(ino).unwrap(); @@ -1610,7 +1613,7 @@ mod test { Ino::Root, "grow".to_string(), false, - Location::staged("grow"), + Location::staged("grow", 0), ByteRange::empty(), ) .unwrap() diff --git a/pond/src/storage.rs b/pond/src/storage.rs index 38cc787..0ac80b4 100644 --- a/pond/src/storage.rs +++ b/pond/src/storage.rs @@ -1,6 +1,7 @@ use futures::TryFutureExt; use object_store::{ObjectStore, aws::AmazonS3Builder, local::LocalFileSystem}; -use std::{fs::File, sync::Arc}; +use rand::distr::{Alphanumeric, SampleString}; +use std::sync::Arc; use tempfile::TempDir; use url::Url; @@ -182,14 +183,12 @@ impl Storage { } impl Storage { - pub(crate) fn tempfile(&self) -> Result<(std::path::PathBuf, File)> { - let f = tempfile::Builder::new() - .disable_cleanup(true) - .tempfile_in(&*self.temp_dir) - .map_err(|e| Error::with_source(e.kind().into(), "failed to create tempfile", e))?; - - let (file, path) = f.into_parts(); - Ok((path.to_path_buf(), file)) + pub(crate) fn new_staged_filepath(&self) -> Result { + // note: we don't check for collisions here. there's 62^64 possible filenames to be + // generated, and that number is large enough that we realistically should never have a + // collision ... + let filename = Alphanumeric.sample_string(&mut rand::rng(), 64); + Ok(self.temp_dir.path().join(filename)) } pub(crate) async fn list_versions(&self) -> Result> { diff --git a/pond/src/volume.rs b/pond/src/volume.rs index a7794fe..7d34685 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -1,5 +1,5 @@ use crate::{ - ByteRange, DirEntry, Error, FileAttr, Ino, Location, Result, + ByteRange, DirEntry, Error, FileAttr, FileType, Ino, Location, Result, cache::{CacheConfig, ChunkCache}, error::ErrorKind, metadata::{Modify, Version, VolumeMetadata}, @@ -14,8 +14,11 @@ use std::{ collections::BTreeMap, io::{BufReader, Read}, os::unix::fs::FileExt, - path::Path, - sync::Arc, + path::{Path, PathBuf}, + sync::{ + Arc, RwLock, + atomic::{AtomicBool, AtomicU64, Ordering}, + }, time::{Duration, SystemTime}, }; @@ -53,7 +56,7 @@ impl From for u64 { } } -#[derive(Debug)] +#[derive(Debug, Clone)] enum FileDescriptor { Committed { key: Arc, @@ -73,12 +76,22 @@ enum FileDescriptor { }, } +// NOTE: When locking both meta and fds for modification, meta should always be locked first to +// avoid deadlocks! pub struct Volume { - meta: VolumeMetadata, + meta: Arc>, cache: ChunkCache, - fds: BTreeMap, + fds: Arc>>, store: crate::storage::Storage, metrics_snapshot: Option>>>, + // file generation number for staged files. the generation gives us an approximation of file + // age relative to other files. the generation is bumped everytime we commit, so we're only at + // risk of an overflow if the user commits 2^64 times throughout the lifetime of this process. + // the generation number does not persist across multiple processes -- it is always reset back + // to 0 since it's attached to staged (ephemeral) files. + generation: AtomicU64, + // is a commit in progress? + commit: AtomicBool, } impl Volume { @@ -95,24 +108,18 @@ impl Volume { }); Self { - meta, + meta: Arc::new(RwLock::new(meta)), cache, - fds: Default::default(), + fds: Arc::new(RwLock::new(BTreeMap::new())), store, metrics_snapshot, + generation: AtomicU64::new(0), + commit: AtomicBool::new(false), } } - pub(crate) fn metadata(&self) -> &VolumeMetadata { - &self.meta - } - - pub(crate) fn metadata_mut(&mut self) -> &mut VolumeMetadata { - &mut self.meta - } - pub(crate) fn modify( - &mut self, + &self, ino: Ino, mtime: SystemTime, ctime: Option, @@ -122,14 +129,21 @@ impl Volume { match ino { Ino::CLEAR_CACHE | Ino::COMMIT => Ok(()), ino => { - self.meta.modify(ino, mtime, ctime, location, range)?; + self.meta + .write() + .expect("lock was poisoned") + .modify(ino, mtime, ctime, location, range)?; Ok(()) } } } - pub fn version(&self) -> &Version { - self.metadata().version() + pub fn version(&self) -> Version { + self.meta + .read() + .expect("lock was poisoned") + .version() + .clone() } pub fn object_store_description(&self) -> String { @@ -145,91 +159,130 @@ impl Volume { } pub fn to_bytes(&self) -> Result> { - self.meta.to_bytes() + self.meta.read().expect("lock was poisoned").to_bytes() } pub fn to_bytes_with_version(&self, version: &Version) -> Result> { - self.meta.to_bytes_with_version(version) + self.meta + .read() + .expect("lock was poisoned") + .to_bytes_with_version(version) } - pub fn getattr(&self, ino: Ino) -> Result<&FileAttr> { + pub fn getattr(&self, ino: Ino) -> Result { scoped_timer!("pond_volume_getattr_latency_secs"); - match self.meta.getattr(ino) { - Some(attr) => Ok(attr), + match self.meta.read().expect("lock was poisoned").getattr(ino) { + Some(attr) => Ok(attr.clone()), None => Err(ErrorKind::NotFound.into()), } } pub fn setattr( - &mut self, + &self, ino: Ino, mtime: Option, ctime: Option, - ) -> Result<&FileAttr> { - self.meta.setattr(ino, mtime, ctime) + ) -> Result { + self.meta + .write() + .expect("lock was poisoned") + .setattr(ino, mtime, ctime) + .cloned() } - pub fn lookup(&self, parent: Ino, name: &str) -> Result> { + pub fn lookup(&self, parent: Ino, name: &str) -> Result> { scoped_timer!("pond_volume_lookup_latency_secs"); - let attr = self.meta.lookup(parent, name)?; - Ok(attr) + Ok(self + .meta + .read() + .expect("lock was poisoned") + .lookup(parent, name)? + .cloned()) } - pub fn mkdir(&mut self, parent: Ino, name: String) -> Result<&FileAttr> { - self.meta.mkdir(parent, name) + pub fn mkdir(&self, parent: Ino, name: String) -> Result { + self.meta + .write() + .expect("lock was poisoned") + .mkdir(parent, name) + .cloned() } - pub fn rmdir(&mut self, parent: Ino, name: &str) -> Result<()> { - self.meta.rmdir(parent, name)?; + pub fn rmdir(&self, parent: Ino, name: &str) -> Result<()> { + self.meta + .write() + .expect("lock was poisoned") + .rmdir(parent, name)?; Ok(()) } - pub fn rename( - &mut self, - parent: Ino, - name: &str, - newparent: Ino, - newname: String, - ) -> Result<()> { - self.meta.rename(parent, name, newparent, newname)?; + pub fn rename(&self, parent: Ino, name: &str, newparent: Ino, newname: String) -> Result<()> { + self.meta + .write() + .expect("lock was poisoned") + .rename(parent, name, newparent, newname)?; Ok(()) } - pub fn readdir(&self, ino: Ino) -> Result>> { - let iter = self.meta.readdir(ino)?; - Ok(iter) + pub fn readdir( + &self, + ino: Ino, + offset: Option, + size: usize, + ) -> Result> { + let metadata = self.meta.read().expect("lock was poisoned"); + let entries: Vec = metadata + .readdir(ino, offset)? + .take(size) + .map(|e| e.to_owned()) + .collect(); + Ok(entries.into_iter()) } - pub fn create( - &mut self, + pub async fn create( + &self, parent: Ino, name: String, exclusive: bool, - ) -> Result<(&FileAttr, Fd)> { - let (path, file) = self.store.tempfile()?; - - let attr = self.meta.create( - parent, - name, - exclusive, - Location::Staged { path }, - ByteRange::empty(), - )?; + ) -> Result<(FileAttr, Fd)> { + let path = self.store.new_staged_filepath()?; + let file = open_file(path.as_path(), OpenMode::Create).await?; + + let attr = { + let mut metadata = self.meta.write().expect("lock was poisoned"); + metadata + .create( + parent, + name, + exclusive, + Location::staged(path, self.generation.load(Ordering::SeqCst)), + ByteRange::empty(), + )? + .clone() + // we drop the write lock on metadata here -- the metadata and fd updates are + // decoupled anyway, so right now a failed new_fd call doesn't cause us to undo the + // metadata change. if we implemented transaction-like behavior, then it would be more + // useful to hold both locks at the same time but as-is there's no reason to hold the + // metadata for longer. + }; let fd = new_fd( - &mut self.fds, + &mut self.fds.write().expect("lock was poisoned"), attr.ino, FileDescriptor::Staged { file: file.into() }, )?; Ok((attr, fd)) } - pub fn delete(&mut self, parent: Ino, name: &str) -> Result<()> { - self.meta.delete(parent, name)?; + pub fn delete(&self, parent: Ino, name: &str) -> Result<()> { + self.meta + .write() + .expect("lock was poisoned") + .delete(parent, name)?; Ok(()) } - pub fn truncate(&mut self, ino: Ino, size: u64) -> Result<()> { + pub fn truncate(&self, ino: Ino, size: u64) -> Result<()> { self.modify( ino, SystemTime::now(), @@ -263,92 +316,157 @@ impl Volume { /// Open a Fd to a locally staged file for reading and writing. /// /// Opening a Fd with write permissions will always truncate the file. - pub async fn open_read_write(&mut self, ino: Ino) -> Result { + #[allow( + clippy::await_holding_lock, + reason = "https://github.com/rust-lang/rust-clippy/issues/6446" + )] + pub async fn open_read_write(&self, ino: Ino) -> Result { match ino { - Ino::COMMIT => new_fd(&mut self.fds, ino, FileDescriptor::Commit), - Ino::CLEAR_CACHE => new_fd(&mut self.fds, ino, FileDescriptor::ClearCache), + Ino::COMMIT => new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::Commit, + ), + Ino::CLEAR_CACHE => new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::ClearCache, + ), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), - ino => match self.meta.location(ino) { - Some((Location::Staged { path }, _)) => { - let file = open_file(path, true).await?; - - new_fd( - &mut self.fds, - ino, - FileDescriptor::Staged { file: file.into() }, - ) - } - Some((Location::Committed { .. }, ..)) => { - // truncate the file (by assigning it a brand new staged file) if it's - // committed. the alternative would be to keep a copy of the committed - // file locally as a staged file, which can be expensive if it's a large file. - let (path, file) = self.store.tempfile()?; - let staged = Location::Staged { path }; - // modify metadata next - self.modify( - ino, - SystemTime::now(), - None, - Some(staged), - Some(Modify::Set((0, 0).into())), - )?; - // only create the fd once the file is open and metadata is valid - new_fd( - &mut self.fds, - ino, - FileDescriptor::Staged { file: file.into() }, - ) + ino => { + let mut metadata_guard = self.meta.write().expect("lock was poisoned"); + match metadata_guard.location(ino) { + Some((Location::Staged { path, generation }, _)) => { + let file = if generation < self.generation.load(Ordering::SeqCst) { + // generation mismatch for staged files, which means we're opening this + // up while a commit is running and this staged file is part of the + // snapshot. we treat it as a committed file in this case. + let path = self.store.new_staged_filepath()?; + let staged = Location::staged( + path.clone(), + self.generation.load(Ordering::SeqCst), + ); + metadata_guard.modify( + ino, + SystemTime::now(), + None, + Some(staged), + Some(Modify::Set((0, 0).into())), + )?; + std::mem::drop(metadata_guard); // guard is dropped here before the await + open_file(&path, OpenMode::Create).await? + } else { + let path = path.clone(); + std::mem::drop(metadata_guard); // guard is dropped here before the await + open_file(&path, OpenMode::ReadWrite).await? + }; + + new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::Staged { file: file.into() }, + ) + } + Some((Location::Committed { .. }, ..)) => { + // truncate the file (by assigning it a brand new staged file) if it's + // committed. the alternative would be to keep a copy of the committed + // file locally as a staged file, which can be expensive if it's a large file. + let path = self.store.new_staged_filepath()?; + let staged = + Location::staged(path.clone(), self.generation.load(Ordering::SeqCst)); + metadata_guard.modify( + ino, + SystemTime::now(), + None, + Some(staged), + Some(Modify::Set((0, 0).into())), + )?; + std::mem::drop(metadata_guard); // guard is dropped here before the await + let file = open_file(&path, OpenMode::Create).await?; + // only create the fd once the file is open and metadata is valid + new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::Staged { file: file.into() }, + ) + } + None => Err(ErrorKind::NotFound.into()), } - None => Err(ErrorKind::NotFound.into()), - }, + } } } - pub async fn open_read(&mut self, ino: Ino) -> Result { + #[allow( + clippy::await_holding_lock, + reason = "https://github.com/rust-lang/rust-clippy/issues/6446" + )] + pub async fn open_read(&self, ino: Ino) -> Result { match ino { - Ino::VERSION => new_fd(&mut self.fds, ino, FileDescriptor::Version), + Ino::VERSION => new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::Version, + ), Ino::PROM_METRICS => { let data = match &self.metrics_snapshot { Some(metrics) => metrics.load().clone(), None => Default::default(), }; new_fd( - &mut self.fds, + &mut self.fds.write().expect("lock was poisoned"), ino, FileDescriptor::PromMetrics { snapshot: data }, ) } Ino::COMMIT | Ino::CLEAR_CACHE => Err(ErrorKind::PermissionDenied.into()), - ino => match self.meta.location(ino) { - Some((Location::Staged { path }, _)) => { - let file = open_file(path, false).await?; - new_fd( - &mut self.fds, - ino, - FileDescriptor::Staged { file: file.into() }, - ) - } - Some((Location::Committed { key }, range)) => { - let key = Arc::new(self.store.child_path(key)); - new_fd( - &mut self.fds, - ino, - FileDescriptor::Committed { key, range: *range }, - ) + ino => { + let metadata_guard = self.meta.read().expect("lock was poisoned"); + match metadata_guard.location(ino) { + Some((Location::Staged { path, .. }, _)) => { + let path = path.clone(); + std::mem::drop(metadata_guard); // guard is dropped here before the await + let file = open_file(&path, OpenMode::Read).await?; + new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::Staged { file: file.into() }, + ) + } + Some((Location::Committed { key }, range)) => { + let key = Arc::new(self.store.child_path(key.as_ref())); + let range = *range; + std::mem::drop(metadata_guard); + new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::Committed { key, range }, + ) + } + None => Err(ErrorKind::NotFound.into()), } - None => Err(ErrorKind::NotFound.into()), - }, + } } } pub async fn read_at(&self, fd: Fd, offset: u64, buf: &mut [u8]) -> Result { metrics::histogram!("pond_volume_read_buf_size_bytes").record(buf.len() as f64); - match self.fds.get(&fd) { + let descriptor = self + .fds + .read() + .expect("lock was poisoned") + .get(&fd) + .cloned(); + match descriptor { // reads of write-only special fds do nothing Some(FileDescriptor::ClearCache) | Some(FileDescriptor::Commit) => Ok(0), - Some(FileDescriptor::Version) => read_version(self.meta.version(), offset, buf), - Some(FileDescriptor::PromMetrics { snapshot }) => read_from_buf(snapshot, offset, buf), + Some(FileDescriptor::Version) => { + let version = self.version(); + read_version(&version, offset, buf) + } + Some(FileDescriptor::PromMetrics { snapshot }) => { + read_from_buf(snapshot.as_ref(), offset, buf) + } Some(FileDescriptor::Committed { key, range }) => { scoped_timer!("pond_volume_read_latency_secs", "type" => "committed"); // FIXME: readahead needs to know the extent of the location - @@ -361,10 +479,7 @@ impl Volume { return Ok(0); } let blob_offset = range.offset + offset; - let bytes: Vec = self - .cache - .get_at(key.clone(), blob_offset, read_len) - .await?; + let bytes: Vec = self.cache.get_at(key, blob_offset, read_len).await?; Ok(copy_into(buf, &bytes)) } Some(FileDescriptor::Staged { file, .. }) => { @@ -375,10 +490,16 @@ impl Volume { } } - pub async fn write_at(&mut self, fd: Fd, offset: u64, data: &[u8]) -> Result { + pub async fn write_at(&self, fd: Fd, offset: u64, data: &[u8]) -> Result { metrics::histogram!("pond_volume_write_buf_size_bytes").record(data.len() as f64); - match self.fds.get_mut(&fd) { + let descriptor = self + .fds + .read() + .expect("lock was poisoned") + .get(&fd) + .cloned(); + match descriptor { Some(FileDescriptor::ClearCache) => { self.cache.clear(); Ok(data.len()) @@ -427,14 +548,14 @@ impl Volume { } } - pub async fn release(&mut self, fd: Fd) -> Result<()> { - match self.fds.remove(&fd) { + pub async fn release(&self, fd: Fd) -> Result<()> { + match self.fds.write().expect("lock was poisoned").remove(&fd) { Some(_) => Ok(()), None => Err(ErrorKind::NotFound.into()), } } - pub async fn commit(&mut self, version: Version) -> Result<()> { + pub async fn commit(&self, version: Version) -> Result<()> { if self.store.exists(&version).await? { return Err(Error::new( ErrorKind::AlreadyExists, @@ -442,24 +563,19 @@ impl Volume { )); } - // don't allow staged files to be open - if self - .fds - .iter() - .any(|(_, desc)| matches!(desc, FileDescriptor::Staged { .. })) - { - return Err(Error::new( - ErrorKind::ResourceBusy, - "all open staged files must be closed before committing", - )); - } + let commit = Commit::new(self)?; - let mut staged = StagedVolume::new(self); - let (dest, ranges) = staged.upload().await?; - staged.modify(dest, ranges)?; - staged.persist(version).await?; + // take a snapshot of the metadata and walk the volume to get a handle to each staged file. + let mut snapshot = commit.snapshot()?; + let (dest, range) = commit.upload_files(snapshot.staged_files).await?; + apply_location_ranges(&mut snapshot.metadata, range, dest)?; + commit + .upload_metadata(&mut snapshot.metadata, version) + .await?; - Ok(()) + // reconcile any differences between what we just uploaded and any modifications that + // occurred since we took the snapshot. + commit.sync(snapshot.metadata) } } @@ -480,9 +596,63 @@ fn read_from_buf(from: &[u8], offset: u64, to: &mut [u8]) -> Result { Ok(amt) } +pub struct WalkVolume { + meta: Arc>, + stack: Vec>, +} + +impl WalkVolume { + fn new(meta: Arc>, ino: Ino) -> Result { + let entries = meta + .read() + .expect("lock was poisoned") + .readdir(ino, None)? + .map(|e| e.to_owned()) + .collect::>(); + Ok(Self { + meta, + stack: vec![entries.into_iter()], + }) + } + + fn push_dir_entries(&mut self, ino: Ino) -> Result<()> { + let entries = self + .meta + .read() + .expect("lock was poisoned") + .readdir(ino, None)? + .map(|e| e.to_owned()) + .collect::>(); + self.stack.push(entries.into_iter()); + Ok(()) + } +} + +impl Iterator for WalkVolume { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + let iter = self.stack.last_mut()?; + if let Some(entry) = iter.next() { + if entry.attr().kind == FileType::Directory + && let Err(e) = self.push_dir_entries(entry.attr().ino) + { + return Some(Err(e)); + } + return Some(Ok(entry)); + } else { + self.stack.pop(); + } + } + } +} + impl Volume { - pub fn walk(&self, ino: Ino) -> Result>>> { - self.meta.walk(ino) + /// Returns a guarded iterator over the entire volume. The guard ensures that iteration is done + /// over a consistent view of the Volume. + pub fn walk(&self, ino: Ino) -> Result { + WalkVolume::new(self.meta.clone(), ino) } /// Pack a local directory into a Pond volume. @@ -515,7 +685,10 @@ impl Volume { .components() .map(|c| c.as_os_str().to_string_lossy().to_string()) .collect(); - self.metadata_mut().mkdir_all(Ino::Root, dirs)?; + self.meta + .write() + .expect("lock was poisoned") + .mkdir_all(Ino::Root, dirs)?; } // for a file: // @@ -537,7 +710,11 @@ impl Volume { let dirs = dir .components() .map(|c| c.as_os_str().to_string_lossy().to_string()); - self.metadata_mut().mkdir_all(Ino::Root, dirs)?.ino + self.meta + .write() + .expect("lock was poisoned") + .mkdir_all(Ino::Root, dirs)? + .ino } else { Ino::Root }; @@ -559,11 +736,11 @@ impl Volume { ) })? .len(); - self.metadata_mut().create( + self.meta.write().expect("lock was poisoned").create( dir_ino, name.to_string_lossy().to_string(), true, - Location::staged(entry.path()), + Location::staged(entry.path(), self.generation.load(Ordering::SeqCst)), ByteRange { offset: 0, len }, )?; } @@ -587,11 +764,25 @@ macro_rules! try_sync { }; } -/// Opens a std::fs::File for reading (and writing if `write` is set). -async fn open_file(path: &Path, write: bool) -> Result { +enum OpenMode { + Read, + ReadWrite, + Create, +} + +/// Opens a std::fs::File for reading and optionally writing. +async fn open_file(path: &Path, mode: OpenMode) -> Result { let copy = path.to_path_buf(); + + let mut options = std::fs::File::options(); + match mode { + OpenMode::Read => options.read(true), + OpenMode::ReadWrite => options.read(true).write(true), + OpenMode::Create => options.read(true).write(true).create(true).truncate(true), + }; + try_sync!( - std::fs::File::options().read(true).write(write).open(copy), + options.open(copy), "std::fs::File::open", "failed to open staged file" ) @@ -661,13 +852,39 @@ macro_rules! try_mpu { }; } -struct StagedVolume<'a> { - inner: &'a mut Volume, +struct Snapshot { + metadata: VolumeMetadata, + staged_files: Vec<(FileAttr, Arc)>, +} + +struct Commit<'a> { + inner: &'a Volume, } -impl<'a> StagedVolume<'a> { - fn new(inner: &'a mut Volume) -> Self { - Self { inner } +impl<'a> Drop for Commit<'a> { + fn drop(&mut self) { + self.inner.commit.store(false, Ordering::SeqCst); + } +} + +impl<'a> Commit<'a> { + fn new(inner: &'a Volume) -> Result { + // only allow commits if another commit isn't already in-flight, set the commit bool if + // we're okay to proceed. + match inner + .commit + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + { + Ok(_) => (), + Err(_) => { + return Err(Error::new( + ErrorKind::ResourceBusy, + "a commit is already in progress", + )); + } + } + + Ok(Self { inner }) } /// The size of each part within the multipart upload (excluding the last which is allowed to @@ -675,12 +892,53 @@ impl<'a> StagedVolume<'a> { const MPU_UPLOAD_SIZE: usize = 32 * 1024 * 1024; const READ_BUF_SIZE: usize = 8 * 1024; + /// Takes a snapshot of the metadata and the files we need to upload as a part of this commit + /// while holding a lock for the metadata and file descriptor map. + fn snapshot(&self) -> Result { + // read lock on metadata, others can read during the snapshot process, but can't modify. + let metadata = self.inner.meta.read().expect("lock was poisoned"); + // write lock on the fds, no one is allowed to open new fds while we're taking a snapshot. + let fds = self.inner.fds.write().expect("lock was poisoned"); + + // don't allow open fds to staged files during the snapshot process. + if fds + .iter() + .any(|(_, desc)| matches!(desc, FileDescriptor::Staged { .. })) + { + return Err(Error::new( + ErrorKind::ResourceBusy, + "all open staged files must be closed before committing", + )); + } + + // bump the generation number for new staged files. commit will only upload and take + // snapshots of files with generation numbers less than this value. this draws a + // line in the sand on what will be included in the commit. if someone opens a staged file + // and sees that its generation number is less than the current generation, and commit is + // inprogress, then we'll treat it similarly to opening a committed file (i.e. truncating + // it). + self.inner.generation.fetch_add(1, Ordering::SeqCst); + + let staged_files: Vec<_> = metadata + .iter_staged() + .map(|(attr, path)| (attr.clone(), path.clone())) + .collect(); + + Ok(Snapshot { + metadata: metadata.clone(), + staged_files, + }) + } + /// Upload all staged files into a single blob under base. /// /// Returns the location of the newly uploaded blob and a vector that maps each newly written /// Ino to its ByteRange within the new blob. Files are uploaded to the blob using multipart /// uploads. - async fn upload(&self) -> Result<(Location, Vec<(Ino, ByteRange)>)> { + async fn upload_files( + &self, + files: Vec<(FileAttr, Arc)>, + ) -> Result<(Location, Vec<(Ino, ByteRange)>)> { let (dest_name, dest) = self.inner.store.new_data_file(); let mut offset = 0; @@ -692,10 +950,10 @@ impl<'a> StagedVolume<'a> { let mut buf = BytesMut::with_capacity(Self::MPU_UPLOAD_SIZE); let mut read_buf = vec![0u8; Self::READ_BUF_SIZE]; - for (attr, path) in self.inner.meta.iter_staged() { + for (attr, path) in files { // don't actually upload anything for zero sized files if attr.size > 0 { - let file = open_file(path, false).await?; + let file = open_file(&path, OpenMode::Read).await?; let mut reader = BufReader::new(file).take(attr.size); loop { @@ -752,29 +1010,10 @@ impl<'a> StagedVolume<'a> { Ok((Location::committed(dest_name), staged)) } - /// Relocate all staged files to dest. - fn modify(&mut self, dest: Location, ranges: Vec<(Ino, ByteRange)>) -> Result<()> { - let now = SystemTime::now(); - for (ino, byte_range) in ranges { - self.inner.meta.modify( - ino, - now, - Some(now), - Some(dest.clone()), - Some(Modify::Set(byte_range)), - )?; - } - - // deduplicate and clean up all hanging staged Locations - self.inner.meta.clean_staged_locations(dest); - - Ok(()) - } - /// Mint and upload a new version of Volume. - async fn persist(self, version: Version) -> Result<()> { + async fn upload_metadata(&self, metadata: &mut VolumeMetadata, version: Version) -> Result<()> { let meta_path = self.inner.store.metadata_path(&version); - let new_volume = bytes::Bytes::from(self.inner.to_bytes_with_version(&version)?); + let new_volume = bytes::Bytes::from(metadata.to_bytes_with_version(&version)?); let put_metadata = || async { self.inner @@ -798,7 +1037,7 @@ impl<'a> StagedVolume<'a> { match res { Ok(_) => { - self.inner.meta.set_version(version); + metadata.set_version(version); Ok(()) } // TODO: there's a scenario here where we get an AlreadyExists error returned to us, @@ -821,6 +1060,86 @@ impl<'a> StagedVolume<'a> { )), } } + + fn sync(self, snapshot: VolumeMetadata) -> Result<()> { + let mut metadata = self.inner.meta.write().expect("lock was poisoned"); + metadata.set_version(snapshot.version().clone()); + + let snapshot_generation = self.inner.generation.load(Ordering::SeqCst); + // walk the snapshot, updating the current view of metadata for the dir entries for staged + // regular files that we committed up to S3. only update them if they weren't updated since + // we took the snapshot. if they were updated, then any concurrent updates to the metdata + // while we were committing will take precedence. + for snapshot_entry in snapshot.walk(Ino::Root)? { + let snapshot_entry = snapshot_entry?; + if !snapshot_entry.is_regular() || !snapshot_entry.attr.is_file() { + continue; + } + + // fetch the equivalent entry in the current version of metadata. if it's missing, then + // someone deleted this ino while we were busy committing. + let ino = snapshot_entry.attr.ino; + let Some(attr) = metadata.getattr(ino).cloned() else { + continue; + }; + + // if the location is still staged and has an older generation, then we know that it + // wasn't opened for writing since the commit. we can proceed with replacing its + // location to the one we uploaded to S3. + if !matches!( + metadata.location(ino), + Some((Location::Staged { generation, .. }, _)) if generation < snapshot_generation + ) { + continue; + } + + // retain mtime and ctime changes (e.g. if the file was touched to update mtime or + // moved) + let new_mtime = std::cmp::max(attr.mtime, snapshot_entry.attr.mtime); + let new_ctime = std::cmp::max(attr.ctime, snapshot_entry.attr.ctime); + + let Some((snapshot_location, snapshot_byte_range)) = snapshot_entry.location() else { + continue; + }; + + metadata.modify( + ino, + new_mtime, + Some(new_ctime), + Some(snapshot_location.clone()), + Some(Modify::Set(snapshot_byte_range)), + )?; + } + + // the walk and location modification will result in some unreferenced staged locations. + // these need to be cleaned up. + metadata.prune_unreferenced_locations(); + + Ok(()) + } +} + +/// Updates the entries within `metadata` specified by the Inos in `ranges` to point to `dest` and +/// have the range in `ranges`. +fn apply_location_ranges( + metadata: &mut VolumeMetadata, + ranges: Vec<(Ino, ByteRange)>, + dest: Location, +) -> Result<()> { + let now = SystemTime::now(); + for (ino, byte_range) in ranges { + metadata.modify( + ino, + now, + Some(now), + Some(dest.clone()), + Some(Modify::Set(byte_range)), + )?; + } + + metadata.prune_unreferenced_locations(); + + Ok(()) } fn should_retry(e: &object_store::Error) -> bool { @@ -882,12 +1201,12 @@ mod tests { } } - async fn assert_write(volume: &mut Volume, fd: Fd, contents: &'static str) { + async fn assert_write(volume: &Volume, fd: Fd, contents: &'static str) { volume.write_at(fd, 0, contents.as_bytes()).await.unwrap(); volume.release(fd).await.unwrap(); } - async fn assert_read(volume: &mut Volume, name: &'static str, contents: &'static str) { + async fn assert_read(volume: &Volume, name: &'static str, contents: &'static str) { let attr = volume.lookup(Ino::Root, name).unwrap().unwrap(); let mut buf = vec![0u8; attr.size as usize]; let fd = volume.open_read(attr.ino).await.unwrap(); @@ -907,13 +1226,14 @@ mod tests { .unwrap(); // create a volume with three files, all of which are 5 bytes long. - let (mut volume, inos) = runtime.block_on(async { + let (volume, inos) = runtime.block_on(async { let mut client = Client::open(volume_path.to_str().unwrap()).unwrap(); - let mut volume = client.create_volume().await; + let volume = client.create_volume().await; - let mut create = async |name, bs| { + let create = async |name, bs| { let (attr, fd) = volume .create(Ino::Root, std::primitive::str::to_string(name), true) + .await .unwrap(); let ino = attr.ino; volume.write_at(fd, 0, bs).await.unwrap(); @@ -954,47 +1274,59 @@ mod tests { std::fs::create_dir_all(&volume_path).unwrap(); let mut client = Client::open(volume_path.to_str().unwrap()).unwrap(); - let mut volume = client.create_volume().await; + let volume = client.create_volume().await; // clean volume -- this is not staged - assert!(!volume.meta.is_staged()); + assert!(!volume.meta.read().expect("lock was poisoned").is_staged()); // creating two files, it should be a staged volume now. - let (attr1, fd1) = volume.create(Ino::Root, "hello.txt".into(), true).unwrap(); + let (attr1, fd1) = volume + .create(Ino::Root, "hello.txt".into(), true) + .await + .unwrap(); let attr1 = attr1.clone(); - assert_write(&mut volume, fd1, "hello").await; - let (attr2, fd2) = volume.create(Ino::Root, "world.txt".into(), true).unwrap(); + assert_write(&volume, fd1, "hello").await; + let (attr2, fd2) = volume + .create(Ino::Root, "world.txt".into(), true) + .await + .unwrap(); let attr2 = attr2.clone(); - assert_write(&mut volume, fd2, "world").await; + assert_write(&volume, fd2, "world").await; - assert!(volume.meta.is_staged()); - for attr in [&attr1, &attr2] { - assert!(matches!( - volume.meta.location(attr.ino), - Some((Location::Staged { .. }, _)) - )); + { + let meta = volume.meta.read().expect("lock was poisoned"); + assert!(meta.is_staged()); + for attr in [&attr1, &attr2] { + assert!(matches!( + meta.location(attr.ino), + Some((Location::Staged { .. }, _)) + )); + } } // commit!!! let commit_fd = volume.open_read_write(Ino::COMMIT).await.unwrap(); - assert_write(&mut volume, commit_fd, "next-version").await; + assert_write(&volume, commit_fd, "next-version").await; // after commit, both files are no longer staged - assert!(!volume.meta.is_staged()); - for attr in [&attr1, &attr2] { - assert!(matches!( - volume.meta.location(attr.ino), - Some((Location::Committed { .. }, _)) - )); + { + let meta = volume.meta.read().expect("lock was poisoned"); + assert!(!meta.is_staged()); + for attr in [&attr1, &attr2] { + assert!(matches!( + meta.location(attr.ino), + Some((Location::Committed { .. }, _)) + )); + } } // read the new volume, assert that the committed files have the // right contents, and check the version is bumped as expected - let mut next_volume = client.load_volume(&None).await.unwrap(); - let next_version: &str = next_volume.meta.version().as_ref(); - assert_eq!(next_version, "next-version",); - assert_read(&mut next_volume, "hello.txt", "hello").await; - assert_read(&mut next_volume, "world.txt", "world").await; + let next_volume = client.load_volume(&None).await.unwrap(); + let next_version = next_volume.version().to_string(); + assert_eq!(next_version, "next-version"); + assert_read(&next_volume, "hello.txt", "hello").await; + assert_read(&next_volume, "world.txt", "world").await; } #[tokio::test] @@ -1004,13 +1336,20 @@ mod tests { std::fs::create_dir_all(&volume_path).unwrap(); let mut client = Client::open(volume_path.to_str().unwrap()).unwrap(); - let mut volume = client.create_volume().await; + let volume = client.create_volume().await; // creating a file, but holding the fd (not releasing it) - let (attr, fd) = volume.create(Ino::Root, "hello.txt".into(), true).unwrap(); + let (attr, fd) = volume + .create(Ino::Root, "hello.txt".into(), true) + .await + .unwrap(); let attr = attr.clone(); assert!(matches!( - volume.meta.location(attr.ino), + volume + .meta + .read() + .expect("lock was poisoned") + .location(attr.ino), Some((Location::Staged { .. }, _)) )); @@ -1038,7 +1377,7 @@ mod tests { let _f3 = volume.open_read_write(Ino::COMMIT).await.unwrap(); let _f4 = volume.open_read(attr.ino).await.unwrap(); // this is ok, because none of them are staged - assert!(!volume.fds.is_empty()); + assert!(!volume.fds.read().expect("lock was poisoned").is_empty()); volume .commit(Version::from_static("next-version")) .await