From 51f9385b1bd60f3152a580332ba4b19ec131f89a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 7 Feb 2024 18:47:55 +0100 Subject: [PATCH] live-reconfigurable virtual_file::IoEngine (#6552) This PR adds an API to live-reconfigure the VirtualFile io engine. It also adds a flag to `pagebench get-page-latest-lsn`, which is where I found this functionality to be useful: it helps compare the io engines in a benchmark without re-compiling a release build, which took ~50s on the i3en.3xlarge where I was doing the benchmark. Switching the IO engine is completely safe at runtime. --- libs/pageserver_api/src/models.rs | 21 +++ pageserver/client/src/mgmt_api.rs | 12 ++ pageserver/ctl/src/layer_map_analyzer.rs | 2 +- pageserver/ctl/src/layers.rs | 4 +- pageserver/ctl/src/main.rs | 2 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 8 ++ pageserver/src/http/routes.rs | 10 ++ pageserver/src/virtual_file.rs | 5 +- pageserver/src/virtual_file/io_engine.rs | 130 +++++++++++------- pageserver/src/virtual_file/open_options.rs | 7 +- 10 files changed, 144 insertions(+), 57 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 5a638df9cc..c08cacb822 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -649,6 +649,27 @@ pub struct WalRedoManagerStatus { pub pid: Option, } +pub mod virtual_file { + #[derive( + Copy, + Clone, + PartialEq, + Eq, + Hash, + strum_macros::EnumString, + strum_macros::Display, + serde_with::DeserializeFromStr, + serde_with::SerializeDisplay, + Debug, + )] + #[strum(serialize_all = "kebab-case")] + pub enum IoEngineKind { + StdFs, + #[cfg(target_os = "linux")] + TokioEpollUring, + } +} + // Wrapped in libpq CopyData #[derive(PartialEq, Eq, Debug)] pub enum PagestreamFeMessage { diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 91b9afa026..8abe58e1a2 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -339,4 +339,16 @@ impl Client { .await .map_err(Error::ReceiveBody) } + + pub async fn put_io_engine( + &self, + engine: &pageserver_api::models::virtual_file::IoEngineKind, + ) -> Result<()> { + let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint); + self.request(Method::PUT, uri, engine) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } } diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index eb5c3f15cf..42c4e9ff48 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. - pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); + pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index dbbcfedac0..27efa6d028 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -59,7 +59,7 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); - virtual_file::init(10, virtual_file::IoEngineKind::StdFs); + virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); page_cache::init(100); let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0, ctx).await?; @@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { new_tenant_id, new_timeline_id, } => { - pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); + pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 3c90933fe9..e73d961e36 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup - virtual_file::init(10, virtual_file::IoEngineKind::StdFs); + virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index aa809d8d26..647f571e59 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -51,6 +51,10 @@ pub(crate) struct Args { /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction. #[clap(long)] keyspace_cache: Option, + /// Before starting the benchmark, live-reconfigure the pageserver to use the given + /// [`pageserver_api::models::virtual_file::IoEngineKind`]. + #[clap(long)] + set_io_engine: Option, targets: Option>, } @@ -109,6 +113,10 @@ async fn main_impl( args.pageserver_jwt.as_deref(), )); + if let Some(engine_str) = &args.set_io_engine { + mgmt_api_client.put_io_engine(engine_str).await?; + } + // discover targets let timelines: Vec = crate::util::cli::targets::discover( &mgmt_api_client, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 792089ebe7..ebcb27fa08 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1908,6 +1908,15 @@ async fn post_tracing_event_handler( json_response(StatusCode::OK, ()) } +async fn put_io_engine_handler( + mut r: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?; + crate::virtual_file::io_engine::set(kind); + json_response(StatusCode::OK, ()) +} + /// Common functionality of all the HTTP API handlers. /// /// - Adds a tracing span to each request (by `request_span`) @@ -2165,5 +2174,6 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace", |r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace), ) + .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler)) .any(handler_404)) } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 066f06c88f..059a6596d3 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -28,9 +28,10 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; use utils::fs_ext; -mod io_engine; +pub use pageserver_api::models::virtual_file as api; +pub(crate) mod io_engine; mod open_options; -pub use io_engine::IoEngineKind; +pub(crate) use io_engine::IoEngineKind; pub(crate) use open_options::*; /// diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index f7b46fe653..892affa326 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -7,67 +7,100 @@ //! //! Then use [`get`] and [`super::OpenOptions`]. -#[derive( - Copy, - Clone, - PartialEq, - Eq, - Hash, - strum_macros::EnumString, - strum_macros::Display, - serde_with::DeserializeFromStr, - serde_with::SerializeDisplay, - Debug, -)] -#[strum(serialize_all = "kebab-case")] -pub enum IoEngineKind { +pub(crate) use super::api::IoEngineKind; +#[derive(Clone, Copy)] +#[repr(u8)] +pub(crate) enum IoEngine { + NotSet, StdFs, #[cfg(target_os = "linux")] TokioEpollUring, } -static IO_ENGINE: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); - -#[cfg(not(test))] -pub(super) fn init(engine: IoEngineKind) { - if IO_ENGINE.set(engine).is_err() { - panic!("called twice"); +impl From for IoEngine { + fn from(value: IoEngineKind) -> Self { + match value { + IoEngineKind::StdFs => IoEngine::StdFs, + #[cfg(target_os = "linux")] + IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring, + } } - crate::metrics::virtual_file_io_engine::KIND - .with_label_values(&[&format!("{engine}")]) - .set(1); } -pub(super) fn get() -> &'static IoEngineKind { - #[cfg(test)] - { - let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE"; - IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) { - Ok(v) => match v.parse::() { - Ok(engine_kind) => engine_kind, - Err(e) => { - panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}") - } - }, - Err(std::env::VarError::NotPresent) => { - crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE - .parse() - .unwrap() - } - Err(std::env::VarError::NotUnicode(_)) => { - panic!("env var {env_var_name} is not unicode"); - } +impl TryFrom for IoEngine { + type Error = u8; + + fn try_from(value: u8) -> Result { + Ok(match value { + v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet, + v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs, + #[cfg(target_os = "linux")] + v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring, + x => return Err(x), }) } - #[cfg(not(test))] - IO_ENGINE.get().unwrap() } -use std::os::unix::prelude::FileExt; +static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8); + +pub(crate) fn set(engine_kind: IoEngineKind) { + let engine: IoEngine = engine_kind.into(); + IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed); + #[cfg(not(test))] + { + let metric = &crate::metrics::virtual_file_io_engine::KIND; + metric.reset(); + metric + .with_label_values(&[&format!("{engine_kind}")]) + .set(1); + } +} + +#[cfg(not(test))] +pub(super) fn init(engine_kind: IoEngineKind) { + set(engine_kind); +} + +pub(super) fn get() -> IoEngine { + let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap(); + if cfg!(test) { + let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE"; + match cur { + IoEngine::NotSet => { + let kind = match std::env::var(env_var_name) { + Ok(v) => match v.parse::() { + Ok(engine_kind) => engine_kind, + Err(e) => { + panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}") + } + }, + Err(std::env::VarError::NotPresent) => { + crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE + .parse() + .unwrap() + } + Err(std::env::VarError::NotUnicode(_)) => { + panic!("env var {env_var_name} is not unicode"); + } + }; + self::set(kind); + self::get() + } + x => x, + } + } else { + cur + } +} + +use std::{ + os::unix::prelude::FileExt, + sync::atomic::{AtomicU8, Ordering}, +}; use super::FileGuard; -impl IoEngineKind { +impl IoEngine { pub(super) async fn read_at( &self, file_guard: FileGuard, @@ -78,7 +111,8 @@ impl IoEngineKind { B: tokio_epoll_uring::BoundedBufMut + Send, { match self { - IoEngineKind::StdFs => { + IoEngine::NotSet => panic!("not initialized"), + IoEngine::StdFs => { // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory. let dst = unsafe { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) @@ -96,7 +130,7 @@ impl IoEngineKind { ((file_guard, buf), res) } #[cfg(target_os = "linux")] - IoEngineKind::TokioEpollUring => { + IoEngine::TokioEpollUring => { let system = tokio_epoll_uring::thread_local_system().await; let (resources, res) = system.read(file_guard, offset, buf).await; ( diff --git a/pageserver/src/virtual_file/open_options.rs b/pageserver/src/virtual_file/open_options.rs index 1e5ffe15cc..f75edb0bac 100644 --- a/pageserver/src/virtual_file/open_options.rs +++ b/pageserver/src/virtual_file/open_options.rs @@ -1,6 +1,6 @@ //! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`]; -use super::IoEngineKind; +use super::io_engine::IoEngine; use std::{os::fd::OwnedFd, path::Path}; #[derive(Debug, Clone)] @@ -13,9 +13,10 @@ pub enum OpenOptions { impl Default for OpenOptions { fn default() -> Self { match super::io_engine::get() { - IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()), + IoEngine::NotSet => panic!("io engine not set"), + IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()), #[cfg(target_os = "linux")] - IoEngineKind::TokioEpollUring => { + IoEngine::TokioEpollUring => { Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new()) } }