tokio-epoll-uring: make io engine configurable (#6118)

- intoduce concept of IoEngineKind to VirtualFile
- introduce a global OnceCell that contains the IoEngineKind
- introduce OpenOptions that enum-dispatches to the respective
  IoEngineKind's `OpenOptions` type.
- dispatch the currently supported VirtualFile operations
  (open, read) through the IoEngineKind
- add a pageserver config file variable, defaulting to the current StdFs
  => tokio-epoll-uring is off by default
- cover both ioengines in CI `cargo test` runs
- run all the regression tests with both engine kinds (doubling the
amount of regression tests per run in the interest of maximizing
tokio-epoll-uring usage before it hits production)

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
This commit is contained in:
Christian Schwarz
2023-12-20 18:12:26 +01:00
committed by GitHub
parent 41387136e4
commit 48e058da3f
19 changed files with 387 additions and 102 deletions

View File

@@ -199,6 +199,10 @@ jobs:
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- name: Checkout
uses: actions/checkout@v3
@@ -330,7 +334,9 @@ jobs:
- name: Run cargo test
run: |
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES
done
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
@@ -415,6 +421,7 @@ jobs:
matrix:
build_type: [ debug, release ]
pg_version: [ v14, v15, v16 ]
pageserver_virtual_file_io_engine: [ std-fs, tokio-epoll-uring ]
steps:
- name: Checkout
uses: actions/checkout@v3
@@ -437,6 +444,7 @@ jobs:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: ${{ matrix.pageserver_virtual_file_io_engine }}
- name: Merge and upload coverage data
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
@@ -455,6 +463,7 @@ jobs:
matrix:
pytest_split_group: [ 1, 2, 3, 4 ]
build_type: [ release ]
pageserver_virtual_file_io_engine: [ std-fs, tokio-epoll-uring ]
steps:
- name: Checkout
uses: actions/checkout@v3
@@ -471,6 +480,7 @@ jobs:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: "${{ matrix.pageserver_virtual_file_io_engine }}"
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
@@ -1097,6 +1107,10 @@ jobs:
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- name: Checkout
uses: actions/checkout@v3

View File

@@ -142,6 +142,10 @@ jobs:
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- name: Checkout
uses: actions/checkout@v4
@@ -238,6 +242,20 @@ jobs:
options: --init
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- name: Checkout
uses: actions/checkout@v4
with:

5
Cargo.lock generated
View File

@@ -5147,7 +5147,6 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.5",
@@ -5158,7 +5157,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#9c5ea716add1357dc8c180167e43a2ae596eba68"
dependencies = [
"futures",
"once_cell",
@@ -5714,7 +5713,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#9c5ea716add1357dc8c180167e43a2ae596eba68"
dependencies = [
"io-uring",
"libc",

View File

@@ -144,6 +144,8 @@ test-context = "0.1"
thiserror = "1.0"
tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] }
tokio = { version = "1.17", features = ["macros"] }
#tokio-epoll-uring = { path = "../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.10.0"
tokio-rustls = "0.24"

View File

@@ -60,6 +60,7 @@ sync_wrapper.workspace = true
tokio-tar.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-epoll-uring.workspace = true
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-util.workspace = true
@@ -83,8 +84,6 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
[dev-dependencies]
criterion.workspace = true

View File

@@ -18,7 +18,7 @@ use pageserver::tenant::block_io::FileBlockReader;
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
use pageserver::tenant::storage_layer::range_overlaps;
use pageserver::virtual_file::VirtualFile;
use pageserver::virtual_file::{self, VirtualFile};
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;

View File

@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10);
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0, ctx).await?;
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10);
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -129,7 +129,7 @@ fn main() -> anyhow::Result<()> {
let scenario = pageserver::failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -36,11 +36,14 @@ use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use crate::virtual_file;
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
};
use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
pub mod defaults {
use crate::tenant::config::defaults::*;
use const_format::formatcp;
@@ -70,6 +73,8 @@ pub mod defaults {
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
///
/// Default built-in configuration file.
///
@@ -101,6 +106,8 @@ pub mod defaults {
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
[tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -215,6 +222,8 @@ pub struct PageServerConf {
/// If true, pageserver will make best-effort to operate without a control plane: only
/// for use in major incidents.
pub control_plane_emergency_mode: bool,
pub virtual_file_io_engine: virtual_file::IoEngineKind,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -293,6 +302,8 @@ struct PageServerConfigBuilder {
control_plane_api: BuilderValue<Option<Url>>,
control_plane_api_token: BuilderValue<Option<SecretString>>,
control_plane_emergency_mode: BuilderValue<bool>,
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
}
impl Default for PageServerConfigBuilder {
@@ -361,6 +372,8 @@ impl Default for PageServerConfigBuilder {
control_plane_api: Set(None),
control_plane_api_token: Set(None),
control_plane_emergency_mode: Set(false),
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
}
}
}
@@ -501,6 +514,10 @@ impl PageServerConfigBuilder {
self.control_plane_emergency_mode = BuilderValue::Set(enabled)
}
pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
self.virtual_file_io_engine = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
@@ -595,6 +612,9 @@ impl PageServerConfigBuilder {
control_plane_emergency_mode: self
.control_plane_emergency_mode
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
virtual_file_io_engine: self
.virtual_file_io_engine
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
})
}
}
@@ -828,8 +848,10 @@ impl PageServerConf {
},
"control_plane_emergency_mode" => {
builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
},
"virtual_file_io_engine" => {
builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -896,6 +918,7 @@ impl PageServerConf {
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
}
}
}
@@ -1120,7 +1143,8 @@ background_task_maximum_delay = '334 s'
)?,
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false
control_plane_emergency_mode: false,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1177,7 +1201,8 @@ background_task_maximum_delay = '334 s'
background_task_maximum_delay: Duration::from_secs(334),
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false
control_plane_emergency_mode: false,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -892,6 +892,7 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
#[cfg(not(test))]
pub(crate) mod virtual_file_descriptor_cache {
use super::*;

View File

@@ -5,7 +5,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{self, VirtualFile};
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use std::cmp::min;
@@ -46,7 +46,7 @@ impl EphemeralFile {
let file = VirtualFile::open_with_options(
&filename,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
virtual_file::OpenOptions::new()
.read(true)
.write(true)
.create(true)

View File

@@ -36,7 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
@@ -649,7 +649,7 @@ impl DeltaLayer {
{
let file = VirtualFile::open_with_options(
path,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
virtual_file::OpenOptions::new()
.read(true)
.write(true)
.to_owned(),

View File

@@ -34,7 +34,7 @@ use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
@@ -327,7 +327,7 @@ impl ImageLayer {
{
let file = VirtualFile::open_with_options(
path,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
virtual_file::OpenOptions::new()
.read(true)
.write(true)
.to_owned(),
@@ -496,9 +496,14 @@ impl ImageLayerWriterInner {
);
info!("new image layer {path}");
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create_new(true);
VirtualFile::open_with_options(&path, options).await?
VirtualFile::open_with_options(
&path,
virtual_file::OpenOptions::new()
.write(true)
.create_new(true)
.to_owned(),
)
.await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;

View File

@@ -24,6 +24,10 @@ use tokio::time::Instant;
use tokio_epoll_uring::IoBufMut;
use utils::fs_ext;
mod io_engine;
pub use io_engine::IoEngineKind;
pub(crate) use io_engine::*;
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -56,7 +60,7 @@ pub struct VirtualFile {
/// opened, in the VirtualFile::create() function, and strip the flag before
/// storing it here.
pub path: Utf8PathBuf,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
open_options: OpenOptions,
// These are strings becase we only use them for metrics, and those expect strings.
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
@@ -314,13 +318,7 @@ macro_rules! with_file {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
)
.await
Self::open_with_options(path, OpenOptions::new().read(true).to_owned()).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
@@ -328,7 +326,7 @@ impl VirtualFile {
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
@@ -344,7 +342,7 @@ impl VirtualFile {
/// on the first time. Make sure that's sane!
pub async fn open_with_options(
path: &Utf8Path,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
open_options: OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
@@ -360,17 +358,7 @@ impl VirtualFile {
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
slot_guard.file = Some(observe_duration!(StorageIoOperation::Open, {
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd = system
.open(path, &open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
file
open_options.clone().open(path.as_std_path()).await?
}));
// Strip all options other than read and write.
@@ -414,7 +402,7 @@ impl VirtualFile {
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
@@ -432,13 +420,9 @@ impl VirtualFile {
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd = Self::open_with_options(
final_path_parent,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
)
.await?;
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true).to_owned())
.await?;
final_parent_dirfd.sync_all().await?;
Ok(())
}
@@ -507,17 +491,10 @@ impl VirtualFile {
// case from StorageIoOperation::Open. This helps with identifying thrashing
// of the virtual file descriptor cache.
let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd = system
.open(&self.path, &self.open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
file
self.open_options
.clone()
.open(self.path.as_std_path())
.await?
});
// Store the File in the slot and update the handle in the VirtualFile
@@ -677,8 +654,7 @@ impl VirtualFile {
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let system = tokio_epoll_uring::thread_local_system().await;
let ((_file_guard, buf), res) = system.read(file_guard, offset, buf).await;
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
if let Ok(size) = res {
// TODO: don't use with_label_values on hot path
// https://github.com/neondatabase/neon/issues/6107
@@ -847,10 +823,12 @@ impl OpenFiles {
/// Initialize the virtual file module. This must be called once at page
/// server startup.
///
pub fn init(num_slots: usize) {
#[cfg(not(test))]
pub fn init(num_slots: usize, engine: IoEngineKind) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
io_engine::init(engine);
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}
@@ -967,16 +945,7 @@ mod tests {
async fn test_physical_files() -> anyhow::Result<()> {
test_files("physical_files", |path, open_options| async move {
Ok(MaybeVirtualFile::File({
let system = tokio_epoll_uring::thread_local_system().await;
let owned_fd = system
.open(path, &open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let owned_fd = open_options.open(path.as_std_path()).await?;
File::from(owned_fd)
}))
})
@@ -985,7 +954,7 @@ mod tests {
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
where
OF: Fn(Utf8PathBuf, tokio_epoll_uring::ops::open_at::OpenOptions) -> FT,
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
@@ -994,7 +963,7 @@ mod tests {
let path_a = testdir.join("file_a");
let mut file_a = openfunc(
path_a.clone(),
tokio_epoll_uring::ops::open_at::OpenOptions::new()
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
@@ -1007,13 +976,7 @@ mod tests {
let _ = file_a.read_string().await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = openfunc(
path_a,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
)
.await?;
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar").await.unwrap_err();
@@ -1050,7 +1013,7 @@ mod tests {
let path_b = testdir.join("file_b");
let mut file_b = openfunc(
path_b.clone(),
tokio_epoll_uring::ops::open_at::OpenOptions::new()
OpenOptions::new()
.read(true)
.write(true)
.create(true)
@@ -1071,13 +1034,8 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = openfunc(
path_b.clone(),
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
)
.await?;
let mut vfile =
openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile);
}
@@ -1124,9 +1082,7 @@ mod tests {
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(
&test_file_path,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
OpenOptions::new().read(true).to_owned(),
)
.await?;
files.push(f);

View File

@@ -0,0 +1,112 @@
//! [`super::VirtualFile`] supports different IO engines.
//!
//! The [`IoEngineKind`] enum identifies them.
//!
//! The choice of IO engine is global.
//! Initialize using [`init`].
//!
//! Then use [`get`] and [`OpenOptions`].
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
pub enum IoEngineKind {
StdFs,
TokioEpollUring,
}
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
#[cfg(not(test))]
pub(super) fn init(engine: IoEngineKind) {
if IO_ENGINE.set(engine).is_err() {
panic!("called twice");
}
}
pub(super) fn get() -> &'static IoEngineKind {
#[cfg(test)]
{
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
Ok(v) => match v.parse::<IoEngineKind>() {
Ok(engine_kind) => engine_kind,
Err(e) => {
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
}
},
Err(std::env::VarError::NotPresent) => {
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
.parse()
.unwrap()
}
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode");
}
})
}
#[cfg(not(test))]
IO_ENGINE.get().unwrap()
}
mod open_options;
use std::os::unix::prelude::FileExt;
pub(crate) use open_options::OpenOptions;
use super::FileGuard;
impl IoEngineKind {
pub(super) async fn read_at<B>(
&self,
file_guard: FileGuard,
offset: u64,
mut buf: B,
) -> ((FileGuard, B), std::io::Result<usize>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
match self {
IoEngineKind::StdFs => {
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
let dst = unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
};
let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
if let Ok(nbytes) = &res {
assert!(*nbytes <= buf.bytes_total());
// SAFETY: see above assertion
unsafe {
buf.set_init(*nbytes);
}
}
#[allow(dropping_references)]
drop(dst);
((file_guard, buf), res)
}
IoEngineKind::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, buf).await;
(
resources,
res.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
}),
)
}
}
}
}

View File

@@ -0,0 +1,129 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use std::{os::fd::OwnedFd, path::Path};
use super::IoEngineKind;
#[derive(Debug, Clone)]
pub enum OpenOptions {
StdFs(std::fs::OpenOptions),
TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions),
}
impl Default for OpenOptions {
fn default() -> Self {
match super::get() {
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
IoEngineKind::TokioEpollUring => {
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
}
}
}
}
impl OpenOptions {
pub fn new() -> OpenOptions {
Self::default()
}
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.read(read);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.read(read);
}
}
self
}
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.write(write);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.write(write);
}
}
self
}
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.create(create);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.create(create);
}
}
self
}
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.create_new(create_new);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.create_new(create_new);
}
}
self
}
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.truncate(truncate);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.truncate(truncate);
}
}
self
}
pub(in crate::virtual_file) async fn open(self, path: &Path) -> std::io::Result<OwnedFd> {
match self {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
OpenOptions::TokioEpollUring(x) => {
let system = tokio_epoll_uring::thread_local_system().await;
system.open(path, &x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})
}
}
}
}
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.mode(mode);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.mode(mode);
}
}
self
}
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.custom_flags(flags);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.custom_flags(flags);
}
}
self
}
}

