refactor(VirtualFile): configurable IO engine

This commit is contained in:
Christian Schwarz
2024-01-12 10:32:33 +00:00
parent 971132d607
commit f63dced5cf
9 changed files with 195 additions and 50 deletions

View File

@@ -18,7 +18,7 @@ use pageserver::tenant::block_io::FileBlockReader;
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
use pageserver::tenant::storage_layer::range_overlaps;
use pageserver::virtual_file::VirtualFile;
use pageserver::virtual_file::{self, VirtualFile};
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;

View File

@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10);
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0, ctx).await?;
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10);
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -130,7 +130,7 @@ fn main() -> anyhow::Result<()> {
let scenario = failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -36,7 +36,7 @@ use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use crate::virtual_file;
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
@@ -44,6 +44,8 @@ use crate::{
use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
pub mod defaults {
use crate::tenant::config::defaults::*;
use const_format::formatcp;
@@ -80,6 +82,8 @@ pub mod defaults {
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
///
/// Default built-in configuration file.
///
@@ -115,6 +119,8 @@ pub mod defaults {
#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
[tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -248,6 +254,8 @@ pub struct PageServerConf {
/// Maximum number of WAL records to be ingested and committed at the same time
pub ingest_batch_size: u64,
pub virtual_file_io_engine: virtual_file::IoEngineKind,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -332,6 +340,8 @@ struct PageServerConfigBuilder {
secondary_download_concurrency: BuilderValue<usize>,
ingest_batch_size: BuilderValue<u64>,
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
}
impl Default for PageServerConfigBuilder {
@@ -407,6 +417,8 @@ impl Default for PageServerConfigBuilder {
secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
}
}
}
@@ -563,6 +575,10 @@ impl PageServerConfigBuilder {
self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
}
pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
self.virtual_file_io_engine = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_warmup = self
.concurrent_tenant_warmup
@@ -670,6 +686,9 @@ impl PageServerConfigBuilder {
ingest_batch_size: self
.ingest_batch_size
.ok_or(anyhow!("missing ingest_batch_size"))?,
virtual_file_io_engine: self
.virtual_file_io_engine
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
})
}
}
@@ -921,6 +940,9 @@ impl PageServerConf {
builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
},
"ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
"virtual_file_io_engine" => {
builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -994,6 +1016,7 @@ impl PageServerConf {
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
}
}
}
@@ -1225,6 +1248,7 @@ background_task_maximum_delay = '334 s'
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1288,6 +1312,7 @@ background_task_maximum_delay = '334 s'
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: 100,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -941,6 +941,7 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
#[cfg(not(test))]
pub(crate) mod virtual_file_descriptor_cache {
use super::*;

View File

@@ -27,7 +27,9 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
mod io_engine;
mod open_options;
pub use io_engine::IoEngineKind;
pub(crate) use open_options::*;
///
@@ -633,41 +635,24 @@ impl VirtualFile {
Ok(n)
}
pub(crate) async fn read_at<B>(&self, mut buf: B, offset: u64) -> (B, Result<usize, Error>)
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let (buf, result) = async move {
let file_guard = match self.lock_file().await {
Err(e) => return (buf, Err(e)),
Ok(file_guard) => file_guard,
};
observe_duration!(StorageIoOperation::Read, {
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
let dst = unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
};
let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
if let Ok(nbytes) = &res {
assert!(*nbytes <= buf.bytes_total());
// SAFETY: see above assertion
unsafe {
buf.set_init(*nbytes);
}
}
#[allow(dropping_references)]
drop(dst);
drop(file_guard);
(buf, res)
})
}
.await;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
(buf, result)
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
observe_duration!(StorageIoOperation::Read, {
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
if let Ok(size) = res {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
(buf, res)
})
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
@@ -819,10 +804,12 @@ impl OpenFiles {
/// Initialize the virtual file module. This must be called once at page
/// server startup.
///
pub fn init(num_slots: usize) {
#[cfg(not(test))]
pub fn init(num_slots: usize, engine: IoEngineKind) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
io_engine::init(engine);
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}

View File

@@ -0,0 +1,95 @@
//! [`super::VirtualFile`] supports different IO engines.
//!
//! The [`IoEngineKind`] enum identifies them.
//!
//! The choice of IO engine is global.
//! Initialize using [`init`].
//!
//! Then use [`get`] and [`super::OpenOptions`].
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
pub enum IoEngineKind {
StdFs,
}
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
#[cfg(not(test))]
pub(super) fn init(engine: IoEngineKind) {
if IO_ENGINE.set(engine).is_err() {
panic!("called twice");
}
}
pub(super) fn get() -> &'static IoEngineKind {
#[cfg(test)]
{
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
Ok(v) => match v.parse::<IoEngineKind>() {
Ok(engine_kind) => engine_kind,
Err(e) => {
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
}
},
Err(std::env::VarError::NotPresent) => {
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
.parse()
.unwrap()
}
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode");
}
})
}
#[cfg(not(test))]
IO_ENGINE.get().unwrap()
}
use std::os::unix::prelude::FileExt;
use super::FileGuard;
impl IoEngineKind {
pub(super) async fn read_at<B>(
&self,
file_guard: FileGuard,
offset: u64,
mut buf: B,
) -> ((FileGuard, B), std::io::Result<usize>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
match self {
IoEngineKind::StdFs => {
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
let dst = unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
};
let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
if let Ok(nbytes) = &res {
assert!(*nbytes <= buf.bytes_total());
// SAFETY: see above assertion
unsafe {
buf.set_init(*nbytes);
}
}
#[allow(dropping_references)]
drop(dst);
((file_guard, buf), res)
}
}
}
}

View File

@@ -1,11 +1,18 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use super::IoEngineKind;
use std::{os::fd::OwnedFd, path::Path};
#[derive(Debug, Clone)]
pub struct OpenOptions(std::fs::OpenOptions);
pub enum OpenOptions {
StdFs(std::fs::OpenOptions),
}
impl Default for OpenOptions {
fn default() -> Self {
Self(std::fs::OpenOptions::new())
match super::io_engine::get() {
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
}
}
}
@@ -15,43 +22,73 @@ impl OpenOptions {
}
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
let _ = self.0.read(read);
match self {
OpenOptions::StdFs(x) => {
let _ = x.read(read);
}
}
self
}
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
let _ = self.0.write(write);
match self {
OpenOptions::StdFs(x) => {
let _ = x.write(write);
}
}
self
}
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
let _ = self.0.create(create);
match self {
OpenOptions::StdFs(x) => {
let _ = x.create(create);
}
}
self
}
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
let _ = self.0.create_new(create_new);
match self {
OpenOptions::StdFs(x) => {
let _ = x.create_new(create_new);
}
}
self
}
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
let _ = self.0.truncate(truncate);
match self {
OpenOptions::StdFs(x) => {
let _ = x.truncate(truncate);
}
}
self
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
self.0.open(path).map(|file| file.into())
match self {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
}
}
}
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
let _ = self.0.mode(mode);
match self {
OpenOptions::StdFs(x) => {
let _ = x.mode(mode);
}
}
self
}
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
let _ = self.0.custom_flags(flags);
match self {
OpenOptions::StdFs(x) => {
let _ = x.custom_flags(flags);
}
}
self
}
}