From 3c9b1f6d7f8b67d6f595cadf4ec6701849911952 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Fri, 21 Nov 2025 03:38:34 -0500 Subject: [PATCH 01/17] parking_lot rwlocks --- Cargo.lock | 1 + Cargo.toml | 1 + pond-fs/src/fuse.rs | 10 +- pond-fs/src/lib.rs | 2 +- pond-fs/tests/fuzz.rs | 4 +- pond/Cargo.toml | 1 + pond/src/lib.rs | 47 ++++++ pond/src/volume.rs | 370 +++++++++++++++++++++++------------------- 8 files changed, 260 insertions(+), 176 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 87fa059..30cbc87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1787,6 +1787,7 @@ dependencies = [ "futures", "metrics", "object_store", + "parking_lot", "rand 0.9.2", "tempfile", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 19bb8d8..c4796a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ metrics-exporter-prometheus = "0.17.2" metrics-util = "0.20.0" nix = "0.29" # don't require a newer version than fuser if possible object_store = { version = "0.12.3", features = ["aws"] } +parking_lot = "0.12.5" rand = "0.9.2" signal-hook = "0.3" tempfile = "3" diff --git a/pond-fs/src/fuse.rs b/pond-fs/src/fuse.rs index d3ede9e..4b8aa32 100644 --- a/pond-fs/src/fuse.rs +++ b/pond-fs/src/fuse.rs @@ -88,7 +88,7 @@ impl fuser::Filesystem for Pond { match fs_try!(reply, self.volume.lookup(parent.into(), name)) { Some(attr) => reply.entry( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), 0, ), None => reply.error(libc::ENOENT), @@ -106,7 +106,7 @@ impl fuser::Filesystem for Pond { let attr = fs_try!(reply, self.volume.getattr(ino.into())); reply.attr( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), ); } @@ -147,7 +147,7 @@ impl fuser::Filesystem for Pond { let attr = fs_try!(reply, self.volume.mkdir(parent.into(), name.to_string())); reply.entry( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), 0, ); } @@ -217,7 +217,7 @@ impl fuser::Filesystem for Pond { ); reply.created( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), 0, fd.into(), 0, @@ -344,7 +344,7 @@ impl fuser::Filesystem for Pond { let attr = fs_try!(reply, self.volume.getattr(ino)); reply.attr( &self.kernel_cache_timeout, - &fuse_attr(self.uid, self.gid, attr), + &fuse_attr(self.uid, self.gid, &attr), ); } diff --git a/pond-fs/src/lib.rs b/pond-fs/src/lib.rs index 06781be..2b4284a 100644 --- a/pond-fs/src/lib.rs +++ b/pond-fs/src/lib.rs @@ -261,7 +261,7 @@ pub fn create( let version = Version::from_str(version.as_ref())?; runtime.block_on(async { - let mut volume = client.create_volume().await; + let volume = client.create_volume().await; volume.pack(dir, version).await?; Ok(()) }) diff --git a/pond-fs/tests/fuzz.rs b/pond-fs/tests/fuzz.rs index 5a4646f..a85fde6 100644 --- a/pond-fs/tests/fuzz.rs +++ b/pond-fs/tests/fuzz.rs @@ -137,7 +137,7 @@ fn test_pack(expected_dir: &Path, actual_dir: &Path, pack_dir: &Path, entries: V // pack it to the pack_dir runtime .block_on(async { - let mut volume = client.create_volume().await; + let volume = client.create_volume().await; volume.pack(expected_dir, version).await?; Ok::<_, pond::Error>(()) }) @@ -264,7 +264,7 @@ fn test_commit( // we have all the data from before the commit and nothing // from after it. let volume = runtime.block_on(client.load_volume(&None)).unwrap(); - assert_eq!(volume.version(), &Version::from_static("v1")); + assert_eq!(volume.version(), Version::from_static("v1")); let mount = spawn_mount(mount_dir, volume); let expected = read_entries(expected_dir); diff --git a/pond/Cargo.toml b/pond/Cargo.toml index 1107615..beb0879 100644 --- a/pond/Cargo.toml +++ b/pond/Cargo.toml @@ -13,6 +13,7 @@ foyer-memory.workspace = true futures.workspace = true metrics.workspace = true object_store.workspace = true +parking_lot.workspace = true rand.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["fs"] } diff --git a/pond/src/lib.rs b/pond/src/lib.rs index 3bff853..f33f743 100644 --- a/pond/src/lib.rs +++ b/pond/src/lib.rs @@ -187,6 +187,53 @@ impl<'a> DirEntry<'a> { } } +/// Owned equivalent of `DirEntry` that does not borrow from the underlying volume. +#[derive(Debug, Clone)] +pub struct OwnedDirEntry { + name: String, + parents: Vec, + attr: FileAttr, + location: Option<(Location, ByteRange)>, +} + +impl OwnedDirEntry { + pub fn name(&self) -> &str { + &self.name + } + + pub fn attr(&self) -> &FileAttr { + &self.attr + } + + pub fn location(&self) -> Option<(&Location, ByteRange)> { + self.location + .as_ref() + .map(|(loc, range)| (loc, *range)) + } + + pub fn path(&self) -> String { + let mut path = self.parents.clone(); + path.push(self.name.clone()); + path.join("/") + } + + pub fn is_regular(&self) -> bool { + self.attr.ino.is_regular() + } +} + +impl<'a> From> for OwnedDirEntry { + fn from(entry: DirEntry<'a>) -> Self { + let location = entry.location().map(|(loc, range)| (loc.clone(), range)); + Self { + name: entry.name().to_string(), + parents: entry.parents.iter().map(|p| p.to_string()).collect(), + attr: entry.attr().clone(), + location, + } + } +} + // TODO: add checksums/etags here? #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] diff --git a/pond/src/volume.rs b/pond/src/volume.rs index a7794fe..6a4aae4 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -1,5 +1,5 @@ use crate::{ - ByteRange, DirEntry, Error, FileAttr, Ino, Location, Result, + ByteRange, Error, FileAttr, Ino, Location, OwnedDirEntry, Result, cache::{CacheConfig, ChunkCache}, error::ErrorKind, metadata::{Modify, Version, VolumeMetadata}, @@ -10,6 +10,7 @@ use arc_swap::ArcSwap; use backon::{ExponentialBuilder, Retryable}; use bytes::{Bytes, BytesMut}; use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; +use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::{ collections::BTreeMap, io::{BufReader, Read}, @@ -53,7 +54,7 @@ impl From for u64 { } } -#[derive(Debug)] +#[derive(Debug, Clone)] enum FileDescriptor { Committed { key: Arc, @@ -74,9 +75,9 @@ enum FileDescriptor { } pub struct Volume { - meta: VolumeMetadata, + meta: Arc>, cache: ChunkCache, - fds: BTreeMap, + fds: Arc>>, store: crate::storage::Storage, metrics_snapshot: Option>>>, } @@ -95,24 +96,24 @@ impl Volume { }); Self { - meta, + meta: Arc::new(RwLock::new(meta)), cache, - fds: Default::default(), + fds: Arc::new(RwLock::new(BTreeMap::new())), store, metrics_snapshot, } } - pub(crate) fn metadata(&self) -> &VolumeMetadata { - &self.meta + pub(crate) fn metadata(&self) -> RwLockReadGuard<'_, VolumeMetadata> { + self.meta.read() } - pub(crate) fn metadata_mut(&mut self) -> &mut VolumeMetadata { - &mut self.meta + pub(crate) fn metadata_mut(&self) -> RwLockWriteGuard<'_, VolumeMetadata> { + self.meta.write() } pub(crate) fn modify( - &mut self, + &self, ino: Ino, mtime: SystemTime, ctime: Option, @@ -122,14 +123,16 @@ impl Volume { match ino { Ino::CLEAR_CACHE | Ino::COMMIT => Ok(()), ino => { - self.meta.modify(ino, mtime, ctime, location, range)?; + self.meta + .write() + .modify(ino, mtime, ctime, location, range)?; Ok(()) } } } - pub fn version(&self) -> &Version { - self.metadata().version() + pub fn version(&self) -> Version { + self.metadata().version().clone() } pub fn object_store_description(&self) -> String { @@ -145,91 +148,90 @@ impl Volume { } pub fn to_bytes(&self) -> Result> { - self.meta.to_bytes() + let meta = self.metadata(); + meta.to_bytes() } pub fn to_bytes_with_version(&self, version: &Version) -> Result> { - self.meta.to_bytes_with_version(version) + let meta = self.metadata(); + meta.to_bytes_with_version(version) } - pub fn getattr(&self, ino: Ino) -> Result<&FileAttr> { + pub fn getattr(&self, ino: Ino) -> Result { scoped_timer!("pond_volume_getattr_latency_secs"); - match self.meta.getattr(ino) { - Some(attr) => Ok(attr), + match self.metadata().getattr(ino) { + Some(attr) => Ok(attr.clone()), None => Err(ErrorKind::NotFound.into()), } } pub fn setattr( - &mut self, + &self, ino: Ino, mtime: Option, ctime: Option, - ) -> Result<&FileAttr> { - self.meta.setattr(ino, mtime, ctime) + ) -> Result { + let mut meta = self.metadata_mut(); + meta.setattr(ino, mtime, ctime).cloned() } - pub fn lookup(&self, parent: Ino, name: &str) -> Result> { + pub fn lookup(&self, parent: Ino, name: &str) -> Result> { scoped_timer!("pond_volume_lookup_latency_secs"); - let attr = self.meta.lookup(parent, name)?; - Ok(attr) + let meta = self.metadata(); + Ok(meta.lookup(parent, name)?.cloned()) } - pub fn mkdir(&mut self, parent: Ino, name: String) -> Result<&FileAttr> { - self.meta.mkdir(parent, name) + pub fn mkdir(&self, parent: Ino, name: String) -> Result { + let mut meta = self.metadata_mut(); + meta.mkdir(parent, name).cloned() } - pub fn rmdir(&mut self, parent: Ino, name: &str) -> Result<()> { - self.meta.rmdir(parent, name)?; + pub fn rmdir(&self, parent: Ino, name: &str) -> Result<()> { + self.metadata_mut().rmdir(parent, name)?; Ok(()) } - pub fn rename( - &mut self, - parent: Ino, - name: &str, - newparent: Ino, - newname: String, - ) -> Result<()> { - self.meta.rename(parent, name, newparent, newname)?; + pub fn rename(&self, parent: Ino, name: &str, newparent: Ino, newname: String) -> Result<()> { + self.metadata_mut() + .rename(parent, name, newparent, newname)?; Ok(()) } - pub fn readdir(&self, ino: Ino) -> Result>> { - let iter = self.meta.readdir(ino)?; - Ok(iter) + pub fn readdir(&self, ino: Ino) -> Result> { + let guard = self.metadata(); + let entries: Vec = guard.readdir(ino)?.map(OwnedDirEntry::from).collect(); + Ok(entries.into_iter()) } - pub fn create( - &mut self, - parent: Ino, - name: String, - exclusive: bool, - ) -> Result<(&FileAttr, Fd)> { + pub fn create(&self, parent: Ino, name: String, exclusive: bool) -> Result<(FileAttr, Fd)> { let (path, file) = self.store.tempfile()?; - let attr = self.meta.create( - parent, - name, - exclusive, - Location::Staged { path }, - ByteRange::empty(), - )?; + let attr = { + let mut meta = self.metadata_mut(); + meta.create( + parent, + name, + exclusive, + Location::Staged { path }, + ByteRange::empty(), + )? + .clone() + }; let fd = new_fd( - &mut self.fds, + &self.fds, attr.ino, FileDescriptor::Staged { file: file.into() }, )?; Ok((attr, fd)) } - pub fn delete(&mut self, parent: Ino, name: &str) -> Result<()> { - self.meta.delete(parent, name)?; + pub fn delete(&self, parent: Ino, name: &str) -> Result<()> { + self.metadata_mut().delete(parent, name)?; Ok(()) } - pub fn truncate(&mut self, ino: Ino, size: u64) -> Result<()> { + pub fn truncate(&self, ino: Ino, size: u64) -> Result<()> { self.modify( ino, SystemTime::now(), @@ -263,92 +265,98 @@ impl Volume { /// Open a Fd to a locally staged file for reading and writing. /// /// Opening a Fd with write permissions will always truncate the file. - pub async fn open_read_write(&mut self, ino: Ino) -> Result { + pub async fn open_read_write(&self, ino: Ino) -> Result { match ino { - Ino::COMMIT => new_fd(&mut self.fds, ino, FileDescriptor::Commit), - Ino::CLEAR_CACHE => new_fd(&mut self.fds, ino, FileDescriptor::ClearCache), + Ino::COMMIT => new_fd(&self.fds, ino, FileDescriptor::Commit), + Ino::CLEAR_CACHE => new_fd(&self.fds, ino, FileDescriptor::ClearCache), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), - ino => match self.meta.location(ino) { - Some((Location::Staged { path }, _)) => { - let file = open_file(path, true).await?; - - new_fd( - &mut self.fds, - ino, - FileDescriptor::Staged { file: file.into() }, - ) - } - Some((Location::Committed { .. }, ..)) => { - // truncate the file (by assigning it a brand new staged file) if it's - // committed. the alternative would be to keep a copy of the committed - // file locally as a staged file, which can be expensive if it's a large file. - let (path, file) = self.store.tempfile()?; - let staged = Location::Staged { path }; - // modify metadata next - self.modify( - ino, - SystemTime::now(), - None, - Some(staged), - Some(Modify::Set((0, 0).into())), - )?; - // only create the fd once the file is open and metadata is valid - new_fd( - &mut self.fds, - ino, - FileDescriptor::Staged { file: file.into() }, - ) + ino => { + let location = { + let meta = self.metadata(); + meta.location(ino) + .map(|(location, range)| (location.clone(), *range)) + }; + match location { + Some((Location::Staged { path }, _)) => { + let file = open_file(&path, true).await?; + + new_fd(&self.fds, ino, FileDescriptor::Staged { file: file.into() }) + } + Some((Location::Committed { .. }, ..)) => { + // truncate the file (by assigning it a brand new staged file) if it's + // committed. the alternative would be to keep a copy of the committed + // file locally as a staged file, which can be expensive if it's a large file. + let (path, file) = self.store.tempfile()?; + let staged = Location::Staged { path }; + // modify metadata next + self.modify( + ino, + SystemTime::now(), + None, + Some(staged), + Some(Modify::Set((0, 0).into())), + )?; + // only create the fd once the file is open and metadata is valid + new_fd(&self.fds, ino, FileDescriptor::Staged { file: file.into() }) + } + None => Err(ErrorKind::NotFound.into()), } - None => Err(ErrorKind::NotFound.into()), - }, + } } } - pub async fn open_read(&mut self, ino: Ino) -> Result { + pub async fn open_read(&self, ino: Ino) -> Result { match ino { - Ino::VERSION => new_fd(&mut self.fds, ino, FileDescriptor::Version), + Ino::VERSION => new_fd(&self.fds, ino, FileDescriptor::Version), Ino::PROM_METRICS => { let data = match &self.metrics_snapshot { Some(metrics) => metrics.load().clone(), None => Default::default(), }; new_fd( - &mut self.fds, + &self.fds, ino, FileDescriptor::PromMetrics { snapshot: data }, ) } Ino::COMMIT | Ino::CLEAR_CACHE => Err(ErrorKind::PermissionDenied.into()), - ino => match self.meta.location(ino) { - Some((Location::Staged { path }, _)) => { - let file = open_file(path, false).await?; - new_fd( - &mut self.fds, - ino, - FileDescriptor::Staged { file: file.into() }, - ) - } - Some((Location::Committed { key }, range)) => { - let key = Arc::new(self.store.child_path(key)); - new_fd( - &mut self.fds, - ino, - FileDescriptor::Committed { key, range: *range }, - ) + ino => { + let location = { + let meta = self.metadata(); + meta.location(ino) + .map(|(location, range)| (location.clone(), *range)) + }; + + match location { + Some((Location::Staged { path }, _)) => { + let file = open_file(&path, false).await?; + new_fd(&self.fds, ino, FileDescriptor::Staged { file: file.into() }) + } + Some((Location::Committed { key }, range)) => { + let key = Arc::new(self.store.child_path(key.as_ref())); + new_fd(&self.fds, ino, FileDescriptor::Committed { key, range }) + } + None => Err(ErrorKind::NotFound.into()), } - None => Err(ErrorKind::NotFound.into()), - }, + } } } pub async fn read_at(&self, fd: Fd, offset: u64, buf: &mut [u8]) -> Result { metrics::histogram!("pond_volume_read_buf_size_bytes").record(buf.len() as f64); - match self.fds.get(&fd) { + let descriptor = { self.fds.read().get(&fd).cloned() }; + + match descriptor { // reads of write-only special fds do nothing Some(FileDescriptor::ClearCache) | Some(FileDescriptor::Commit) => Ok(0), - Some(FileDescriptor::Version) => read_version(self.meta.version(), offset, buf), - Some(FileDescriptor::PromMetrics { snapshot }) => read_from_buf(snapshot, offset, buf), + Some(FileDescriptor::Version) => { + let version = self.version(); + read_version(&version, offset, buf) + } + Some(FileDescriptor::PromMetrics { snapshot }) => { + read_from_buf(snapshot.as_ref(), offset, buf) + } Some(FileDescriptor::Committed { key, range }) => { scoped_timer!("pond_volume_read_latency_secs", "type" => "committed"); // FIXME: readahead needs to know the extent of the location - @@ -361,10 +369,7 @@ impl Volume { return Ok(0); } let blob_offset = range.offset + offset; - let bytes: Vec = self - .cache - .get_at(key.clone(), blob_offset, read_len) - .await?; + let bytes: Vec = self.cache.get_at(key, blob_offset, read_len).await?; Ok(copy_into(buf, &bytes)) } Some(FileDescriptor::Staged { file, .. }) => { @@ -375,10 +380,12 @@ impl Volume { } } - pub async fn write_at(&mut self, fd: Fd, offset: u64, data: &[u8]) -> Result { + pub async fn write_at(&self, fd: Fd, offset: u64, data: &[u8]) -> Result { metrics::histogram!("pond_volume_write_buf_size_bytes").record(data.len() as f64); - match self.fds.get_mut(&fd) { + let descriptor = { self.fds.read().get(&fd).cloned() }; + + match descriptor { Some(FileDescriptor::ClearCache) => { self.cache.clear(); Ok(data.len()) @@ -427,14 +434,14 @@ impl Volume { } } - pub async fn release(&mut self, fd: Fd) -> Result<()> { - match self.fds.remove(&fd) { + pub async fn release(&self, fd: Fd) -> Result<()> { + match self.fds.write().remove(&fd) { Some(_) => Ok(()), None => Err(ErrorKind::NotFound.into()), } } - pub async fn commit(&mut self, version: Version) -> Result<()> { + pub async fn commit(&self, version: Version) -> Result<()> { if self.store.exists(&version).await? { return Err(Error::new( ErrorKind::AlreadyExists, @@ -445,6 +452,7 @@ impl Volume { // don't allow staged files to be open if self .fds + .read() .iter() .any(|(_, desc)| matches!(desc, FileDescriptor::Staged { .. })) { @@ -454,7 +462,7 @@ impl Volume { )); } - let mut staged = StagedVolume::new(self); + let staged = StagedVolume::new(self); let (dest, ranges) = staged.upload().await?; staged.modify(dest, ranges)?; staged.persist(version).await?; @@ -481,12 +489,17 @@ fn read_from_buf(from: &[u8], offset: u64, to: &mut [u8]) -> Result { } impl Volume { - pub fn walk(&self, ino: Ino) -> Result>>> { - self.meta.walk(ino) + pub fn walk(&self, ino: Ino) -> Result>> { + let guard = self.metadata(); + let entries: Vec> = guard + .walk(ino)? + .map(|res| res.map(OwnedDirEntry::from)) + .collect(); + Ok(entries.into_iter()) } /// Pack a local directory into a Pond volume. - pub async fn pack(&mut self, dir: impl AsRef, version: Version) -> crate::Result<()> { + pub async fn pack(&self, dir: impl AsRef, version: Version) -> crate::Result<()> { if self.store.exists(&version).await? { return Err(Error::new( ErrorKind::AlreadyExists, @@ -642,7 +655,12 @@ fn copy_into(mut buf: &mut [u8], bytes: &[Bytes]) -> usize { // FIXME: this needs to allocate and check for remaining fds instead of just // trying to increment every time and crashing. it's u64 so we probably won't // hit it for a while but that's jank -fn new_fd(fd_set: &mut BTreeMap, ino: Ino, d: FileDescriptor) -> Result { +fn new_fd( + fd_set: &RwLock>, + ino: Ino, + d: FileDescriptor, +) -> Result { + let mut fd_set = fd_set.write(); let next_fh = fd_set .keys() .last() @@ -662,11 +680,11 @@ macro_rules! try_mpu { } struct StagedVolume<'a> { - inner: &'a mut Volume, + inner: &'a Volume, } impl<'a> StagedVolume<'a> { - fn new(inner: &'a mut Volume) -> Self { + fn new(inner: &'a Volume) -> Self { Self { inner } } @@ -692,10 +710,17 @@ impl<'a> StagedVolume<'a> { let mut buf = BytesMut::with_capacity(Self::MPU_UPLOAD_SIZE); let mut read_buf = vec![0u8; Self::READ_BUF_SIZE]; - for (attr, path) in self.inner.meta.iter_staged() { + let staged_files: Vec<(FileAttr, std::path::PathBuf)> = { + let meta = self.inner.metadata(); + meta.iter_staged() + .map(|(attr, path)| (attr.clone(), path.clone())) + .collect() + }; + + for (attr, path) in staged_files { // don't actually upload anything for zero sized files if attr.size > 0 { - let file = open_file(path, false).await?; + let file = open_file(&path, false).await?; let mut reader = BufReader::new(file).take(attr.size); loop { @@ -753,10 +778,10 @@ impl<'a> StagedVolume<'a> { } /// Relocate all staged files to dest. - fn modify(&mut self, dest: Location, ranges: Vec<(Ino, ByteRange)>) -> Result<()> { + fn modify(&self, dest: Location, ranges: Vec<(Ino, ByteRange)>) -> Result<()> { let now = SystemTime::now(); for (ino, byte_range) in ranges { - self.inner.meta.modify( + self.inner.modify( ino, now, Some(now), @@ -766,7 +791,7 @@ impl<'a> StagedVolume<'a> { } // deduplicate and clean up all hanging staged Locations - self.inner.meta.clean_staged_locations(dest); + self.inner.metadata_mut().clean_staged_locations(dest); Ok(()) } @@ -798,7 +823,7 @@ impl<'a> StagedVolume<'a> { match res { Ok(_) => { - self.inner.meta.set_version(version); + self.inner.metadata_mut().set_version(version); Ok(()) } // TODO: there's a scenario here where we get an AlreadyExists error returned to us, @@ -882,12 +907,12 @@ mod tests { } } - async fn assert_write(volume: &mut Volume, fd: Fd, contents: &'static str) { + async fn assert_write(volume: &Volume, fd: Fd, contents: &'static str) { volume.write_at(fd, 0, contents.as_bytes()).await.unwrap(); volume.release(fd).await.unwrap(); } - async fn assert_read(volume: &mut Volume, name: &'static str, contents: &'static str) { + async fn assert_read(volume: &Volume, name: &'static str, contents: &'static str) { let attr = volume.lookup(Ino::Root, name).unwrap().unwrap(); let mut buf = vec![0u8; attr.size as usize]; let fd = volume.open_read(attr.ino).await.unwrap(); @@ -907,11 +932,11 @@ mod tests { .unwrap(); // create a volume with three files, all of which are 5 bytes long. - let (mut volume, inos) = runtime.block_on(async { + let (volume, inos) = runtime.block_on(async { let mut client = Client::open(volume_path.to_str().unwrap()).unwrap(); - let mut volume = client.create_volume().await; + let volume = client.create_volume().await; - let mut create = async |name, bs| { + let create = async |name, bs| { let (attr, fd) = volume .create(Ino::Root, std::primitive::str::to_string(name), true) .unwrap(); @@ -954,47 +979,56 @@ mod tests { std::fs::create_dir_all(&volume_path).unwrap(); let mut client = Client::open(volume_path.to_str().unwrap()).unwrap(); - let mut volume = client.create_volume().await; + let volume = client.create_volume().await; // clean volume -- this is not staged - assert!(!volume.meta.is_staged()); + { + let meta = volume.metadata(); + assert!(!meta.is_staged()); + } // creating two files, it should be a staged volume now. let (attr1, fd1) = volume.create(Ino::Root, "hello.txt".into(), true).unwrap(); let attr1 = attr1.clone(); - assert_write(&mut volume, fd1, "hello").await; + assert_write(&volume, fd1, "hello").await; let (attr2, fd2) = volume.create(Ino::Root, "world.txt".into(), true).unwrap(); let attr2 = attr2.clone(); - assert_write(&mut volume, fd2, "world").await; + assert_write(&volume, fd2, "world").await; - assert!(volume.meta.is_staged()); - for attr in [&attr1, &attr2] { - assert!(matches!( - volume.meta.location(attr.ino), - Some((Location::Staged { .. }, _)) - )); + { + let meta = volume.metadata(); + assert!(meta.is_staged()); + for attr in [&attr1, &attr2] { + assert!(matches!( + meta.location(attr.ino), + Some((Location::Staged { .. }, _)) + )); + } } // commit!!! let commit_fd = volume.open_read_write(Ino::COMMIT).await.unwrap(); - assert_write(&mut volume, commit_fd, "next-version").await; + assert_write(&volume, commit_fd, "next-version").await; // after commit, both files are no longer staged - assert!(!volume.meta.is_staged()); - for attr in [&attr1, &attr2] { - assert!(matches!( - volume.meta.location(attr.ino), - Some((Location::Committed { .. }, _)) - )); + { + let meta = volume.metadata(); + assert!(!meta.is_staged()); + for attr in [&attr1, &attr2] { + assert!(matches!( + meta.location(attr.ino), + Some((Location::Committed { .. }, _)) + )); + } } // read the new volume, assert that the committed files have the // right contents, and check the version is bumped as expected - let mut next_volume = client.load_volume(&None).await.unwrap(); - let next_version: &str = next_volume.meta.version().as_ref(); - assert_eq!(next_version, "next-version",); - assert_read(&mut next_volume, "hello.txt", "hello").await; - assert_read(&mut next_volume, "world.txt", "world").await; + let next_volume = client.load_volume(&None).await.unwrap(); + let next_version = next_volume.version().to_string(); + assert_eq!(next_version, "next-version"); + assert_read(&next_volume, "hello.txt", "hello").await; + assert_read(&next_volume, "world.txt", "world").await; } #[tokio::test] @@ -1004,13 +1038,13 @@ mod tests { std::fs::create_dir_all(&volume_path).unwrap(); let mut client = Client::open(volume_path.to_str().unwrap()).unwrap(); - let mut volume = client.create_volume().await; + let volume = client.create_volume().await; // creating a file, but holding the fd (not releasing it) let (attr, fd) = volume.create(Ino::Root, "hello.txt".into(), true).unwrap(); let attr = attr.clone(); assert!(matches!( - volume.meta.location(attr.ino), + volume.meta.read().location(attr.ino), Some((Location::Staged { .. }, _)) )); @@ -1038,7 +1072,7 @@ mod tests { let _f3 = volume.open_read_write(Ino::COMMIT).await.unwrap(); let _f4 = volume.open_read(attr.ino).await.unwrap(); // this is ok, because none of them are staged - assert!(!volume.fds.is_empty()); + assert!(!volume.fds.read().is_empty()); volume .commit(Version::from_static("next-version")) .await From d8db571799fc3d675518453b7de2060f7e25a9d0 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Fri, 21 Nov 2025 11:26:08 -0500 Subject: [PATCH 02/17] walkdir takes a guard the entire time --- pond/src/volume.rs | 59 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 8 deletions(-) diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 6a4aae4..6cbdb60 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -1,5 +1,5 @@ use crate::{ - ByteRange, Error, FileAttr, Ino, Location, OwnedDirEntry, Result, + ByteRange, Error, FileAttr, FileType, Ino, Location, OwnedDirEntry, Result, cache::{CacheConfig, ChunkCache}, error::ErrorKind, metadata::{Modify, Version, VolumeMetadata}, @@ -488,14 +488,57 @@ fn read_from_buf(from: &[u8], offset: u64, to: &mut [u8]) -> Result { Ok(amt) } +pub struct WalkVolume<'a> { + guard: RwLockReadGuard<'a, VolumeMetadata>, + stack: Vec>, +} + +impl<'a> WalkVolume<'a> { + fn new(guard: RwLockReadGuard<'a, VolumeMetadata>, ino: Ino) -> Result { + let entries = guard + .readdir(ino)? + .map(OwnedDirEntry::from) + .collect::>(); + Ok(Self { + guard, + stack: vec![entries.into_iter()], + }) + } + + fn push_dir_entries(&mut self, ino: Ino) -> Result<()> { + let entries = self + .guard + .readdir(ino)? + .map(OwnedDirEntry::from) + .collect::>(); + self.stack.push(entries.into_iter()); + Ok(()) + } +} + +impl<'a> Iterator for WalkVolume<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + let iter = self.stack.last_mut()?; + if let Some(entry) = iter.next() { + if entry.attr().kind == FileType::Directory + && let Err(e) = self.push_dir_entries(entry.attr().ino) + { + return Some(Err(e)); + } + return Some(Ok(entry)); + } else { + self.stack.pop(); + } + } + } +} + impl Volume { - pub fn walk(&self, ino: Ino) -> Result>> { - let guard = self.metadata(); - let entries: Vec> = guard - .walk(ino)? - .map(|res| res.map(OwnedDirEntry::from)) - .collect(); - Ok(entries.into_iter()) + pub fn walk(&self, ino: Ino) -> Result> { + WalkVolume::new(self.metadata(), ino) } /// Pack a local directory into a Pond volume. From 4ecaea6cb6c8fe604ccefa6c7fe900d660ad4faf Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Fri, 21 Nov 2025 16:56:46 +0000 Subject: [PATCH 03/17] inline some things --- pond/src/volume.rs | 46 ++++++++++++++++++---------------------------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 6cbdb60..b9937d7 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -123,8 +123,7 @@ impl Volume { match ino { Ino::CLEAR_CACHE | Ino::COMMIT => Ok(()), ino => { - self.meta - .write() + self.metadata_mut() .modify(ino, mtime, ctime, location, range)?; Ok(()) } @@ -148,13 +147,11 @@ impl Volume { } pub fn to_bytes(&self) -> Result> { - let meta = self.metadata(); - meta.to_bytes() + self.metadata().to_bytes() } pub fn to_bytes_with_version(&self, version: &Version) -> Result> { - let meta = self.metadata(); - meta.to_bytes_with_version(version) + self.metadata().to_bytes_with_version(version) } pub fn getattr(&self, ino: Ino) -> Result { @@ -171,19 +168,16 @@ impl Volume { mtime: Option, ctime: Option, ) -> Result { - let mut meta = self.metadata_mut(); - meta.setattr(ino, mtime, ctime).cloned() + self.metadata_mut().setattr(ino, mtime, ctime).cloned() } pub fn lookup(&self, parent: Ino, name: &str) -> Result> { scoped_timer!("pond_volume_lookup_latency_secs"); - let meta = self.metadata(); - Ok(meta.lookup(parent, name)?.cloned()) + Ok(self.metadata().lookup(parent, name)?.cloned()) } pub fn mkdir(&self, parent: Ino, name: String) -> Result { - let mut meta = self.metadata_mut(); - meta.mkdir(parent, name).cloned() + self.metadata_mut().mkdir(parent, name).cloned() } pub fn rmdir(&self, parent: Ino, name: &str) -> Result<()> { @@ -271,11 +265,10 @@ impl Volume { Ino::CLEAR_CACHE => new_fd(&self.fds, ino, FileDescriptor::ClearCache), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), ino => { - let location = { - let meta = self.metadata(); - meta.location(ino) - .map(|(location, range)| (location.clone(), *range)) - }; + let location = self + .metadata() + .location(ino) + .map(|(location, range)| (location.clone(), *range)); match location { Some((Location::Staged { path }, _)) => { let file = open_file(&path, true).await?; @@ -321,11 +314,10 @@ impl Volume { } Ino::COMMIT | Ino::CLEAR_CACHE => Err(ErrorKind::PermissionDenied.into()), ino => { - let location = { - let meta = self.metadata(); - meta.location(ino) - .map(|(location, range)| (location.clone(), *range)) - }; + let location = self + .metadata() + .location(ino) + .map(|(location, range)| (location.clone(), *range)); match location { Some((Location::Staged { path }, _)) => { @@ -754,8 +746,9 @@ impl<'a> StagedVolume<'a> { let mut buf = BytesMut::with_capacity(Self::MPU_UPLOAD_SIZE); let mut read_buf = vec![0u8; Self::READ_BUF_SIZE]; let staged_files: Vec<(FileAttr, std::path::PathBuf)> = { - let meta = self.inner.metadata(); - meta.iter_staged() + self.inner + .metadata() + .iter_staged() .map(|(attr, path)| (attr.clone(), path.clone())) .collect() }; @@ -1025,10 +1018,7 @@ mod tests { let volume = client.create_volume().await; // clean volume -- this is not staged - { - let meta = volume.metadata(); - assert!(!meta.is_staged()); - } + assert!(!volume.metadata().is_staged()); // creating two files, it should be a staged volume now. let (attr1, fd1) = volume.create(Ino::Root, "hello.txt".into(), true).unwrap(); From 44f1bb1778f6e106b84b09ded0a3b4af652d4f23 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Fri, 21 Nov 2025 21:59:14 +0000 Subject: [PATCH 04/17] readdir returns copies of the dirents --- pond-fs/src/fuse.rs | 44 ++++++++++++++++++++++++++++++++++++++---- pond/src/lib.rs | 44 +++++++++++++++++++++--------------------- pond/src/metadata.rs | 46 +++++++++++++++++++++++++++++++++----------- pond/src/volume.rs | 24 ++++++++++++++--------- 4 files changed, 112 insertions(+), 46 deletions(-) diff --git a/pond-fs/src/fuse.rs b/pond-fs/src/fuse.rs index 4b8aa32..2cdb92c 100644 --- a/pond-fs/src/fuse.rs +++ b/pond-fs/src/fuse.rs @@ -1,5 +1,9 @@ use pond::{ErrorKind, Fd, Ino, Volume}; +use std::collections::HashMap; use std::ffi::OsStr; +use std::hash::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; use std::time::Duration; use std::time::SystemTime; use tracing::instrument; @@ -10,6 +14,7 @@ pub struct Pond { uid: u32, gid: u32, kernel_cache_timeout: Duration, + readdir_offsets: HashMap<(Ino, i64), String>, } impl Pond { @@ -28,6 +33,7 @@ impl Pond { uid, gid, kernel_cache_timeout, + readdir_offsets: Default::default(), } } } @@ -119,16 +125,40 @@ impl fuser::Filesystem for Pond { offset: i64, mut reply: fuser::ReplyDirectory, ) { - let iter = fs_try!(reply, self.volume.readdir(ino.into())); - let offset = fs_try!(reply, offset.try_into().map_err(|_| ErrorKind::InvalidData)); + // translate the offset into the filename where the last readdir left off before its buffer + // was full. + let offset = { + if offset == 0 { + None + } else { + let fname = fs_try!( + reply, + self.readdir_offsets + // TODO: is it idempotent? should it be? + .remove(&(ino.into(), offset)) + .ok_or_else(|| pond::Error::new( + pond::ErrorKind::InvalidData, + format!("bad offset passed to readdir: {offset}"), + )) + ); + Some(fname) + } + }; - for (i, entry) in iter.enumerate().skip(offset) { + let iter = fs_try!(reply, self.volume.readdir(ino.into(), offset)).peekable(); + let mut last_offset = None; + for entry in iter { let attr = entry.attr(); let name = entry.name(); - let is_full = reply.add(attr.ino.into(), (i + 1) as i64, fuse_kind(attr.kind), name); + let hash = hash(name); + let is_full = reply.add(attr.ino.into(), hash, fuse_kind(attr.kind), name); if is_full { + if let Some((hash, name)) = last_offset { + self.readdir_offsets.insert((ino.into(), hash), name); + } break; } + last_offset = Some((hash, name.to_string())); } reply.ok(); } @@ -451,3 +481,9 @@ fn getuid() -> u32 { fn getgid() -> u32 { unsafe { libc::getgid() } } + +fn hash(s: &str) -> i64 { + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + hasher.finish() as i64 +} diff --git a/pond/src/lib.rs b/pond/src/lib.rs index f33f743..391255f 100644 --- a/pond/src/lib.rs +++ b/pond/src/lib.rs @@ -15,7 +15,7 @@ pub use location::Location; pub use metadata::Version; pub use volume::{Fd, Volume}; -use std::time::SystemTime; +use std::{path::PathBuf, time::SystemTime}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FileType { @@ -146,7 +146,7 @@ impl FileAttr { } #[derive(Debug, Clone)] -pub struct DirEntry<'a> { +pub struct DirEntryRef<'a> { name: &'a str, parents: Vec<&'a str>, attr: &'a FileAttr, @@ -154,7 +154,7 @@ pub struct DirEntry<'a> { data: &'a metadata::EntryData, } -impl<'a> DirEntry<'a> { +impl<'a> DirEntryRef<'a> { pub fn name(&self) -> &str { self.name } @@ -187,18 +187,25 @@ impl<'a> DirEntry<'a> { } } -/// Owned equivalent of `DirEntry` that does not borrow from the underlying volume. +/// Owned equivalent of [`DirEntryRef`] that does not borrow from the underlying volume. #[derive(Debug, Clone)] -pub struct OwnedDirEntry { - name: String, - parents: Vec, +pub struct DirEntry { + // TODO: swap this out for camino Utf8PathBuf + path: PathBuf, attr: FileAttr, location: Option<(Location, ByteRange)>, } -impl OwnedDirEntry { +impl DirEntry { pub fn name(&self) -> &str { - &self.name + self.path + .file_name() + .and_then(|f| f.to_str()) + .expect("should be a valid utf-8 string") + } + + pub fn path(&self) -> &str { + self.path.to_str().expect("should be a valid utf-8 string") } pub fn attr(&self) -> &FileAttr { @@ -206,15 +213,7 @@ impl OwnedDirEntry { } pub fn location(&self) -> Option<(&Location, ByteRange)> { - self.location - .as_ref() - .map(|(loc, range)| (loc, *range)) - } - - pub fn path(&self) -> String { - let mut path = self.parents.clone(); - path.push(self.name.clone()); - path.join("/") + self.location.as_ref().map(|(loc, range)| (loc, *range)) } pub fn is_regular(&self) -> bool { @@ -222,12 +221,13 @@ impl OwnedDirEntry { } } -impl<'a> From> for OwnedDirEntry { - fn from(entry: DirEntry<'a>) -> Self { +impl<'a> From> for DirEntry { + fn from(entry: DirEntryRef<'a>) -> Self { let location = entry.location().map(|(loc, range)| (loc.clone(), range)); + let mut path: PathBuf = entry.parents.iter().collect(); + path.push(entry.name()); Self { - name: entry.name().to_string(), - parents: entry.parents.iter().map(|p| p.to_string()).collect(), + path, attr: entry.attr().clone(), location, } diff --git a/pond/src/metadata.rs b/pond/src/metadata.rs index a7915db..12da24b 100644 --- a/pond/src/metadata.rs +++ b/pond/src/metadata.rs @@ -7,7 +7,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use crate::{ByteRange, DirEntry, Error, FileAttr, FileType, Ino, Location, error::ErrorKind}; +use crate::{ByteRange, DirEntryRef, Error, FileAttr, FileType, Ino, Location, error::ErrorKind}; // TODO: we duplicate file/dir names as strings in data values and entry keys. // have to figure out how to intern somewhere if we want to stop, and probably @@ -754,12 +754,20 @@ impl VolumeMetadata { /// `(filename, attr)` pairs. /// /// Iterator order is not guaranteed to be stable. - pub(crate) fn readdir<'a>(&'a self, ino: Ino) -> crate::Result> { + pub(crate) fn readdir<'a>( + &'a self, + ino: Ino, + offset: Option, + ) -> crate::Result> { let parents = self.dir_path(ino)?; + let range = match offset { + Some(offset) => self.dirs.range(entry_range_with_offset(ino, offset)?), + None => self.dirs.range(entry_range(ino)?), + }; Ok(ReadDir { data: &self.data, locations: &self.locations, - range: self.dirs.range(entry_range(ino)?), + range, parents, }) } @@ -943,6 +951,18 @@ fn entry_range(ino: Ino) -> crate::Result>> { Ok(start..end) } +fn entry_range_with_offset( + ino: Ino, + offset: String, +) -> crate::Result>> { + let start: EntryKey = (ino, offset).into(); + let end: EntryKey = (ino.add(1)?, "").into(); + Ok(( + std::ops::Bound::Excluded(start), + std::ops::Bound::Excluded(end), + )) +} + /// The iterator returned from [readdir][Volume::readdir]. pub(crate) struct ReadDir<'a> { data: &'a BTreeMap, @@ -952,7 +972,7 @@ pub(crate) struct ReadDir<'a> { } impl<'a> Iterator for ReadDir<'a> { - type Item = DirEntry<'a>; + type Item = DirEntryRef<'a>; fn next(&mut self) -> Option { self.range.next().map(|(EntryKey(_), ino)| { @@ -961,7 +981,7 @@ impl<'a> Iterator for ReadDir<'a> { .get(ino) .unwrap_or_else(|| panic!("BUG: invalid dirent: ino={ino:?}")); - DirEntry { + DirEntryRef { name: &dent.name, parents: self.parents.clone(), attr: &dent.attr, @@ -987,7 +1007,7 @@ impl<'a> Iterator for WalkIter<'a> { // TODO: it's a big gnarly to be cloning and returning the ancestors path // every time but the lifetime on returning a slice referencing self // is a pain to express. - type Item = crate::Result>; + type Item = crate::Result>; fn next(&mut self) -> Option { while !self.readdirs.is_empty() { @@ -1191,8 +1211,8 @@ mod test { fn readdir_nospecial( v: &VolumeMetadata, ino: Ino, - ) -> crate::Result>> { - Ok(v.readdir(ino)?.filter(|e| e.attr.ino.is_regular())) + ) -> crate::Result>> { + Ok(v.readdir(ino, None)?.filter(|e| e.attr.ino.is_regular())) } #[test] @@ -1284,7 +1304,11 @@ mod test { .collect(); assert_eq!(vec!["a"], names); // should be empty - let names: Vec<_> = volume.readdir(a.ino).unwrap().map(|e| e.name).collect(); + let names: Vec<_> = volume + .readdir(a.ino, None) + .unwrap() + .map(|e| e.name) + .collect(); assert!(names.is_empty()); } @@ -1365,7 +1389,7 @@ mod test { ); assert_eq!( volume - .readdir(a.ino) + .readdir(a.ino, None) .unwrap() .map(|e| e.path()) .collect::>(), @@ -1373,7 +1397,7 @@ mod test { ); assert_eq!( volume - .readdir(b.ino) + .readdir(b.ino, None) .unwrap() .map(|e| e.path()) .collect::>(), diff --git a/pond/src/volume.rs b/pond/src/volume.rs index b9937d7..7558ea0 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -1,5 +1,5 @@ use crate::{ - ByteRange, Error, FileAttr, FileType, Ino, Location, OwnedDirEntry, Result, + ByteRange, DirEntry, Error, FileAttr, FileType, Ino, Location, Result, cache::{CacheConfig, ChunkCache}, error::ErrorKind, metadata::{Modify, Version, VolumeMetadata}, @@ -191,9 +191,13 @@ impl Volume { Ok(()) } - pub fn readdir(&self, ino: Ino) -> Result> { + pub fn readdir( + &self, + ino: Ino, + offset: Option, + ) -> Result> { let guard = self.metadata(); - let entries: Vec = guard.readdir(ino)?.map(OwnedDirEntry::from).collect(); + let entries: Vec = guard.readdir(ino, offset)?.map(DirEntry::from).collect(); Ok(entries.into_iter()) } @@ -482,14 +486,14 @@ fn read_from_buf(from: &[u8], offset: u64, to: &mut [u8]) -> Result { pub struct WalkVolume<'a> { guard: RwLockReadGuard<'a, VolumeMetadata>, - stack: Vec>, + stack: Vec>, } impl<'a> WalkVolume<'a> { fn new(guard: RwLockReadGuard<'a, VolumeMetadata>, ino: Ino) -> Result { let entries = guard - .readdir(ino)? - .map(OwnedDirEntry::from) + .readdir(ino, None)? + .map(DirEntry::from) .collect::>(); Ok(Self { guard, @@ -500,8 +504,8 @@ impl<'a> WalkVolume<'a> { fn push_dir_entries(&mut self, ino: Ino) -> Result<()> { let entries = self .guard - .readdir(ino)? - .map(OwnedDirEntry::from) + .readdir(ino, None)? + .map(DirEntry::from) .collect::>(); self.stack.push(entries.into_iter()); Ok(()) @@ -509,7 +513,7 @@ impl<'a> WalkVolume<'a> { } impl<'a> Iterator for WalkVolume<'a> { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { loop { @@ -529,6 +533,8 @@ impl<'a> Iterator for WalkVolume<'a> { } impl Volume { + /// Returns a guarded iterator over the entire volume. The guard ensures that iteration is done + /// over a consistent view of the Volume. pub fn walk(&self, ino: Ino) -> Result> { WalkVolume::new(self.metadata(), ino) } From ee720c303f73d312acc094e5f57dbe0aba221d24 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 24 Nov 2025 06:17:07 +0000 Subject: [PATCH 05/17] readdir with better offsets --- pond-fs/src/fuse.rs | 84 ++++++++++++++++++++++++++++----------------- pond/src/volume.rs | 7 +++- 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/pond-fs/src/fuse.rs b/pond-fs/src/fuse.rs index 2cdb92c..cd4a098 100644 --- a/pond-fs/src/fuse.rs +++ b/pond-fs/src/fuse.rs @@ -8,13 +8,17 @@ use std::time::Duration; use std::time::SystemTime; use tracing::instrument; +type FilenameHash = i64; + +const READDIR_BATCH: usize = 64; + pub struct Pond { volume: Volume, runtime: tokio::runtime::Runtime, uid: u32, gid: u32, kernel_cache_timeout: Duration, - readdir_offsets: HashMap<(Ino, i64), String>, + readdir_offsets: HashMap<(Ino, FilenameHash), String>, } impl Pond { @@ -122,43 +126,61 @@ impl fuser::Filesystem for Pond { _req: &fuser::Request<'_>, ino: u64, _fh: u64, - offset: i64, + offset: FilenameHash, mut reply: fuser::ReplyDirectory, ) { // translate the offset into the filename where the last readdir left off before its buffer - // was full. - let offset = { - if offset == 0 { - None - } else { - let fname = fs_try!( - reply, - self.readdir_offsets - // TODO: is it idempotent? should it be? - .remove(&(ino.into(), offset)) - .ok_or_else(|| pond::Error::new( - pond::ErrorKind::InvalidData, - format!("bad offset passed to readdir: {offset}"), - )) - ); - Some(fname) - } + // was full. keep the cookie around so we can fall back to it if nothing new was returned. + let mut token: Option = if offset == 0 { + None + } else { + let name = fs_try!( + reply, + // note that we aren't removing it here. this means that the map grows over the + // lifetime of the mount. this allows uses to call readdir with the same offset + // multiple times, whereas removing it would cause subsequent calls to fail + // completely. if this becomes a problem, we can revisit removing the entry here or + // having a TTL map. + self.readdir_offsets + .get(&(ino.into(), offset)) + .ok_or_else(|| pond::Error::new( + pond::ErrorKind::InvalidData, + format!("bad offset passed to readdir: {offset}"), + )) + ); + Some(name.clone()) }; - let iter = fs_try!(reply, self.volume.readdir(ino.into(), offset)).peekable(); - let mut last_offset = None; - for entry in iter { - let attr = entry.attr(); - let name = entry.name(); - let hash = hash(name); - let is_full = reply.add(attr.ino.into(), hash, fuse_kind(attr.kind), name); - if is_full { - if let Some((hash, name)) = last_offset { - self.readdir_offsets.insert((ino.into(), hash), name); + 'outer: loop { + let chunk_iter = fs_try!( + reply, + self.volume + .readdir(ino.into(), token.clone(), READDIR_BATCH,) + ); + + let mut num_entries = 0; + for entry in chunk_iter { + num_entries += 1; + let name = entry.name(); + let attr = entry.attr(); + + let full_buffer = + reply.add(attr.ino.into(), hash(name), fuse_kind(attr.kind), name); + if full_buffer { + break 'outer; } + + token = Some(name.to_string()); + } + + // if this emits less than the READDIR_BATCH we asked for, we've reached EOF. + if num_entries < READDIR_BATCH { break; } - last_offset = Some((hash, name.to_string())); + } + + if let Some(name) = token { + self.readdir_offsets.insert((ino.into(), hash(&name)), name); } reply.ok(); } @@ -482,7 +504,7 @@ fn getgid() -> u32 { unsafe { libc::getgid() } } -fn hash(s: &str) -> i64 { +fn hash(s: &str) -> FilenameHash { let mut hasher = DefaultHasher::new(); s.hash(&mut hasher); hasher.finish() as i64 diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 7558ea0..249d8b9 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -195,9 +195,14 @@ impl Volume { &self, ino: Ino, offset: Option, + size: usize, ) -> Result> { let guard = self.metadata(); - let entries: Vec = guard.readdir(ino, offset)?.map(DirEntry::from).collect(); + let entries: Vec = guard + .readdir(ino, offset)? + .take(size) + .map(DirEntry::from) + .collect(); Ok(entries.into_iter()) } From ba743b965b5e35aef47c2d3ae5a36d05de96aca2 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 24 Nov 2025 16:43:43 +0000 Subject: [PATCH 06/17] use camino in DirEntry --- Cargo.lock | 7 +++++++ Cargo.toml | 1 + pond/Cargo.toml | 1 + pond/src/lib.rs | 15 ++++++--------- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 30cbc87..adcfa5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,6 +283,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5c434ae3cf0089ca203e9019ebe529c47ff45cefe8af7c85ecb734ef541822f" +[[package]] +name = "camino" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "276a59bf2b2c967788139340c9f0c5b12d7fd6630315c15c217e559de85d2609" + [[package]] name = "cc" version = "1.2.43" @@ -1781,6 +1787,7 @@ dependencies = [ "arc-swap", "backon", "bytes", + "camino", "flatbuffers", "foyer", "foyer-memory", diff --git a/Cargo.toml b/Cargo.toml index c4796a0..273e71d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ arc-swap = "1.7.1" backon = { version = "1.6.0", default-features = false, features = ["tokio-sleep"] } bytes = "1.10" bytesize = "2.1.0" +camino = "1.2.1" clap = { version = "4.5.47", features = ["derive"] } flatbuffers = "25.2" foyer = "0.20.0" diff --git a/pond/Cargo.toml b/pond/Cargo.toml index beb0879..38339e3 100644 --- a/pond/Cargo.toml +++ b/pond/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" arc-swap.workspace = true backon.workspace = true bytes.workspace = true +camino.workspace = true flatbuffers.workspace = true foyer.workspace = true foyer-memory.workspace = true diff --git a/pond/src/lib.rs b/pond/src/lib.rs index 391255f..692e93a 100644 --- a/pond/src/lib.rs +++ b/pond/src/lib.rs @@ -9,13 +9,14 @@ mod metrics; mod storage; mod volume; +use camino::Utf8PathBuf; pub use client::Client; pub use error::{Error, ErrorKind, Result}; pub use location::Location; pub use metadata::Version; pub use volume::{Fd, Volume}; -use std::{path::PathBuf, time::SystemTime}; +use std::time::SystemTime; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FileType { @@ -190,22 +191,18 @@ impl<'a> DirEntryRef<'a> { /// Owned equivalent of [`DirEntryRef`] that does not borrow from the underlying volume. #[derive(Debug, Clone)] pub struct DirEntry { - // TODO: swap this out for camino Utf8PathBuf - path: PathBuf, + path: Utf8PathBuf, attr: FileAttr, location: Option<(Location, ByteRange)>, } impl DirEntry { pub fn name(&self) -> &str { - self.path - .file_name() - .and_then(|f| f.to_str()) - .expect("should be a valid utf-8 string") + self.path.file_name().expect("BUG: path ends in '..'") } pub fn path(&self) -> &str { - self.path.to_str().expect("should be a valid utf-8 string") + self.path.as_str() } pub fn attr(&self) -> &FileAttr { @@ -224,7 +221,7 @@ impl DirEntry { impl<'a> From> for DirEntry { fn from(entry: DirEntryRef<'a>) -> Self { let location = entry.location().map(|(loc, range)| (loc.clone(), range)); - let mut path: PathBuf = entry.parents.iter().collect(); + let mut path: Utf8PathBuf = entry.parents.iter().collect(); path.push(entry.name()); Self { path, From 83c8409cc5947f06cb9f0aa5211afe0aedc3d2fe Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 24 Nov 2025 16:55:33 +0000 Subject: [PATCH 07/17] helper for get_fd, more explicit about guard naming --- pond/src/volume.rs | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 249d8b9..41b7e80 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -112,6 +112,10 @@ impl Volume { self.meta.write() } + fn get_fd(&self, fd: &Fd) -> Option { + self.fds.read().get(fd).cloned() + } + pub(crate) fn modify( &self, ino: Ino, @@ -210,15 +214,16 @@ impl Volume { let (path, file) = self.store.tempfile()?; let attr = { - let mut meta = self.metadata_mut(); - meta.create( - parent, - name, - exclusive, - Location::Staged { path }, - ByteRange::empty(), - )? - .clone() + let mut guard = self.metadata_mut(); + guard + .create( + parent, + name, + exclusive, + Location::Staged { path }, + ByteRange::empty(), + )? + .clone() }; let fd = new_fd( @@ -274,11 +279,11 @@ impl Volume { Ino::CLEAR_CACHE => new_fd(&self.fds, ino, FileDescriptor::ClearCache), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), ino => { - let location = self + let guard = self .metadata() .location(ino) .map(|(location, range)| (location.clone(), *range)); - match location { + match guard { Some((Location::Staged { path }, _)) => { let file = open_file(&path, true).await?; @@ -323,12 +328,12 @@ impl Volume { } Ino::COMMIT | Ino::CLEAR_CACHE => Err(ErrorKind::PermissionDenied.into()), ino => { - let location = self + let guard = self .metadata() .location(ino) .map(|(location, range)| (location.clone(), *range)); - match location { + match guard { Some((Location::Staged { path }, _)) => { let file = open_file(&path, false).await?; new_fd(&self.fds, ino, FileDescriptor::Staged { file: file.into() }) @@ -346,8 +351,7 @@ impl Volume { pub async fn read_at(&self, fd: Fd, offset: u64, buf: &mut [u8]) -> Result { metrics::histogram!("pond_volume_read_buf_size_bytes").record(buf.len() as f64); - let descriptor = { self.fds.read().get(&fd).cloned() }; - + let descriptor = self.get_fd(&fd); match descriptor { // reads of write-only special fds do nothing Some(FileDescriptor::ClearCache) | Some(FileDescriptor::Commit) => Ok(0), @@ -384,8 +388,7 @@ impl Volume { pub async fn write_at(&self, fd: Fd, offset: u64, data: &[u8]) -> Result { metrics::histogram!("pond_volume_write_buf_size_bytes").record(data.len() as f64); - let descriptor = { self.fds.read().get(&fd).cloned() }; - + let descriptor = self.get_fd(&fd); match descriptor { Some(FileDescriptor::ClearCache) => { self.cache.clear(); From 98985a487e61fa437b02582273c40b05f89cb160 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 24 Nov 2025 20:10:13 +0000 Subject: [PATCH 08/17] be more explicit about locking, don't hide behind helpers --- pond/src/volume.rs | 139 +++++++++++++++++++++++---------------------- 1 file changed, 71 insertions(+), 68 deletions(-) diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 41b7e80..d647c52 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -10,7 +10,7 @@ use arc_swap::ArcSwap; use backon::{ExponentialBuilder, Retryable}; use bytes::{Bytes, BytesMut}; use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; -use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use parking_lot::{RwLock, RwLockReadGuard}; use std::{ collections::BTreeMap, io::{BufReader, Read}, @@ -74,6 +74,8 @@ enum FileDescriptor { }, } +// NOTE: When locking both meta and fds for modification, meta should always be locked first to +// avoid deadlocks! pub struct Volume { meta: Arc>, cache: ChunkCache, @@ -104,18 +106,6 @@ impl Volume { } } - pub(crate) fn metadata(&self) -> RwLockReadGuard<'_, VolumeMetadata> { - self.meta.read() - } - - pub(crate) fn metadata_mut(&self) -> RwLockWriteGuard<'_, VolumeMetadata> { - self.meta.write() - } - - fn get_fd(&self, fd: &Fd) -> Option { - self.fds.read().get(fd).cloned() - } - pub(crate) fn modify( &self, ino: Ino, @@ -127,7 +117,8 @@ impl Volume { match ino { Ino::CLEAR_CACHE | Ino::COMMIT => Ok(()), ino => { - self.metadata_mut() + self.meta + .write() .modify(ino, mtime, ctime, location, range)?; Ok(()) } @@ -135,7 +126,7 @@ impl Volume { } pub fn version(&self) -> Version { - self.metadata().version().clone() + self.meta.read().version().clone() } pub fn object_store_description(&self) -> String { @@ -151,16 +142,16 @@ impl Volume { } pub fn to_bytes(&self) -> Result> { - self.metadata().to_bytes() + self.meta.read().to_bytes() } pub fn to_bytes_with_version(&self, version: &Version) -> Result> { - self.metadata().to_bytes_with_version(version) + self.meta.read().to_bytes_with_version(version) } pub fn getattr(&self, ino: Ino) -> Result { scoped_timer!("pond_volume_getattr_latency_secs"); - match self.metadata().getattr(ino) { + match self.meta.read().getattr(ino) { Some(attr) => Ok(attr.clone()), None => Err(ErrorKind::NotFound.into()), } @@ -172,26 +163,25 @@ impl Volume { mtime: Option, ctime: Option, ) -> Result { - self.metadata_mut().setattr(ino, mtime, ctime).cloned() + self.meta.write().setattr(ino, mtime, ctime).cloned() } pub fn lookup(&self, parent: Ino, name: &str) -> Result> { scoped_timer!("pond_volume_lookup_latency_secs"); - Ok(self.metadata().lookup(parent, name)?.cloned()) + Ok(self.meta.read().lookup(parent, name)?.cloned()) } pub fn mkdir(&self, parent: Ino, name: String) -> Result { - self.metadata_mut().mkdir(parent, name).cloned() + self.meta.write().mkdir(parent, name).cloned() } pub fn rmdir(&self, parent: Ino, name: &str) -> Result<()> { - self.metadata_mut().rmdir(parent, name)?; + self.meta.write().rmdir(parent, name)?; Ok(()) } pub fn rename(&self, parent: Ino, name: &str, newparent: Ino, newname: String) -> Result<()> { - self.metadata_mut() - .rename(parent, name, newparent, newname)?; + self.meta.write().rename(parent, name, newparent, newname)?; Ok(()) } @@ -201,8 +191,8 @@ impl Volume { offset: Option, size: usize, ) -> Result> { - let guard = self.metadata(); - let entries: Vec = guard + let metadata = self.meta.read(); + let entries: Vec = metadata .readdir(ino, offset)? .take(size) .map(DirEntry::from) @@ -214,8 +204,8 @@ impl Volume { let (path, file) = self.store.tempfile()?; let attr = { - let mut guard = self.metadata_mut(); - guard + let mut metadata = self.meta.write(); + metadata .create( parent, name, @@ -227,7 +217,7 @@ impl Volume { }; let fd = new_fd( - &self.fds, + &mut self.fds.write(), attr.ino, FileDescriptor::Staged { file: file.into() }, )?; @@ -235,7 +225,7 @@ impl Volume { } pub fn delete(&self, parent: Ino, name: &str) -> Result<()> { - self.metadata_mut().delete(parent, name)?; + self.meta.write().delete(parent, name)?; Ok(()) } @@ -273,21 +263,26 @@ impl Volume { /// Open a Fd to a locally staged file for reading and writing. /// /// Opening a Fd with write permissions will always truncate the file. + // known false positive: https://github.com/rust-lang/rust-clippy/issues/6446 + #[allow(clippy::await_holding_lock)] pub async fn open_read_write(&self, ino: Ino) -> Result { match ino { - Ino::COMMIT => new_fd(&self.fds, ino, FileDescriptor::Commit), - Ino::CLEAR_CACHE => new_fd(&self.fds, ino, FileDescriptor::ClearCache), + Ino::COMMIT => new_fd(&mut self.fds.write(), ino, FileDescriptor::Commit), + Ino::CLEAR_CACHE => new_fd(&mut self.fds.write(), ino, FileDescriptor::ClearCache), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), ino => { - let guard = self - .metadata() - .location(ino) - .map(|(location, range)| (location.clone(), *range)); - match guard { + let mut metadata_guard = self.meta.write(); + match metadata_guard.location(ino) { Some((Location::Staged { path }, _)) => { + let path = path.clone(); + std::mem::drop(metadata_guard); // guard is dropped here before the await let file = open_file(&path, true).await?; - new_fd(&self.fds, ino, FileDescriptor::Staged { file: file.into() }) + new_fd( + &mut self.fds.write(), + ino, + FileDescriptor::Staged { file: file.into() }, + ) } Some((Location::Committed { .. }, ..)) => { // truncate the file (by assigning it a brand new staged file) if it's @@ -296,7 +291,7 @@ impl Volume { let (path, file) = self.store.tempfile()?; let staged = Location::Staged { path }; // modify metadata next - self.modify( + metadata_guard.modify( ino, SystemTime::now(), None, @@ -304,7 +299,11 @@ impl Volume { Some(Modify::Set((0, 0).into())), )?; // only create the fd once the file is open and metadata is valid - new_fd(&self.fds, ino, FileDescriptor::Staged { file: file.into() }) + new_fd( + &mut self.fds.write(), + ino, + FileDescriptor::Staged { file: file.into() }, + ) } None => Err(ErrorKind::NotFound.into()), } @@ -312,35 +311,43 @@ impl Volume { } } + // known false positive: https://github.com/rust-lang/rust-clippy/issues/6446 + #[allow(clippy::await_holding_lock)] pub async fn open_read(&self, ino: Ino) -> Result { match ino { - Ino::VERSION => new_fd(&self.fds, ino, FileDescriptor::Version), + Ino::VERSION => new_fd(&mut self.fds.write(), ino, FileDescriptor::Version), Ino::PROM_METRICS => { let data = match &self.metrics_snapshot { Some(metrics) => metrics.load().clone(), None => Default::default(), }; new_fd( - &self.fds, + &mut self.fds.write(), ino, FileDescriptor::PromMetrics { snapshot: data }, ) } Ino::COMMIT | Ino::CLEAR_CACHE => Err(ErrorKind::PermissionDenied.into()), ino => { - let guard = self - .metadata() - .location(ino) - .map(|(location, range)| (location.clone(), *range)); - - match guard { + let metadata_guard = self.meta.read(); + match metadata_guard.location(ino) { Some((Location::Staged { path }, _)) => { + let path = path.clone(); + std::mem::drop(metadata_guard); // guard is dropped here before the await let file = open_file(&path, false).await?; - new_fd(&self.fds, ino, FileDescriptor::Staged { file: file.into() }) + new_fd( + &mut self.fds.write(), + ino, + FileDescriptor::Staged { file: file.into() }, + ) } Some((Location::Committed { key }, range)) => { let key = Arc::new(self.store.child_path(key.as_ref())); - new_fd(&self.fds, ino, FileDescriptor::Committed { key, range }) + new_fd( + &mut self.fds.write(), + ino, + FileDescriptor::Committed { key, range: *range }, + ) } None => Err(ErrorKind::NotFound.into()), } @@ -351,7 +358,7 @@ impl Volume { pub async fn read_at(&self, fd: Fd, offset: u64, buf: &mut [u8]) -> Result { metrics::histogram!("pond_volume_read_buf_size_bytes").record(buf.len() as f64); - let descriptor = self.get_fd(&fd); + let descriptor = self.fds.read().get(&fd).cloned(); match descriptor { // reads of write-only special fds do nothing Some(FileDescriptor::ClearCache) | Some(FileDescriptor::Commit) => Ok(0), @@ -388,7 +395,7 @@ impl Volume { pub async fn write_at(&self, fd: Fd, offset: u64, data: &[u8]) -> Result { metrics::histogram!("pond_volume_write_buf_size_bytes").record(data.len() as f64); - let descriptor = self.get_fd(&fd); + let descriptor = self.fds.read().get(&fd).cloned(); match descriptor { Some(FileDescriptor::ClearCache) => { self.cache.clear(); @@ -544,7 +551,7 @@ impl Volume { /// Returns a guarded iterator over the entire volume. The guard ensures that iteration is done /// over a consistent view of the Volume. pub fn walk(&self, ino: Ino) -> Result> { - WalkVolume::new(self.metadata(), ino) + WalkVolume::new(self.meta.read(), ino) } /// Pack a local directory into a Pond volume. @@ -577,7 +584,7 @@ impl Volume { .components() .map(|c| c.as_os_str().to_string_lossy().to_string()) .collect(); - self.metadata_mut().mkdir_all(Ino::Root, dirs)?; + self.meta.write().mkdir_all(Ino::Root, dirs)?; } // for a file: // @@ -599,7 +606,7 @@ impl Volume { let dirs = dir .components() .map(|c| c.as_os_str().to_string_lossy().to_string()); - self.metadata_mut().mkdir_all(Ino::Root, dirs)?.ino + self.meta.write().mkdir_all(Ino::Root, dirs)?.ino } else { Ino::Root }; @@ -621,7 +628,7 @@ impl Volume { ) })? .len(); - self.metadata_mut().create( + self.meta.write().create( dir_ino, name.to_string_lossy().to_string(), true, @@ -704,12 +711,7 @@ fn copy_into(mut buf: &mut [u8], bytes: &[Bytes]) -> usize { // FIXME: this needs to allocate and check for remaining fds instead of just // trying to increment every time and crashing. it's u64 so we probably won't // hit it for a while but that's jank -fn new_fd( - fd_set: &RwLock>, - ino: Ino, - d: FileDescriptor, -) -> Result { - let mut fd_set = fd_set.write(); +fn new_fd(fd_set: &mut BTreeMap, ino: Ino, d: FileDescriptor) -> Result { let next_fh = fd_set .keys() .last() @@ -761,7 +763,8 @@ impl<'a> StagedVolume<'a> { let mut read_buf = vec![0u8; Self::READ_BUF_SIZE]; let staged_files: Vec<(FileAttr, std::path::PathBuf)> = { self.inner - .metadata() + .meta + .read() .iter_staged() .map(|(attr, path)| (attr.clone(), path.clone())) .collect() @@ -841,7 +844,7 @@ impl<'a> StagedVolume<'a> { } // deduplicate and clean up all hanging staged Locations - self.inner.metadata_mut().clean_staged_locations(dest); + self.inner.meta.write().clean_staged_locations(dest); Ok(()) } @@ -873,7 +876,7 @@ impl<'a> StagedVolume<'a> { match res { Ok(_) => { - self.inner.metadata_mut().set_version(version); + self.inner.meta.write().set_version(version); Ok(()) } // TODO: there's a scenario here where we get an AlreadyExists error returned to us, @@ -1032,7 +1035,7 @@ mod tests { let volume = client.create_volume().await; // clean volume -- this is not staged - assert!(!volume.metadata().is_staged()); + assert!(!volume.meta.read().is_staged()); // creating two files, it should be a staged volume now. let (attr1, fd1) = volume.create(Ino::Root, "hello.txt".into(), true).unwrap(); @@ -1043,7 +1046,7 @@ mod tests { assert_write(&volume, fd2, "world").await; { - let meta = volume.metadata(); + let meta = volume.meta.read(); assert!(meta.is_staged()); for attr in [&attr1, &attr2] { assert!(matches!( @@ -1059,7 +1062,7 @@ mod tests { // after commit, both files are no longer staged { - let meta = volume.metadata(); + let meta = volume.meta.read(); assert!(!meta.is_staged()); for attr in [&attr1, &attr2] { assert!(matches!( From ea5df85e945d5b78dc5654e695f205263b1d7655 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 24 Nov 2025 20:17:35 +0000 Subject: [PATCH 09/17] volume walk without the long-lived readlock --- pond/src/volume.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pond/src/volume.rs b/pond/src/volume.rs index d647c52..75356bf 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -10,7 +10,7 @@ use arc_swap::ArcSwap; use backon::{ExponentialBuilder, Retryable}; use bytes::{Bytes, BytesMut}; use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; -use parking_lot::{RwLock, RwLockReadGuard}; +use parking_lot::RwLock; use std::{ collections::BTreeMap, io::{BufReader, Read}, @@ -499,26 +499,28 @@ fn read_from_buf(from: &[u8], offset: u64, to: &mut [u8]) -> Result { Ok(amt) } -pub struct WalkVolume<'a> { - guard: RwLockReadGuard<'a, VolumeMetadata>, +pub struct WalkVolume { + meta: Arc>, stack: Vec>, } -impl<'a> WalkVolume<'a> { - fn new(guard: RwLockReadGuard<'a, VolumeMetadata>, ino: Ino) -> Result { - let entries = guard +impl WalkVolume { + fn new(meta: Arc>, ino: Ino) -> Result { + let entries = meta + .read() .readdir(ino, None)? .map(DirEntry::from) .collect::>(); Ok(Self { - guard, + meta, stack: vec![entries.into_iter()], }) } fn push_dir_entries(&mut self, ino: Ino) -> Result<()> { let entries = self - .guard + .meta + .read() .readdir(ino, None)? .map(DirEntry::from) .collect::>(); @@ -527,7 +529,7 @@ impl<'a> WalkVolume<'a> { } } -impl<'a> Iterator for WalkVolume<'a> { +impl Iterator for WalkVolume { type Item = Result; fn next(&mut self) -> Option { @@ -550,8 +552,8 @@ impl<'a> Iterator for WalkVolume<'a> { impl Volume { /// Returns a guarded iterator over the entire volume. The guard ensures that iteration is done /// over a consistent view of the Volume. - pub fn walk(&self, ino: Ino) -> Result> { - WalkVolume::new(self.meta.read(), ino) + pub fn walk(&self, ino: Ino) -> Result { + WalkVolume::new(self.meta.clone(), ino) } /// Pack a local directory into a Pond volume. From 9dac10b250a6666c70dd4e2fd23e211da92e1469 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Tue, 25 Nov 2025 21:46:31 +0000 Subject: [PATCH 10/17] more aggressive guard scoping, &mut pack, to_owned impl --- pond-fs/src/lib.rs | 2 +- pond-fs/tests/fuzz.rs | 2 +- pond/src/lib.rs | 24 +++++++++++------------- pond/src/volume.rs | 38 +++++++++++++++++++++++++------------- 4 files changed, 38 insertions(+), 28 deletions(-) diff --git a/pond-fs/src/lib.rs b/pond-fs/src/lib.rs index 2b4284a..06781be 100644 --- a/pond-fs/src/lib.rs +++ b/pond-fs/src/lib.rs @@ -261,7 +261,7 @@ pub fn create( let version = Version::from_str(version.as_ref())?; runtime.block_on(async { - let volume = client.create_volume().await; + let mut volume = client.create_volume().await; volume.pack(dir, version).await?; Ok(()) }) diff --git a/pond-fs/tests/fuzz.rs b/pond-fs/tests/fuzz.rs index a85fde6..1d83303 100644 --- a/pond-fs/tests/fuzz.rs +++ b/pond-fs/tests/fuzz.rs @@ -137,7 +137,7 @@ fn test_pack(expected_dir: &Path, actual_dir: &Path, pack_dir: &Path, entries: V // pack it to the pack_dir runtime .block_on(async { - let volume = client.create_volume().await; + let mut volume = client.create_volume().await; volume.pack(expected_dir, version).await?; Ok::<_, pond::Error>(()) }) diff --git a/pond/src/lib.rs b/pond/src/lib.rs index 692e93a..ec06d0b 100644 --- a/pond/src/lib.rs +++ b/pond/src/lib.rs @@ -186,6 +186,17 @@ impl<'a> DirEntryRef<'a> { pub fn is_regular(&self) -> bool { self.attr.ino.is_regular() } + + pub(crate) fn to_owned(&self) -> DirEntry { + let location = self.location().map(|(loc, range)| (loc.clone(), range)); + let mut path: Utf8PathBuf = self.parents.iter().collect(); + path.push(self.name()); + DirEntry { + path, + attr: self.attr().clone(), + location, + } + } } /// Owned equivalent of [`DirEntryRef`] that does not borrow from the underlying volume. @@ -218,19 +229,6 @@ impl DirEntry { } } -impl<'a> From> for DirEntry { - fn from(entry: DirEntryRef<'a>) -> Self { - let location = entry.location().map(|(loc, range)| (loc.clone(), range)); - let mut path: Utf8PathBuf = entry.parents.iter().collect(); - path.push(entry.name()); - Self { - path, - attr: entry.attr().clone(), - location, - } - } -} - // TODO: add checksums/etags here? #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 75356bf..9172bc8 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -10,7 +10,7 @@ use arc_swap::ArcSwap; use backon::{ExponentialBuilder, Retryable}; use bytes::{Bytes, BytesMut}; use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use std::{ collections::BTreeMap, io::{BufReader, Read}, @@ -195,7 +195,7 @@ impl Volume { let entries: Vec = metadata .readdir(ino, offset)? .take(size) - .map(DirEntry::from) + .map(|e| e.to_owned()) .collect(); Ok(entries.into_iter()) } @@ -214,6 +214,11 @@ impl Volume { ByteRange::empty(), )? .clone() + // we drop the write lock on metadata here -- the metadata and fd updates are + // decoupled anyway, so right now a failed new_fd call doesn't cause us to undo the + // metadata change. if we implemented transaction-like behavior, then it would be more + // useful to hold both locks at the same time but as-is there's no reason to hold the + // metadata for longer. }; let fd = new_fd( @@ -263,15 +268,17 @@ impl Volume { /// Open a Fd to a locally staged file for reading and writing. /// /// Opening a Fd with write permissions will always truncate the file. - // known false positive: https://github.com/rust-lang/rust-clippy/issues/6446 - #[allow(clippy::await_holding_lock)] + #[allow( + clippy::await_holding_lock, + reason = "https://github.com/rust-lang/rust-clippy/issues/6446" + )] pub async fn open_read_write(&self, ino: Ino) -> Result { match ino { Ino::COMMIT => new_fd(&mut self.fds.write(), ino, FileDescriptor::Commit), Ino::CLEAR_CACHE => new_fd(&mut self.fds.write(), ino, FileDescriptor::ClearCache), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), ino => { - let mut metadata_guard = self.meta.write(); + let metadata_guard = self.meta.upgradable_read(); match metadata_guard.location(ino) { Some((Location::Staged { path }, _)) => { let path = path.clone(); @@ -290,8 +297,9 @@ impl Volume { // file locally as a staged file, which can be expensive if it's a large file. let (path, file) = self.store.tempfile()?; let staged = Location::Staged { path }; - // modify metadata next - metadata_guard.modify( + // upgrade the lock to a write lock and modify metadata next. the write + // guard gets dropped after we modify since we don't assign it to anything. + RwLockUpgradableReadGuard::upgrade(metadata_guard).modify( ino, SystemTime::now(), None, @@ -311,8 +319,10 @@ impl Volume { } } - // known false positive: https://github.com/rust-lang/rust-clippy/issues/6446 - #[allow(clippy::await_holding_lock)] + #[allow( + clippy::await_holding_lock, + reason = "https://github.com/rust-lang/rust-clippy/issues/6446" + )] pub async fn open_read(&self, ino: Ino) -> Result { match ino { Ino::VERSION => new_fd(&mut self.fds.write(), ino, FileDescriptor::Version), @@ -343,10 +353,12 @@ impl Volume { } Some((Location::Committed { key }, range)) => { let key = Arc::new(self.store.child_path(key.as_ref())); + let range = *range; + std::mem::drop(metadata_guard); new_fd( &mut self.fds.write(), ino, - FileDescriptor::Committed { key, range: *range }, + FileDescriptor::Committed { key, range }, ) } None => Err(ErrorKind::NotFound.into()), @@ -509,7 +521,7 @@ impl WalkVolume { let entries = meta .read() .readdir(ino, None)? - .map(DirEntry::from) + .map(|e| e.to_owned()) .collect::>(); Ok(Self { meta, @@ -522,7 +534,7 @@ impl WalkVolume { .meta .read() .readdir(ino, None)? - .map(DirEntry::from) + .map(|e| e.to_owned()) .collect::>(); self.stack.push(entries.into_iter()); Ok(()) @@ -557,7 +569,7 @@ impl Volume { } /// Pack a local directory into a Pond volume. - pub async fn pack(&self, dir: impl AsRef, version: Version) -> crate::Result<()> { + pub async fn pack(&mut self, dir: impl AsRef, version: Version) -> crate::Result<()> { if self.store.exists(&version).await? { return Err(Error::new( ErrorKind::AlreadyExists, From 5f9fd295bfdd0ae45579d99d70f05915c6fdc4bf Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Wed, 26 Nov 2025 18:10:19 +0000 Subject: [PATCH 11/17] fix tempfile being sync I/O --- pond-fs/src/fuse.rs | 3 ++- pond/src/storage.rs | 17 +++++++------ pond/src/volume.rs | 58 +++++++++++++++++++++++++++++++++++---------- 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/pond-fs/src/fuse.rs b/pond-fs/src/fuse.rs index cd4a098..f15db81 100644 --- a/pond-fs/src/fuse.rs +++ b/pond-fs/src/fuse.rs @@ -265,7 +265,8 @@ impl fuser::Filesystem for Pond { let name = fs_try!(reply, from_os_str(name)); let (attr, fd) = fs_try!( reply, - self.volume.create(parent.into(), name.to_string(), excl) + self.runtime + .block_on(self.volume.create(parent.into(), name.to_string(), excl)) ); reply.created( &self.kernel_cache_timeout, diff --git a/pond/src/storage.rs b/pond/src/storage.rs index 38cc787..0ac80b4 100644 --- a/pond/src/storage.rs +++ b/pond/src/storage.rs @@ -1,6 +1,7 @@ use futures::TryFutureExt; use object_store::{ObjectStore, aws::AmazonS3Builder, local::LocalFileSystem}; -use std::{fs::File, sync::Arc}; +use rand::distr::{Alphanumeric, SampleString}; +use std::sync::Arc; use tempfile::TempDir; use url::Url; @@ -182,14 +183,12 @@ impl Storage { } impl Storage { - pub(crate) fn tempfile(&self) -> Result<(std::path::PathBuf, File)> { - let f = tempfile::Builder::new() - .disable_cleanup(true) - .tempfile_in(&*self.temp_dir) - .map_err(|e| Error::with_source(e.kind().into(), "failed to create tempfile", e))?; - - let (file, path) = f.into_parts(); - Ok((path.to_path_buf(), file)) + pub(crate) fn new_staged_filepath(&self) -> Result { + // note: we don't check for collisions here. there's 62^64 possible filenames to be + // generated, and that number is large enough that we realistically should never have a + // collision ... + let filename = Alphanumeric.sample_string(&mut rand::rng(), 64); + Ok(self.temp_dir.path().join(filename)) } pub(crate) async fn list_versions(&self) -> Result> { diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 9172bc8..874238c 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -200,8 +200,14 @@ impl Volume { Ok(entries.into_iter()) } - pub fn create(&self, parent: Ino, name: String, exclusive: bool) -> Result<(FileAttr, Fd)> { - let (path, file) = self.store.tempfile()?; + pub async fn create( + &self, + parent: Ino, + name: String, + exclusive: bool, + ) -> Result<(FileAttr, Fd)> { + let path = self.store.new_staged_filepath()?; + let file = open_file(path.as_path(), OpenMode::Create).await?; let attr = { let mut metadata = self.meta.write(); @@ -283,7 +289,7 @@ impl Volume { Some((Location::Staged { path }, _)) => { let path = path.clone(); std::mem::drop(metadata_guard); // guard is dropped here before the await - let file = open_file(&path, true).await?; + let file = open_file(&path, OpenMode::ReadWrite).await?; new_fd( &mut self.fds.write(), @@ -295,8 +301,8 @@ impl Volume { // truncate the file (by assigning it a brand new staged file) if it's // committed. the alternative would be to keep a copy of the committed // file locally as a staged file, which can be expensive if it's a large file. - let (path, file) = self.store.tempfile()?; - let staged = Location::Staged { path }; + let path = self.store.new_staged_filepath()?; + let staged = Location::Staged { path: path.clone() }; // upgrade the lock to a write lock and modify metadata next. the write // guard gets dropped after we modify since we don't assign it to anything. RwLockUpgradableReadGuard::upgrade(metadata_guard).modify( @@ -306,6 +312,7 @@ impl Volume { Some(staged), Some(Modify::Set((0, 0).into())), )?; + let file = open_file(&path, OpenMode::Create).await?; // only create the fd once the file is open and metadata is valid new_fd( &mut self.fds.write(), @@ -344,7 +351,7 @@ impl Volume { Some((Location::Staged { path }, _)) => { let path = path.clone(); std::mem::drop(metadata_guard); // guard is dropped here before the await - let file = open_file(&path, false).await?; + let file = open_file(&path, OpenMode::Read).await?; new_fd( &mut self.fds.write(), ino, @@ -670,11 +677,26 @@ macro_rules! try_sync { }; } -/// Opens a std::fs::File for reading (and writing if `write` is set). -async fn open_file(path: &Path, write: bool) -> Result { +#[derive(Copy, Clone)] +enum OpenMode { + Read, + ReadWrite, + Create, +} + +/// Opens a std::fs::File for reading and optionally writing. +async fn open_file(path: &Path, mode: OpenMode) -> Result { let copy = path.to_path_buf(); + + let mut options = std::fs::File::options(); + match mode { + OpenMode::Read => options.read(true), + OpenMode::ReadWrite => options.read(true).write(true), + OpenMode::Create => options.read(true).write(true).create(true).truncate(true), + }; + try_sync!( - std::fs::File::options().read(true).write(write).open(copy), + options.open(copy), "std::fs::File::open", "failed to open staged file" ) @@ -787,7 +809,7 @@ impl<'a> StagedVolume<'a> { for (attr, path) in staged_files { // don't actually upload anything for zero sized files if attr.size > 0 { - let file = open_file(&path, false).await?; + let file = open_file(&path, OpenMode::Read).await?; let mut reader = BufReader::new(file).take(attr.size); loop { @@ -1006,6 +1028,7 @@ mod tests { let create = async |name, bs| { let (attr, fd) = volume .create(Ino::Root, std::primitive::str::to_string(name), true) + .await .unwrap(); let ino = attr.ino; volume.write_at(fd, 0, bs).await.unwrap(); @@ -1052,10 +1075,16 @@ mod tests { assert!(!volume.meta.read().is_staged()); // creating two files, it should be a staged volume now. - let (attr1, fd1) = volume.create(Ino::Root, "hello.txt".into(), true).unwrap(); + let (attr1, fd1) = volume + .create(Ino::Root, "hello.txt".into(), true) + .await + .unwrap(); let attr1 = attr1.clone(); assert_write(&volume, fd1, "hello").await; - let (attr2, fd2) = volume.create(Ino::Root, "world.txt".into(), true).unwrap(); + let (attr2, fd2) = volume + .create(Ino::Root, "world.txt".into(), true) + .await + .unwrap(); let attr2 = attr2.clone(); assert_write(&volume, fd2, "world").await; @@ -1105,7 +1134,10 @@ mod tests { let volume = client.create_volume().await; // creating a file, but holding the fd (not releasing it) - let (attr, fd) = volume.create(Ino::Root, "hello.txt".into(), true).unwrap(); + let (attr, fd) = volume + .create(Ino::Root, "hello.txt".into(), true) + .await + .unwrap(); let attr = attr.clone(); assert!(matches!( volume.meta.read().location(attr.ino), From c96a0a0408e5c3202fe79afedbd915386cccef77 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Fri, 28 Nov 2025 06:57:54 +0000 Subject: [PATCH 12/17] wip: commit --- pond/src/location.rs | 14 ++- pond/src/metadata.rs | 12 +-- pond/src/volume.rs | 205 +++++++++++++++++++++++++++++++------------ 3 files changed, 165 insertions(+), 66 deletions(-) diff --git a/pond/src/location.rs b/pond/src/location.rs index 441c707..678ea52 100644 --- a/pond/src/location.rs +++ b/pond/src/location.rs @@ -2,14 +2,19 @@ use std::{borrow::Cow, sync::Arc}; #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Location { - Staged { path: std::path::PathBuf }, - Committed { key: Arc }, + Staged { + path: std::path::PathBuf, + generation: u64, + }, + Committed { + key: Arc, + }, } impl std::fmt::Display for Location { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Location::Staged { path } => write!(f, "{}", path.display()), + Location::Staged { path, .. } => write!(f, "{}", path.display()), Location::Committed { key } => write!(f, "{key}"), } } @@ -28,9 +33,10 @@ impl Location { Location::Committed { key } } - pub(crate) fn staged(path: impl AsRef) -> Self { + pub(crate) fn staged(path: impl AsRef, generation: u64) -> Self { Location::Staged { path: path.as_ref().to_path_buf(), + generation, } } } diff --git a/pond/src/metadata.rs b/pond/src/metadata.rs index 12da24b..6fbddff 100644 --- a/pond/src/metadata.rs +++ b/pond/src/metadata.rs @@ -116,7 +116,7 @@ impl Version { /// Because staged locations are effectively dangling pointers, volumes cannot /// be serialized while they're being staged. // # TODO: should we guarantee inodes are stable in the docs? -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct VolumeMetadata { version: Version, @@ -164,13 +164,13 @@ pub(crate) enum EntryData { Dynamic, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] struct EntryKeyRef<'a> { ino: Ino, name: Cow<'a, str>, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] struct EntryKey<'a>(EntryKeyRef<'a>); impl<'a> From<(Ino, &'a str)> for EntryKey<'a> { @@ -828,7 +828,7 @@ impl VolumeMetadata { self.data .iter() .filter_map(|(ino, entry)| match self.location(*ino) { - Some((Location::Staged { path }, _)) => Some((&entry.attr, path)), + Some((Location::Staged { path, .. }, _)) => Some((&entry.attr, path)), _ => None, }) } @@ -1350,7 +1350,7 @@ mod test { c.ino, "hi.txt".to_string(), true, - Location::staged("whatever"), + Location::staged("whatever", 0), ByteRange::empty(), ) .unwrap() @@ -1634,7 +1634,7 @@ mod test { Ino::Root, "grow".to_string(), false, - Location::staged("grow"), + Location::staged("grow", 0), ByteRange::empty(), ) .unwrap() diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 874238c..6900e67 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -10,13 +10,16 @@ use arc_swap::ArcSwap; use backon::{ExponentialBuilder, Retryable}; use bytes::{Bytes, BytesMut}; use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; -use parking_lot::{RwLock, RwLockUpgradableReadGuard}; +use parking_lot::RwLock; use std::{ collections::BTreeMap, io::{BufReader, Read}, os::unix::fs::FileExt, - path::Path, - sync::Arc, + path::{Path, PathBuf}, + sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, + }, time::{Duration, SystemTime}, }; @@ -82,6 +85,14 @@ pub struct Volume { fds: Arc>>, store: crate::storage::Storage, metrics_snapshot: Option>>>, + // file generation number for staged files. the generation gives us an approximation of file + // age relative to other files. the generation is bumped everytime we commit, so we're only at + // risk of an overflow if the user commits 2^64 times throughout the lifetime of this process. + // the generation number does not persist across multiple processes -- it is always reset back + // to 0 since it's attached to staged (ephemeral) files. + generation: AtomicU64, + // is a commit in progress? + commit: AtomicBool, } impl Volume { @@ -103,6 +114,8 @@ impl Volume { fds: Arc::new(RwLock::new(BTreeMap::new())), store, metrics_snapshot, + generation: AtomicU64::new(0), + commit: AtomicBool::new(false), } } @@ -216,7 +229,7 @@ impl Volume { parent, name, exclusive, - Location::Staged { path }, + Location::staged(path, self.generation.load(Ordering::SeqCst)), ByteRange::empty(), )? .clone() @@ -284,12 +297,37 @@ impl Volume { Ino::CLEAR_CACHE => new_fd(&mut self.fds.write(), ino, FileDescriptor::ClearCache), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), ino => { - let metadata_guard = self.meta.upgradable_read(); + // note, we can't use an upgradable read lock here because there might be a race + // condition?? TODO + let mut metadata_guard = self.meta.write(); match metadata_guard.location(ino) { - Some((Location::Staged { path }, _)) => { - let path = path.clone(); - std::mem::drop(metadata_guard); // guard is dropped here before the await - let file = open_file(&path, OpenMode::ReadWrite).await?; + Some((Location::Staged { path, generation }, _)) => { + let file = if *generation < self.generation.load(Ordering::SeqCst) { + // generation mismatch for staged files, which means we're opening this + // up while a commit is running and this staged file is part of the + // snapshot. we treat it as a committed file in this case. + let path = self.store.new_staged_filepath()?; + let staged = Location::staged( + path.clone(), + self.generation.load(Ordering::SeqCst), + ); + metadata_guard.modify( + ino, + SystemTime::now(), + None, + Some(staged), + Some(Modify::Set((0, 0).into())), + )?; + std::mem::drop(metadata_guard); // guard is dropped here before the await + open_file(&path, OpenMode::Create).await? + } else { + let path = path.clone(); + std::mem::drop(metadata_guard); // guard is dropped here before the await + open_file(&path, OpenMode::ReadWrite).await? + }; + + // TODO: if generation numbers don't match up, we're copying and creating + // a new staged file handle. new_fd( &mut self.fds.write(), @@ -302,16 +340,16 @@ impl Volume { // committed. the alternative would be to keep a copy of the committed // file locally as a staged file, which can be expensive if it's a large file. let path = self.store.new_staged_filepath()?; - let staged = Location::Staged { path: path.clone() }; - // upgrade the lock to a write lock and modify metadata next. the write - // guard gets dropped after we modify since we don't assign it to anything. - RwLockUpgradableReadGuard::upgrade(metadata_guard).modify( + let staged = + Location::staged(path.clone(), self.generation.load(Ordering::SeqCst)); + metadata_guard.modify( ino, SystemTime::now(), None, Some(staged), Some(Modify::Set((0, 0).into())), )?; + std::mem::drop(metadata_guard); // guard is dropped here before the await let file = open_file(&path, OpenMode::Create).await?; // only create the fd once the file is open and metadata is valid new_fd( @@ -348,7 +386,7 @@ impl Volume { ino => { let metadata_guard = self.meta.read(); match metadata_guard.location(ino) { - Some((Location::Staged { path }, _)) => { + Some((Location::Staged { path, .. }, _)) => { let path = path.clone(); std::mem::drop(metadata_guard); // guard is dropped here before the await let file = open_file(&path, OpenMode::Read).await?; @@ -479,25 +517,19 @@ impl Volume { )); } - // don't allow staged files to be open - if self - .fds - .read() - .iter() - .any(|(_, desc)| matches!(desc, FileDescriptor::Staged { .. })) - { - return Err(Error::new( - ErrorKind::ResourceBusy, - "all open staged files must be closed before committing", - )); - } + let commit = Commit::new(self)?; - let staged = StagedVolume::new(self); - let (dest, ranges) = staged.upload().await?; - staged.modify(dest, ranges)?; - staged.persist(version).await?; + // take a snapshot of the metadata and walk the volume to get a handle to each staged file. + let (metadata_snapshot, files) = commit.snapshot()?; + let (dest, range) = commit.upload_files(files).await?; + let mut metadata_snapshot = Commit::relocate_staged_files(metadata_snapshot, range, dest)?; + commit + .upload_metadata(&mut metadata_snapshot, version) + .await?; - Ok(()) + // reconcile any differences between what we just uploaded and any modifications that + // occurred since we took the snapshot. + commit.sync(metadata_snapshot) } } @@ -653,7 +685,7 @@ impl Volume { dir_ino, name.to_string_lossy().to_string(), true, - Location::staged(entry.path()), + Location::staged(entry.path(), self.generation.load(Ordering::SeqCst)), ByteRange { offset: 0, len }, )?; } @@ -677,7 +709,6 @@ macro_rules! try_sync { }; } -#[derive(Copy, Clone)] enum OpenMode { Read, ReadWrite, @@ -766,13 +797,34 @@ macro_rules! try_mpu { }; } -struct StagedVolume<'a> { +struct Commit<'a> { inner: &'a Volume, } -impl<'a> StagedVolume<'a> { - fn new(inner: &'a Volume) -> Self { - Self { inner } +impl<'a> Drop for Commit<'a> { + fn drop(&mut self) { + self.inner.commit.store(false, Ordering::SeqCst); + } +} + +impl<'a> Commit<'a> { + fn new(inner: &'a Volume) -> Result { + // only allow commits if another commit isn't already in-flight, set the commit bool if + // we're okay to proceed. + match inner + .commit + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + { + Ok(_) => (), + Err(_) => { + return Err(Error::new( + ErrorKind::ResourceBusy, + "a commit is already in progress", + )); + } + } + + Ok(Self { inner }) } /// The size of each part within the multipart upload (excluding the last which is allowed to @@ -780,12 +832,51 @@ impl<'a> StagedVolume<'a> { const MPU_UPLOAD_SIZE: usize = 32 * 1024 * 1024; const READ_BUF_SIZE: usize = 8 * 1024; + /// Takes a snapshot of the metadata and the files we need to upload as a part of this commit + /// while holding a lock for the metadata and file descriptor map. + fn snapshot(&self) -> Result<(VolumeMetadata, Vec<(FileAttr, PathBuf)>)> { + // read lock on metadata, others can read during the snapshot process, but can't modify. + let metadata = self.inner.meta.read(); + // write lock on the fds, no one is allowed to open new fds while we're taking a snapshot. + let fds = self.inner.fds.write(); + + // don't allow open fds to staged files during the snapshot process. + if fds + .iter() + .any(|(_, desc)| matches!(desc, FileDescriptor::Staged { .. })) + { + return Err(Error::new( + ErrorKind::ResourceBusy, + "all open staged files must be closed before committing", + )); + } + + // bump the generation number for new staged files. commit will only upload and take + // snapshots of files with generation numbers less than this value. this draws a + // line in the sand on what will be included in the commit. if someone opens a staged file + // and sees that its generation number is less than the current generation, and commit is + // inprogress, then we'll treat it similarly to opening a committed file (i.e. truncating + // it). + self.inner.generation.fetch_add(1, Ordering::SeqCst); + + let snapshot: Vec<_> = metadata + .iter_staged() + .map(|(attr, path)| (attr.clone(), path.clone())) + .collect(); + let copy = metadata.clone(); + + Ok((copy, snapshot)) + } + /// Upload all staged files into a single blob under base. /// /// Returns the location of the newly uploaded blob and a vector that maps each newly written /// Ino to its ByteRange within the new blob. Files are uploaded to the blob using multipart /// uploads. - async fn upload(&self) -> Result<(Location, Vec<(Ino, ByteRange)>)> { + async fn upload_files( + &self, + files: Vec<(FileAttr, PathBuf)>, + ) -> Result<(Location, Vec<(Ino, ByteRange)>)> { let (dest_name, dest) = self.inner.store.new_data_file(); let mut offset = 0; @@ -797,16 +888,7 @@ impl<'a> StagedVolume<'a> { let mut buf = BytesMut::with_capacity(Self::MPU_UPLOAD_SIZE); let mut read_buf = vec![0u8; Self::READ_BUF_SIZE]; - let staged_files: Vec<(FileAttr, std::path::PathBuf)> = { - self.inner - .meta - .read() - .iter_staged() - .map(|(attr, path)| (attr.clone(), path.clone())) - .collect() - }; - - for (attr, path) in staged_files { + for (attr, path) in files { // don't actually upload anything for zero sized files if attr.size > 0 { let file = open_file(&path, OpenMode::Read).await?; @@ -866,11 +948,16 @@ impl<'a> StagedVolume<'a> { Ok((Location::committed(dest_name), staged)) } - /// Relocate all staged files to dest. - fn modify(&self, dest: Location, ranges: Vec<(Ino, ByteRange)>) -> Result<()> { + /// Relocate the given VolumeMetadata such that all files present in `ranges` have their + /// location modified to `dest`. + fn relocate_staged_files( + mut metadata: VolumeMetadata, + ranges: Vec<(Ino, ByteRange)>, + dest: Location, + ) -> Result { let now = SystemTime::now(); for (ino, byte_range) in ranges { - self.inner.modify( + metadata.modify( ino, now, Some(now), @@ -880,15 +967,15 @@ impl<'a> StagedVolume<'a> { } // deduplicate and clean up all hanging staged Locations - self.inner.meta.write().clean_staged_locations(dest); + metadata.clean_staged_locations(dest); - Ok(()) + Ok(metadata) } /// Mint and upload a new version of Volume. - async fn persist(self, version: Version) -> Result<()> { + async fn upload_metadata(&self, metadata: &mut VolumeMetadata, version: Version) -> Result<()> { let meta_path = self.inner.store.metadata_path(&version); - let new_volume = bytes::Bytes::from(self.inner.to_bytes_with_version(&version)?); + let new_volume = bytes::Bytes::from(metadata.to_bytes_with_version(&version)?); let put_metadata = || async { self.inner @@ -912,7 +999,7 @@ impl<'a> StagedVolume<'a> { match res { Ok(_) => { - self.inner.meta.write().set_version(version); + metadata.set_version(version); Ok(()) } // TODO: there's a scenario here where we get an AlreadyExists error returned to us, @@ -935,6 +1022,12 @@ impl<'a> StagedVolume<'a> { )), } } + + fn sync(self, snapshot: VolumeMetadata) -> Result<()> { + let metadata = self.inner.meta.write(); + for entry in snapshot.walk(Ino::Root) {} + Ok(()) + } } fn should_retry(e: &object_store::Error) -> bool { From cb38346185bc1e1544262529b940a570a340441e Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 1 Dec 2025 18:57:08 +0000 Subject: [PATCH 13/17] syncing metadata after the commit upload --- pond/src/location.rs | 8 +- pond/src/metadata.rs | 196 +++++++++++++++++++++++++++++++++++-------- pond/src/volume.rs | 129 ++++++++++++++++++++-------- 3 files changed, 257 insertions(+), 76 deletions(-) diff --git a/pond/src/location.rs b/pond/src/location.rs index 678ea52..0f8aac2 100644 --- a/pond/src/location.rs +++ b/pond/src/location.rs @@ -3,7 +3,7 @@ use std::{borrow::Cow, sync::Arc}; #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Location { Staged { - path: std::path::PathBuf, + path: Arc, generation: u64, }, Committed { @@ -21,10 +21,6 @@ impl std::fmt::Display for Location { } impl Location { - pub(crate) fn is_staged(&self) -> bool { - matches!(self, Location::Staged { .. }) - } - pub(crate) fn committed<'a>(key: impl Into>) -> Self { let key = match key.into() { Cow::Borrowed(str) => Arc::from(str), @@ -35,7 +31,7 @@ impl Location { pub(crate) fn staged(path: impl AsRef, generation: u64) -> Self { Location::Staged { - path: path.as_ref().to_path_buf(), + path: path.as_ref().to_path_buf().into(), generation, } } diff --git a/pond/src/metadata.rs b/pond/src/metadata.rs index 6fbddff..370f380 100644 --- a/pond/src/metadata.rs +++ b/pond/src/metadata.rs @@ -1,9 +1,10 @@ use std::{ borrow::{Borrow, Cow}, - collections::{BTreeMap, VecDeque, btree_map}, + collections::{BTreeMap, HashMap, VecDeque, btree_map}, fmt::Debug, path::PathBuf, str::FromStr, + sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -654,44 +655,171 @@ impl VolumeMetadata { Ok(()) } - /// Clean up all staged locations, replacing all internal state to instead point to dest. + /// Remove any locations that no longer have references while keeping metadata valid at all + /// times. /// - /// This attempts to keep Volume in a consistent state at every step of modification. - pub(crate) fn clean_staged_locations(&mut self, dest: Location) { - // find the first staged location within self.locations and use that as the final spot - // for dest. - let Some((idx, location)) = self - .locations - .iter_mut() - .enumerate() - .find(|(i, l)| l.is_staged()) - else { + /// This is less efficient because we keep this function cancel and panic safe. If the future + /// or thread running this is killed, the metadata will remain valid. + pub(crate) fn prune_unreferenced_locations(&mut self) { + if self.locations.is_empty() { return; - }; - *location = dest.clone(); + } - // we're making an assumption here that once you see the first staged location, - // every location after that must also be staged. so if we see a location_idx > idx, this - // means it's a staged location. - assert!( - &self - .locations - .iter() - .skip(idx + 1) - .all(|l| l.is_staged() || l == &dest) - ); + // count how many entries point to each location, and keep the references to the entries + // for each location + let mut refcounts = vec![0usize; self.locations.len()]; + let mut references = vec![vec![]; self.locations.len()]; + for entry in self.data.values() { + if let EntryData::File { location_idx, .. } = entry.data { + refcounts[location_idx] += 1; + references[location_idx].push(entry.attr.ino); + } + } - // update all relevant EntryData::File::location_idx to point to the same idx we found - // above! - for (_, Entry { data, .. }) in self.data.iter_mut() { - let EntryData::File { location_idx, .. } = data else { + if refcounts.iter().all(|count| *count > 0) { + return; + } + + // replacing any spots where we find unreferenced locations with a referenced location. + // doing it with two pointers to avoid shifting everything down one slot, so we're filling + // in gaps where we have unreferenced locations with locations (with references!) from the + // back. + let mut insert = 0; + let mut back = self.locations.len(); + while insert < back { + if refcounts[insert] > 0 { + // insert will keep searching for an unreferenced location. + insert += 1; continue; - }; - *location_idx = idx.min(*location_idx); + } else if refcounts[back - 1] == 0 { + // if the location at the back is unreferenced, we can just ignore it. at the end, + // this gets truncated away. + back -= 1; + continue; + } else { + // at this point, `insert` points to an unreferenced location and `back` points to a + // referenced location. + + // copy the referenced location into the unreferenced location's position in the + // locations vector, and update the references. this keeps it cancel and panic safe. + self.locations[insert] = self.locations[back - 1].clone(); + for ino in &references[back - 1] { + let Some(entry) = self.data.get_mut(ino) else { + continue; + }; + let EntryData::File { location_idx, .. } = &mut entry.data else { + // we loaded it as a file above. inos aren't reused so this would be a big bug. + panic!("BUG: non-file entry holds a location index."); + }; + *location_idx = insert; + } + insert += 1; + back -= 1; + } + } + self.locations.truncate(insert); + } + + /// Deduplicate committed locations while keeping staged entries at the tail. + /// + /// This builds a plan for the final layout (unique committed locations first, then staged + /// ones), appends that layout to the existing vector, and remaps entries as each new slot is + /// written. A final prune drops the old copies. Because nothing is overwritten before all + /// references move, metadata remains consistent even if the operation is interrupted. + pub(crate) fn compact_locations(&mut self) { + self.prune_unreferenced_locations(); + + if self.locations.len() <= 1 { + return; + } + + #[derive(Debug)] + struct LocationPlan { + location: Location, + sources: Vec<(usize, Vec)>, + } + + let references = self.collect_location_references(); + if references.iter().all(|inos| inos.is_empty()) { + self.locations.clear(); + return; + } + + let mut committed_plans: Vec = Vec::new(); + let mut canonical: HashMap = HashMap::new(); + let mut staged_plans: Vec = Vec::new(); + + for (idx, location) in self.locations.iter().cloned().enumerate() { + let inos = &references[idx]; + if inos.is_empty() { + continue; + } + + match location { + Location::Committed { .. } => { + if let Some(&plan_idx) = canonical.get(&location) { + committed_plans[plan_idx].sources.push((idx, inos.clone())); + } else { + let plan_idx = committed_plans.len(); + canonical.insert(location.clone(), plan_idx); + committed_plans.push(LocationPlan { + location, + sources: vec![(idx, inos.clone())], + }); + } + } + Location::Staged { .. } => staged_plans.push(LocationPlan { + location, + sources: vec![(idx, inos.clone())], + }), + } + } + + if committed_plans.is_empty() && staged_plans.is_empty() { + return; + } + + let mut plans = committed_plans; + plans.extend(staged_plans); + + for LocationPlan { location, sources } in plans { + let dest_idx = self.locations.len(); + self.locations.push(location); + for (src_idx, inos) in sources { + self.remap_location_indices(&inos, src_idx, dest_idx); + } + } + + self.prune_unreferenced_locations(); + } + + fn collect_location_references(&self) -> Vec> { + let mut references = vec![Vec::new(); self.locations.len()]; + for (ino, entry) in self.data.iter() { + if let EntryData::File { location_idx, .. } = entry.data { + references[location_idx].push(*ino); + } + } + references + } + + fn remap_location_indices(&mut self, inos: &[Ino], from_idx: usize, to_idx: usize) { + if from_idx == to_idx { + return; } - // lop off the tail - self.locations.truncate(idx + 1); + for ino in inos { + let entry = self + .data + .get_mut(ino) + .expect("inode missing while remapping locations"); + let EntryData::File { location_idx, .. } = &mut entry.data else { + debug_assert!(false, "non-file entry holds a location index"); + continue; + }; + debug_assert_eq!(*location_idx, from_idx); + *location_idx = to_idx; + } } /// Update a file's metadata to reflect that a file's data has been modified. @@ -824,11 +952,11 @@ impl VolumeMetadata { /// order. /// /// Returns an iterator over `(FileAttr, PathBuf)` tuples. - pub(crate) fn iter_staged(&self) -> impl Iterator { + pub(crate) fn iter_staged(&self) -> impl Iterator)> { self.data .iter() .filter_map(|(ino, entry)| match self.location(*ino) { - Some((Location::Staged { path, .. }, _)) => Some((&entry.attr, path)), + Some((Location::Staged { path, .. }, _)) => Some((&entry.attr, path.clone())), _ => None, }) } diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 6900e67..78572e7 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -520,16 +520,16 @@ impl Volume { let commit = Commit::new(self)?; // take a snapshot of the metadata and walk the volume to get a handle to each staged file. - let (metadata_snapshot, files) = commit.snapshot()?; - let (dest, range) = commit.upload_files(files).await?; - let mut metadata_snapshot = Commit::relocate_staged_files(metadata_snapshot, range, dest)?; + let mut snapshot = commit.snapshot()?; + let (dest, range) = commit.upload_files(snapshot.staged_files).await?; + apply_location_ranges(&mut snapshot.metadata, range, dest)?; commit - .upload_metadata(&mut metadata_snapshot, version) + .upload_metadata(&mut snapshot.metadata, version) .await?; // reconcile any differences between what we just uploaded and any modifications that // occurred since we took the snapshot. - commit.sync(metadata_snapshot) + commit.sync(snapshot.metadata) } } @@ -797,6 +797,11 @@ macro_rules! try_mpu { }; } +struct Snapshot { + metadata: VolumeMetadata, + staged_files: Vec<(FileAttr, Arc)>, +} + struct Commit<'a> { inner: &'a Volume, } @@ -834,7 +839,7 @@ impl<'a> Commit<'a> { /// Takes a snapshot of the metadata and the files we need to upload as a part of this commit /// while holding a lock for the metadata and file descriptor map. - fn snapshot(&self) -> Result<(VolumeMetadata, Vec<(FileAttr, PathBuf)>)> { + fn snapshot(&self) -> Result { // read lock on metadata, others can read during the snapshot process, but can't modify. let metadata = self.inner.meta.read(); // write lock on the fds, no one is allowed to open new fds while we're taking a snapshot. @@ -859,13 +864,15 @@ impl<'a> Commit<'a> { // it). self.inner.generation.fetch_add(1, Ordering::SeqCst); - let snapshot: Vec<_> = metadata + let staged_files: Vec<_> = metadata .iter_staged() .map(|(attr, path)| (attr.clone(), path.clone())) .collect(); - let copy = metadata.clone(); - Ok((copy, snapshot)) + Ok(Snapshot { + metadata: metadata.clone(), + staged_files, + }) } /// Upload all staged files into a single blob under base. @@ -875,7 +882,7 @@ impl<'a> Commit<'a> { /// uploads. async fn upload_files( &self, - files: Vec<(FileAttr, PathBuf)>, + files: Vec<(FileAttr, Arc)>, ) -> Result<(Location, Vec<(Ino, ByteRange)>)> { let (dest_name, dest) = self.inner.store.new_data_file(); @@ -948,30 +955,6 @@ impl<'a> Commit<'a> { Ok((Location::committed(dest_name), staged)) } - /// Relocate the given VolumeMetadata such that all files present in `ranges` have their - /// location modified to `dest`. - fn relocate_staged_files( - mut metadata: VolumeMetadata, - ranges: Vec<(Ino, ByteRange)>, - dest: Location, - ) -> Result { - let now = SystemTime::now(); - for (ino, byte_range) in ranges { - metadata.modify( - ino, - now, - Some(now), - Some(dest.clone()), - Some(Modify::Set(byte_range)), - )?; - } - - // deduplicate and clean up all hanging staged Locations - metadata.clean_staged_locations(dest); - - Ok(metadata) - } - /// Mint and upload a new version of Volume. async fn upload_metadata(&self, metadata: &mut VolumeMetadata, version: Version) -> Result<()> { let meta_path = self.inner.store.metadata_path(&version); @@ -1024,12 +1007,86 @@ impl<'a> Commit<'a> { } fn sync(self, snapshot: VolumeMetadata) -> Result<()> { - let metadata = self.inner.meta.write(); - for entry in snapshot.walk(Ino::Root) {} + let mut metadata = self.inner.meta.write(); + metadata.set_version(snapshot.version().clone()); + + let snapshot_generation = self.inner.generation.load(Ordering::SeqCst); + // walk the snapshot, updating the current view of metadata for the dir entries for staged + // regular files that we committed up to S3. only update them if they weren't updated since + // we took the snapshot. if they were updated, then any concurrent updates to the metdata + // while we were committing will take precedence. + for snapshot_entry in snapshot.walk(Ino::Root)? { + let snapshot_entry = snapshot_entry?; + if !snapshot_entry.is_regular() || !snapshot_entry.attr.is_file() { + continue; + } + + // fetch the equivalent entry in the current version of metadata. if it's missing, then + // someone deleted this ino while we were busy committing. + let ino = snapshot_entry.attr.ino; + let Some(attr) = metadata.getattr(ino).cloned() else { + continue; + }; + + // if the location is still staged and has an older generation, then we know that it + // wasn't opened for writing since the commit. we can proceed with replacing its + // location to the one we uploaded to S3. + if !matches!( + metadata.location(ino), + Some((Location::Staged { generation, .. }, _)) if *generation < snapshot_generation + ) { + continue; + } + + // retain mtime and ctime changes (e.g. if the file was touched to update mtime or + // moved) + let new_mtime = std::cmp::max(attr.mtime, snapshot_entry.attr.mtime); + let new_ctime = std::cmp::max(attr.ctime, snapshot_entry.attr.ctime); + + let Some((snapshot_location, snapshot_byte_range)) = snapshot_entry.location() else { + continue; + }; + + metadata.modify( + ino, + new_mtime, + Some(new_ctime), + Some(snapshot_location.clone()), + Some(Modify::Set(snapshot_byte_range)), + )?; + } + + // the walk and location modification will result in some unreferenced staged locations. + // these need to be cleaned up. + metadata.compact_locations(); + Ok(()) } } +/// Updates the entries within `metadata` specified by the Inos in `ranges` to point to `dest` and +/// have the range in `ranges`. +fn apply_location_ranges( + metadata: &mut VolumeMetadata, + ranges: Vec<(Ino, ByteRange)>, + dest: Location, +) -> Result<()> { + let now = SystemTime::now(); + for (ino, byte_range) in ranges { + metadata.modify( + ino, + now, + Some(now), + Some(dest.clone()), + Some(Modify::Set(byte_range)), + )?; + } + + metadata.compact_locations(); + + Ok(()) +} + fn should_retry(e: &object_store::Error) -> bool { use std::error::Error; From b354f30368d16311074536057f95307b0faaf825 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 1 Dec 2025 20:06:09 +0000 Subject: [PATCH 14/17] get rid of the intern list, make it a set and change serialization --- pond/src/lib.rs | 10 +- pond/src/location.rs | 3 + pond/src/metadata.rs | 288 +++++++++++-------------------------------- pond/src/volume.rs | 8 +- 4 files changed, 80 insertions(+), 229 deletions(-) diff --git a/pond/src/lib.rs b/pond/src/lib.rs index ec06d0b..29010d4 100644 --- a/pond/src/lib.rs +++ b/pond/src/lib.rs @@ -151,7 +151,6 @@ pub struct DirEntryRef<'a> { name: &'a str, parents: Vec<&'a str>, attr: &'a FileAttr, - locations: &'a [Location], data: &'a metadata::EntryData, } @@ -164,15 +163,12 @@ impl<'a> DirEntryRef<'a> { self.attr } - pub fn location(&self) -> Option<(&Location, ByteRange)> { + pub fn location(&self) -> Option<(Location, ByteRange)> { match self.data { metadata::EntryData::File { - location_idx, + location, byte_range, - } => { - let location = &self.locations[*location_idx]; - Some((location, *byte_range)) - } + } => Some((location.clone(), *byte_range)), _ => None, } } diff --git a/pond/src/location.rs b/pond/src/location.rs index 0f8aac2..6b55c0d 100644 --- a/pond/src/location.rs +++ b/pond/src/location.rs @@ -1,5 +1,8 @@ use std::{borrow::Cow, sync::Arc}; +/// Location is an enum that acts as a pointer to a blob of bytes. +/// +/// Location is a lightweight as each enum is basically just an Arc. Cheap to clone. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Location { Staged { diff --git a/pond/src/metadata.rs b/pond/src/metadata.rs index 370f380..9f18fa0 100644 --- a/pond/src/metadata.rs +++ b/pond/src/metadata.rs @@ -1,6 +1,6 @@ use std::{ borrow::{Borrow, Cow}, - collections::{BTreeMap, HashMap, VecDeque, btree_map}, + collections::{BTreeMap, BTreeSet, VecDeque, btree_map}, fmt::Debug, path::PathBuf, str::FromStr, @@ -125,10 +125,9 @@ pub(crate) struct VolumeMetadata { // volume. next_ino: Ino, - // interned list of locations that hold data blobs. this list must - // remain in a stable order. each Entry contains indexes into this - // vec, and re-ordering it invalidates those indexes. - locations: Vec, + // interned set of locations that hold data blobs. the locations themselves are cheap to clone + // as each enum is just an Arc. + locations: BTreeSet, // Ino -> Entry data: BTreeMap, @@ -155,11 +154,12 @@ impl Entry { } } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Debug)] pub(crate) enum EntryData { Directory, File { - location_idx: usize, + /// Location is an enum over Arcs, it's cheap to copy. + location: Location, byte_range: ByteRange, }, Dynamic, @@ -233,7 +233,7 @@ impl VolumeMetadata { Self { version, next_ino: Ino::min_regular(), - locations: vec![], + locations: BTreeSet::new(), data, dirs, } @@ -528,7 +528,16 @@ impl VolumeMetadata { let ino = self.next_ino()?; let slot = self.data.entry(ino); - let location_idx = insert_unique(&mut self.locations, location); + // insert it into our self.location set if it doesn't exist, else grab a clone of the + // location inside of it (location is a bag of Arcs, so this keeps only 1 copy around). + let location = match self.locations.get(&location) { + // this gives us the interned location + Some(location) => location.clone(), + None => { + self.locations.insert(location.clone()); + location + } + }; let now = SystemTime::now(); let new_entry = Entry { name: name.clone().into(), @@ -541,7 +550,7 @@ impl VolumeMetadata { kind: FileType::Regular, }, data: EntryData::File { - location_idx, + location, byte_range, }, }; @@ -643,23 +652,7 @@ impl VolumeMetadata { Ok(&entry.attr) } - /// Change the physical location of a data blob. - /// - /// This changes the physical location for all files in the volume that - /// refer to this blob. To relocate an individual file, see `modify`. - pub(crate) fn relocate(&mut self, from: &Location, to: Location) -> crate::Result<()> { - let Some(from) = self.locations.iter_mut().find(|l| l == &from) else { - return Err(ErrorKind::NotFound.into()); - }; - *from = to; - Ok(()) - } - - /// Remove any locations that no longer have references while keeping metadata valid at all - /// times. - /// - /// This is less efficient because we keep this function cancel and panic safe. If the future - /// or thread running this is killed, the metadata will remain valid. + /// Remove any locations that no longer have references. pub(crate) fn prune_unreferenced_locations(&mut self) { if self.locations.is_empty() { return; @@ -667,159 +660,14 @@ impl VolumeMetadata { // count how many entries point to each location, and keep the references to the entries // for each location - let mut refcounts = vec![0usize; self.locations.len()]; - let mut references = vec![vec![]; self.locations.len()]; + let mut seen = BTreeSet::new(); for entry in self.data.values() { - if let EntryData::File { location_idx, .. } = entry.data { - refcounts[location_idx] += 1; - references[location_idx].push(entry.attr.ino); - } - } - - if refcounts.iter().all(|count| *count > 0) { - return; - } - - // replacing any spots where we find unreferenced locations with a referenced location. - // doing it with two pointers to avoid shifting everything down one slot, so we're filling - // in gaps where we have unreferenced locations with locations (with references!) from the - // back. - let mut insert = 0; - let mut back = self.locations.len(); - while insert < back { - if refcounts[insert] > 0 { - // insert will keep searching for an unreferenced location. - insert += 1; - continue; - } else if refcounts[back - 1] == 0 { - // if the location at the back is unreferenced, we can just ignore it. at the end, - // this gets truncated away. - back -= 1; - continue; - } else { - // at this point, `insert` points to an unreferenced location and `back` points to a - // referenced location. - - // copy the referenced location into the unreferenced location's position in the - // locations vector, and update the references. this keeps it cancel and panic safe. - self.locations[insert] = self.locations[back - 1].clone(); - for ino in &references[back - 1] { - let Some(entry) = self.data.get_mut(ino) else { - continue; - }; - let EntryData::File { location_idx, .. } = &mut entry.data else { - // we loaded it as a file above. inos aren't reused so this would be a big bug. - panic!("BUG: non-file entry holds a location index."); - }; - *location_idx = insert; - } - insert += 1; - back -= 1; - } - } - self.locations.truncate(insert); - } - - /// Deduplicate committed locations while keeping staged entries at the tail. - /// - /// This builds a plan for the final layout (unique committed locations first, then staged - /// ones), appends that layout to the existing vector, and remaps entries as each new slot is - /// written. A final prune drops the old copies. Because nothing is overwritten before all - /// references move, metadata remains consistent even if the operation is interrupted. - pub(crate) fn compact_locations(&mut self) { - self.prune_unreferenced_locations(); - - if self.locations.len() <= 1 { - return; - } - - #[derive(Debug)] - struct LocationPlan { - location: Location, - sources: Vec<(usize, Vec)>, - } - - let references = self.collect_location_references(); - if references.iter().all(|inos| inos.is_empty()) { - self.locations.clear(); - return; - } - - let mut committed_plans: Vec = Vec::new(); - let mut canonical: HashMap = HashMap::new(); - let mut staged_plans: Vec = Vec::new(); - - for (idx, location) in self.locations.iter().cloned().enumerate() { - let inos = &references[idx]; - if inos.is_empty() { - continue; - } - - match location { - Location::Committed { .. } => { - if let Some(&plan_idx) = canonical.get(&location) { - committed_plans[plan_idx].sources.push((idx, inos.clone())); - } else { - let plan_idx = committed_plans.len(); - canonical.insert(location.clone(), plan_idx); - committed_plans.push(LocationPlan { - location, - sources: vec![(idx, inos.clone())], - }); - } - } - Location::Staged { .. } => staged_plans.push(LocationPlan { - location, - sources: vec![(idx, inos.clone())], - }), + if let EntryData::File { location, .. } = &entry.data { + seen.insert(location); } } - if committed_plans.is_empty() && staged_plans.is_empty() { - return; - } - - let mut plans = committed_plans; - plans.extend(staged_plans); - - for LocationPlan { location, sources } in plans { - let dest_idx = self.locations.len(); - self.locations.push(location); - for (src_idx, inos) in sources { - self.remap_location_indices(&inos, src_idx, dest_idx); - } - } - - self.prune_unreferenced_locations(); - } - - fn collect_location_references(&self) -> Vec> { - let mut references = vec![Vec::new(); self.locations.len()]; - for (ino, entry) in self.data.iter() { - if let EntryData::File { location_idx, .. } = entry.data { - references[location_idx].push(*ino); - } - } - references - } - - fn remap_location_indices(&mut self, inos: &[Ino], from_idx: usize, to_idx: usize) { - if from_idx == to_idx { - return; - } - - for ino in inos { - let entry = self - .data - .get_mut(ino) - .expect("inode missing while remapping locations"); - let EntryData::File { location_idx, .. } = &mut entry.data else { - debug_assert!(false, "non-file entry holds a location index"); - continue; - }; - debug_assert_eq!(*location_idx, from_idx); - *location_idx = to_idx; - } + self.locations.retain(|e| seen.contains(e)); } /// Update a file's metadata to reflect that a file's data has been modified. @@ -838,7 +686,7 @@ impl VolumeMetadata { return Err(ErrorKind::NotFound.into()); }; let EntryData::File { - location_idx, + location: mut_location, byte_range, } = &mut entry.data else { @@ -846,8 +694,14 @@ impl VolumeMetadata { }; if let Some(location) = location { - let new_location = insert_unique(&mut self.locations, location); - *location_idx = new_location; + *mut_location = match self.locations.get(&location) { + // this gives us the interned location + Some(location) => location.clone(), + None => { + self.locations.insert(location.clone()); + location + } + }; } match range { Some(Modify::Set(range)) => { @@ -894,7 +748,6 @@ impl VolumeMetadata { }; Ok(ReadDir { data: &self.data, - locations: &self.locations, range, parents, }) @@ -907,7 +760,6 @@ impl VolumeMetadata { ) -> crate::Result> { Ok(ReadDir { data: &self.data, - locations: &self.locations, range: self.dirs.range(entry_range(ino)?), parents, }) @@ -936,7 +788,6 @@ impl VolumeMetadata { let root_dir = self.dir_entry(ino)?; let root_iter = ReadDir { data: &self.data, - locations: &self.locations, range: self.dirs.range(entry_range(ino)?), parents: parents.clone(), }; @@ -974,16 +825,13 @@ impl VolumeMetadata { /// /// Attempting to get the physical location of a directory or a symlink /// returns an error. - pub(crate) fn location(&self, ino: Ino) -> Option<(&Location, &ByteRange)> { + pub(crate) fn location(&self, ino: Ino) -> Option<(Location, &ByteRange)> { self.data.get(&ino).and_then(|entry| match &entry.data { // files need to map to blob list EntryData::File { - location_idx, + location, byte_range, - } => { - let location = self.locations.get(*location_idx)?; - Some((location, byte_range)) - } + } => Some((location.clone(), byte_range)), // no other file type has a location _ => None, }) @@ -999,10 +847,15 @@ impl VolumeMetadata { pub(crate) fn to_bytes_with_version(&self, version: &Version) -> crate::Result> { let mut fbb = FlatBufferBuilder::new(); + // flatten our interned location set into a vector. we'll serialize the locations as a + // vector, and each data entry will have an index into this vector to save on space. + let mut location_idx_map: BTreeMap = BTreeMap::new(); let locations = { let mut locations = Vec::with_capacity(self.locations.len()); - for location in &self.locations { - locations.push(to_fb_location(&mut fbb, location)?) + // stable iteration order since self.locations is a BTreeMap + for (i, location) in self.locations.iter().enumerate() { + locations.push(to_fb_location(&mut fbb, location)?); + location_idx_map.insert(location.clone(), i); } fbb.create_vector(&locations) }; @@ -1012,7 +865,7 @@ impl VolumeMetadata { if !ino.is_regular() { continue; } - dir_entries.push(to_fb_entry(&mut fbb, entry)?); + dir_entries.push(to_fb_entry(&mut fbb, &location_idx_map, entry)?); } fbb.create_vector(&dir_entries) }; @@ -1044,14 +897,15 @@ impl VolumeMetadata { let mut volume = Self::new(version); // set the locations - volume.locations = locations; + volume.locations = locations.into_iter().collect(); + let location_idx_map: Vec<_> = volume.locations.iter().cloned().collect(); // walk the serialized entries and insert both the entries // and their dir index entry for entry in fb_volume.entries().iter() { let parent_ino = entry.parent_ino().into(); let dir_key = (parent_ino, entry.name().to_string()).into(); - let entry = from_fb_entry(&entry)?; + let entry = from_fb_entry(&location_idx_map, &entry)?; max_ino = max_ino.max(entry.attr.ino); volume.dirs.insert(dir_key, entry.attr.ino); volume.data.insert(entry.attr.ino, entry); @@ -1062,17 +916,6 @@ impl VolumeMetadata { } } -fn insert_unique(xs: &mut Vec, x: T) -> usize { - match xs.iter().position(|e| e == &x) { - Some(idx) => idx, - None => { - let idx = xs.len(); - xs.push(x); - idx - } - } -} - fn entry_range(ino: Ino) -> crate::Result>> { let start: EntryKey = (ino, "").into(); let end: EntryKey = (ino.add(1)?, "").into(); @@ -1094,7 +937,6 @@ fn entry_range_with_offset( /// The iterator returned from [readdir][Volume::readdir]. pub(crate) struct ReadDir<'a> { data: &'a BTreeMap, - locations: &'a [Location], range: btree_map::Range<'a, EntryKey<'static>, Ino>, parents: Vec<&'a str>, } @@ -1113,7 +955,6 @@ impl<'a> Iterator for ReadDir<'a> { name: &dent.name, parents: self.parents.clone(), attr: &dent.attr, - locations: self.locations, data: &dent.data, } }) @@ -1167,6 +1008,7 @@ impl<'a> Iterator for WalkIter<'a> { fn to_fb_entry<'a>( fbb: &mut FlatBufferBuilder<'a>, + location_idx_map: &BTreeMap, entry: &Entry, ) -> crate::Result>> { let attrs = fb::FileAttrs::create( @@ -1181,14 +1023,17 @@ fn to_fb_entry<'a>( ); let location_ref = match &entry.data { EntryData::File { - location_idx, + location, byte_range, } => Some({ let byte_range = fb::ByteRange::new(byte_range.offset, byte_range.len); fb::LocationRef::create( fbb, &fb::LocationRefArgs { - location_index: *location_idx as u16, + location_index: *(location_idx_map + .get(location) + .expect("BUG: metadata had a dangling location")) + as u16, byte_range: Some(&byte_range), }, ) @@ -1208,7 +1053,7 @@ fn to_fb_entry<'a>( )) } -fn from_fb_entry(fb_entry: &fb::Entry) -> crate::Result { +fn from_fb_entry(location_idx_map: &[Location], fb_entry: &fb::Entry) -> crate::Result { let name = fb_entry.name().to_string().into(); let parent_ino = fb_entry.parent_ino(); let attr = { @@ -1236,8 +1081,15 @@ fn from_fb_entry(fb_entry: &fb::Entry) -> crate::Result { "missing file data range", )); }; + let Some(location) = location_idx_map.get(location_ref.location_index() as usize) + else { + return Err(Error::new( + ErrorKind::InvalidData, + format!("missing location at idx: {}", location_ref.location_index()), + )); + }; EntryData::File { - location_idx: location_ref.location_index() as usize, + location: location.clone(), byte_range: ByteRange { offset: byte_range.offset(), len: byte_range.length(), @@ -1361,7 +1213,7 @@ mod test { .clone(); // location should match what we just created with let (l1, _) = volume.location(f1.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); let f2 = volume .create( @@ -1379,10 +1231,10 @@ mod test { // old locations should be stable let (l1, _) = volume.location(f1.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); // location should match what we just created with let (l2, _) = volume.location(f2.ino).unwrap(); - assert_eq!(l2, &Location::committed("aaaa")); + assert_eq!(l2, Location::committed("aaaa")); } #[test] @@ -1410,15 +1262,15 @@ mod test { // location should match what we just created with let (l1, _) = volume.location(f1.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); let (l1, _) = volume.location(f2.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); // delete the first file volume.delete(Ino::Root, "zzzz").unwrap(); assert_eq!(volume.location(f1.ino), None); let (l1, _) = volume.location(f2.ino).unwrap(); - assert_eq!(l1, &Location::committed("zzzz")); + assert_eq!(l1, Location::committed("zzzz")); // delete both files volume.delete(a.ino, "zzzz").unwrap(); @@ -1713,7 +1565,7 @@ mod test { assert_eq!( volume.location(test_txt.ino).unwrap(), ( - &Location::committed("test-key.txt"), + Location::committed("test-key.txt"), &ByteRange { offset: 0, len: 64 }, ) ); @@ -1747,7 +1599,7 @@ mod test { .unwrap(); let (location, range) = meta.location(ino).unwrap(); - assert_eq!(location, &new_location); + assert_eq!(location, new_location); assert_eq!(*range, new_range); let attr = meta.getattr(ino).unwrap(); diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 78572e7..794a0a9 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -302,7 +302,7 @@ impl Volume { let mut metadata_guard = self.meta.write(); match metadata_guard.location(ino) { Some((Location::Staged { path, generation }, _)) => { - let file = if *generation < self.generation.load(Ordering::SeqCst) { + let file = if generation < self.generation.load(Ordering::SeqCst) { // generation mismatch for staged files, which means we're opening this // up while a commit is running and this staged file is part of the // snapshot. we treat it as a committed file in this case. @@ -1033,7 +1033,7 @@ impl<'a> Commit<'a> { // location to the one we uploaded to S3. if !matches!( metadata.location(ino), - Some((Location::Staged { generation, .. }, _)) if *generation < snapshot_generation + Some((Location::Staged { generation, .. }, _)) if generation < snapshot_generation ) { continue; } @@ -1058,7 +1058,7 @@ impl<'a> Commit<'a> { // the walk and location modification will result in some unreferenced staged locations. // these need to be cleaned up. - metadata.compact_locations(); + metadata.prune_unreferenced_locations(); Ok(()) } @@ -1082,7 +1082,7 @@ fn apply_location_ranges( )?; } - metadata.compact_locations(); + metadata.prune_unreferenced_locations(); Ok(()) } From 17a826d3c89b46a7421fdbd082faa7c4f1408246 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 1 Dec 2025 20:10:52 +0000 Subject: [PATCH 15/17] helper for the interning locations --- pond/src/metadata.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pond/src/metadata.rs b/pond/src/metadata.rs index 9f18fa0..b639780 100644 --- a/pond/src/metadata.rs +++ b/pond/src/metadata.rs @@ -528,16 +528,7 @@ impl VolumeMetadata { let ino = self.next_ino()?; let slot = self.data.entry(ino); - // insert it into our self.location set if it doesn't exist, else grab a clone of the - // location inside of it (location is a bag of Arcs, so this keeps only 1 copy around). - let location = match self.locations.get(&location) { - // this gives us the interned location - Some(location) => location.clone(), - None => { - self.locations.insert(location.clone()); - location - } - }; + let location = get_interned_or_insert(&mut self.locations, location); let now = SystemTime::now(); let new_entry = Entry { name: name.clone().into(), @@ -694,14 +685,7 @@ impl VolumeMetadata { }; if let Some(location) = location { - *mut_location = match self.locations.get(&location) { - // this gives us the interned location - Some(location) => location.clone(), - None => { - self.locations.insert(location.clone()); - location - } - }; + *mut_location = get_interned_or_insert(&mut self.locations, location); } match range { Some(Modify::Set(range)) => { @@ -916,6 +900,22 @@ impl VolumeMetadata { } } +/// Insert it into `locations` if it doesn't exist, else grab a clone of the +/// location inside of it (location is a bag of Arcs, so this keeps only 1 copy around). +pub(crate) fn get_interned_or_insert( + locations: &mut BTreeSet, + location: Location, +) -> Location { + match locations.get(&location) { + // this gives us the interned location + Some(location) => location.clone(), + None => { + locations.insert(location.clone()); + location + } + } +} + fn entry_range(ino: Ino) -> crate::Result>> { let start: EntryKey = (ino, "").into(); let end: EntryKey = (ino.add(1)?, "").into(); From 56aa12335061d87c4a87cf60f9f14cce4799c444 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 1 Dec 2025 20:19:14 +0000 Subject: [PATCH 16/17] bye parking_lot? --- Cargo.lock | 1 - Cargo.toml | 1 - pond/Cargo.toml | 1 - pond/src/volume.rs | 142 ++++++++++++++++++++++++++++++++------------- 4 files changed, 103 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index adcfa5f..a7caf79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1794,7 +1794,6 @@ dependencies = [ "futures", "metrics", "object_store", - "parking_lot", "rand 0.9.2", "tempfile", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 273e71d..b0ab2cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,6 @@ metrics-exporter-prometheus = "0.17.2" metrics-util = "0.20.0" nix = "0.29" # don't require a newer version than fuser if possible object_store = { version = "0.12.3", features = ["aws"] } -parking_lot = "0.12.5" rand = "0.9.2" signal-hook = "0.3" tempfile = "3" diff --git a/pond/Cargo.toml b/pond/Cargo.toml index 38339e3..ed8229f 100644 --- a/pond/Cargo.toml +++ b/pond/Cargo.toml @@ -14,7 +14,6 @@ foyer-memory.workspace = true futures.workspace = true metrics.workspace = true object_store.workspace = true -parking_lot.workspace = true rand.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["fs"] } diff --git a/pond/src/volume.rs b/pond/src/volume.rs index 794a0a9..b6a6037 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -10,14 +10,13 @@ use arc_swap::ArcSwap; use backon::{ExponentialBuilder, Retryable}; use bytes::{Bytes, BytesMut}; use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; -use parking_lot::RwLock; use std::{ collections::BTreeMap, io::{BufReader, Read}, os::unix::fs::FileExt, path::{Path, PathBuf}, sync::{ - Arc, + Arc, RwLock, atomic::{AtomicBool, AtomicU64, Ordering}, }, time::{Duration, SystemTime}, @@ -132,6 +131,7 @@ impl Volume { ino => { self.meta .write() + .expect("lock was poisoned") .modify(ino, mtime, ctime, location, range)?; Ok(()) } @@ -139,7 +139,11 @@ impl Volume { } pub fn version(&self) -> Version { - self.meta.read().version().clone() + self.meta + .read() + .expect("lock was poisoned") + .version() + .clone() } pub fn object_store_description(&self) -> String { @@ -155,16 +159,19 @@ impl Volume { } pub fn to_bytes(&self) -> Result> { - self.meta.read().to_bytes() + self.meta.read().expect("lock was poisoned").to_bytes() } pub fn to_bytes_with_version(&self, version: &Version) -> Result> { - self.meta.read().to_bytes_with_version(version) + self.meta + .read() + .expect("lock was poisoned") + .to_bytes_with_version(version) } pub fn getattr(&self, ino: Ino) -> Result { scoped_timer!("pond_volume_getattr_latency_secs"); - match self.meta.read().getattr(ino) { + match self.meta.read().expect("lock was poisoned").getattr(ino) { Some(attr) => Ok(attr.clone()), None => Err(ErrorKind::NotFound.into()), } @@ -176,25 +183,44 @@ impl Volume { mtime: Option, ctime: Option, ) -> Result { - self.meta.write().setattr(ino, mtime, ctime).cloned() + self.meta + .write() + .expect("lock was poisoned") + .setattr(ino, mtime, ctime) + .cloned() } pub fn lookup(&self, parent: Ino, name: &str) -> Result> { scoped_timer!("pond_volume_lookup_latency_secs"); - Ok(self.meta.read().lookup(parent, name)?.cloned()) + Ok(self + .meta + .read() + .expect("lock was poisoned") + .lookup(parent, name)? + .cloned()) } pub fn mkdir(&self, parent: Ino, name: String) -> Result { - self.meta.write().mkdir(parent, name).cloned() + self.meta + .write() + .expect("lock was poisoned") + .mkdir(parent, name) + .cloned() } pub fn rmdir(&self, parent: Ino, name: &str) -> Result<()> { - self.meta.write().rmdir(parent, name)?; + self.meta + .write() + .expect("lock was poisoned") + .rmdir(parent, name)?; Ok(()) } pub fn rename(&self, parent: Ino, name: &str, newparent: Ino, newname: String) -> Result<()> { - self.meta.write().rename(parent, name, newparent, newname)?; + self.meta + .write() + .expect("lock was poisoned") + .rename(parent, name, newparent, newname)?; Ok(()) } @@ -204,7 +230,7 @@ impl Volume { offset: Option, size: usize, ) -> Result> { - let metadata = self.meta.read(); + let metadata = self.meta.read().expect("lock was poisoned"); let entries: Vec = metadata .readdir(ino, offset)? .take(size) @@ -223,7 +249,7 @@ impl Volume { let file = open_file(path.as_path(), OpenMode::Create).await?; let attr = { - let mut metadata = self.meta.write(); + let mut metadata = self.meta.write().expect("lock was poisoned"); metadata .create( parent, @@ -241,7 +267,7 @@ impl Volume { }; let fd = new_fd( - &mut self.fds.write(), + &mut self.fds.write().expect("lock was poisoned"), attr.ino, FileDescriptor::Staged { file: file.into() }, )?; @@ -249,7 +275,10 @@ impl Volume { } pub fn delete(&self, parent: Ino, name: &str) -> Result<()> { - self.meta.write().delete(parent, name)?; + self.meta + .write() + .expect("lock was poisoned") + .delete(parent, name)?; Ok(()) } @@ -293,13 +322,21 @@ impl Volume { )] pub async fn open_read_write(&self, ino: Ino) -> Result { match ino { - Ino::COMMIT => new_fd(&mut self.fds.write(), ino, FileDescriptor::Commit), - Ino::CLEAR_CACHE => new_fd(&mut self.fds.write(), ino, FileDescriptor::ClearCache), + Ino::COMMIT => new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::Commit, + ), + Ino::CLEAR_CACHE => new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::ClearCache, + ), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), ino => { // note, we can't use an upgradable read lock here because there might be a race // condition?? TODO - let mut metadata_guard = self.meta.write(); + let mut metadata_guard = self.meta.write().expect("lock was poisoned"); match metadata_guard.location(ino) { Some((Location::Staged { path, generation }, _)) => { let file = if generation < self.generation.load(Ordering::SeqCst) { @@ -330,7 +367,7 @@ impl Volume { // a new staged file handle. new_fd( - &mut self.fds.write(), + &mut self.fds.write().expect("lock was poisoned"), ino, FileDescriptor::Staged { file: file.into() }, ) @@ -353,7 +390,7 @@ impl Volume { let file = open_file(&path, OpenMode::Create).await?; // only create the fd once the file is open and metadata is valid new_fd( - &mut self.fds.write(), + &mut self.fds.write().expect("lock was poisoned"), ino, FileDescriptor::Staged { file: file.into() }, ) @@ -370,28 +407,32 @@ impl Volume { )] pub async fn open_read(&self, ino: Ino) -> Result { match ino { - Ino::VERSION => new_fd(&mut self.fds.write(), ino, FileDescriptor::Version), + Ino::VERSION => new_fd( + &mut self.fds.write().expect("lock was poisoned"), + ino, + FileDescriptor::Version, + ), Ino::PROM_METRICS => { let data = match &self.metrics_snapshot { Some(metrics) => metrics.load().clone(), None => Default::default(), }; new_fd( - &mut self.fds.write(), + &mut self.fds.write().expect("lock was poisoned"), ino, FileDescriptor::PromMetrics { snapshot: data }, ) } Ino::COMMIT | Ino::CLEAR_CACHE => Err(ErrorKind::PermissionDenied.into()), ino => { - let metadata_guard = self.meta.read(); + let metadata_guard = self.meta.read().expect("lock was poisoned"); match metadata_guard.location(ino) { Some((Location::Staged { path, .. }, _)) => { let path = path.clone(); std::mem::drop(metadata_guard); // guard is dropped here before the await let file = open_file(&path, OpenMode::Read).await?; new_fd( - &mut self.fds.write(), + &mut self.fds.write().expect("lock was poisoned"), ino, FileDescriptor::Staged { file: file.into() }, ) @@ -401,7 +442,7 @@ impl Volume { let range = *range; std::mem::drop(metadata_guard); new_fd( - &mut self.fds.write(), + &mut self.fds.write().expect("lock was poisoned"), ino, FileDescriptor::Committed { key, range }, ) @@ -415,7 +456,12 @@ impl Volume { pub async fn read_at(&self, fd: Fd, offset: u64, buf: &mut [u8]) -> Result { metrics::histogram!("pond_volume_read_buf_size_bytes").record(buf.len() as f64); - let descriptor = self.fds.read().get(&fd).cloned(); + let descriptor = self + .fds + .read() + .expect("lock was poisoned") + .get(&fd) + .cloned(); match descriptor { // reads of write-only special fds do nothing Some(FileDescriptor::ClearCache) | Some(FileDescriptor::Commit) => Ok(0), @@ -452,7 +498,12 @@ impl Volume { pub async fn write_at(&self, fd: Fd, offset: u64, data: &[u8]) -> Result { metrics::histogram!("pond_volume_write_buf_size_bytes").record(data.len() as f64); - let descriptor = self.fds.read().get(&fd).cloned(); + let descriptor = self + .fds + .read() + .expect("lock was poisoned") + .get(&fd) + .cloned(); match descriptor { Some(FileDescriptor::ClearCache) => { self.cache.clear(); @@ -503,7 +554,7 @@ impl Volume { } pub async fn release(&self, fd: Fd) -> Result<()> { - match self.fds.write().remove(&fd) { + match self.fds.write().expect("lock was poisoned").remove(&fd) { Some(_) => Ok(()), None => Err(ErrorKind::NotFound.into()), } @@ -559,6 +610,7 @@ impl WalkVolume { fn new(meta: Arc>, ino: Ino) -> Result { let entries = meta .read() + .expect("lock was poisoned") .readdir(ino, None)? .map(|e| e.to_owned()) .collect::>(); @@ -572,6 +624,7 @@ impl WalkVolume { let entries = self .meta .read() + .expect("lock was poisoned") .readdir(ino, None)? .map(|e| e.to_owned()) .collect::>(); @@ -637,7 +690,10 @@ impl Volume { .components() .map(|c| c.as_os_str().to_string_lossy().to_string()) .collect(); - self.meta.write().mkdir_all(Ino::Root, dirs)?; + self.meta + .write() + .expect("lock was poisoned") + .mkdir_all(Ino::Root, dirs)?; } // for a file: // @@ -659,7 +715,11 @@ impl Volume { let dirs = dir .components() .map(|c| c.as_os_str().to_string_lossy().to_string()); - self.meta.write().mkdir_all(Ino::Root, dirs)?.ino + self.meta + .write() + .expect("lock was poisoned") + .mkdir_all(Ino::Root, dirs)? + .ino } else { Ino::Root }; @@ -681,7 +741,7 @@ impl Volume { ) })? .len(); - self.meta.write().create( + self.meta.write().expect("lock was poisoned").create( dir_ino, name.to_string_lossy().to_string(), true, @@ -841,9 +901,9 @@ impl<'a> Commit<'a> { /// while holding a lock for the metadata and file descriptor map. fn snapshot(&self) -> Result { // read lock on metadata, others can read during the snapshot process, but can't modify. - let metadata = self.inner.meta.read(); + let metadata = self.inner.meta.read().expect("lock was poisoned"); // write lock on the fds, no one is allowed to open new fds while we're taking a snapshot. - let fds = self.inner.fds.write(); + let fds = self.inner.fds.write().expect("lock was poisoned"); // don't allow open fds to staged files during the snapshot process. if fds @@ -1007,7 +1067,7 @@ impl<'a> Commit<'a> { } fn sync(self, snapshot: VolumeMetadata) -> Result<()> { - let mut metadata = self.inner.meta.write(); + let mut metadata = self.inner.meta.write().expect("lock was poisoned"); metadata.set_version(snapshot.version().clone()); let snapshot_generation = self.inner.generation.load(Ordering::SeqCst); @@ -1222,7 +1282,7 @@ mod tests { let volume = client.create_volume().await; // clean volume -- this is not staged - assert!(!volume.meta.read().is_staged()); + assert!(!volume.meta.read().expect("lock was poisoned").is_staged()); // creating two files, it should be a staged volume now. let (attr1, fd1) = volume @@ -1239,7 +1299,7 @@ mod tests { assert_write(&volume, fd2, "world").await; { - let meta = volume.meta.read(); + let meta = volume.meta.read().expect("lock was poisoned"); assert!(meta.is_staged()); for attr in [&attr1, &attr2] { assert!(matches!( @@ -1255,7 +1315,7 @@ mod tests { // after commit, both files are no longer staged { - let meta = volume.meta.read(); + let meta = volume.meta.read().expect("lock was poisoned"); assert!(!meta.is_staged()); for attr in [&attr1, &attr2] { assert!(matches!( @@ -1290,7 +1350,11 @@ mod tests { .unwrap(); let attr = attr.clone(); assert!(matches!( - volume.meta.read().location(attr.ino), + volume + .meta + .read() + .expect("lock was poisoned") + .location(attr.ino), Some((Location::Staged { .. }, _)) )); @@ -1318,7 +1382,7 @@ mod tests { let _f3 = volume.open_read_write(Ino::COMMIT).await.unwrap(); let _f4 = volume.open_read(attr.ino).await.unwrap(); // this is ok, because none of them are staged - assert!(!volume.fds.read().is_empty()); + assert!(!volume.fds.read().expect("lock was poisoned").is_empty()); volume .commit(Version::from_static("next-version")) .await From b09f2f1f0d67381f923c7fbf5e87218949a8acd1 Mon Sep 17 00:00:00 2001 From: Dustin Pho Date: Mon, 1 Dec 2025 20:29:45 +0000 Subject: [PATCH 17/17] clean up leftover todos + comments --- pond/src/metadata.rs | 3 +-- pond/src/volume.rs | 5 ----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/pond/src/metadata.rs b/pond/src/metadata.rs index b639780..c0e2d81 100644 --- a/pond/src/metadata.rs +++ b/pond/src/metadata.rs @@ -649,8 +649,7 @@ impl VolumeMetadata { return; } - // count how many entries point to each location, and keep the references to the entries - // for each location + // keep track of all referenced locations while we're walking the entries let mut seen = BTreeSet::new(); for entry in self.data.values() { if let EntryData::File { location, .. } = &entry.data { diff --git a/pond/src/volume.rs b/pond/src/volume.rs index b6a6037..7d34685 100644 --- a/pond/src/volume.rs +++ b/pond/src/volume.rs @@ -334,8 +334,6 @@ impl Volume { ), Ino::VERSION => Err(ErrorKind::PermissionDenied.into()), ino => { - // note, we can't use an upgradable read lock here because there might be a race - // condition?? TODO let mut metadata_guard = self.meta.write().expect("lock was poisoned"); match metadata_guard.location(ino) { Some((Location::Staged { path, generation }, _)) => { @@ -363,9 +361,6 @@ impl Volume { open_file(&path, OpenMode::ReadWrite).await? }; - // TODO: if generation numbers don't match up, we're copying and creating - // a new staged file handle. - new_fd( &mut self.fds.write().expect("lock was poisoned"), ino,