View File

@@ -428,6 +428,7 @@ class NeonEnvBuilder:
preserve_database_files: bool = False,
initial_tenant: Optional[TenantId] = None,
initial_timeline: Optional[TimelineId] = None,
pageserver_virtual_file_io_engine: Optional[str] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -460,6 +461,8 @@ class NeonEnvBuilder:
self.scrub_on_exit = False
self.test_output_dir = test_output_dir
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -717,6 +720,8 @@ class NeonEnv:
self.control_plane_api = None
self.attachment_service = None
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
# Create a config file corresponding to the options
toml = textwrap.dedent(
f"""
@@ -759,6 +764,12 @@ class NeonEnv:
http_auth_type = '{http_auth_type}'
"""
)
if self.pageserver_virtual_file_io_engine is not None:
toml += textwrap.dedent(
f"""
virtual_file_io_engine = '{self.pageserver_virtual_file_io_engine}'
"""
)
# Create a corresponding NeonPageserver object
self.pageservers.append(
@@ -908,6 +919,7 @@ def _shared_simple_env(
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: PgVersion,
pageserver_virtual_file_io_engine: str,
) -> Iterator[NeonEnv]:
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
@@ -936,6 +948,7 @@ def _shared_simple_env(
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
test_name=request.node.name,
test_output_dir=test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
) as builder:
env = builder.init_start()
@@ -972,6 +985,7 @@ def neon_env_builder(
default_broker: NeonBroker,
run_id: uuid.UUID,
request: FixtureRequest,
pageserver_virtual_file_io_engine: str,
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1000,6 +1014,7 @@ def neon_env_builder(
broker=default_broker,
run_id=run_id,
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
test_name=request.node.name,
test_output_dir=test_output_dir,
) as builder:

View File

@@ -8,7 +8,7 @@ from _pytest.python import Metafunc
from fixtures.pg_version import PgVersion
"""
Dynamically parametrize tests by Postgres version and build type (debug/release/remote)
Dynamically parametrize tests by Postgres version, build type (debug/release/remote), and possibly by other parameters
"""
@@ -31,11 +31,12 @@ def build_type(request: FixtureRequest) -> Optional[str]:
return None
def pytest_generate_tests(metafunc: Metafunc):
# Do not parametrize performance tests yet, we need to prepare grafana charts first
if "test_runner/performance" in metafunc.definition._nodeid:
return
@pytest.fixture(scope="function", autouse=True)
def pageserver_virtual_file_io_engine(request: FixtureRequest) -> Optional[str]:
return None
def pytest_generate_tests(metafunc: Metafunc):
if (v := os.environ.get("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
@@ -46,5 +47,14 @@ def pytest_generate_tests(metafunc: Metafunc):
else:
build_types = [bt.lower()]
# A hacky way to parametrize performance tests only for `pageserver_virtual_file_io_engine=tokio-epoll-uring`
# And do not change test name for default `pageserver_virtual_file_io_engine=std-fs` to keep perf tests statistics
if (io_engine := os.environ.get("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
metafunc.parametrize("pageserver_virtual_file_io_engine", [io_engine])
# Do not parametrize performance tests yet by Postgres version or build type, we need to prepare grafana charts first
if "test_runner/performance" in metafunc.definition._nodeid:
return
metafunc.parametrize("build_type", build_types)
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))