diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 15d4eb09e0..eb5c3f15cf 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -18,7 +18,7 @@ use pageserver::tenant::block_io::FileBlockReader; use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection}; use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE}; use pageserver::tenant::storage_layer::range_overlaps; -use pageserver::virtual_file::VirtualFile; +use pageserver::virtual_file::{self, VirtualFile}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. - pageserver::virtual_file::init(10); + pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index ebf4a4bec3..dbbcfedac0 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -59,7 +59,7 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); - virtual_file::init(10); + virtual_file::init(10, virtual_file::IoEngineKind::StdFs); page_cache::init(100); let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0, ctx).await?; @@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { new_tenant_id, new_timeline_id, } => { - pageserver::virtual_file::init(10); + pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index fb42d6d2f1..3c90933fe9 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup - virtual_file::init(10); + virtual_file::init(10, virtual_file::IoEngineKind::StdFs); page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 621ad050f4..49e646dd71 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -130,7 +130,7 @@ fn main() -> anyhow::Result<()> { let scenario = failpoint_support::init(); // Basic initialization of things that don't change after startup - virtual_file::init(conf.max_file_descriptors); + virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine); page_cache::init(conf.page_cache_size); start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index c1a0b1aeb5..7f68492f72 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -36,7 +36,7 @@ use crate::tenant::config::TenantConfOpt; use crate::tenant::{ TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME, }; - +use crate::virtual_file; use crate::{ IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX, @@ -44,6 +44,8 @@ use crate::{ use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP; +use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE; + pub mod defaults { use crate::tenant::config::defaults::*; use const_format::formatcp; @@ -80,6 +82,8 @@ pub mod defaults { pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100; + pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs"; + /// /// Default built-in configuration file. /// @@ -115,6 +119,8 @@ pub mod defaults { #ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE} +#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}' + [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -248,6 +254,8 @@ pub struct PageServerConf { /// Maximum number of WAL records to be ingested and committed at the same time pub ingest_batch_size: u64, + + pub virtual_file_io_engine: virtual_file::IoEngineKind, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -332,6 +340,8 @@ struct PageServerConfigBuilder { secondary_download_concurrency: BuilderValue, ingest_batch_size: BuilderValue, + + virtual_file_io_engine: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -407,6 +417,8 @@ impl Default for PageServerConfigBuilder { secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY), ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE), + + virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()), } } } @@ -563,6 +575,10 @@ impl PageServerConfigBuilder { self.ingest_batch_size = BuilderValue::Set(ingest_batch_size) } + pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) { + self.virtual_file_io_engine = BuilderValue::Set(value); + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_warmup = self .concurrent_tenant_warmup @@ -670,6 +686,9 @@ impl PageServerConfigBuilder { ingest_batch_size: self .ingest_batch_size .ok_or(anyhow!("missing ingest_batch_size"))?, + virtual_file_io_engine: self + .virtual_file_io_engine + .ok_or(anyhow!("missing virtual_file_io_engine"))?, }) } } @@ -921,6 +940,9 @@ impl PageServerConf { builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize) }, "ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?), + "virtual_file_io_engine" => { + builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?) + } _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -994,6 +1016,7 @@ impl PageServerConf { heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, + virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), } } } @@ -1225,6 +1248,7 @@ background_task_maximum_delay = '334 s' heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, + virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), }, "Correct defaults should be used when no config values are provided" ); @@ -1288,6 +1312,7 @@ background_task_maximum_delay = '334 s' heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: 100, + virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 6f4431c3cf..6ed9d2ad0b 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -941,6 +941,7 @@ pub(crate) static STORAGE_IO_SIZE: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +#[cfg(not(test))] pub(crate) mod virtual_file_descriptor_cache { use super::*; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index af757c385d..f2fe617b10 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -27,7 +27,9 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; use utils::fs_ext; +mod io_engine; mod open_options; +pub use io_engine::IoEngineKind; pub(crate) use open_options::*; /// @@ -633,41 +635,24 @@ impl VirtualFile { Ok(n) } - pub(crate) async fn read_at(&self, mut buf: B, offset: u64) -> (B, Result) + pub(crate) async fn read_at(&self, buf: B, offset: u64) -> (B, Result) where B: tokio_epoll_uring::BoundedBufMut + Send, { - let (buf, result) = async move { - let file_guard = match self.lock_file().await { - Err(e) => return (buf, Err(e)), - Ok(file_guard) => file_guard, - }; - observe_duration!(StorageIoOperation::Read, { - // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory. - let dst = unsafe { - std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) - }; - let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset)); - if let Ok(nbytes) = &res { - assert!(*nbytes <= buf.bytes_total()); - // SAFETY: see above assertion - unsafe { - buf.set_init(*nbytes); - } - } - #[allow(dropping_references)] - drop(dst); - drop(file_guard); - (buf, res) - }) - } - .await; - if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) - .add(size as i64); - } - (buf, result) + let file_guard = match self.lock_file().await { + Ok(file_guard) => file_guard, + Err(e) => return (buf, Err(e)), + }; + + observe_duration!(StorageIoOperation::Read, { + let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await; + if let Ok(size) = res { + STORAGE_IO_SIZE + .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) + .add(size as i64); + } + (buf, res) + }) } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { @@ -819,10 +804,12 @@ impl OpenFiles { /// Initialize the virtual file module. This must be called once at page /// server startup. /// -pub fn init(num_slots: usize) { +#[cfg(not(test))] +pub fn init(num_slots: usize, engine: IoEngineKind) { if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() { panic!("virtual_file::init called twice"); } + io_engine::init(engine); crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64); } diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs new file mode 100644 index 0000000000..91c94ce969 --- /dev/null +++ b/pageserver/src/virtual_file/io_engine.rs @@ -0,0 +1,95 @@ +//! [`super::VirtualFile`] supports different IO engines. +//! +//! The [`IoEngineKind`] enum identifies them. +//! +//! The choice of IO engine is global. +//! Initialize using [`init`]. +//! +//! Then use [`get`] and [`super::OpenOptions`]. + +#[derive( + Copy, + Clone, + PartialEq, + Eq, + Hash, + strum_macros::EnumString, + strum_macros::Display, + serde_with::DeserializeFromStr, + serde_with::SerializeDisplay, + Debug, +)] +#[strum(serialize_all = "kebab-case")] +pub enum IoEngineKind { + StdFs, +} + +static IO_ENGINE: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); + +#[cfg(not(test))] +pub(super) fn init(engine: IoEngineKind) { + if IO_ENGINE.set(engine).is_err() { + panic!("called twice"); + } +} + +pub(super) fn get() -> &'static IoEngineKind { + #[cfg(test)] + { + let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE"; + IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) { + Ok(v) => match v.parse::() { + Ok(engine_kind) => engine_kind, + Err(e) => { + panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}") + } + }, + Err(std::env::VarError::NotPresent) => { + crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE + .parse() + .unwrap() + } + Err(std::env::VarError::NotUnicode(_)) => { + panic!("env var {env_var_name} is not unicode"); + } + }) + } + #[cfg(not(test))] + IO_ENGINE.get().unwrap() +} + +use std::os::unix::prelude::FileExt; + +use super::FileGuard; + +impl IoEngineKind { + pub(super) async fn read_at( + &self, + file_guard: FileGuard, + offset: u64, + mut buf: B, + ) -> ((FileGuard, B), std::io::Result) + where + B: tokio_epoll_uring::BoundedBufMut + Send, + { + match self { + IoEngineKind::StdFs => { + // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory. + let dst = unsafe { + std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) + }; + let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset)); + if let Ok(nbytes) = &res { + assert!(*nbytes <= buf.bytes_total()); + // SAFETY: see above assertion + unsafe { + buf.set_init(*nbytes); + } + } + #[allow(dropping_references)] + drop(dst); + ((file_guard, buf), res) + } + } + } +} diff --git a/pageserver/src/virtual_file/open_options.rs b/pageserver/src/virtual_file/open_options.rs index ca9f8cff55..4c2148602a 100644 --- a/pageserver/src/virtual_file/open_options.rs +++ b/pageserver/src/virtual_file/open_options.rs @@ -1,11 +1,18 @@ +//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`]; + +use super::IoEngineKind; use std::{os::fd::OwnedFd, path::Path}; #[derive(Debug, Clone)] -pub struct OpenOptions(std::fs::OpenOptions); +pub enum OpenOptions { + StdFs(std::fs::OpenOptions), +} impl Default for OpenOptions { fn default() -> Self { - Self(std::fs::OpenOptions::new()) + match super::io_engine::get() { + IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()), + } } } @@ -15,43 +22,73 @@ impl OpenOptions { } pub fn read(&mut self, read: bool) -> &mut OpenOptions { - let _ = self.0.read(read); + match self { + OpenOptions::StdFs(x) => { + let _ = x.read(read); + } + } self } pub fn write(&mut self, write: bool) -> &mut OpenOptions { - let _ = self.0.write(write); + match self { + OpenOptions::StdFs(x) => { + let _ = x.write(write); + } + } self } pub fn create(&mut self, create: bool) -> &mut OpenOptions { - let _ = self.0.create(create); + match self { + OpenOptions::StdFs(x) => { + let _ = x.create(create); + } + } self } pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions { - let _ = self.0.create_new(create_new); + match self { + OpenOptions::StdFs(x) => { + let _ = x.create_new(create_new); + } + } self } pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions { - let _ = self.0.truncate(truncate); + match self { + OpenOptions::StdFs(x) => { + let _ = x.truncate(truncate); + } + } self } pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result { - self.0.open(path).map(|file| file.into()) + match self { + OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()), + } } } impl std::os::unix::prelude::OpenOptionsExt for OpenOptions { fn mode(&mut self, mode: u32) -> &mut OpenOptions { - let _ = self.0.mode(mode); + match self { + OpenOptions::StdFs(x) => { + let _ = x.mode(mode); + } + } self } fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions { - let _ = self.0.custom_flags(flags); + match self { + OpenOptions::StdFs(x) => { + let _ = x.custom_flags(flags); + } + } self } }