mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 03:20:36 +00:00
Implement a new `struct Layer` abstraction which manages downloadness internally, requiring no LayerMap locking or rewriting to download or evict providing a property "you have a layer, you can read it". The new `struct Layer` provides ability to keep the file resident via a RAII structure for new layers which still need to be uploaded. Previous solution solved this `RemoteTimelineClient::wait_completion` which lead to bugs like #5639. Evicting or the final local deletion after garbage collection is done using Arc'd value `Drop`. With a single `struct Layer` the closed open ended `trait Layer`, `trait PersistentLayer` and `struct RemoteLayer` are removed following noting that compaction could be simplified by simply not using any of the traits in between: #4839. The new `struct Layer` is a preliminary to remove `Timeline::layer_removal_cs` documented in #4745. Preliminaries: #4936, #4937, #5013, #5014, #5022, #5033, #5044, #5058, #5059, #5061, #5074, #5103, epic #5172, #5645, #5649. Related split off: #5057, #5134.
85 lines
2.5 KiB
Rust
85 lines
2.5 KiB
Rust
use std::{
|
|
io,
|
|
sync::atomic::{AtomicUsize, Ordering},
|
|
};
|
|
|
|
use camino::{Utf8Path, Utf8PathBuf};
|
|
|
|
fn fsync_path(path: &Utf8Path) -> io::Result<()> {
|
|
// TODO use VirtualFile::fsync_all once we fully go async.
|
|
let file = std::fs::File::open(path)?;
|
|
file.sync_all()
|
|
}
|
|
|
|
fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
|
|
while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
|
|
fsync_path(path)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
|
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
|
|
|
|
/// Use at most this number of threads.
|
|
/// Increasing this limit will
|
|
/// - use more memory
|
|
/// - increase the cost of spawn/join latency
|
|
const MAX_NUM_THREADS: usize = 64;
|
|
let num_threads = paths.len().min(MAX_NUM_THREADS);
|
|
let next_path_idx = AtomicUsize::new(0);
|
|
|
|
std::thread::scope(|s| -> io::Result<()> {
|
|
let mut handles = vec![];
|
|
// Spawn `num_threads - 1`, as the current thread is also a worker.
|
|
for _ in 1..num_threads {
|
|
handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx)));
|
|
}
|
|
|
|
parallel_worker(paths, &next_path_idx)?;
|
|
|
|
for handle in handles {
|
|
handle.join().unwrap()?;
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
|
|
pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
|
if paths.len() == 1 {
|
|
fsync_path(&paths[0])?;
|
|
return Ok(());
|
|
}
|
|
|
|
fsync_in_thread_pool(paths)
|
|
}
|
|
|
|
/// Parallel fsync asynchronously.
|
|
pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
|
|
const MAX_CONCURRENT_FSYNC: usize = 64;
|
|
let mut next = paths.iter().peekable();
|
|
let mut js = tokio::task::JoinSet::new();
|
|
loop {
|
|
while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() {
|
|
let next = next.next().expect("just peeked");
|
|
let next = next.to_owned();
|
|
js.spawn_blocking(move || fsync_path(&next));
|
|
}
|
|
|
|
// now the joinset has been filled up, wait for next to complete
|
|
if let Some(res) = js.join_next().await {
|
|
res??;
|
|
} else {
|
|
// last item had already completed
|
|
assert!(
|
|
next.peek().is_none(),
|
|
"joinset emptied, we shouldn't have more work"
|
|
);
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|