diff --git a/Cargo.lock b/Cargo.lock index 84e91449a3..a7b90e18b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -208,8 +208,7 @@ dependencies = [ [[package]] name = "bookfile" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efa3e2086414e1bbecbc10730f265e5b079ab4ea0b830e7219a70dab6471e753" +source = "git+https://github.com/zenithdb/bookfile.git?branch=generic-readext#d51a99c7a0be48c3d9cc7cb85c9b7fb05ce1100c" dependencies = [ "aversion", "byteorder", @@ -1218,6 +1217,7 @@ dependencies = [ "lazy_static", "log", "nix", + "once_cell", "postgres", "postgres-protocol", "postgres-types", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index f0f1230dfa..2e4cbd9877 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Stas Kelvich "] edition = "2018" [dependencies] -bookfile = "^0.3" +bookfile = { git = "https://github.com/zenithdb/bookfile.git", branch="generic-readext" } chrono = "0.4.19" rand = "0.8.3" regex = "1.4.5" @@ -39,6 +39,7 @@ tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } url = "2" nix = "0.23" +once_cell = "1.8.0" postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 55f3258f6c..eb9706ba9e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -26,8 +26,8 @@ use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; use pageserver::{ - branches, defaults::*, http, page_service, remote_storage, tenant_mgr, PageServerConf, - RemoteStorageConfig, RemoteStorageKind, S3Config, LOG_FILE_NAME, + branches, defaults::*, http, page_service, remote_storage, tenant_mgr, virtual_file, + PageServerConf, RemoteStorageConfig, RemoteStorageKind, S3Config, LOG_FILE_NAME, }; use zenith_utils::http::endpoint; use zenith_utils::postgres_backend; @@ -44,6 +44,7 @@ struct CfgFileParams { gc_horizon: Option, gc_period: Option, open_mem_limit: Option, + max_file_descriptors: Option, pg_distrib_dir: Option, auth_validation_public_key_path: Option, auth_type: Option, @@ -106,6 +107,7 @@ impl CfgFileParams { gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), open_mem_limit: get_arg("open_mem_limit"), + max_file_descriptors: get_arg("max_file_descriptors"), pg_distrib_dir: get_arg("postgres-distrib"), auth_validation_public_key_path: get_arg("auth-validation-public-key-path"), auth_type: get_arg("auth-type"), @@ -125,6 +127,7 @@ impl CfgFileParams { gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), open_mem_limit: self.open_mem_limit.or(other.open_mem_limit), + max_file_descriptors: self.max_file_descriptors.or(other.max_file_descriptors), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), auth_validation_public_key_path: self .auth_validation_public_key_path @@ -174,6 +177,11 @@ impl CfgFileParams { None => DEFAULT_OPEN_MEM_LIMIT, }; + let max_file_descriptors: usize = match self.max_file_descriptors.as_ref() { + Some(max_file_descriptors_str) => max_file_descriptors_str.parse()?, + None => DEFAULT_MAX_FILE_DESCRIPTORS, + }; + let pg_distrib_dir = match self.pg_distrib_dir.as_ref() { Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str), None => env::current_dir()?.join("tmp_install"), @@ -244,6 +252,7 @@ impl CfgFileParams { gc_horizon, gc_period, open_mem_limit, + max_file_descriptors, superuser: String::from(DEFAULT_SUPERUSER), @@ -321,6 +330,12 @@ fn main() -> Result<()> { .takes_value(true) .help("Amount of memory reserved for buffering incoming WAL"), ) + .arg( + Arg::with_name("max_file_descriptors") + .long("max_file_descriptors") + .takes_value(true) + .help("Max number of file descriptors to keep open for files"), + ) .arg( Arg::with_name("workdir") .short("D") @@ -452,6 +467,9 @@ fn main() -> Result<()> { // as a ref. let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + // Basic initialization of things that don't change after startup + virtual_file::init(conf.max_file_descriptors); + // Create repo and exit if init was requested if init { branches::init_pageserver(conf, create_tenant).context("Failed to init pageserver")?; @@ -618,6 +636,7 @@ mod tests { gc_horizon: Some("gc_horizon_VALUE".to_string()), gc_period: Some("gc_period_VALUE".to_string()), open_mem_limit: Some("open_mem_limit_VALUE".to_string()), + max_file_descriptors: Some("max_file_descriptors_VALUE".to_string()), pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()), auth_validation_public_key_path: Some( "auth_validation_public_key_path_VALUE".to_string(), @@ -642,6 +661,7 @@ checkpoint_period = 'checkpoint_period_VALUE' gc_horizon = 'gc_horizon_VALUE' gc_period = 'gc_period_VALUE' open_mem_limit = 'open_mem_limit_VALUE' +max_file_descriptors = 'max_file_descriptors_VALUE' pg_distrib_dir = 'pg_distrib_dir_VALUE' auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE' auth_type = 'auth_type_VALUE' @@ -677,6 +697,7 @@ local_path = 'remote_storage_local_VALUE' gc_horizon: Some("gc_horizon_VALUE".to_string()), gc_period: Some("gc_period_VALUE".to_string()), open_mem_limit: Some("open_mem_limit_VALUE".to_string()), + max_file_descriptors: Some("max_file_descriptors_VALUE".to_string()), pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()), auth_validation_public_key_path: Some( "auth_validation_public_key_path_VALUE".to_string(), @@ -704,6 +725,7 @@ checkpoint_period = 'checkpoint_period_VALUE' gc_horizon = 'gc_horizon_VALUE' gc_period = 'gc_period_VALUE' open_mem_limit = 'open_mem_limit_VALUE' +max_file_descriptors = 'max_file_descriptors_VALUE' pg_distrib_dir = 'pg_distrib_dir_VALUE' auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE' auth_type = 'auth_type_VALUE' diff --git a/pageserver/src/layered_repository/blob.rs b/pageserver/src/layered_repository/blob.rs index b7c7c3f460..592e279f4f 100644 --- a/pageserver/src/layered_repository/blob.rs +++ b/pageserver/src/layered_repository/blob.rs @@ -1,4 +1,5 @@ -use std::{fs::File, io::Write}; +use std::io::Write; +use std::os::unix::prelude::FileExt; use anyhow::Result; use bookfile::{BookWriter, BoundedReader, ChapterId, ChapterWriter}; @@ -10,7 +11,7 @@ pub struct BlobRange { size: usize, } -pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result> { +pub fn read_blob(reader: &BoundedReader<&'_ F>, range: &BlobRange) -> Result> { let mut buf = vec![0u8; range.size]; reader.read_exact_at(&mut buf, range.offset)?; Ok(buf) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 4e10a284ac..89a338ccbe 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -42,6 +42,7 @@ use crate::layered_repository::filename::{DeltaFileName, PathOrConf}; use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, }; +use crate::virtual_file::VirtualFile; use crate::waldecoder; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; @@ -53,7 +54,6 @@ use zenith_utils::vec_map::VecMap; // while being able to use std::fmt::Write's methods use std::fmt::Write as _; use std::fs; -use std::fs::File; use std::io::{BufWriter, Write}; use std::ops::Bound::Included; use std::path::{Path, PathBuf}; @@ -139,6 +139,8 @@ pub struct DeltaLayerInner { /// loaded into memory yet. loaded: bool, + book: Option>, + /// All versions of all pages in the file are are kept here. /// Indexed by block number and LSN. page_version_metas: VecMap<(u32, Lsn), BlobRange>, @@ -189,10 +191,12 @@ impl Layer for DeltaLayer { { // Open the file and lock the metadata in memory - // TODO: avoid opening the file for each read - let (_path, book) = self.open_book()?; - let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?; let inner = self.load()?; + let page_version_reader = inner + .book + .as_ref() + .unwrap() + .chapter_reader(PAGE_VERSIONS_CHAPTER)?; // Scan the metadata BTreeMap backwards, starting from the given entry. let minkey = (blknum, Lsn(0)); @@ -303,7 +307,11 @@ impl Layer for DeltaLayer { println!(" {}: {}", k, v); } println!("--- page versions ---"); - let (_path, book) = self.open_book()?; + + let path = self.path(); + let file = std::fs::File::open(&path)?; + let book = Book::new(file)?; + let chapter = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?; for ((blk, lsn), blob_range) in inner.page_version_metas.as_slice() { let mut desc = String::new(); @@ -382,18 +390,23 @@ impl DeltaLayer { dropped, inner: Mutex::new(DeltaLayerInner { loaded: true, + book: None, page_version_metas: VecMap::default(), relsizes, }), }; let mut inner = delta_layer.inner.lock().unwrap(); - // Write the in-memory btreemaps into a file - let path = delta_layer.path(); - + // Write the data into a file + // + // Note: Because we open the file in write-only mode, we cannot + // reuse the same VirtualFile for reading later. That's why we don't + // set inner.book here. The first read will have to re-open it. + // // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? - let file = File::create(&path)?; + let path = delta_layer.path(); + let file = VirtualFile::create(&path)?; let buf_writer = BufWriter::new(file); let book = BookWriter::new(buf_writer, DELTA_FILE_MAGIC)?; @@ -448,15 +461,6 @@ impl DeltaLayer { Ok(delta_layer) } - fn open_book(&self) -> Result<(PathBuf, Book)> { - let path = self.path(); - - let file = File::open(&path)?; - let book = Book::new(file)?; - - Ok((path, book)) - } - /// /// Load the contents of the file into memory /// @@ -468,7 +472,9 @@ impl DeltaLayer { return Ok(inner); } - let (path, book) = self.open_book()?; + let path = self.path(); + let file = VirtualFile::open(&path)?; + let book = Book::new(file)?; match &self.path_or_conf { PathOrConf::Conf(_) => { @@ -505,6 +511,7 @@ impl DeltaLayer { *inner = DeltaLayerInner { loaded: true, + book: None, page_version_metas, relsizes, }; @@ -529,6 +536,7 @@ impl DeltaLayer { dropped: filename.dropped, inner: Mutex::new(DeltaLayerInner { loaded: false, + book: None, page_version_metas: VecMap::default(), relsizes: VecMap::default(), }), @@ -538,7 +546,10 @@ impl DeltaLayer { /// Create a DeltaLayer struct representing an existing file on disk. /// /// This variant is only used for debugging purposes, by the 'dump_layerfile' binary. - pub fn new_for_path(path: &Path, book: &Book) -> Result { + pub fn new_for_path(path: &Path, book: &Book) -> Result + where + F: std::os::unix::prelude::FileExt, + { let chapter = book.read_chapter(SUMMARY_CHAPTER)?; let summary = Summary::des(&chapter)?; @@ -552,6 +563,7 @@ impl DeltaLayer { dropped: summary.dropped, inner: Mutex::new(DeltaLayerInner { loaded: false, + book: None, page_version_metas: VecMap::default(), relsizes: VecMap::default(), }), diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 179b66853e..f59e87a2ba 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -27,6 +27,7 @@ use crate::layered_repository::storage_layer::{ }; use crate::layered_repository::LayeredTimeline; use crate::layered_repository::RELISH_SEG_SIZE; +use crate::virtual_file::VirtualFile; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{anyhow, bail, ensure, Result}; @@ -35,7 +36,6 @@ use log::*; use serde::{Deserialize, Serialize}; use std::convert::TryInto; use std::fs; -use std::fs::File; use std::io::{BufWriter, Write}; use std::path::{Path, PathBuf}; use std::sync::{Mutex, MutexGuard}; @@ -104,9 +104,8 @@ enum ImageType { } pub struct ImageLayerInner { - /// If false, the 'image_type' has not been - /// loaded into memory yet. - loaded: bool, + /// If None, the 'image_type' has not been loaded into memory yet. + book: Option>, /// Derived from filename and bookfile chapter metadata image_type: ImageType, @@ -155,8 +154,6 @@ impl Layer for ImageLayer { let base_blknum = blknum % RELISH_SEG_SIZE; - let (_path, book) = self.open_book()?; - let buf = match &inner.image_type { ImageType::Blocky { num_blocks } => { if base_blknum >= *num_blocks { @@ -166,14 +163,23 @@ impl Layer for ImageLayer { let mut buf = vec![0u8; BLOCK_SIZE]; let offset = BLOCK_SIZE as u64 * base_blknum as u64; - let chapter = book.chapter_reader(BLOCKY_IMAGES_CHAPTER)?; + let chapter = inner + .book + .as_ref() + .unwrap() + .chapter_reader(BLOCKY_IMAGES_CHAPTER)?; chapter.read_exact_at(&mut buf, offset)?; buf } ImageType::NonBlocky => { ensure!(base_blknum == 0); - book.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec() + inner + .book + .as_ref() + .unwrap() + .read_chapter(NONBLOCKY_IMAGE_CHAPTER)? + .into_vec() } }; @@ -195,14 +201,7 @@ impl Layer for ImageLayer { Ok(true) } - /// - /// Release most of the memory used by this layer. If it's accessed again later, - /// it will need to be loaded back. - /// fn unload(&self) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); - inner.image_type = ImageType::Blocky { num_blocks: 0 }; - inner.loaded = false; Ok(()) } @@ -228,8 +227,11 @@ impl Layer for ImageLayer { match inner.image_type { ImageType::Blocky { num_blocks } => println!("({}) blocks ", num_blocks), ImageType::NonBlocky => { - let (_path, book) = self.open_book()?; - let chapter = book.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?; + let chapter = inner + .book + .as_ref() + .unwrap() + .read_chapter(NONBLOCKY_IMAGE_CHAPTER)?; println!("non-blocky ({} bytes)", chapter.len()); } } @@ -277,17 +279,22 @@ impl ImageLayer { seg, lsn, inner: Mutex::new(ImageLayerInner { - loaded: true, + book: None, image_type: image_type.clone(), }), }; let inner = layer.inner.lock().unwrap(); // Write the images into a file - let path = layer.path(); + // + // Note: Because we open the file in write-only mode, we cannot + // reuse the same VirtualFile for reading later. That's why we don't + // set inner.book here. The first read will have to re-open it. + // // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? - let file = File::create(&path)?; + let path = layer.path(); + let file = VirtualFile::create(&path)?; let buf_writer = BufWriter::new(file); let book = BookWriter::new(buf_writer, IMAGE_FILE_MAGIC)?; @@ -374,11 +381,13 @@ impl ImageLayer { // quick exit if already loaded let mut inner = self.inner.lock().unwrap(); - if inner.loaded { + if inner.book.is_some() { return Ok(inner); } - let (path, book) = self.open_book()?; + let path = self.path(); + let file = VirtualFile::open(&path)?; + let book = Book::new(file)?; match &self.path_or_conf { PathOrConf::Conf(_) => { @@ -419,22 +428,13 @@ impl ImageLayer { debug!("loaded from {}", &path.display()); *inner = ImageLayerInner { - loaded: true, + book: Some(book), image_type, }; Ok(inner) } - fn open_book(&self) -> Result<(PathBuf, Book)> { - let path = self.path(); - - let file = File::open(&path)?; - let book = Book::new(file)?; - - Ok((path, book)) - } - /// Create an ImageLayer struct representing an existing file on disk pub fn new( conf: &'static PageServerConf, @@ -449,7 +449,7 @@ impl ImageLayer { seg: filename.seg, lsn: filename.lsn, inner: Mutex::new(ImageLayerInner { - loaded: false, + book: None, image_type: ImageType::Blocky { num_blocks: 0 }, }), } @@ -458,7 +458,10 @@ impl ImageLayer { /// Create an ImageLayer struct representing an existing file on disk. /// /// This variant is only used for debugging purposes, by the 'dump_layerfile' binary. - pub fn new_for_path(path: &Path, book: &Book) -> Result { + pub fn new_for_path(path: &Path, book: &Book) -> Result + where + F: std::os::unix::prelude::FileExt, + { let chapter = book.read_chapter(SUMMARY_CHAPTER)?; let summary = Summary::des(&chapter)?; @@ -469,7 +472,7 @@ impl ImageLayer { seg: summary.seg, lsn: summary.lsn, inner: Mutex::new(ImageLayerInner { - loaded: false, + book: None, image_type: ImageType::Blocky { num_blocks: 0 }, }), }) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8315ecc8c1..68e8e214bf 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -19,6 +19,7 @@ pub mod repository; pub mod restore_local_repo; pub mod tenant_mgr; pub mod tenant_threads; +pub mod virtual_file; pub mod waldecoder; pub mod walreceiver; pub mod walredo; @@ -45,6 +46,7 @@ pub mod defaults { pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; pub const DEFAULT_OPEN_MEM_LIMIT: usize = 128 * 1024 * 1024; + pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100; } lazy_static! { @@ -74,6 +76,7 @@ pub struct PageServerConf { pub superuser: String, pub open_mem_limit: usize, + pub max_file_descriptors: usize, // Repository directory, relative to current working directory. // Normally, the page server changes the current working directory @@ -158,6 +161,7 @@ impl PageServerConf { gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), open_mem_limit: defaults::DEFAULT_OPEN_MEM_LIMIT, + max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), superuser: "zenith_admin".to_string(), diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs new file mode 100644 index 0000000000..5f7ace0a32 --- /dev/null +++ b/pageserver/src/virtual_file.rs @@ -0,0 +1,556 @@ +//! +//! VirtualFile is like a normal File, but it's not bound directly to +//! a file descriptor. Instead, the file is opened when it's read from, +//! and if too many files are open globally in the system, least-recently +//! used ones are closed. +//! +//! To track which files have been recently used, we use the clock algorithm +//! with a 'recently_used' flag on each slot. +//! +//! This is similar to PostgreSQL's virtual file descriptor facility in +//! src/backend/storage/file/fd.c +//! +use std::fs::{File, OpenOptions}; +use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}; +use std::os::unix::fs::FileExt; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{RwLock, RwLockWriteGuard}; + +use once_cell::sync::OnceCell; + +/// +/// A virtual file descriptor. You can use this just like std::fs::File, but internally +/// the underlying file is closed if the system is low on file descriptors, +/// and re-opened when it's accessed again. +/// +/// Like with std::fs::File, multiple threads can read/write the file concurrently, +/// holding just a shared reference the same VirtualFile, using the read_at() / write_at() +/// functions from the FileExt trait. But the functions from the Read/Write/Seek traits +/// require a mutable reference, because they modify the "current position". +/// +/// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the +/// slot that 'handle points to, if the underlying file is currently open. If it's not +/// currently open, the 'handle' can still point to the slot where it was last kept. The +/// 'tag' field is used to detect whether the handle still is valid or not. +/// +pub struct VirtualFile { + /// Lazy handle to the global file descriptor cache. The slot that this points to + /// might contain our File, or it may be empty, or it may contain a File that + /// belongs to a different VirtualFile. + handle: RwLock, + + /// Current file position + pos: u64, + + /// File path and options to use to open it. + /// + /// Note: this only contains the options needed to re-open it. For example, + /// if a new file is created, we only pass the create flag when it's initially + /// opened, in the VirtualFile::create() function, and strip the flag before + /// storing it here. + path: PathBuf, + open_options: OpenOptions, +} + +#[derive(PartialEq, Clone, Copy)] +struct SlotHandle { + /// Index into OPEN_FILES.slots + index: usize, + + /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has + /// been recycled and no longer contains the FD for this virtual file. + tag: u64, +} + +/// OPEN_FILES is the global array that holds the physical file descriptors that +/// are currently open. Each slot in the array is protected by a separate lock, +/// so that different files can be accessed independently. The lock must be held +/// in write mode to replace the slot with a different file, but a read mode +/// is enough to operate on the file, whether you're reading or writing to it. +/// +/// OPEN_FILES starts in uninitialized state, and it's initialized by +/// the virtual_file::init() function. It must be called exactly once at page +/// server startup. +static OPEN_FILES: OnceCell = OnceCell::new(); + +struct OpenFiles { + slots: &'static [Slot], + + /// clock arm for the clock algorithm + next: AtomicUsize, +} + +struct Slot { + inner: RwLock, + + /// has this file been used since last clock sweep? + recently_used: AtomicBool, +} + +struct SlotInner { + /// Counter that's incremented every time a different file is stored here. + /// To avoid the ABA problem. + tag: u64, + + /// the underlying file + file: Option, +} + +impl OpenFiles { + /// Find a slot to use, evicting an existing file descriptor if needed. + /// + /// On return, we hold a lock on the slot, and its 'tag' has been updated + /// recently_used has been set. It's all ready for reuse. + fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { + // + // Run the clock algorithm to find a slot to replace. + // + let num_slots = self.slots.len(); + let mut retries = 0; + let mut slot; + let mut slot_guard; + let index; + loop { + let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots; + slot = &self.slots[next]; + + // If the recently_used flag on this slot is set, continue the clock + // sweep. Otherwise try to use this slot. If we cannot acquire the + // lock, also continue the clock sweep. + // + // We only continue in this manner for a while, though. If we loop + // through the array twice without finding a victim, just pick the + // next slot and wait until we can reuse it. This way, we avoid + // spinning in the extreme case that all the slots are busy with an + // I/O operation. + if retries < num_slots * 2 { + if !slot.recently_used.swap(false, Ordering::Release) { + if let Ok(guard) = slot.inner.try_write() { + slot_guard = guard; + index = next; + break; + } + } + retries += 1; + } else { + slot_guard = slot.inner.write().unwrap(); + index = next; + break; + } + } + + // + // We now have the victim slot locked. If it was in use previously, close the + // old file. + // + if let Some(old_file) = slot_guard.file.take() { + drop(old_file); + } + + // Prepare the slot for reuse and return it + slot_guard.tag += 1; + slot.recently_used.store(true, Ordering::Relaxed); + ( + SlotHandle { + index, + tag: slot_guard.tag, + }, + slot_guard, + ) + } +} + +impl VirtualFile { + /// Open a file in read-only mode. Like File::open. + pub fn open(path: &Path) -> Result { + Self::open_with_options(path, OpenOptions::new().read(true)) + } + + /// Create a new file for writing. If the file exists, it will be truncated. + /// Like File::create. + pub fn create(path: &Path) -> Result { + Self::open_with_options( + path, + OpenOptions::new().write(true).create(true).truncate(true), + ) + } + + /// Open a file with given options. + /// + /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, + /// they will be applied also when the file is subsequently re-opened, not only + /// on the first time. Make sure that's sane! + pub fn open_with_options( + path: &Path, + open_options: &OpenOptions, + ) -> Result { + let (handle, mut slot_guard) = get_open_files().find_victim_slot(); + + let file = open_options.open(path)?; + + // Strip all options other than read and write. + // + // It would perhaps be nicer to check just for the read and write flags + // explicitly, but OpenOptions doesn't contain any functions to read flags, + // only to set them. + let mut reopen_options = open_options.clone(); + reopen_options.create(false); + reopen_options.create_new(false); + reopen_options.truncate(false); + + let vfile = VirtualFile { + handle: RwLock::new(handle), + pos: 0, + path: path.to_path_buf(), + open_options: reopen_options, + }; + + slot_guard.file.replace(file); + + Ok(vfile) + } + + /// Call File::sync_all() on the underlying File. + pub fn sync_all(&self) -> Result<(), Error> { + self.with_file(|file| file.sync_all())? + } + + /// Helper function that looks up the underlying File for this VirtualFile, + /// opening it and evicting some other File if necessary. It calls 'func' + /// with the physical File. + fn with_file(&self, mut func: F) -> Result + where + F: FnMut(&File) -> R, + { + let open_files = get_open_files(); + + // Read the cached slot handle, and see if the slot that it points to still + // contains our File. + // + // We only need to hold the lock while we read the current handle. If + // another thread closes the file and recycles the slot for a different file, + // we will notice that the handle we read is no longer valid and retry. + let mut handle_guard; + let mut handle = *self.handle.read().unwrap(); + loop { + let slot = &open_files.slots[handle.index]; + let slot_guard = slot.inner.read().unwrap(); + if slot_guard.tag == handle.tag { + if let Some(file) = &slot_guard.file { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + return Ok(func(file)); + } + } + + // The slot didn't contain our File. Grab a write lock on handle in + // the VirtualFile, so that no other thread will try to concurrently + // open the same file. + handle_guard = self.handle.write().unwrap(); + + // Check if some other thread already did it while we were not + // holding the lock. + if *handle_guard != handle { + handle = *handle_guard; + continue; + } + break; + } + + // We need to open the file ourselves. The handle in the VirtualFile is + // now locked in write-mode. Find a free slot to put it in. + let (handle, mut slot_guard) = open_files.find_victim_slot(); + + // Open the physical file + let file = self.open_options.open(&self.path)?; + + // Perform the requested operation on it + // + // TODO: We could downgrade the locks to read mode before calling + // 'func', to allow a little bit more concurrency, but the standard + // library RwLock doesn't allow downgrading without releasing the lock, + // and that doesn't seem worth the trouble. (parking_lot RwLock would + // allow it) + let result = func(&file); + + // Store the File in the slot and update the handle in the VirtualFile + // to point to it. + slot_guard.file.replace(file); + + *handle_guard = handle; + + Ok(result) + } +} + +impl Drop for VirtualFile { + /// If a VirtualFile is dropped, close the underlying file if it was open. + fn drop(&mut self) { + let handle = self.handle.get_mut().unwrap(); + + // We could check with a read-lock first, to avoid waiting on an + // unrelated I/O. + let slot = &get_open_files().slots[handle.index]; + let mut slot_guard = slot.inner.write().unwrap(); + if slot_guard.tag == handle.tag { + slot.recently_used.store(false, Ordering::Relaxed); + slot_guard.file.take(); + } + } +} + +impl Read for VirtualFile { + fn read(&mut self, buf: &mut [u8]) -> Result { + let pos = self.pos; + let n = self.read_at(buf, pos)?; + self.pos += n as u64; + Ok(n) + } +} + +impl Write for VirtualFile { + fn write(&mut self, buf: &[u8]) -> Result { + let pos = self.pos; + let n = self.write_at(buf, pos)?; + self.pos += n as u64; + Ok(n) + } + + fn flush(&mut self) -> Result<(), std::io::Error> { + // flush is no-op for File (at least on unix), so we don't need to do + // anything here either. + Ok(()) + } +} + +impl Seek for VirtualFile { + fn seek(&mut self, pos: SeekFrom) -> Result { + match pos { + SeekFrom::Start(offset) => { + self.pos = offset; + } + SeekFrom::End(offset) => { + self.pos = self.with_file(|mut file| file.seek(SeekFrom::End(offset)))?? + } + SeekFrom::Current(offset) => { + let pos = self.pos as i128 + offset as i128; + if pos < 0 { + return Err(Error::new( + ErrorKind::InvalidInput, + "offset would be negative", + )); + } + if pos > u64::MAX as i128 { + return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); + } + self.pos = pos as u64; + } + } + Ok(self.pos) + } +} + +impl FileExt for VirtualFile { + fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { + self.with_file(|file| file.read_at(buf, offset))? + } + + fn write_at(&self, buf: &[u8], offset: u64) -> Result { + self.with_file(|file| file.write_at(buf, offset))? + } +} + +impl OpenFiles { + fn new(num_slots: usize) -> OpenFiles { + let mut slots = Box::new(Vec::with_capacity(num_slots)); + for _ in 0..num_slots { + let slot = Slot { + recently_used: AtomicBool::new(false), + inner: RwLock::new(SlotInner { tag: 0, file: None }), + }; + slots.push(slot); + } + + OpenFiles { + next: AtomicUsize::new(0), + slots: Box::leak(slots), + } + } +} + +/// +/// Initialize the virtual file module. This must be called once at page +/// server startup. +/// +pub fn init(num_slots: usize) { + if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() { + panic!("virtual_file::init called twice"); + } +} + +const TEST_MAX_FILE_DESCRIPTORS: usize = 10; + +// Get a handle to the global slots array. +fn get_open_files() -> &'static OpenFiles { + // + // In unit tests, page server startup doesn't happen and no one calls + // virtual_file::init(). Initialize it here, with a small array. + // + // This applies to the virtual file tests below, but all other unit + // tests too, so the virtual file facility is always usable in + // unit tests. + // + if cfg!(test) { + OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS)) + } else { + OPEN_FILES.get().expect("virtual_file::init not called yet") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::seq::SliceRandom; + use rand::thread_rng; + + // Helper function to slurp contents of a file, starting at the current position, + // into a string + fn read_string(vfile: &mut FD) -> Result + where + FD: Read, + { + let mut buf = String::new(); + vfile.read_to_string(&mut buf)?; + Ok(buf) + } + + // Helper function to slurp a portion of a file into a string + fn read_string_at(vfile: &mut FD, pos: u64, len: usize) -> Result + where + FD: FileExt, + { + let mut buf = Vec::new(); + buf.resize(len, 0); + vfile.read_exact_at(&mut buf, pos)?; + Ok(String::from_utf8(buf).unwrap()) + } + + #[test] + fn test_virtual_files() -> Result<(), Error> { + // The real work is done in the test_files() helper function. This + // allows us to run the same set of tests against a native File, and + // VirtualFile. We trust the native Files and wouldn't need to test them, + // but this allows us to verify that the operations return the same + // results with VirtualFiles as with native Files. (Except that with + // native files, you will run out of file descriptors if the ulimit + // is low enough.) + test_files("virtual_files", |path, open_options| { + VirtualFile::open_with_options(path, open_options) + }) + } + + #[test] + fn test_physical_files() -> Result<(), Error> { + test_files("physical_files", |path, open_options| { + open_options.open(path) + }) + } + + fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> + where + FD: Read + Write + Seek + FileExt, + OF: Fn(&Path, &OpenOptions) -> Result, + { + let testdir = crate::PageServerConf::test_repo_dir(testname); + std::fs::create_dir_all(&testdir)?; + + let path_a = testdir.join("file_a"); + let mut file_a = openfunc( + &path_a, + OpenOptions::new().write(true).create(true).truncate(true), + )?; + file_a.write_all(b"foobar")?; + + // cannot read from a file opened in write-only mode + assert!(read_string(&mut file_a).is_err()); + + // Close the file and re-open for reading + let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; + + // cannot write to a file opened in read-only mode + assert!(file_a.write(b"bar").is_err()); + + // Try simple read + assert_eq!("foobar", read_string(&mut file_a)?); + + // It's positioned at the EOF now. + assert_eq!("", read_string(&mut file_a)?); + + // Test seeks. + assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); + assert_eq!("oobar", read_string(&mut file_a)?); + + assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4); + assert_eq!("ar", read_string(&mut file_a)?); + + assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); + assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3); + assert_eq!("bar", read_string(&mut file_a)?); + + assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1); + assert_eq!("oobar", read_string(&mut file_a)?); + + // Test erroneous seeks to before byte 0 + assert!(file_a.seek(SeekFrom::End(-7)).is_err()); + assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); + assert!(file_a.seek(SeekFrom::Current(-2)).is_err()); + + // the erroneous seek should have left the position unchanged + assert_eq!("oobar", read_string(&mut file_a)?); + + // Create another test file, and try FileExt functions on it. + let path_b = testdir.join("file_b"); + let mut file_b = openfunc( + &path_b, + OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true), + )?; + file_b.write_all_at(b"BAR", 3)?; + file_b.write_all_at(b"FOO", 0)?; + + assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA"); + + // Open a lot of files, enough to cause some evictions. (Or to be precise, + // open the same file many times. The effect is the same.) + // + // leave file_a positioned at offset 1 before we start + assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); + + let mut vfiles = Vec::new(); + for _ in 0..100 { + let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?; + assert_eq!("FOOBAR", read_string(&mut vfile)?); + vfiles.push(vfile); + } + + // make sure we opened enough files to definitely cause evictions. + assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2); + + // The underlying file descriptor for 'file_a' should be closed now. Try to read + // from it again. We left the file positioned at offset 1 above. + assert_eq!("oobar", read_string(&mut file_a)?); + + // Check that all the other FDs still work too. Use them in random order for + // good measure. + vfiles.as_mut_slice().shuffle(&mut thread_rng()); + for vfile in vfiles.iter_mut() { + assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?); + } + + Ok(()) + } +}