mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
layer file creation: fsync timeline directories using VirtualFile::sync_all() (#6986)
Except for the involvement of the VirtualFile fd cache, this is equivalent to what happened before at runtime. Future PR https://github.com/neondatabase/neon/pull/6378 will implement `VirtualFile::sync_all()` using tokio-epoll-uring if that's configured as the io engine. This PR is preliminary work for that. part of https://github.com/neondatabase/neon/issues/6663
This commit is contained in:
committed by
GitHub
parent
e1c032fb3c
commit
944cac950d
@@ -151,7 +151,6 @@ pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
|
||||
pub mod metadata;
|
||||
mod par_fsync;
|
||||
pub mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
use std::{
|
||||
io,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
|
||||
fn fsync_path(path: &Utf8Path) -> io::Result<()> {
|
||||
// TODO use VirtualFile::fsync_all once we fully go async.
|
||||
let file = std::fs::File::open(path)?;
|
||||
file.sync_all()
|
||||
}
|
||||
|
||||
fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
|
||||
while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
|
||||
fsync_path(path)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
||||
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
|
||||
|
||||
/// Use at most this number of threads.
|
||||
/// Increasing this limit will
|
||||
/// - use more memory
|
||||
/// - increase the cost of spawn/join latency
|
||||
const MAX_NUM_THREADS: usize = 64;
|
||||
let num_threads = paths.len().min(MAX_NUM_THREADS);
|
||||
let next_path_idx = AtomicUsize::new(0);
|
||||
|
||||
std::thread::scope(|s| -> io::Result<()> {
|
||||
let mut handles = vec![];
|
||||
// Spawn `num_threads - 1`, as the current thread is also a worker.
|
||||
for _ in 1..num_threads {
|
||||
handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
|
||||
}
|
||||
|
||||
parallel_worker(paths, &next_path_idx)?;
|
||||
|
||||
for handle in handles {
|
||||
handle.join().unwrap()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
|
||||
pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
||||
if paths.len() == 1 {
|
||||
fsync_path(&paths[0])?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
fsync_in_thread_pool(paths)
|
||||
}
|
||||
|
||||
/// Parallel fsync asynchronously.
|
||||
pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
||||
const MAX_CONCURRENT_FSYNC: usize = 64;
|
||||
let mut next = paths.iter().peekable();
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
loop {
|
||||
while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
|
||||
let next = next.next().expect("just peeked");
|
||||
let next = next.to_owned();
|
||||
js.spawn_blocking(move || fsync_path(&next));
|
||||
}
|
||||
|
||||
// now the joinset has been filled up, wait for next to complete
|
||||
if let Some(res) = js.join_next().await {
|
||||
res??;
|
||||
} else {
|
||||
// last item had already completed
|
||||
assert!(
|
||||
next.peek().is_none(),
|
||||
"joinset emptied, we shouldn't have more work"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,6 @@ use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
par_fsync,
|
||||
};
|
||||
use crate::{
|
||||
context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||
@@ -76,7 +75,7 @@ use crate::{
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
virtual_file::MaybeFatalIo,
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -3417,28 +3416,31 @@ impl Timeline {
|
||||
let frozen_layer = Arc::clone(frozen_layer);
|
||||
let ctx = ctx.attached_child();
|
||||
move || {
|
||||
// Write it out
|
||||
// Keep this inside `spawn_blocking` and `Handle::current`
|
||||
// as long as the write path is still sync and the read impl
|
||||
// is still not fully async. Otherwise executor threads would
|
||||
// be blocked.
|
||||
let _g = span.entered();
|
||||
let new_delta =
|
||||
Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
|
||||
|
||||
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
//
|
||||
// We use fatal_err() below because the after write_to_disk returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
par_fsync::par_fsync(&[self_clone
|
||||
.conf
|
||||
.timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
|
||||
.fatal_err("fsync of timeline dir");
|
||||
|
||||
anyhow::Ok(new_delta)
|
||||
Handle::current().block_on(
|
||||
async move {
|
||||
let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?;
|
||||
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
//
|
||||
// We use fatal_err() below because the after write_to_disk returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
let timeline_dir =
|
||||
VirtualFile::open(&self_clone.conf.timeline_path(
|
||||
&self_clone.tenant_shard_id,
|
||||
&self_clone.timeline_id,
|
||||
))
|
||||
.await
|
||||
.fatal_err("VirtualFile::open for timeline dir fsync");
|
||||
timeline_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.fatal_err("VirtualFile::sync_all timeline dir");
|
||||
anyhow::Ok(new_delta)
|
||||
}
|
||||
.instrument(span),
|
||||
)
|
||||
}
|
||||
})
|
||||
.await
|
||||
@@ -3672,11 +3674,17 @@ impl Timeline {
|
||||
// We use fatal_err() below because the after writer.finish() returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
par_fsync::par_fsync_async(&[self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id)])
|
||||
let timeline_dir = VirtualFile::open(
|
||||
&self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
|
||||
)
|
||||
.await
|
||||
.fatal_err("fsync of timeline dir");
|
||||
.fatal_err("VirtualFile::open for timeline dir fsync");
|
||||
timeline_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.fatal_err("VirtualFile::sync_all timeline dir");
|
||||
}
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
@@ -4265,12 +4273,17 @@ impl Timeline {
|
||||
// We use fatal_err() below because the after writer.finish() returns with success,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
let timeline_dir = self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id);
|
||||
par_fsync::par_fsync_async(&[timeline_dir])
|
||||
let timeline_dir = VirtualFile::open(
|
||||
&self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
|
||||
)
|
||||
.await
|
||||
.fatal_err("VirtualFile::open for timeline dir fsync");
|
||||
timeline_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.fatal_err("fsync of timeline dir");
|
||||
.fatal_err("VirtualFile::sync_all timeline dir");
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
|
||||
|
||||
Reference in New Issue
Block a user