diff --git a/docker-compose/pageserver_config/pageserver.toml b/docker-compose/pageserver_config/pageserver.toml index 7d603b6c65..81445ed412 100644 --- a/docker-compose/pageserver_config/pageserver.toml +++ b/docker-compose/pageserver_config/pageserver.toml @@ -5,3 +5,4 @@ listen_http_addr='0.0.0.0:9898' remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' } control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address control_plane_emergency_mode=true +virtual_file_io_mode="buffered" # the CI runners where we run the docker compose tests have slow disks diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index ff911499ab..5fcdefba66 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1832,6 +1832,7 @@ pub mod virtual_file { Eq, Hash, strum_macros::EnumString, + strum_macros::EnumIter, strum_macros::Display, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, @@ -1843,10 +1844,8 @@ pub mod virtual_file { /// Uses buffered IO. Buffered, /// Uses direct IO for reads only. - #[cfg(target_os = "linux")] Direct, /// Use direct IO for reads and writes. - #[cfg(target_os = "linux")] DirectRw, } @@ -1854,26 +1853,13 @@ pub mod virtual_file { pub fn preferred() -> Self { // The default behavior when running Rust unit tests without any further // flags is to use the newest behavior (DirectRw). - // The CI uses the following environment variable to unit tests for all - // different modes. + // The CI uses the environment variable to unit tests for all different modes. // NB: the Python regression & perf tests have their own defaults management // that writes pageserver.toml; they do not use this variable. - if cfg!(test) { - static CACHED: LazyLock = LazyLock::new(|| { - utils::env::var_serde_json_string( - "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE", - ) - .unwrap_or( - #[cfg(target_os = "linux")] - IoMode::DirectRw, - #[cfg(not(target_os = "linux"))] - IoMode::Buffered, - ) - }); - *CACHED - } else { - IoMode::Buffered - } + static ENV_OVERRIDE: LazyLock> = LazyLock::new(|| { + utils::env::var_serde_json_string("NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE") + }); + ENV_OVERRIDE.unwrap_or(IoMode::DirectRw) } } @@ -1883,9 +1869,7 @@ pub mod virtual_file { fn try_from(value: u8) -> Result { Ok(match value { v if v == (IoMode::Buffered as u8) => IoMode::Buffered, - #[cfg(target_os = "linux")] v if v == (IoMode::Direct as u8) => IoMode::Direct, - #[cfg(target_os = "linux")] v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw, x => return Err(x), }) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 2836450a0e..eaadfe14ae 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -14,6 +14,7 @@ use pageserver_api::key::Key; use pageserver_api::models::virtual_file::IoMode; use pageserver_api::shard::TenantShardId; use pageserver_api::value::Value; +use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; @@ -244,13 +245,7 @@ fn criterion_benchmark(c: &mut Criterion) { ]; let exploded_parameters = { let mut out = Vec::new(); - for io_mode in [ - IoMode::Buffered, - #[cfg(target_os = "linux")] - IoMode::Direct, - #[cfg(target_os = "linux")] - IoMode::DirectRw, - ] { + for io_mode in IoMode::iter() { for param in expect.clone() { let HandPickedParameters { volume_mib, diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index f429e59ef3..c707d35114 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -74,6 +74,8 @@ pub struct VirtualFile { impl VirtualFile { /// Open a file in read-only mode. Like File::open. + /// + /// Insensitive to `virtual_file_io_mode` setting. pub async fn open>( path: P, ctx: &RequestContext, @@ -95,31 +97,20 @@ impl VirtualFile { Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await } + /// `O_DIRECT` will be enabled base on `virtual_file_io_mode`. pub async fn open_with_options_v2>( path: P, - #[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions, + mut open_options: OpenOptions, ctx: &RequestContext, ) -> Result { let mode = get_io_mode(); - let set_o_direct = match (mode, open_options.is_write()) { + let direct = match (mode, open_options.is_write()) { (IoMode::Buffered, _) => false, - #[cfg(target_os = "linux")] (IoMode::Direct, false) => true, - #[cfg(target_os = "linux")] (IoMode::Direct, true) => false, - #[cfg(target_os = "linux")] (IoMode::DirectRw, _) => true, }; - if set_o_direct { - #[cfg(target_os = "linux")] - { - open_options = open_options.custom_flags(nix::libc::O_DIRECT); - } - #[cfg(not(target_os = "linux"))] - unreachable!( - "O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined" - ); - } + open_options = open_options.direct(direct); let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?; Ok(VirtualFile { inner, _mode: mode }) } @@ -791,6 +782,12 @@ impl VirtualFileInner { where Buf: tokio_epoll_uring::IoBufMut + Send, { + self.validate_direct_io( + Slice::stable_ptr(&buf).addr(), + Slice::bytes_total(&buf), + offset, + ); + let file_guard = match self .lock_file() .await @@ -816,6 +813,8 @@ impl VirtualFileInner { offset: u64, ctx: &RequestContext, ) -> (FullSlice, Result) { + self.validate_direct_io(buf.as_ptr().addr(), buf.len(), offset); + let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, Err(e) => return (buf, Err(e)), @@ -830,6 +829,64 @@ impl VirtualFileInner { (buf, result) }) } + + /// Validate all reads and writes to adhere to the O_DIRECT requirements of our production systems. + /// + /// Validating it iin userspace sets a consistent bar, independent of what actual OS/filesystem/block device is in use. + fn validate_direct_io(&self, addr: usize, size: usize, offset: u64) { + // TODO: eventually enable validation in the builds we use in real environments like staging, preprod, and prod. + if !(cfg!(feature = "testing") || cfg!(test)) { + return; + } + if !self.open_options.is_direct() { + return; + } + + // Validate buffer memory alignment. + // + // What practically matters as of Linux 6.1 is bdev_dma_alignment() + // which is practically between 512 and 4096. + // On our production systems, the value is 512. + // The IoBuffer/IoBufferMut hard-code that value. + // + // Because the alloctor might return _more_ aligned addresses than requested, + // there is a chance that testing would not catch violations of a runtime requirement stricter than 512. + { + let requirement = 512; + let remainder = addr % requirement; + assert!( + remainder == 0, + "Direct I/O buffer must be aligned: buffer_addr=0x{addr:x} % 0x{requirement:x} = 0x{remainder:x}" + ); + } + + // Validate offset alignment. + // + // We hard-code 512 throughout the code base. + // So enforce just that and not anything more restrictive. + // Even the shallowest testing will expose more restrictive requirements if those ever arise. + { + let requirement = 512; + let remainder = offset % requirement; + assert!( + remainder == 0, + "Direct I/O offset must be aligned: offset=0x{offset:x} % 0x{requirement:x} = 0x{remainder:x}" + ); + } + + // Validate buffer size multiple requirement. + // + // The requirement in Linux 6.1 is bdev_logical_block_size(). + // On our production systems, that is 512. + { + let requirement = 512; + let remainder = size % requirement; + assert!( + remainder == 0, + "Direct I/O buffer size must be a multiple of {requirement}: size=0x{size:x} % 0x{requirement:x} = 0x{remainder:x}" + ); + } + } } // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 @@ -1218,7 +1275,6 @@ mod tests { use std::sync::Arc; use owned_buffers_io::io_buf_ext::IoBufExt; - use owned_buffers_io::slice::SliceMutExt; use rand::seq::SliceRandom; use rand::{Rng, thread_rng}; @@ -1226,162 +1282,38 @@ mod tests { use crate::context::DownloadBehavior; use crate::task_mgr::TaskKind; - enum MaybeVirtualFile { - VirtualFile(VirtualFile), - File(File), - } - - impl From for MaybeVirtualFile { - fn from(vf: VirtualFile) -> Self { - MaybeVirtualFile::VirtualFile(vf) - } - } - - impl MaybeVirtualFile { - async fn read_exact_at( - &self, - mut slice: tokio_epoll_uring::Slice, - offset: u64, - ctx: &RequestContext, - ) -> Result, Error> { - match self { - MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await, - MaybeVirtualFile::File(file) => { - let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed(); - file.read_exact_at(rust_slice, offset).map(|()| slice) - } - } - } - async fn write_all_at( - &self, - buf: FullSlice, - offset: u64, - ctx: &RequestContext, - ) -> Result<(), Error> { - match self { - MaybeVirtualFile::VirtualFile(file) => { - let (_buf, res) = file.write_all_at(buf, offset, ctx).await; - res - } - MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset), - } - } - - // Helper function to slurp a portion of a file into a string - async fn read_string_at( - &mut self, - pos: u64, - len: usize, - ctx: &RequestContext, - ) -> Result { - let slice = IoBufferMut::with_capacity(len).slice_full(); - assert_eq!(slice.bytes_total(), len); - let slice = self.read_exact_at(slice, pos, ctx).await?; - let buf = slice.into_inner(); - assert_eq!(buf.len(), len); - - Ok(String::from_utf8(buf.to_vec()).unwrap()) - } - } - #[tokio::test] async fn test_virtual_files() -> anyhow::Result<()> { - // The real work is done in the test_files() helper function. This - // allows us to run the same set of tests against a native File, and - // VirtualFile. We trust the native Files and wouldn't need to test them, - // but this allows us to verify that the operations return the same - // results with VirtualFiles as with native Files. (Except that with - // native files, you will run out of file descriptors if the ulimit - // is low enough.) - struct A; - - impl Adapter for A { - async fn open( - path: Utf8PathBuf, - opts: OpenOptions, - ctx: &RequestContext, - ) -> Result { - let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?; - Ok(MaybeVirtualFile::VirtualFile(vf)) - } - } - test_files::("virtual_files").await - } - - #[tokio::test] - async fn test_physical_files() -> anyhow::Result<()> { - struct B; - - impl Adapter for B { - async fn open( - path: Utf8PathBuf, - opts: OpenOptions, - _ctx: &RequestContext, - ) -> Result { - Ok(MaybeVirtualFile::File({ - let owned_fd = opts.open(path.as_std_path()).await?; - File::from(owned_fd) - })) - } - } - - test_files::("physical_files").await - } - - /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition - /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function - /// in trait which benefits from the new lifetime capture rules already. - trait Adapter { - async fn open( - path: Utf8PathBuf, - opts: OpenOptions, - ctx: &RequestContext, - ) -> Result; - } - - async fn test_files(testname: &str) -> anyhow::Result<()> - where - A: Adapter, - { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); - let testdir = crate::config::PageServerConf::test_repo_dir(testname); + let testdir = crate::config::PageServerConf::test_repo_dir("test_virtual_files"); std::fs::create_dir_all(&testdir)?; + let zeropad512 = |content: &[u8]| { + let mut buf = IoBufferMut::with_capacity_zeroed(512); + buf[..content.len()].copy_from_slice(content); + buf.freeze().slice_len() + }; + let path_a = testdir.join("file_a"); - let mut file_a = A::open( + let file_a = VirtualFile::open_with_options_v2( path_a.clone(), OpenOptions::new() + .read(true) .write(true) + // set create & truncate flags to ensure when we trigger a reopen later in this test, + // the reopen_options must have masked out those flags; if they don't, then + // the after reopen we will fail to read the `content_a` that we write here. .create(true) - .truncate(true) - .to_owned(), + .truncate(true), &ctx, ) .await?; + let (_, res) = file_a.write_all_at(zeropad512(b"content_a"), 0, &ctx).await; + res?; - file_a - .write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx) - .await?; - - // cannot read from a file opened in write-only mode - let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err(); - - // Close the file and re-open for reading - let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?; - - // cannot write to a file opened in read-only mode - let _ = file_a - .write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx) - .await - .unwrap_err(); - - // Try simple read - assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); - - // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); - let mut file_b = A::open( + let file_b = VirtualFile::open_with_options_v2( path_b.clone(), OpenOptions::new() .read(true) @@ -1391,37 +1323,44 @@ mod tests { &ctx, ) .await?; - file_b - .write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx) - .await?; - file_b - .write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx) - .await?; + let (_, res) = file_b.write_all_at(zeropad512(b"content_b"), 0, &ctx).await; + res?; - assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA"); + let assert_first_512_eq = async |vfile: &VirtualFile, expect: &[u8]| { + let buf = vfile + .read_exact_at(IoBufferMut::with_capacity_zeroed(512).slice_full(), 0, &ctx) + .await + .unwrap(); + assert_eq!(&buf[..], &zeropad512(expect)[..]); + }; - // Open a lot of files, enough to cause some evictions. (Or to be precise, - // open the same file many times. The effect is the same.) + // Open a lot of file descriptors / VirtualFile instances. + // Enough to cause some evictions in the fd cache. - let mut vfiles = Vec::new(); + let mut file_b_dupes = Vec::new(); for _ in 0..100 { - let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?; - assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?); - vfiles.push(vfile); + let vfile = VirtualFile::open_with_options_v2( + path_b.clone(), + OpenOptions::new().read(true), + &ctx, + ) + .await?; + assert_first_512_eq(&vfile, b"content_b").await; + file_b_dupes.push(vfile); } // make sure we opened enough files to definitely cause evictions. - assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2); + assert!(file_b_dupes.len() > TEST_MAX_FILE_DESCRIPTORS * 2); // The underlying file descriptor for 'file_a' should be closed now. Try to read - // from it again. - assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); + // from it again. The VirtualFile reopens the file internally. + assert_first_512_eq(&file_a, b"content_a").await; // Check that all the other FDs still work too. Use them in random order for // good measure. - vfiles.as_mut_slice().shuffle(&mut thread_rng()); - for vfile in vfiles.iter_mut() { - assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?); + file_b_dupes.as_mut_slice().shuffle(&mut thread_rng()); + for vfile in file_b_dupes.iter_mut() { + assert_first_512_eq(vfile, b"content_b").await; } Ok(()) @@ -1452,7 +1391,7 @@ mod tests { // Open the file many times. let mut files = Vec::new(); for _ in 0..VIRTUAL_FILES { - let f = VirtualFileInner::open_with_options( + let f = VirtualFile::open_with_options_v2( &test_file_path, OpenOptions::new().read(true), &ctx, @@ -1497,8 +1436,6 @@ mod tests { #[tokio::test] async fn test_atomic_overwrite_basic() { - let ctx = - RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic"); std::fs::create_dir_all(&testdir).unwrap(); @@ -1508,26 +1445,22 @@ mod tests { VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) .await .unwrap(); - let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string_at(0, 3, &ctx).await.unwrap(); + + let post = std::fs::read_to_string(&path).unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); - drop(file); VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec()) .await .unwrap(); - let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string_at(0, 3, &ctx).await.unwrap(); + + let post = std::fs::read_to_string(&path).unwrap(); assert_eq!(post, "bar"); assert!(!tmp_path.exists()); - drop(file); } #[tokio::test] async fn test_atomic_overwrite_preexisting_tmp() { - let ctx = - RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp"); std::fs::create_dir_all(&testdir).unwrap(); @@ -1542,10 +1475,8 @@ mod tests { .await .unwrap(); - let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string_at(0, 3, &ctx).await.unwrap(); + let post = std::fs::read_to_string(&path).unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); - drop(file); } } diff --git a/pageserver/src/virtual_file/open_options.rs b/pageserver/src/virtual_file/open_options.rs index a40dfed4a4..7d478f3600 100644 --- a/pageserver/src/virtual_file/open_options.rs +++ b/pageserver/src/virtual_file/open_options.rs @@ -8,7 +8,13 @@ use super::io_engine::IoEngine; #[derive(Debug, Clone)] pub struct OpenOptions { + /// We keep a copy of the write() flag we pass to the `inner`` `OptionOptions` + /// to support [`Self::is_write`]. write: bool, + /// We don't expose + pass through a raw `custom_flags()` style API. + /// The only custom flag we support is `O_DIRECT`, which we track here + /// and map to `custom_flags()` in the [`Self::open`] method. + direct: bool, inner: Inner, } #[derive(Debug, Clone)] @@ -30,6 +36,7 @@ impl Default for OpenOptions { }; Self { write: false, + direct: false, inner, } } @@ -44,6 +51,10 @@ impl OpenOptions { self.write } + pub(super) fn is_direct(&self) -> bool { + self.direct + } + pub fn read(mut self, read: bool) -> Self { match &mut self.inner { Inner::StdFs(x) => { @@ -116,13 +127,38 @@ impl OpenOptions { } pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result { - match &self.inner { - Inner::StdFs(x) => x.open(path).map(|file| file.into()), + #[cfg_attr(not(target_os = "linux"), allow(unused_mut))] + let mut custom_flags = 0; + if self.direct { #[cfg(target_os = "linux")] - Inner::TokioEpollUring(x) => { + { + custom_flags |= nix::libc::O_DIRECT; + } + #[cfg(not(target_os = "linux"))] + { + // Other platforms may be used for development but don't necessarily have a 1:1 equivalent to Linux's O_DIRECT (macOS!). + // Just don't set the flag; to catch alignment bugs typical for O_DIRECT, + // we have a runtime validation layer inside `VirtualFile::write_at` and `VirtualFile::read_at`. + static WARNING: std::sync::Once = std::sync::Once::new(); + WARNING.call_once(|| { + let span = tracing::info_span!(parent: None, "open_options"); + let _enter = span.enter(); + tracing::warn!("your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs; this warning is logged once per process"); + }); + } + } + + match self.inner.clone() { + Inner::StdFs(mut x) => x + .custom_flags(custom_flags) + .open(path) + .map(|file| file.into()), + #[cfg(target_os = "linux")] + Inner::TokioEpollUring(mut x) => { + x.custom_flags(custom_flags); let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await; let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async { - let res = system.open(path, x).await; + let res = system.open(path, &x).await; ((), res) }) .await; @@ -144,19 +180,8 @@ impl OpenOptions { self } - pub fn custom_flags(mut self, flags: i32) -> Self { - if flags & nix::libc::O_APPEND != 0 { - super::io_engine::panic_operation_must_be_idempotent(); - } - match &mut self.inner { - Inner::StdFs(x) => { - let _ = x.custom_flags(flags); - } - #[cfg(target_os = "linux")] - Inner::TokioEpollUring(x) => { - let _ = x.custom_flags(flags); - } - } + pub fn direct(mut self, direct: bool) -> Self { + self.direct = direct; self } } diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 43bffd919c..9b564f0a60 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -111,6 +111,13 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( ".*stalling layer flushes for compaction backpressure.*", ".*layer roll waiting for flush due to compaction backpressure.*", ".*BatchSpanProcessor.*", + *( + [ + r".*your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs.*" + ] + if sys.platform != "linux" + else [] + ), )