Compare commits

...

1 Commits

Author SHA1 Message Date
Patrick Insinger
c491256f12 pageserver - use batch_fsync for historics 2021-10-09 11:25:42 -07:00
7 changed files with 56 additions and 7 deletions

1
Cargo.lock generated
View File

@@ -2592,6 +2592,7 @@ dependencies = [
"bincode",
"byteorder",
"bytes",
"crossbeam-utils",
"hex",
"hex-literal",
"hyper",

View File

@@ -18,6 +18,7 @@ use lazy_static::lazy_static;
use log::*;
use postgres_ffi::pg_constants::BLCKSZ;
use serde::{Deserialize, Serialize};
use zenith_utils::batch_fsync::batch_fsync;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
@@ -1442,9 +1443,9 @@ impl LayeredTimeline {
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
let timeline_dir =
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
timeline_dir.sync_all()?;
layer_uploads.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
batch_fsync(&layer_uploads)?;
layer_uploads.pop().unwrap();
}
// Save the metadata, with updated 'disk_consistent_lsn', to a

View File

@@ -431,8 +431,7 @@ impl DeltaLayer {
let book = chapter.close()?;
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
book.close()?;
trace!("saved {}", &path.display());

View File

@@ -315,8 +315,7 @@ impl ImageLayer {
let book = chapter.close()?;
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
book.close()?;
trace!("saved {}", path.display());

View File

@@ -18,6 +18,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
thiserror = "1.0"
tokio = "1.11"
crossbeam-utils = "0.8.5"
slog-async = "2.6.0"
slog-stdlog = "4.1.0"

View File

@@ -0,0 +1,45 @@
use crossbeam_utils::thread;
use std::{
fs::File,
io,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
};
pub fn batch_fsync(paths: &[PathBuf]) -> std::io::Result<()> {
let next = AtomicUsize::new(0);
let num_threads = std::cmp::min(paths.len() / 2, 256);
if num_threads <= 1 {
for path in paths {
let file = File::open(&path)?;
file.sync_all()?;
}
return Ok(());
}
thread::scope(|s| -> io::Result<()> {
let mut handles = Vec::new();
for _ in 0..num_threads {
handles.push(s.spawn(|_| loop {
let idx = next.fetch_add(1, Ordering::Relaxed);
if idx >= paths.len() {
return io::Result::Ok(());
}
let file = File::open(&paths[idx])?;
file.sync_all()?;
}));
}
for handle in handles {
handle.join().unwrap()?;
}
Ok(())
})
.unwrap()
}

View File

@@ -40,3 +40,6 @@ pub mod logging;
// Misc
pub mod accum;
/// Utility for quickly fsyncing many files at once
pub mod batch_fsync;