From 48e058da3fb6cec6a6396b509d87319d392a5608 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 20 Dec 2023 18:12:26 +0100 Subject: [PATCH] tokio-epoll-uring: make io engine configurable (#6118) - intoduce concept of IoEngineKind to VirtualFile - introduce a global OnceCell that contains the IoEngineKind - introduce OpenOptions that enum-dispatches to the respective IoEngineKind's `OpenOptions` type. - dispatch the currently supported VirtualFile operations (open, read) through the IoEngineKind - add a pageserver config file variable, defaulting to the current StdFs => tokio-epoll-uring is off by default - cover both ioengines in CI `cargo test` runs - run all the regression tests with both engine kinds (doubling the amount of regression tests per run in the interest of maximizing tokio-epoll-uring usage before it hits production) --------- Co-authored-by: Alexander Bayandin --- .github/workflows/build_and_test.yml | 16 ++- .github/workflows/neon_extra_builds.yml | 18 +++ Cargo.lock | 5 +- Cargo.toml | 2 + pageserver/Cargo.toml | 3 +- pageserver/ctl/src/layer_map_analyzer.rs | 4 +- pageserver/ctl/src/layers.rs | 4 +- pageserver/ctl/src/main.rs | 2 +- pageserver/src/bin/pageserver.rs | 2 +- pageserver/src/config.rs | 31 ++++- pageserver/src/metrics.rs | 1 + pageserver/src/tenant/ephemeral_file.rs | 4 +- .../src/tenant/storage_layer/delta_layer.rs | 4 +- .../src/tenant/storage_layer/image_layer.rs | 15 +- pageserver/src/virtual_file.rs | 102 ++++---------- pageserver/src/virtual_file/io_engine.rs | 112 +++++++++++++++ .../virtual_file/io_engine/open_options.rs | 129 ++++++++++++++++++ test_runner/fixtures/neon_fixtures.py | 15 ++ test_runner/fixtures/parametrize.py | 20 ++- 19 files changed, 387 insertions(+), 102 deletions(-) create mode 100644 pageserver/src/virtual_file/io_engine.rs create mode 100644 pageserver/src/virtual_file/io_engine/open_options.rs diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 820848b4fb..4283330128 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -199,6 +199,10 @@ jobs: # git config --global --add safe.directory ${{ github.workspace }} git config --global --add safe.directory ${GITHUB_WORKSPACE} + for r in 14 15 16; do + git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r" + git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r" + done - name: Checkout uses: actions/checkout@v3 @@ -330,7 +334,9 @@ jobs: - name: Run cargo test run: | - ${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES + for io_engine in std-fs tokio-epoll-uring ; do + NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES + done # Run separate tests for real S3 export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty @@ -415,6 +421,7 @@ jobs: matrix: build_type: [ debug, release ] pg_version: [ v14, v15, v16 ] + pageserver_virtual_file_io_engine: [ std-fs, tokio-epoll-uring ] steps: - name: Checkout uses: actions/checkout@v3 @@ -437,6 +444,7 @@ jobs: TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} CHECK_ONDISK_DATA_COMPATIBILITY: nonempty BUILD_TAG: ${{ needs.tag.outputs.build-tag }} + PAGESERVER_VIRTUAL_FILE_IO_ENGINE: ${{ matrix.pageserver_virtual_file_io_engine }} - name: Merge and upload coverage data if: matrix.build_type == 'debug' && matrix.pg_version == 'v14' @@ -455,6 +463,7 @@ jobs: matrix: pytest_split_group: [ 1, 2, 3, 4 ] build_type: [ release ] + pageserver_virtual_file_io_engine: [ std-fs, tokio-epoll-uring ] steps: - name: Checkout uses: actions/checkout@v3 @@ -471,6 +480,7 @@ jobs: VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}" TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}" + PAGESERVER_VIRTUAL_FILE_IO_ENGINE: "${{ matrix.pageserver_virtual_file_io_engine }}" # XXX: no coverage data handling here, since benchmarks are run on release builds, # while coverage is currently collected for the debug ones @@ -1097,6 +1107,10 @@ jobs: # git config --global --add safe.directory ${{ github.workspace }} git config --global --add safe.directory ${GITHUB_WORKSPACE} + for r in 14 15 16; do + git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r" + git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r" + done - name: Checkout uses: actions/checkout@v3 diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index 0d7db8dfbc..b1ea5e4f74 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -142,6 +142,10 @@ jobs: # git config --global --add safe.directory ${{ github.workspace }} git config --global --add safe.directory ${GITHUB_WORKSPACE} + for r in 14 15 16; do + git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r" + git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r" + done - name: Checkout uses: actions/checkout@v4 @@ -238,6 +242,20 @@ jobs: options: --init steps: + - name: Fix git ownership + run: | + # Workaround for `fatal: detected dubious ownership in repository at ...` + # + # Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers + # Ref https://github.com/actions/checkout/issues/785 + # + git config --global --add safe.directory ${{ github.workspace }} + git config --global --add safe.directory ${GITHUB_WORKSPACE} + for r in 14 15 16; do + git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r" + git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r" + done + - name: Checkout uses: actions/checkout@v4 with: diff --git a/Cargo.lock b/Cargo.lock index 7efc4a0b0a..e82c8641c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5147,7 +5147,6 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2 0.5.5", @@ -5158,7 +5157,7 @@ dependencies = [ [[package]] name = "tokio-epoll-uring" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#9c5ea716add1357dc8c180167e43a2ae596eba68" dependencies = [ "futures", "once_cell", @@ -5714,7 +5713,7 @@ dependencies = [ [[package]] name = "uring-common" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#9c5ea716add1357dc8c180167e43a2ae596eba68" dependencies = [ "io-uring", "libc", diff --git a/Cargo.toml b/Cargo.toml index b5eece5e35..4673c00213 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,6 +144,8 @@ test-context = "0.1" thiserror = "1.0" tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] } tokio = { version = "1.17", features = ["macros"] } +#tokio-epoll-uring = { path = "../tokio-epoll-uring/tokio-epoll-uring" } +tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } tokio-io-timeout = "1.2.0" tokio-postgres-rustls = "0.10.0" tokio-rustls = "0.24" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index a83bf833a1..26460d3926 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -60,6 +60,7 @@ sync_wrapper.workspace = true tokio-tar.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] } +tokio-epoll-uring.workspace = true tokio-io-timeout.workspace = true tokio-postgres.workspace = true tokio-util.workspace = true @@ -83,8 +84,6 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true -#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" } -tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } [dev-dependencies] criterion.workspace = true diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 15d4eb09e0..eb5c3f15cf 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -18,7 +18,7 @@ use pageserver::tenant::block_io::FileBlockReader; use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection}; use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE}; use pageserver::tenant::storage_layer::range_overlaps; -use pageserver::virtual_file::VirtualFile; +use pageserver::virtual_file::{self, VirtualFile}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. - pageserver::virtual_file::init(10); + pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index ebf4a4bec3..dbbcfedac0 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -59,7 +59,7 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); - virtual_file::init(10); + virtual_file::init(10, virtual_file::IoEngineKind::StdFs); page_cache::init(100); let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0, ctx).await?; @@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { new_tenant_id, new_timeline_id, } => { - pageserver::virtual_file::init(10); + pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index fb42d6d2f1..3c90933fe9 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup - virtual_file::init(10); + virtual_file::init(10, virtual_file::IoEngineKind::StdFs); page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 7607119dda..8da1d858e4 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -129,7 +129,7 @@ fn main() -> anyhow::Result<()> { let scenario = pageserver::failpoint_support::init(); // Basic initialization of things that don't change after startup - virtual_file::init(conf.max_file_descriptors); + virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine); page_cache::init(conf.page_cache_size); start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 13d1fc775b..8675dfcfee 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -36,11 +36,14 @@ use crate::tenant::config::TenantConfOpt; use crate::tenant::{ TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME, }; +use crate::virtual_file; use crate::{ IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX, }; +use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE; + pub mod defaults { use crate::tenant::config::defaults::*; use const_format::formatcp; @@ -70,6 +73,8 @@ pub mod defaults { pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min"; pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s"; + pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs"; + /// /// Default built-in configuration file. /// @@ -101,6 +106,8 @@ pub mod defaults { #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}' +#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}' + [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -215,6 +222,8 @@ pub struct PageServerConf { /// If true, pageserver will make best-effort to operate without a control plane: only /// for use in major incidents. pub control_plane_emergency_mode: bool, + + pub virtual_file_io_engine: virtual_file::IoEngineKind, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -293,6 +302,8 @@ struct PageServerConfigBuilder { control_plane_api: BuilderValue>, control_plane_api_token: BuilderValue>, control_plane_emergency_mode: BuilderValue, + + virtual_file_io_engine: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -361,6 +372,8 @@ impl Default for PageServerConfigBuilder { control_plane_api: Set(None), control_plane_api_token: Set(None), control_plane_emergency_mode: Set(false), + + virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()), } } } @@ -501,6 +514,10 @@ impl PageServerConfigBuilder { self.control_plane_emergency_mode = BuilderValue::Set(enabled) } + pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) { + self.virtual_file_io_engine = BuilderValue::Set(value); + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_size_logical_size_queries = self .concurrent_tenant_size_logical_size_queries @@ -595,6 +612,9 @@ impl PageServerConfigBuilder { control_plane_emergency_mode: self .control_plane_emergency_mode .ok_or(anyhow!("missing control_plane_emergency_mode"))?, + virtual_file_io_engine: self + .virtual_file_io_engine + .ok_or(anyhow!("missing virtual_file_io_engine"))?, }) } } @@ -828,8 +848,10 @@ impl PageServerConf { }, "control_plane_emergency_mode" => { builder.control_plane_emergency_mode(parse_toml_bool(key, item)?) - }, + "virtual_file_io_engine" => { + builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?) + } _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -896,6 +918,7 @@ impl PageServerConf { control_plane_api: None, control_plane_api_token: None, control_plane_emergency_mode: false, + virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), } } } @@ -1120,7 +1143,8 @@ background_task_maximum_delay = '334 s' )?, control_plane_api: None, control_plane_api_token: None, - control_plane_emergency_mode: false + control_plane_emergency_mode: false, + virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), }, "Correct defaults should be used when no config values are provided" ); @@ -1177,7 +1201,8 @@ background_task_maximum_delay = '334 s' background_task_maximum_delay: Duration::from_secs(334), control_plane_api: None, control_plane_api_token: None, - control_plane_emergency_mode: false + control_plane_emergency_mode: false, + virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 67d798c1d4..f2a5d01902 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -892,6 +892,7 @@ pub(crate) static STORAGE_IO_SIZE: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +#[cfg(not(test))] pub(crate) mod virtual_file_descriptor_cache { use super::*; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 5c465e0655..731493696e 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -5,7 +5,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; -use crate::virtual_file::VirtualFile; +use crate::virtual_file::{self, VirtualFile}; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; use std::cmp::min; @@ -46,7 +46,7 @@ impl EphemeralFile { let file = VirtualFile::open_with_options( &filename, - tokio_epoll_uring::ops::open_at::OpenOptions::new() + virtual_file::OpenOptions::new() .read(true) .write(true) .create(true) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 869759431a..d0822d220f 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -36,7 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::Timeline; -use crate::virtual_file::VirtualFile; +use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; @@ -649,7 +649,7 @@ impl DeltaLayer { { let file = VirtualFile::open_with_options( path, - tokio_epoll_uring::ops::open_at::OpenOptions::new() + virtual_file::OpenOptions::new() .read(true) .write(true) .to_owned(), diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 42127b17b0..75174f4745 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -34,7 +34,7 @@ use crate::tenant::storage_layer::{ LayerAccessStats, ValueReconstructResult, ValueReconstructState, }; use crate::tenant::Timeline; -use crate::virtual_file::VirtualFile; +use crate::virtual_file::{self, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; @@ -327,7 +327,7 @@ impl ImageLayer { { let file = VirtualFile::open_with_options( path, - tokio_epoll_uring::ops::open_at::OpenOptions::new() + virtual_file::OpenOptions::new() .read(true) .write(true) .to_owned(), @@ -496,9 +496,14 @@ impl ImageLayerWriterInner { ); info!("new image layer {path}"); let mut file = { - let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); - options.write(true).create_new(true); - VirtualFile::open_with_options(&path, options).await? + VirtualFile::open_with_options( + &path, + virtual_file::OpenOptions::new() + .write(true) + .create_new(true) + .to_owned(), + ) + .await? }; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index b06dbba8f2..0b28b98513 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -24,6 +24,10 @@ use tokio::time::Instant; use tokio_epoll_uring::IoBufMut; use utils::fs_ext; +mod io_engine; +pub use io_engine::IoEngineKind; +pub(crate) use io_engine::*; + /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally /// the underlying file is closed if the system is low on file descriptors, @@ -56,7 +60,7 @@ pub struct VirtualFile { /// opened, in the VirtualFile::create() function, and strip the flag before /// storing it here. pub path: Utf8PathBuf, - open_options: tokio_epoll_uring::ops::open_at::OpenOptions, + open_options: OpenOptions, // These are strings becase we only use them for metrics, and those expect strings. // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into @@ -314,13 +318,7 @@ macro_rules! with_file { impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Utf8Path) -> Result { - Self::open_with_options( - path, - tokio_epoll_uring::ops::open_at::OpenOptions::new() - .read(true) - .to_owned(), - ) - .await + Self::open_with_options(path, OpenOptions::new().read(true).to_owned()).await } /// Create a new file for writing. If the file exists, it will be truncated. @@ -328,7 +326,7 @@ impl VirtualFile { pub async fn create(path: &Utf8Path) -> Result { Self::open_with_options( path, - tokio_epoll_uring::ops::open_at::OpenOptions::new() + OpenOptions::new() .write(true) .create(true) .truncate(true) @@ -344,7 +342,7 @@ impl VirtualFile { /// on the first time. Make sure that's sane! pub async fn open_with_options( path: &Utf8Path, - open_options: tokio_epoll_uring::ops::open_at::OpenOptions, + open_options: OpenOptions, ) -> Result { let path_str = path.to_string(); let parts = path_str.split('/').collect::>(); @@ -360,17 +358,7 @@ impl VirtualFile { let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; slot_guard.file = Some(observe_duration!(StorageIoOperation::Open, { - let system = tokio_epoll_uring::thread_local_system().await; - let file: OwnedFd = system - .open(path, &open_options) - .await - .map_err(|e| match e { - tokio_epoll_uring::Error::Op(e) => e, - tokio_epoll_uring::Error::System(system) => { - std::io::Error::new(std::io::ErrorKind::Other, system) - } - })?; - file + open_options.clone().open(path.as_std_path()).await? })); // Strip all options other than read and write. @@ -414,7 +402,7 @@ impl VirtualFile { std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; let mut file = Self::open_with_options( tmp_path, - tokio_epoll_uring::ops::open_at::OpenOptions::new() + OpenOptions::new() .write(true) // Use `create_new` so that, if we race with ourselves or something else, // we bail out instead of causing damage. @@ -432,13 +420,9 @@ impl VirtualFile { // the current `find_victim_slot` impl might pick the same slot for both // VirtualFile., and it eventually does a blocking write lock instead of // try_lock. - let final_parent_dirfd = Self::open_with_options( - final_path_parent, - tokio_epoll_uring::ops::open_at::OpenOptions::new() - .read(true) - .to_owned(), - ) - .await?; + let final_parent_dirfd = + Self::open_with_options(final_path_parent, OpenOptions::new().read(true).to_owned()) + .await?; final_parent_dirfd.sync_all().await?; Ok(()) } @@ -507,17 +491,10 @@ impl VirtualFile { // case from StorageIoOperation::Open. This helps with identifying thrashing // of the virtual file descriptor cache. let file = observe_duration!(StorageIoOperation::OpenAfterReplace, { - let system = tokio_epoll_uring::thread_local_system().await; - let file: OwnedFd = system - .open(&self.path, &self.open_options) - .await - .map_err(|e| match e { - tokio_epoll_uring::Error::Op(e) => e, - tokio_epoll_uring::Error::System(system) => { - std::io::Error::new(std::io::ErrorKind::Other, system) - } - })?; - file + self.open_options + .clone() + .open(self.path.as_std_path()) + .await? }); // Store the File in the slot and update the handle in the VirtualFile @@ -677,8 +654,7 @@ impl VirtualFile { where B: tokio_epoll_uring::BoundedBufMut + Send, { - let system = tokio_epoll_uring::thread_local_system().await; - let ((_file_guard, buf), res) = system.read(file_guard, offset, buf).await; + let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await; if let Ok(size) = res { // TODO: don't use with_label_values on hot path // https://github.com/neondatabase/neon/issues/6107 @@ -847,10 +823,12 @@ impl OpenFiles { /// Initialize the virtual file module. This must be called once at page /// server startup. /// -pub fn init(num_slots: usize) { +#[cfg(not(test))] +pub fn init(num_slots: usize, engine: IoEngineKind) { if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() { panic!("virtual_file::init called twice"); } + io_engine::init(engine); crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64); } @@ -967,16 +945,7 @@ mod tests { async fn test_physical_files() -> anyhow::Result<()> { test_files("physical_files", |path, open_options| async move { Ok(MaybeVirtualFile::File({ - let system = tokio_epoll_uring::thread_local_system().await; - let owned_fd = system - .open(path, &open_options) - .await - .map_err(|e| match e { - tokio_epoll_uring::Error::Op(e) => e, - tokio_epoll_uring::Error::System(system) => { - std::io::Error::new(std::io::ErrorKind::Other, system) - } - })?; + let owned_fd = open_options.open(path.as_std_path()).await?; File::from(owned_fd) })) }) @@ -985,7 +954,7 @@ mod tests { async fn test_files(testname: &str, openfunc: OF) -> anyhow::Result<()> where - OF: Fn(Utf8PathBuf, tokio_epoll_uring::ops::open_at::OpenOptions) -> FT, + OF: Fn(Utf8PathBuf, OpenOptions) -> FT, FT: Future>, { let testdir = crate::config::PageServerConf::test_repo_dir(testname); @@ -994,7 +963,7 @@ mod tests { let path_a = testdir.join("file_a"); let mut file_a = openfunc( path_a.clone(), - tokio_epoll_uring::ops::open_at::OpenOptions::new() + OpenOptions::new() .write(true) .create(true) .truncate(true) @@ -1007,13 +976,7 @@ mod tests { let _ = file_a.read_string().await.unwrap_err(); // Close the file and re-open for reading - let mut file_a = openfunc( - path_a, - tokio_epoll_uring::ops::open_at::OpenOptions::new() - .read(true) - .to_owned(), - ) - .await?; + let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?; // cannot write to a file opened in read-only mode let _ = file_a.write_all(b"bar").await.unwrap_err(); @@ -1050,7 +1013,7 @@ mod tests { let path_b = testdir.join("file_b"); let mut file_b = openfunc( path_b.clone(), - tokio_epoll_uring::ops::open_at::OpenOptions::new() + OpenOptions::new() .read(true) .write(true) .create(true) @@ -1071,13 +1034,8 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { - let mut vfile = openfunc( - path_b.clone(), - tokio_epoll_uring::ops::open_at::OpenOptions::new() - .read(true) - .to_owned(), - ) - .await?; + let mut vfile = + openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?; assert_eq!("FOOBAR", vfile.read_string().await?); vfiles.push(vfile); } @@ -1124,9 +1082,7 @@ mod tests { for _ in 0..VIRTUAL_FILES { let f = VirtualFile::open_with_options( &test_file_path, - tokio_epoll_uring::ops::open_at::OpenOptions::new() - .read(true) - .to_owned(), + OpenOptions::new().read(true).to_owned(), ) .await?; files.push(f); diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs new file mode 100644 index 0000000000..8075cc3130 --- /dev/null +++ b/pageserver/src/virtual_file/io_engine.rs @@ -0,0 +1,112 @@ +//! [`super::VirtualFile`] supports different IO engines. +//! +//! The [`IoEngineKind`] enum identifies them. +//! +//! The choice of IO engine is global. +//! Initialize using [`init`]. +//! +//! Then use [`get`] and [`OpenOptions`]. + +#[derive( + Copy, + Clone, + PartialEq, + Eq, + Hash, + strum_macros::EnumString, + strum_macros::Display, + serde_with::DeserializeFromStr, + serde_with::SerializeDisplay, + Debug, +)] +#[strum(serialize_all = "kebab-case")] +pub enum IoEngineKind { + StdFs, + TokioEpollUring, +} + +static IO_ENGINE: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); + +#[cfg(not(test))] +pub(super) fn init(engine: IoEngineKind) { + if IO_ENGINE.set(engine).is_err() { + panic!("called twice"); + } +} + +pub(super) fn get() -> &'static IoEngineKind { + #[cfg(test)] + { + let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE"; + IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) { + Ok(v) => match v.parse::() { + Ok(engine_kind) => engine_kind, + Err(e) => { + panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}") + } + }, + Err(std::env::VarError::NotPresent) => { + crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE + .parse() + .unwrap() + } + Err(std::env::VarError::NotUnicode(_)) => { + panic!("env var {env_var_name} is not unicode"); + } + }) + } + #[cfg(not(test))] + IO_ENGINE.get().unwrap() +} + +mod open_options; +use std::os::unix::prelude::FileExt; + +pub(crate) use open_options::OpenOptions; + +use super::FileGuard; + +impl IoEngineKind { + pub(super) async fn read_at( + &self, + file_guard: FileGuard, + offset: u64, + mut buf: B, + ) -> ((FileGuard, B), std::io::Result) + where + B: tokio_epoll_uring::BoundedBufMut + Send, + { + match self { + IoEngineKind::StdFs => { + // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory. + let dst = unsafe { + std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) + }; + let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset)); + if let Ok(nbytes) = &res { + assert!(*nbytes <= buf.bytes_total()); + // SAFETY: see above assertion + unsafe { + buf.set_init(*nbytes); + } + } + #[allow(dropping_references)] + drop(dst); + ((file_guard, buf), res) + } + IoEngineKind::TokioEpollUring => { + let system = tokio_epoll_uring::thread_local_system().await; + let (resources, res) = system.read(file_guard, offset, buf).await; + ( + resources, + res.map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + }), + ) + } + } + } +} diff --git a/pageserver/src/virtual_file/io_engine/open_options.rs b/pageserver/src/virtual_file/io_engine/open_options.rs new file mode 100644 index 0000000000..c73dc38990 --- /dev/null +++ b/pageserver/src/virtual_file/io_engine/open_options.rs @@ -0,0 +1,129 @@ +//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`]; + +use std::{os::fd::OwnedFd, path::Path}; + +use super::IoEngineKind; + +#[derive(Debug, Clone)] +pub enum OpenOptions { + StdFs(std::fs::OpenOptions), + TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions), +} + +impl Default for OpenOptions { + fn default() -> Self { + match super::get() { + IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()), + IoEngineKind::TokioEpollUring => { + Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new()) + } + } + } +} + +impl OpenOptions { + pub fn new() -> OpenOptions { + Self::default() + } + + pub fn read(&mut self, read: bool) -> &mut OpenOptions { + match self { + OpenOptions::StdFs(x) => { + let _ = x.read(read); + } + OpenOptions::TokioEpollUring(x) => { + let _ = x.read(read); + } + } + self + } + + pub fn write(&mut self, write: bool) -> &mut OpenOptions { + match self { + OpenOptions::StdFs(x) => { + let _ = x.write(write); + } + OpenOptions::TokioEpollUring(x) => { + let _ = x.write(write); + } + } + self + } + + pub fn create(&mut self, create: bool) -> &mut OpenOptions { + match self { + OpenOptions::StdFs(x) => { + let _ = x.create(create); + } + OpenOptions::TokioEpollUring(x) => { + let _ = x.create(create); + } + } + self + } + + pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions { + match self { + OpenOptions::StdFs(x) => { + let _ = x.create_new(create_new); + } + OpenOptions::TokioEpollUring(x) => { + let _ = x.create_new(create_new); + } + } + self + } + + pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions { + match self { + OpenOptions::StdFs(x) => { + let _ = x.truncate(truncate); + } + OpenOptions::TokioEpollUring(x) => { + let _ = x.truncate(truncate); + } + } + self + } + + pub(in crate::virtual_file) async fn open(self, path: &Path) -> std::io::Result { + match self { + OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()), + OpenOptions::TokioEpollUring(x) => { + let system = tokio_epoll_uring::thread_local_system().await; + system.open(path, &x).await.map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + }) + } + } + } +} + +impl std::os::unix::prelude::OpenOptionsExt for OpenOptions { + fn mode(&mut self, mode: u32) -> &mut OpenOptions { + match self { + OpenOptions::StdFs(x) => { + let _ = x.mode(mode); + } + OpenOptions::TokioEpollUring(x) => { + let _ = x.mode(mode); + } + } + self + } + + fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions { + match self { + OpenOptions::StdFs(x) => { + let _ = x.custom_flags(flags); + } + OpenOptions::TokioEpollUring(x) => { + let _ = x.custom_flags(flags); + } + } + self + } +} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9e0beeb4d1..75dc0d9f69 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -428,6 +428,7 @@ class NeonEnvBuilder: preserve_database_files: bool = False, initial_tenant: Optional[TenantId] = None, initial_timeline: Optional[TimelineId] = None, + pageserver_virtual_file_io_engine: Optional[str] = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -460,6 +461,8 @@ class NeonEnvBuilder: self.scrub_on_exit = False self.test_output_dir = test_output_dir + self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -717,6 +720,8 @@ class NeonEnv: self.control_plane_api = None self.attachment_service = None + self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine + # Create a config file corresponding to the options toml = textwrap.dedent( f""" @@ -759,6 +764,12 @@ class NeonEnv: http_auth_type = '{http_auth_type}' """ ) + if self.pageserver_virtual_file_io_engine is not None: + toml += textwrap.dedent( + f""" + virtual_file_io_engine = '{self.pageserver_virtual_file_io_engine}' + """ + ) # Create a corresponding NeonPageserver object self.pageservers.append( @@ -908,6 +919,7 @@ def _shared_simple_env( neon_binpath: Path, pg_distrib_dir: Path, pg_version: PgVersion, + pageserver_virtual_file_io_engine: str, ) -> Iterator[NeonEnv]: """ # Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES @@ -936,6 +948,7 @@ def _shared_simple_env( preserve_database_files=pytestconfig.getoption("--preserve-database-files"), test_name=request.node.name, test_output_dir=test_output_dir, + pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, ) as builder: env = builder.init_start() @@ -972,6 +985,7 @@ def neon_env_builder( default_broker: NeonBroker, run_id: uuid.UUID, request: FixtureRequest, + pageserver_virtual_file_io_engine: str, ) -> Iterator[NeonEnvBuilder]: """ Fixture to create a Neon environment for test. @@ -1000,6 +1014,7 @@ def neon_env_builder( broker=default_broker, run_id=run_id, preserve_database_files=pytestconfig.getoption("--preserve-database-files"), + pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, test_name=request.node.name, test_output_dir=test_output_dir, ) as builder: diff --git a/test_runner/fixtures/parametrize.py b/test_runner/fixtures/parametrize.py index 53350138dd..398d702c30 100644 --- a/test_runner/fixtures/parametrize.py +++ b/test_runner/fixtures/parametrize.py @@ -8,7 +8,7 @@ from _pytest.python import Metafunc from fixtures.pg_version import PgVersion """ -Dynamically parametrize tests by Postgres version and build type (debug/release/remote) +Dynamically parametrize tests by Postgres version, build type (debug/release/remote), and possibly by other parameters """ @@ -31,11 +31,12 @@ def build_type(request: FixtureRequest) -> Optional[str]: return None -def pytest_generate_tests(metafunc: Metafunc): - # Do not parametrize performance tests yet, we need to prepare grafana charts first - if "test_runner/performance" in metafunc.definition._nodeid: - return +@pytest.fixture(scope="function", autouse=True) +def pageserver_virtual_file_io_engine(request: FixtureRequest) -> Optional[str]: + return None + +def pytest_generate_tests(metafunc: Metafunc): if (v := os.environ.get("DEFAULT_PG_VERSION")) is None: pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET] else: @@ -46,5 +47,14 @@ def pytest_generate_tests(metafunc: Metafunc): else: build_types = [bt.lower()] + # A hacky way to parametrize performance tests only for `pageserver_virtual_file_io_engine=tokio-epoll-uring` + # And do not change test name for default `pageserver_virtual_file_io_engine=std-fs` to keep perf tests statistics + if (io_engine := os.environ.get("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"): + metafunc.parametrize("pageserver_virtual_file_io_engine", [io_engine]) + + # Do not parametrize performance tests yet by Postgres version or build type, we need to prepare grafana charts first + if "test_runner/performance" in metafunc.definition._nodeid: + return + metafunc.parametrize("build_type", build_types) metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))