mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
pageserver - use batch_fsync for historics
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2592,6 +2592,7 @@ dependencies = [
|
||||
"bincode",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"crossbeam-utils",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"hyper",
|
||||
|
||||
@@ -18,6 +18,7 @@ use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use postgres_ffi::pg_constants::BLCKSZ;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use zenith_utils::batch_fsync::batch_fsync;
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
@@ -1442,9 +1443,9 @@ impl LayeredTimeline {
|
||||
if created_historics {
|
||||
// We must fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
let timeline_dir =
|
||||
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
|
||||
timeline_dir.sync_all()?;
|
||||
layer_uploads.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
|
||||
batch_fsync(&layer_uploads)?;
|
||||
layer_uploads.pop().unwrap();
|
||||
}
|
||||
|
||||
// Save the metadata, with updated 'disk_consistent_lsn', to a
|
||||
|
||||
@@ -431,8 +431,7 @@ impl DeltaLayer {
|
||||
let book = chapter.close()?;
|
||||
|
||||
// This flushes the underlying 'buf_writer'.
|
||||
let writer = book.close()?;
|
||||
writer.get_ref().sync_all()?;
|
||||
book.close()?;
|
||||
|
||||
trace!("saved {}", &path.display());
|
||||
|
||||
|
||||
@@ -315,8 +315,7 @@ impl ImageLayer {
|
||||
let book = chapter.close()?;
|
||||
|
||||
// This flushes the underlying 'buf_writer'.
|
||||
let writer = book.close()?;
|
||||
writer.get_ref().sync_all()?;
|
||||
book.close()?;
|
||||
|
||||
trace!("saved {}", path.display());
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
thiserror = "1.0"
|
||||
tokio = "1.11"
|
||||
crossbeam-utils = "0.8.5"
|
||||
|
||||
slog-async = "2.6.0"
|
||||
slog-stdlog = "4.1.0"
|
||||
|
||||
45
zenith_utils/src/batch_fsync.rs
Normal file
45
zenith_utils/src/batch_fsync.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use crossbeam_utils::thread;
|
||||
|
||||
use std::{
|
||||
fs::File,
|
||||
io,
|
||||
path::PathBuf,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
pub fn batch_fsync(paths: &[PathBuf]) -> std::io::Result<()> {
|
||||
let next = AtomicUsize::new(0);
|
||||
|
||||
let num_threads = std::cmp::min(paths.len() / 2, 256);
|
||||
|
||||
if num_threads <= 1 {
|
||||
for path in paths {
|
||||
let file = File::open(&path)?;
|
||||
file.sync_all()?;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
thread::scope(|s| -> io::Result<()> {
|
||||
let mut handles = Vec::new();
|
||||
for _ in 0..num_threads {
|
||||
handles.push(s.spawn(|_| loop {
|
||||
let idx = next.fetch_add(1, Ordering::Relaxed);
|
||||
if idx >= paths.len() {
|
||||
return io::Result::Ok(());
|
||||
}
|
||||
|
||||
let file = File::open(&paths[idx])?;
|
||||
file.sync_all()?;
|
||||
}));
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
handle.join().unwrap()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
@@ -40,3 +40,6 @@ pub mod logging;
|
||||
|
||||
// Misc
|
||||
pub mod accum;
|
||||
|
||||
/// Utility for quickly fsyncing many files at once
|
||||
pub mod batch_fsync;
|
||||
|
||||
Reference in New Issue
Block a user