Compare commits

...

1 Commits

Author SHA1 Message Date
Patrick Insinger
c491256f12 pageserver - use batch_fsync for historics 2021-10-09 11:25:42 -07:00
7 changed files with 56 additions and 7 deletions

1
Cargo.lock generated
View File

@@ -2592,6 +2592,7 @@ dependencies = [
"bincode", "bincode",
"byteorder", "byteorder",
"bytes", "bytes",
"crossbeam-utils",
"hex", "hex",
"hex-literal", "hex-literal",
"hyper", "hyper",

View File

@@ -18,6 +18,7 @@ use lazy_static::lazy_static;
use log::*; use log::*;
use postgres_ffi::pg_constants::BLCKSZ; use postgres_ffi::pg_constants::BLCKSZ;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use zenith_utils::batch_fsync::batch_fsync;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
@@ -1442,9 +1443,9 @@ impl LayeredTimeline {
if created_historics { if created_historics {
// We must fsync the timeline dir to ensure the directory entries for // We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable // new layer files are durable
let timeline_dir = layer_uploads.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?; batch_fsync(&layer_uploads)?;
timeline_dir.sync_all()?; layer_uploads.pop().unwrap();
} }
// Save the metadata, with updated 'disk_consistent_lsn', to a // Save the metadata, with updated 'disk_consistent_lsn', to a

View File

@@ -431,8 +431,7 @@ impl DeltaLayer {
let book = chapter.close()?; let book = chapter.close()?;
// This flushes the underlying 'buf_writer'. // This flushes the underlying 'buf_writer'.
let writer = book.close()?; book.close()?;
writer.get_ref().sync_all()?;
trace!("saved {}", &path.display()); trace!("saved {}", &path.display());

View File

@@ -315,8 +315,7 @@ impl ImageLayer {
let book = chapter.close()?; let book = chapter.close()?;
// This flushes the underlying 'buf_writer'. // This flushes the underlying 'buf_writer'.
let writer = book.close()?; book.close()?;
writer.get_ref().sync_all()?;
trace!("saved {}", path.display()); trace!("saved {}", path.display());

View File

@@ -18,6 +18,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1" serde_json = "1"
thiserror = "1.0" thiserror = "1.0"
tokio = "1.11" tokio = "1.11"
crossbeam-utils = "0.8.5"
slog-async = "2.6.0" slog-async = "2.6.0"
slog-stdlog = "4.1.0" slog-stdlog = "4.1.0"

View File

@@ -0,0 +1,45 @@
use crossbeam_utils::thread;
use std::{
fs::File,
io,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
};
pub fn batch_fsync(paths: &[PathBuf]) -> std::io::Result<()> {
let next = AtomicUsize::new(0);
let num_threads = std::cmp::min(paths.len() / 2, 256);
if num_threads <= 1 {
for path in paths {
let file = File::open(&path)?;
file.sync_all()?;
}
return Ok(());
}
thread::scope(|s| -> io::Result<()> {
let mut handles = Vec::new();
for _ in 0..num_threads {
handles.push(s.spawn(|_| loop {
let idx = next.fetch_add(1, Ordering::Relaxed);
if idx >= paths.len() {
return io::Result::Ok(());
}
let file = File::open(&paths[idx])?;
file.sync_all()?;
}));
}
for handle in handles {
handle.join().unwrap()?;
}
Ok(())
})
.unwrap()
}

View File

@@ -40,3 +40,6 @@ pub mod logging;
// Misc // Misc
pub mod accum; pub mod accum;
/// Utility for quickly fsyncing many files at once
pub mod batch_fsync;