From 4d69192ae5039cc62d0acb3a74708494af980eda Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 29 Aug 2023 11:33:28 +0200 Subject: [PATCH] QUICK HACK: rip out virtual file cache --- pageserver/ctl/src/layer_map_analyzer.rs | 1 - pageserver/ctl/src/layers.rs | 3 +- pageserver/ctl/src/main.rs | 2 - pageserver/src/bin/pageserver.rs | 2 - pageserver/src/tenant/ephemeral_file.rs | 3 +- pageserver/src/virtual_file.rs | 535 +---------------------- 6 files changed, 7 insertions(+), 539 deletions(-) diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 29bd6ce598..daf529b4e7 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -136,7 +136,6 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES); // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. - pageserver::virtual_file::init(10); pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 2af54902f7..cdd6861afe 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -2,10 +2,10 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use clap::Subcommand; +use pageserver::page_cache; use pageserver::tenant::block_io::BlockCursor; use pageserver::tenant::disk_btree::DiskBtreeReader; use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary}; -use pageserver::{page_cache, virtual_file}; use pageserver::{ repository::{Key, KEY_SIZE}, tenant::{ @@ -45,7 +45,6 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef) -> Result<()> { let path = path.as_ref(); - virtual_file::init(10); page_cache::init(100); let file = FileBlockReader::new(VirtualFile::open(path)?); let summary_blk = file.read_blk(0).await?; diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 0d154aca0c..515bb6eb8c 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -15,7 +15,6 @@ use pageserver::{ page_cache, task_mgr::TaskKind, tenant::{dump_layerfile_from_path, metadata::TimelineMetadata}, - virtual_file, }; use postgres_ffi::ControlFileData; use std::path::{Path, PathBuf}; @@ -116,7 +115,6 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup - virtual_file::init(10); page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 71e3a0ff3f..e1e32315ec 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -24,7 +24,6 @@ use pageserver::{ task_mgr::TaskKind, task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, tenant::mgr, - virtual_file, }; use postgres_backend::AuthType; use utils::logging::TracingErrorLayerEnablement; @@ -125,7 +124,6 @@ fn main() -> anyhow::Result<()> { let scenario = pageserver::failpoint_support::init(); // Basic initialization of things that don't change after startup - virtual_file::init(conf.max_file_descriptors); page_cache::init(conf.page_cache_size); start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index fbb1b6a0bf..02ef7166e5 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -87,8 +87,7 @@ impl EphemeralFile { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); self.file - .read_exact_at_async(&mut buf[..], blknum as u64 * PAGE_SZ as u64) - .await?; + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?; write_guard.mark_valid(); // Swap for read lock diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index bce74d3872..7b12cc89ac 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -11,14 +11,12 @@ //! src/backend/storage/file/fd.c //! use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; -use once_cell::sync::OnceCell; + use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; -use std::ops::Deref; + use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally @@ -40,7 +38,7 @@ pub struct VirtualFile { /// Lazy handle to the global file descriptor cache. The slot that this points to /// might contain our File, or it may be empty, or it may contain a File that /// belongs to a different VirtualFile. - handle: RwLock, + handle: File, /// Current file position pos: u64, @@ -52,7 +50,6 @@ pub struct VirtualFile { /// opened, in the VirtualFile::create() function, and strip the flag before /// storing it here. pub path: PathBuf, - open_options: OpenOptions, // These are strings becase we only use them for metrics, and those expect strings. // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into @@ -61,118 +58,6 @@ pub struct VirtualFile { timeline_id: String, } -#[derive(Debug, PartialEq, Clone, Copy)] -struct SlotHandle { - /// Index into OPEN_FILES.slots - index: usize, - - /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has - /// been recycled and no longer contains the FD for this virtual file. - tag: u64, -} - -/// OPEN_FILES is the global array that holds the physical file descriptors that -/// are currently open. Each slot in the array is protected by a separate lock, -/// so that different files can be accessed independently. The lock must be held -/// in write mode to replace the slot with a different file, but a read mode -/// is enough to operate on the file, whether you're reading or writing to it. -/// -/// OPEN_FILES starts in uninitialized state, and it's initialized by -/// the virtual_file::init() function. It must be called exactly once at page -/// server startup. -static OPEN_FILES: OnceCell = OnceCell::new(); - -struct OpenFiles { - slots: &'static [Slot], - - /// clock arm for the clock algorithm - next: AtomicUsize, -} - -struct Slot { - inner: tokio::sync::RwLock, - - /// has this file been used since last clock sweep? - recently_used: AtomicBool, -} - -struct SlotInner { - /// Counter that's incremented every time a different file is stored here. - /// To avoid the ABA problem. - tag: u64, - - /// the underlying file - file: Option, -} - -impl OpenFiles { - /// Find a slot to use, evicting an existing file descriptor if needed. - /// - /// On return, we hold a lock on the slot, and its 'tag' has been updated - /// recently_used has been set. It's all ready for reuse. - fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { - // - // Run the clock algorithm to find a slot to replace. - // - let num_slots = self.slots.len(); - let mut retries = 0; - let mut slot; - let mut slot_guard; - let index; - loop { - let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots; - slot = &self.slots[next]; - - // If the recently_used flag on this slot is set, continue the clock - // sweep. Otherwise try to use this slot. If we cannot acquire the - // lock, also continue the clock sweep. - // - // We only continue in this manner for a while, though. If we loop - // through the array twice without finding a victim, just pick the - // next slot and wait until we can reuse it. This way, we avoid - // spinning in the extreme case that all the slots are busy with an - // I/O operation. - if retries < num_slots * 2 { - if !slot.recently_used.swap(false, Ordering::Release) { - if let Ok(guard) = slot.inner.try_write() { - slot_guard = guard; - index = next; - break; - } - } - retries += 1; - } else { - slot_guard = slot.inner.write().unwrap(); - index = next; - break; - } - } - - // - // We now have the victim slot locked. If it was in use previously, close the - // old file. - // - if let Some(old_file) = slot_guard.file.take() { - // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to - // distinguish the two. - STORAGE_IO_TIME - .with_label_values(&["close-by-replace"]) - .observe_closure_duration(|| drop(old_file)); - } - - // Prepare the slot for reuse and return it - slot_guard.tag += 1; - slot.recently_used.store(true, Ordering::Relaxed); - ( - SlotHandle { - index, - tag: slot_guard.tag, - }, - slot_guard, - ) - } -} - impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub fn open(path: &Path) -> Result { @@ -208,7 +93,6 @@ impl VirtualFile { tenant_id = "*".to_string(); timeline_id = "*".to_string(); } - let (handle, mut slot_guard) = get_open_files().find_victim_slot(); let file = STORAGE_IO_TIME .with_label_values(&["open"]) .observe_closure_duration(|| open_options.open(path))?; @@ -224,16 +108,13 @@ impl VirtualFile { reopen_options.truncate(false); let vfile = VirtualFile { - handle: RwLock::new(handle), + handle: file, pos: 0, path: path.to_path_buf(), - open_options: reopen_options, tenant_id, timeline_id, }; - slot_guard.file.replace(file); - Ok(vfile) } @@ -247,23 +128,6 @@ impl VirtualFile { } } -/// The function that constructs the guard guarantees that the respective [`SlotInner::file`] is `Some`. -enum FileSlotGuard { - Write(tokio::sync::RwLockWriteGuard<'static, SlotInner>), - Read(tokio::sync::RwLockReadGuard<'static, SlotInner>), -} - -impl Deref for FileSlotGuard { - type Target = File; - - fn deref(&self) -> &Self::Target { - match self { - FileSlotGuard::Write(guard) => guard.file.as_ref().unwrap(), - FileSlotGuard::Read(guard) => guard.file.as_ref().unwrap(), - } - } -} - impl VirtualFile { /// Helper function that looks up the underlying File for this VirtualFile, /// opening it and evicting some other File if necessary. It calls 'func' @@ -272,85 +136,9 @@ impl VirtualFile { where F: FnMut(&File) -> R, { - let guard = self.get_file_guard()?; - return Ok(STORAGE_IO_TIME .with_label_values(&[op]) - .observe_closure_duration(|| func(&guard))); - } - - /// Like with_file, but, takes a func that returns a future. - async fn with_file_async(&self, op: &str, mut factory: F) -> Result - where - F: FnOnce(FileSlotGuard) -> Fut, - Fut: std::future::Future, - { - let guard = self.get_file_guard()?; - - let start = std::time::Instant::now(); - let res = factory(guard).await; - STORAGE_IO_TIME - .with_label_values(&[op]) - .observe(start.elapsed().as_secs_f64()); - Ok(res) - } - - fn get_file_guard(&self) -> Result { - let open_files = get_open_files(); - - let mut handle_guard = { - // Read the cached slot handle, and see if the slot that it points to still - // contains our File. - // - // We only need to hold the handle lock while we read the current handle. If - // another thread closes the file and recycles the slot for a different file, - // we will notice that the handle we read is no longer valid and retry. - let mut handle = *self.handle.read().unwrap(); - loop { - // Check if the slot contains our File - { - let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().unwrap(); - if slot_guard.tag == handle.tag { - if let Some(_) = &slot_guard.file { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - return Ok(FileSlotGuard::Read(slot_guard)); - } - } - } - - // The slot didn't contain our File. We will have to open it ourselves, - // but before that, grab a write lock on handle in the VirtualFile, so - // that no other thread will try to concurrently open the same file. - let handle_guard = self.handle.write().unwrap(); - - // If another thread changed the handle while we were not holding the lock, - // then the handle might now be valid again. Loop back to retry. - if *handle_guard != handle { - handle = *handle_guard; - continue; - } - break handle_guard; - } - }; - - // We need to open the file ourselves. The handle in the VirtualFile is - // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot(); - - // Open the physical file - let file = STORAGE_IO_TIME - .with_label_values(&["open"]) - .observe_closure_duration(|| self.open_options.open(&self.path))?; - - // Store the File in the slot and update the handle in the VirtualFile - // to point to it. - slot_guard.file.replace(file); - - *handle_guard = handle; - - Ok(FileSlotGuard::Write(slot_guard)) + .observe_closure_duration(|| func(&self.handle))); } pub fn remove(self) { @@ -360,26 +148,6 @@ impl VirtualFile { } } -impl Drop for VirtualFile { - /// If a VirtualFile is dropped, close the underlying file if it was open. - fn drop(&mut self) { - let handle = self.handle.get_mut().unwrap(); - - // We could check with a read-lock first, to avoid waiting on an - // unrelated I/O. - let slot = &get_open_files().slots[handle.index]; - let mut slot_guard = slot.inner.write().unwrap(); - if slot_guard.tag == handle.tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also operation "close-by-replace" for closes done on eviction for - // comparison. - STORAGE_IO_TIME - .with_label_values(&["close"]) - .observe_closure_duration(|| drop(slot_guard.file.take())); - } - } -} - impl Write for VirtualFile { fn write(&mut self, buf: &[u8]) -> Result { let pos = self.pos; @@ -442,31 +210,6 @@ impl VirtualFile { Ok(()) } - // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at_async( - &self, - mut buf: &mut [u8], - mut offset: u64, - ) -> Result<(), Error> { - while !buf.is_empty() { - match self.read_at_async(buf, offset).await { - Ok(0) => { - return Err(Error::new( - std::io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )) - } - Ok(n) => { - buf = &mut buf[n..]; - offset += n as u64; - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - } - Ok(()) - } - // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { @@ -498,21 +241,6 @@ impl VirtualFile { result } - async fn read_at_async(&self, buf: &mut [u8], offset: u64) -> Result { - let result = self - .with_file_async("read", |file| async move { - // todo: use async IO here - file.read_at(buf, offset) - }) - .await?; - if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) - .add(size as i64); - } - result - } - pub fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = self.with_file("write", |file| file.write_at(buf, offset))?; if let Ok(size) = result { @@ -523,256 +251,3 @@ impl VirtualFile { result } } - -impl OpenFiles { - fn new(num_slots: usize) -> OpenFiles { - let mut slots = Box::new(Vec::with_capacity(num_slots)); - for _ in 0..num_slots { - let slot = Slot { - recently_used: AtomicBool::new(false), - inner: RwLock::new(SlotInner { tag: 0, file: None }), - }; - slots.push(slot); - } - - OpenFiles { - next: AtomicUsize::new(0), - slots: Box::leak(slots), - } - } -} - -/// -/// Initialize the virtual file module. This must be called once at page -/// server startup. -/// -pub fn init(num_slots: usize) { - if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() { - panic!("virtual_file::init called twice"); - } -} - -const TEST_MAX_FILE_DESCRIPTORS: usize = 10; - -// Get a handle to the global slots array. -fn get_open_files() -> &'static OpenFiles { - // - // In unit tests, page server startup doesn't happen and no one calls - // virtual_file::init(). Initialize it here, with a small array. - // - // This applies to the virtual file tests below, but all other unit - // tests too, so the virtual file facility is always usable in - // unit tests. - // - if cfg!(test) { - OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS)) - } else { - OPEN_FILES.get().expect("virtual_file::init not called yet") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use rand::seq::SliceRandom; - use rand::thread_rng; - use rand::Rng; - use std::sync::Arc; - use std::thread; - - // Helper function to slurp contents of a file, starting at the current position, - // into a string - fn read_string(vfile: &mut FD) -> Result - where - FD: Read, - { - let mut buf = String::new(); - vfile.read_to_string(&mut buf)?; - Ok(buf) - } - - // Helper function to slurp a portion of a file into a string - fn read_string_at(vfile: &mut FD, pos: u64, len: usize) -> Result - where - FD: FileExt, - { - let mut buf = Vec::new(); - buf.resize(len, 0); - vfile.read_exact_at(&mut buf, pos)?; - Ok(String::from_utf8(buf).unwrap()) - } - - #[test] - fn test_virtual_files() -> Result<(), Error> { - // The real work is done in the test_files() helper function. This - // allows us to run the same set of tests against a native File, and - // VirtualFile. We trust the native Files and wouldn't need to test them, - // but this allows us to verify that the operations return the same - // results with VirtualFiles as with native Files. (Except that with - // native files, you will run out of file descriptors if the ulimit - // is low enough.) - test_files("virtual_files", |path, open_options| { - VirtualFile::open_with_options(path, open_options) - }) - } - - #[test] - fn test_physical_files() -> Result<(), Error> { - test_files("physical_files", |path, open_options| { - open_options.open(path) - }) - } - - fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> - where - FD: Read + Write + Seek + FileExt, - OF: Fn(&Path, &OpenOptions) -> Result, - { - let testdir = crate::config::PageServerConf::test_repo_dir(testname); - std::fs::create_dir_all(&testdir)?; - - let path_a = testdir.join("file_a"); - let mut file_a = openfunc( - &path_a, - OpenOptions::new().write(true).create(true).truncate(true), - )?; - file_a.write_all(b"foobar")?; - - // cannot read from a file opened in write-only mode - assert!(read_string(&mut file_a).is_err()); - - // Close the file and re-open for reading - let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; - - // cannot write to a file opened in read-only mode - assert!(file_a.write(b"bar").is_err()); - - // Try simple read - assert_eq!("foobar", read_string(&mut file_a)?); - - // It's positioned at the EOF now. - assert_eq!("", read_string(&mut file_a)?); - - // Test seeks. - assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); - assert_eq!("oobar", read_string(&mut file_a)?); - - assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4); - assert_eq!("ar", read_string(&mut file_a)?); - - assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); - assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3); - assert_eq!("bar", read_string(&mut file_a)?); - - assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1); - assert_eq!("oobar", read_string(&mut file_a)?); - - // Test erroneous seeks to before byte 0 - assert!(file_a.seek(SeekFrom::End(-7)).is_err()); - assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); - assert!(file_a.seek(SeekFrom::Current(-2)).is_err()); - - // the erroneous seek should have left the position unchanged - assert_eq!("oobar", read_string(&mut file_a)?); - - // Create another test file, and try FileExt functions on it. - let path_b = testdir.join("file_b"); - let mut file_b = openfunc( - &path_b, - OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true), - )?; - file_b.write_all_at(b"BAR", 3)?; - file_b.write_all_at(b"FOO", 0)?; - - assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA"); - - // Open a lot of files, enough to cause some evictions. (Or to be precise, - // open the same file many times. The effect is the same.) - // - // leave file_a positioned at offset 1 before we start - assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); - - let mut vfiles = Vec::new(); - for _ in 0..100 { - let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?; - assert_eq!("FOOBAR", read_string(&mut vfile)?); - vfiles.push(vfile); - } - - // make sure we opened enough files to definitely cause evictions. - assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2); - - // The underlying file descriptor for 'file_a' should be closed now. Try to read - // from it again. We left the file positioned at offset 1 above. - assert_eq!("oobar", read_string(&mut file_a)?); - - // Check that all the other FDs still work too. Use them in random order for - // good measure. - vfiles.as_mut_slice().shuffle(&mut thread_rng()); - for vfile in vfiles.iter_mut() { - assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?); - } - - Ok(()) - } - - /// Test using VirtualFiles from many threads concurrently. This tests both using - /// a lot of VirtualFiles concurrently, causing evictions, and also using the same - /// VirtualFile from multiple threads concurrently. - #[test] - fn test_vfile_concurrency() -> Result<(), Error> { - const SIZE: usize = 8 * 1024; - const VIRTUAL_FILES: usize = 100; - const THREADS: usize = 100; - const SAMPLE: [u8; SIZE] = [0xADu8; SIZE]; - - let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency"); - std::fs::create_dir_all(&testdir)?; - - // Create a test file. - let test_file_path = testdir.join("concurrency_test_file"); - { - let file = File::create(&test_file_path)?; - file.write_all_at(&SAMPLE, 0)?; - } - - // Open the file many times. - let mut files = Vec::new(); - for _ in 0..VIRTUAL_FILES { - let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?; - files.push(f); - } - let files = Arc::new(files); - - // Launch many threads, and use the virtual files concurrently in random order. - let mut threads = Vec::new(); - for threadno in 0..THREADS { - let builder = - thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno)); - - let files = files.clone(); - let thread = builder - .spawn(move || { - let mut buf = [0u8; SIZE]; - let mut rng = rand::thread_rng(); - for _ in 1..1000 { - let f = &files[rng.gen_range(0..files.len())]; - f.read_exact_at(&mut buf, 0).unwrap(); - assert!(buf == SAMPLE); - } - }) - .unwrap(); - threads.push(thread); - } - - for thread in threads { - thread.join().unwrap(); - } - - Ok(()) - } -}