mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-05 06:20:37 +00:00
live-reconfigurable virtual_file::IoEngine (#6552)
This PR adds an API to live-reconfigure the VirtualFile io engine. It also adds a flag to `pagebench get-page-latest-lsn`, which is where I found this functionality to be useful: it helps compare the io engines in a benchmark without re-compiling a release build, which took ~50s on the i3en.3xlarge where I was doing the benchmark. Switching the IO engine is completely safe at runtime.
This commit is contained in:
committed by
GitHub
parent
7b49e5e5c3
commit
51f9385b1b
@@ -649,6 +649,27 @@ pub struct WalRedoManagerStatus {
|
||||
pub pid: Option<u32>,
|
||||
}
|
||||
|
||||
pub mod virtual_file {
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Hash,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum IoEngineKind {
|
||||
StdFs,
|
||||
#[cfg(target_os = "linux")]
|
||||
TokioEpollUring,
|
||||
}
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
|
||||
@@ -339,4 +339,16 @@ impl Client {
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn put_io_engine(
|
||||
&self,
|
||||
engine: &pageserver_api::models::virtual_file::IoEngineKind,
|
||||
) -> Result<()> {
|
||||
let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, uri, engine)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let mut total_delta_layers = 0usize;
|
||||
|
||||
@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path).await?);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
|
||||
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
|
||||
@@ -51,6 +51,10 @@ pub(crate) struct Args {
|
||||
/// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
|
||||
#[clap(long)]
|
||||
keyspace_cache: Option<Utf8PathBuf>,
|
||||
/// Before starting the benchmark, live-reconfigure the pageserver to use the given
|
||||
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
|
||||
#[clap(long)]
|
||||
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
@@ -109,6 +113,10 @@ async fn main_impl(
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
if let Some(engine_str) = &args.set_io_engine {
|
||||
mgmt_api_client.put_io_engine(engine_str).await?;
|
||||
}
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
|
||||
@@ -1908,6 +1908,15 @@ async fn post_tracing_event_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn put_io_engine_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
|
||||
crate::virtual_file::io_engine::set(kind);
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Common functionality of all the HTTP API handlers.
|
||||
///
|
||||
/// - Adds a tracing span to each request (by `request_span`)
|
||||
@@ -2165,5 +2174,6 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|
||||
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
|
||||
)
|
||||
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -28,9 +28,10 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use utils::fs_ext;
|
||||
|
||||
mod io_engine;
|
||||
pub use pageserver_api::models::virtual_file as api;
|
||||
pub(crate) mod io_engine;
|
||||
mod open_options;
|
||||
pub use io_engine::IoEngineKind;
|
||||
pub(crate) use io_engine::IoEngineKind;
|
||||
pub(crate) use open_options::*;
|
||||
|
||||
///
|
||||
|
||||
@@ -7,67 +7,100 @@
|
||||
//!
|
||||
//! Then use [`get`] and [`super::OpenOptions`].
|
||||
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Hash,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum IoEngineKind {
|
||||
pub(crate) use super::api::IoEngineKind;
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(u8)]
|
||||
pub(crate) enum IoEngine {
|
||||
NotSet,
|
||||
StdFs,
|
||||
#[cfg(target_os = "linux")]
|
||||
TokioEpollUring,
|
||||
}
|
||||
|
||||
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(super) fn init(engine: IoEngineKind) {
|
||||
if IO_ENGINE.set(engine).is_err() {
|
||||
panic!("called twice");
|
||||
impl From<IoEngineKind> for IoEngine {
|
||||
fn from(value: IoEngineKind) -> Self {
|
||||
match value {
|
||||
IoEngineKind::StdFs => IoEngine::StdFs,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
|
||||
}
|
||||
}
|
||||
crate::metrics::virtual_file_io_engine::KIND
|
||||
.with_label_values(&[&format!("{engine}")])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
pub(super) fn get() -> &'static IoEngineKind {
|
||||
#[cfg(test)]
|
||||
{
|
||||
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
|
||||
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
|
||||
Ok(v) => match v.parse::<IoEngineKind>() {
|
||||
Ok(engine_kind) => engine_kind,
|
||||
Err(e) => {
|
||||
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
|
||||
}
|
||||
},
|
||||
Err(std::env::VarError::NotPresent) => {
|
||||
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
|
||||
.parse()
|
||||
.unwrap()
|
||||
}
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {env_var_name} is not unicode");
|
||||
}
|
||||
impl TryFrom<u8> for IoEngine {
|
||||
type Error = u8;
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
|
||||
v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
|
||||
#[cfg(target_os = "linux")]
|
||||
v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
|
||||
x => return Err(x),
|
||||
})
|
||||
}
|
||||
#[cfg(not(test))]
|
||||
IO_ENGINE.get().unwrap()
|
||||
}
|
||||
|
||||
use std::os::unix::prelude::FileExt;
|
||||
static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
|
||||
|
||||
pub(crate) fn set(engine_kind: IoEngineKind) {
|
||||
let engine: IoEngine = engine_kind.into();
|
||||
IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
|
||||
#[cfg(not(test))]
|
||||
{
|
||||
let metric = &crate::metrics::virtual_file_io_engine::KIND;
|
||||
metric.reset();
|
||||
metric
|
||||
.with_label_values(&[&format!("{engine_kind}")])
|
||||
.set(1);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(super) fn init(engine_kind: IoEngineKind) {
|
||||
set(engine_kind);
|
||||
}
|
||||
|
||||
pub(super) fn get() -> IoEngine {
|
||||
let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
|
||||
if cfg!(test) {
|
||||
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
|
||||
match cur {
|
||||
IoEngine::NotSet => {
|
||||
let kind = match std::env::var(env_var_name) {
|
||||
Ok(v) => match v.parse::<IoEngineKind>() {
|
||||
Ok(engine_kind) => engine_kind,
|
||||
Err(e) => {
|
||||
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
|
||||
}
|
||||
},
|
||||
Err(std::env::VarError::NotPresent) => {
|
||||
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
|
||||
.parse()
|
||||
.unwrap()
|
||||
}
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {env_var_name} is not unicode");
|
||||
}
|
||||
};
|
||||
self::set(kind);
|
||||
self::get()
|
||||
}
|
||||
x => x,
|
||||
}
|
||||
} else {
|
||||
cur
|
||||
}
|
||||
}
|
||||
|
||||
use std::{
|
||||
os::unix::prelude::FileExt,
|
||||
sync::atomic::{AtomicU8, Ordering},
|
||||
};
|
||||
|
||||
use super::FileGuard;
|
||||
|
||||
impl IoEngineKind {
|
||||
impl IoEngine {
|
||||
pub(super) async fn read_at<B>(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
@@ -78,7 +111,8 @@ impl IoEngineKind {
|
||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||
{
|
||||
match self {
|
||||
IoEngineKind::StdFs => {
|
||||
IoEngine::NotSet => panic!("not initialized"),
|
||||
IoEngine::StdFs => {
|
||||
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
|
||||
let dst = unsafe {
|
||||
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
|
||||
@@ -96,7 +130,7 @@ impl IoEngineKind {
|
||||
((file_guard, buf), res)
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngineKind::TokioEpollUring => {
|
||||
IoEngine::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
let (resources, res) = system.read(file_guard, offset, buf).await;
|
||||
(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use super::IoEngineKind;
|
||||
use super::io_engine::IoEngine;
|
||||
use std::{os::fd::OwnedFd, path::Path};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -13,9 +13,10 @@ pub enum OpenOptions {
|
||||
impl Default for OpenOptions {
|
||||
fn default() -> Self {
|
||||
match super::io_engine::get() {
|
||||
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
|
||||
IoEngine::NotSet => panic!("io engine not set"),
|
||||
IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngineKind::TokioEpollUring => {
|
||||
IoEngine::TokioEpollUring => {
|
||||
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user