diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 5f7ace0a32..9ec72c40cc 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -225,38 +225,42 @@ impl VirtualFile { { let open_files = get_open_files(); - // Read the cached slot handle, and see if the slot that it points to still - // contains our File. - // - // We only need to hold the lock while we read the current handle. If - // another thread closes the file and recycles the slot for a different file, - // we will notice that the handle we read is no longer valid and retry. - let mut handle_guard; - let mut handle = *self.handle.read().unwrap(); - loop { - let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().unwrap(); - if slot_guard.tag == handle.tag { - if let Some(file) = &slot_guard.file { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - return Ok(func(file)); + let mut handle_guard = { + // Read the cached slot handle, and see if the slot that it points to still + // contains our File. + // + // We only need to hold the handle lock while we read the current handle. If + // another thread closes the file and recycles the slot for a different file, + // we will notice that the handle we read is no longer valid and retry. + let mut handle = *self.handle.read().unwrap(); + loop { + // Check if the slot contains our File + { + let slot = &open_files.slots[handle.index]; + let slot_guard = slot.inner.read().unwrap(); + if slot_guard.tag == handle.tag { + if let Some(file) = &slot_guard.file { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + return Ok(func(file)); + } + } } - } - // The slot didn't contain our File. Grab a write lock on handle in - // the VirtualFile, so that no other thread will try to concurrently - // open the same file. - handle_guard = self.handle.write().unwrap(); + // The slot didn't contain our File. We will have to open it ourselves, + // but before that, grab a write lock on handle in the VirtualFile, so + // that no other thread will try to concurrently open the same file. + let handle_guard = self.handle.write().unwrap(); - // Check if some other thread already did it while we were not - // holding the lock. - if *handle_guard != handle { - handle = *handle_guard; - continue; + // If another thread changed the handle while we were not holding the lock, + // then the handle might now be valid again. Loop back to retry. + if *handle_guard != handle { + handle = *handle_guard; + continue; + } + break handle_guard; } - break; - } + }; // We need to open the file ourselves. The handle in the VirtualFile is // now locked in write-mode. Find a free slot to put it in. @@ -413,6 +417,9 @@ mod tests { use super::*; use rand::seq::SliceRandom; use rand::thread_rng; + use rand::Rng; + use std::sync::Arc; + use std::thread; // Helper function to slurp contents of a file, starting at the current position, // into a string @@ -553,4 +560,60 @@ mod tests { Ok(()) } + + /// Test using VirtualFiles from many threads concurrently. This tests both using + /// a lot of VirtualFiles concurrently, causing evictions, and also using the same + /// VirtualFile from multiple threads concurrently. + #[test] + fn test_vfile_concurrency() -> Result<(), Error> { + const SIZE: usize = 8 * 1024; + const VIRTUAL_FILES: usize = 100; + const THREADS: usize = 100; + const SAMPLE: [u8; SIZE] = [0xADu8; SIZE]; + + let testdir = crate::PageServerConf::test_repo_dir("vfile_concurrency"); + std::fs::create_dir_all(&testdir)?; + + // Create a test file. + let test_file_path = testdir.join("concurrency_test_file"); + { + let file = File::create(&test_file_path)?; + file.write_all_at(&SAMPLE, 0)?; + } + + // Open the file many times. + let mut files = Vec::new(); + for _ in 0..VIRTUAL_FILES { + let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?; + files.push(f); + } + let files = Arc::new(files); + + // Launch many threads, and use the virtual files concurrently in random order. + let mut threads = Vec::new(); + for threadno in 0..THREADS { + let builder = + thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno)); + + let files = files.clone(); + let thread = builder + .spawn(move || { + let mut buf = [0u8; SIZE]; + let mut rng = rand::thread_rng(); + for _ in 1..1000 { + let f = &files[rng.gen_range(0..files.len())]; + f.read_exact_at(&mut buf, 0).unwrap(); + assert!(buf == SAMPLE); + } + }) + .unwrap(); + threads.push(thread); + } + + for thread in threads { + thread.join().unwrap(); + } + + Ok(()) + } }