mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
pageserver - parallelize checkpoint fsyncs
This commit is contained in:
committed by
Patrick Insinger
parent
55a4cf64a1
commit
24c8dab86f
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -389,6 +389,16 @@ dependencies = [
|
||||
"rustc_version",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crypto-mac"
|
||||
version = "0.10.1"
|
||||
@@ -1170,6 +1180,7 @@ dependencies = [
|
||||
"clap",
|
||||
"const_format",
|
||||
"crc32c",
|
||||
"crossbeam-utils",
|
||||
"daemonize",
|
||||
"futures",
|
||||
"hex",
|
||||
|
||||
@@ -43,6 +43,7 @@ url = "2"
|
||||
nix = "0.23"
|
||||
once_cell = "1.8.0"
|
||||
parking_lot = "0.11.2"
|
||||
crossbeam-utils = "0.8.5"
|
||||
|
||||
rust-s3 = { version = "0.28", default-features = false, features = ["no-verify-ssl", "tokio-rustls-tls"] }
|
||||
async-compression = {version = "0.3", features = ["zstd", "tokio"]}
|
||||
|
||||
@@ -65,6 +65,7 @@ mod interval_tree;
|
||||
mod layer_map;
|
||||
pub mod metadata;
|
||||
mod page_versions;
|
||||
mod par_fsync;
|
||||
mod storage_layer;
|
||||
|
||||
use delta_layer::DeltaLayer;
|
||||
@@ -1459,7 +1460,7 @@ impl LayeredTimeline {
|
||||
// a lot of memory and/or aren't receiving much updates anymore.
|
||||
let mut disk_consistent_lsn = last_record_lsn;
|
||||
|
||||
let mut layer_uploads = Vec::new();
|
||||
let mut layer_paths = Vec::new();
|
||||
while let Some((oldest_layer_id, oldest_layer, oldest_generation)) =
|
||||
layers.peek_oldest_open()
|
||||
{
|
||||
@@ -1490,8 +1491,8 @@ impl LayeredTimeline {
|
||||
drop(layers);
|
||||
drop(write_guard);
|
||||
|
||||
let mut this_layer_uploads = self.evict_layer(oldest_layer_id, reconstruct_pages)?;
|
||||
layer_uploads.append(&mut this_layer_uploads);
|
||||
let mut this_layer_paths = self.evict_layer(oldest_layer_id, reconstruct_pages)?;
|
||||
layer_paths.append(&mut this_layer_paths);
|
||||
|
||||
write_guard = self.write_lock.lock().unwrap();
|
||||
layers = self.layers.lock().unwrap();
|
||||
@@ -1507,12 +1508,16 @@ impl LayeredTimeline {
|
||||
drop(layers);
|
||||
drop(write_guard);
|
||||
|
||||
if !layer_uploads.is_empty() {
|
||||
if !layer_paths.is_empty() {
|
||||
// We must fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
let timeline_dir =
|
||||
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
|
||||
timeline_dir.sync_all()?;
|
||||
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync(&layer_paths)?;
|
||||
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
|
||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||
@@ -1558,7 +1563,7 @@ impl LayeredTimeline {
|
||||
schedule_timeline_checkpoint_upload(
|
||||
self.tenantid,
|
||||
self.timelineid,
|
||||
layer_uploads,
|
||||
layer_paths,
|
||||
metadata,
|
||||
);
|
||||
}
|
||||
@@ -1579,7 +1584,7 @@ impl LayeredTimeline {
|
||||
let mut write_guard = self.write_lock.lock().unwrap();
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
let mut layer_uploads = Vec::new();
|
||||
let mut layer_paths = Vec::new();
|
||||
|
||||
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
|
||||
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
|
||||
@@ -1605,18 +1610,18 @@ impl LayeredTimeline {
|
||||
|
||||
// Add the historics to the LayerMap
|
||||
for delta_layer in new_historics.delta_layers {
|
||||
layer_uploads.push(delta_layer.path());
|
||||
layer_paths.push(delta_layer.path());
|
||||
layers.insert_historic(Arc::new(delta_layer));
|
||||
}
|
||||
for image_layer in new_historics.image_layers {
|
||||
layer_uploads.push(image_layer.path());
|
||||
layer_paths.push(image_layer.path());
|
||||
layers.insert_historic(Arc::new(image_layer));
|
||||
}
|
||||
}
|
||||
drop(layers);
|
||||
drop(write_guard);
|
||||
|
||||
Ok(layer_uploads)
|
||||
Ok(layer_paths)
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -677,8 +677,7 @@ impl DeltaLayerWriter {
|
||||
let book = chapter.close()?;
|
||||
|
||||
// This flushes the underlying 'buf_writer'.
|
||||
let writer = book.close()?;
|
||||
writer.get_ref().sync_all()?;
|
||||
book.close()?;
|
||||
|
||||
// Note: Because we opened the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
|
||||
@@ -503,8 +503,7 @@ impl ImageLayerWriter {
|
||||
let book = chapter.close()?;
|
||||
|
||||
// This flushes the underlying 'buf_writer'.
|
||||
let writer = book.close()?;
|
||||
writer.get_ref().sync_all()?;
|
||||
book.close()?;
|
||||
|
||||
// Note: Because we open the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
|
||||
55
pageserver/src/layered_repository/par_fsync.rs
Normal file
55
pageserver/src/layered_repository/par_fsync.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use std::{
|
||||
fs::File,
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
fn fsync_path(path: &Path) -> io::Result<()> {
|
||||
let file = File::open(path)?;
|
||||
file.sync_all()
|
||||
}
|
||||
|
||||
fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
|
||||
while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
|
||||
fsync_path(path)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
|
||||
const PARALLEL_PATH_THRESHOLD: usize = 1;
|
||||
if paths.len() <= PARALLEL_PATH_THRESHOLD {
|
||||
for path in paths {
|
||||
fsync_path(path)?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
/// Use at most this number of threads.
|
||||
/// Increasing this limit will
|
||||
/// - use more memory
|
||||
/// - increase the cost of spawn/join latency
|
||||
/// - increase the peak # of file descriptors
|
||||
const MAX_NUM_THREADS: usize = 64;
|
||||
let num_threads = paths.len().min(MAX_NUM_THREADS);
|
||||
let next_path_idx = AtomicUsize::new(0);
|
||||
|
||||
crossbeam_utils::thread::scope(|s| -> io::Result<()> {
|
||||
let mut handles = vec![];
|
||||
// Spawn `num_threads - 1`, as the current thread is also a worker.
|
||||
for _ in 1..num_threads {
|
||||
handles.push(s.spawn(|_| parallel_worker(paths, &next_path_idx)));
|
||||
}
|
||||
|
||||
parallel_worker(paths, &next_path_idx)?;
|
||||
|
||||
for handle in handles {
|
||||
handle.join().unwrap()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
Reference in New Issue
Block a user