diff --git a/Cargo.lock b/Cargo.lock index e36b462d8e..46c2c9d032 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2592,6 +2592,7 @@ dependencies = [ "bincode", "byteorder", "bytes", + "crossbeam-utils", "hex", "hex-literal", "hyper", diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index bb9d8af112..5d36c7873a 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -18,6 +18,7 @@ use lazy_static::lazy_static; use log::*; use postgres_ffi::pg_constants::BLCKSZ; use serde::{Deserialize, Serialize}; +use zenith_utils::batch_fsync::batch_fsync; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -1442,9 +1443,9 @@ impl LayeredTimeline { if created_historics { // We must fsync the timeline dir to ensure the directory entries for // new layer files are durable - let timeline_dir = - File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?; - timeline_dir.sync_all()?; + layer_uploads.push(self.conf.timeline_path(&self.timelineid, &self.tenantid)); + batch_fsync(&layer_uploads)?; + layer_uploads.pop().unwrap(); } // Save the metadata, with updated 'disk_consistent_lsn', to a diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index e93eddb7e6..e84fef2033 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -431,8 +431,7 @@ impl DeltaLayer { let book = chapter.close()?; // This flushes the underlying 'buf_writer'. - let writer = book.close()?; - writer.get_ref().sync_all()?; + book.close()?; trace!("saved {}", &path.display()); diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 744f793558..72ac7248ff 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -315,8 +315,7 @@ impl ImageLayer { let book = chapter.close()?; // This flushes the underlying 'buf_writer'. - let writer = book.close()?; - writer.get_ref().sync_all()?; + book.close()?; trace!("saved {}", path.display()); diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 22c1c9bab6..86ad3d538a 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -18,6 +18,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1" thiserror = "1.0" tokio = "1.11" +crossbeam-utils = "0.8.5" slog-async = "2.6.0" slog-stdlog = "4.1.0" diff --git a/zenith_utils/src/batch_fsync.rs b/zenith_utils/src/batch_fsync.rs new file mode 100644 index 0000000000..c81ef008d5 --- /dev/null +++ b/zenith_utils/src/batch_fsync.rs @@ -0,0 +1,45 @@ +use crossbeam_utils::thread; + +use std::{ + fs::File, + io, + path::PathBuf, + sync::atomic::{AtomicUsize, Ordering}, +}; + +pub fn batch_fsync(paths: &[PathBuf]) -> std::io::Result<()> { + let next = AtomicUsize::new(0); + + let num_threads = std::cmp::min(paths.len() / 2, 256); + + if num_threads <= 1 { + for path in paths { + let file = File::open(&path)?; + file.sync_all()?; + } + + return Ok(()); + } + + thread::scope(|s| -> io::Result<()> { + let mut handles = Vec::new(); + for _ in 0..num_threads { + handles.push(s.spawn(|_| loop { + let idx = next.fetch_add(1, Ordering::Relaxed); + if idx >= paths.len() { + return io::Result::Ok(()); + } + + let file = File::open(&paths[idx])?; + file.sync_all()?; + })); + } + + for handle in handles { + handle.join().unwrap()?; + } + + Ok(()) + }) + .unwrap() +} diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 96b3cf5066..9522278f3d 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -40,3 +40,6 @@ pub mod logging; // Misc pub mod accum; + +/// Utility for quickly fsyncing many files at once +pub mod batch_fsync;