diff --git a/Cargo.lock b/Cargo.lock index 5fd6076af2..f92bcb16d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -389,6 +389,16 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "crypto-mac" version = "0.10.1" @@ -1170,6 +1180,7 @@ dependencies = [ "clap", "const_format", "crc32c", + "crossbeam-utils", "daemonize", "futures", "hex", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 4782884363..39d4fc918a 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -43,6 +43,7 @@ url = "2" nix = "0.23" once_cell = "1.8.0" parking_lot = "0.11.2" +crossbeam-utils = "0.8.5" rust-s3 = { version = "0.28", default-features = false, features = ["no-verify-ssl", "tokio-rustls-tls"] } async-compression = {version = "0.3", features = ["zstd", "tokio"]} diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index c5ccb5a40f..c6e694f607 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -65,6 +65,7 @@ mod interval_tree; mod layer_map; pub mod metadata; mod page_versions; +mod par_fsync; mod storage_layer; use delta_layer::DeltaLayer; @@ -1459,7 +1460,7 @@ impl LayeredTimeline { // a lot of memory and/or aren't receiving much updates anymore. let mut disk_consistent_lsn = last_record_lsn; - let mut layer_uploads = Vec::new(); + let mut layer_paths = Vec::new(); while let Some((oldest_layer_id, oldest_layer, oldest_generation)) = layers.peek_oldest_open() { @@ -1490,8 +1491,8 @@ impl LayeredTimeline { drop(layers); drop(write_guard); - let mut this_layer_uploads = self.evict_layer(oldest_layer_id, reconstruct_pages)?; - layer_uploads.append(&mut this_layer_uploads); + let mut this_layer_paths = self.evict_layer(oldest_layer_id, reconstruct_pages)?; + layer_paths.append(&mut this_layer_paths); write_guard = self.write_lock.lock().unwrap(); layers = self.layers.lock().unwrap(); @@ -1507,12 +1508,16 @@ impl LayeredTimeline { drop(layers); drop(write_guard); - if !layer_uploads.is_empty() { + if !layer_paths.is_empty() { // We must fsync the timeline dir to ensure the directory entries for // new layer files are durable - let timeline_dir = - File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?; - timeline_dir.sync_all()?; + layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid)); + + // Fsync all the layer files and directory using multiple threads to + // minimize latency. + par_fsync::par_fsync(&layer_paths)?; + + layer_paths.pop().unwrap(); } // If we were able to advance 'disk_consistent_lsn', save it the metadata file. @@ -1558,7 +1563,7 @@ impl LayeredTimeline { schedule_timeline_checkpoint_upload( self.tenantid, self.timelineid, - layer_uploads, + layer_paths, metadata, ); } @@ -1579,7 +1584,7 @@ impl LayeredTimeline { let mut write_guard = self.write_lock.lock().unwrap(); let mut layers = self.layers.lock().unwrap(); - let mut layer_uploads = Vec::new(); + let mut layer_paths = Vec::new(); let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap(); if let Some(oldest_layer) = global_layer_map.get(&layer_id) { @@ -1605,18 +1610,18 @@ impl LayeredTimeline { // Add the historics to the LayerMap for delta_layer in new_historics.delta_layers { - layer_uploads.push(delta_layer.path()); + layer_paths.push(delta_layer.path()); layers.insert_historic(Arc::new(delta_layer)); } for image_layer in new_historics.image_layers { - layer_uploads.push(image_layer.path()); + layer_paths.push(image_layer.path()); layers.insert_historic(Arc::new(image_layer)); } } drop(layers); drop(write_guard); - Ok(layer_uploads) + Ok(layer_paths) } /// diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 32393b23af..41692fcac6 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -677,8 +677,7 @@ impl DeltaLayerWriter { let book = chapter.close()?; // This flushes the underlying 'buf_writer'. - let writer = book.close()?; - writer.get_ref().sync_all()?; + book.close()?; // Note: Because we opened the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index c4565797d6..ecfb8c73b0 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -503,8 +503,7 @@ impl ImageLayerWriter { let book = chapter.close()?; // This flushes the underlying 'buf_writer'. - let writer = book.close()?; - writer.get_ref().sync_all()?; + book.close()?; // Note: Because we open the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't diff --git a/pageserver/src/layered_repository/par_fsync.rs b/pageserver/src/layered_repository/par_fsync.rs new file mode 100644 index 0000000000..c2500d2c85 --- /dev/null +++ b/pageserver/src/layered_repository/par_fsync.rs @@ -0,0 +1,55 @@ +use std::{ + fs::File, + io, + path::{Path, PathBuf}, + sync::atomic::{AtomicUsize, Ordering}, +}; + +fn fsync_path(path: &Path) -> io::Result<()> { + let file = File::open(path)?; + file.sync_all() +} + +fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> { + while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) { + fsync_path(path)?; + } + + Ok(()) +} + +pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { + const PARALLEL_PATH_THRESHOLD: usize = 1; + if paths.len() <= PARALLEL_PATH_THRESHOLD { + for path in paths { + fsync_path(path)?; + } + return Ok(()); + } + + /// Use at most this number of threads. + /// Increasing this limit will + /// - use more memory + /// - increase the cost of spawn/join latency + /// - increase the peak # of file descriptors + const MAX_NUM_THREADS: usize = 64; + let num_threads = paths.len().min(MAX_NUM_THREADS); + let next_path_idx = AtomicUsize::new(0); + + crossbeam_utils::thread::scope(|s| -> io::Result<()> { + let mut handles = vec![]; + // Spawn `num_threads - 1`, as the current thread is also a worker. + for _ in 1..num_threads { + handles.push(s.spawn(|_| parallel_worker(paths, &next_path_idx))); + } + + parallel_worker(paths, &next_path_idx)?; + + for handle in handles { + handle.join().unwrap()?; + } + + Ok(()) + }) + .unwrap() +}