mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 17:10:38 +00:00
## Problem Once we use async file system APIs for `VirtualFile`, these functions will also need to be async fn. ## Summary of changes Makes the functions `open, open_with_options, create,sync_all,with_file` of `VirtualFile` async fn, including all functions that call it. Like in the prior PRs, the actual I/O operations are not using async APIs yet, as per request in the #4743 epic. We switch towards not using `VirtualFile` in the par_fsync module, hopefully this is only temporary until we can actually do fully async I/O in `VirtualFile`. This might cause us to exhaust fd limits in the tests, but it should only be an issue for the local developer as we have high ulimits in prod. This PR is a follow-up of #5189, #5190, #5195, and #5203. Part of #4743.
85 lines
2.6 KiB
Rust
85 lines
2.6 KiB
Rust
use std::{
|
|
io,
|
|
path::{Path, PathBuf},
|
|
sync::atomic::{AtomicUsize, Ordering},
|
|
};
|
|
|
|
fn fsync_path(path: &Path) -> io::Result<()> {
|
|
// TODO use VirtualFile::fsync_all once we fully go async.
|
|
let file = std::fs::File::open(path)?;
|
|
file.sync_all()
|
|
}
|
|
|
|
fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
|
|
while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
|
|
fsync_path(path)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
|
|
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
|
|
|
|
/// Use at most this number of threads.
|
|
/// Increasing this limit will
|
|
/// - use more memory
|
|
/// - increase the cost of spawn/join latency
|
|
const MAX_NUM_THREADS: usize = 64;
|
|
let num_threads = paths.len().min(MAX_NUM_THREADS);
|
|
let next_path_idx = AtomicUsize::new(0);
|
|
|
|
std::thread::scope(|s| -> io::Result<()> {
|
|
let mut handles = vec![];
|
|
// Spawn `num_threads - 1`, as the current thread is also a worker.
|
|
for _ in 1..num_threads {
|
|
handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
|
|
}
|
|
|
|
parallel_worker(paths, &next_path_idx)?;
|
|
|
|
for handle in handles {
|
|
handle.join().unwrap()?;
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
|
|
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
|
|
if paths.len() == 1 {
|
|
fsync_path(&paths[0])?;
|
|
return Ok(());
|
|
}
|
|
|
|
fsync_in_thread_pool(paths)
|
|
}
|
|
|
|
/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
|
|
/// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
|
|
pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> {
|
|
const MAX_CONCURRENT_FSYNC: usize = 64;
|
|
let mut next = paths.iter().peekable();
|
|
let mut js = tokio::task::JoinSet::new();
|
|
loop {
|
|
while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
|
|
let next = next.next().expect("just peeked");
|
|
let next = next.to_owned();
|
|
js.spawn_blocking(move || fsync_path(&next));
|
|
}
|
|
|
|
// now the joinset has been filled up, wait for next to complete
|
|
if let Some(res) = js.join_next().await {
|
|
res??;
|
|
} else {
|
|
// last item had already completed
|
|
assert!(
|
|
next.peek().is_none(),
|
|
"joinset emptied, we shouldn't have more work"
|
|
);
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|