diff --git a/libs/utils/src/rate_limit.rs b/libs/utils/src/rate_limit.rs index 945f710b1d..700cd5792b 100644 --- a/libs/utils/src/rate_limit.rs +++ b/libs/utils/src/rate_limit.rs @@ -17,7 +17,7 @@ impl std::fmt::Display for RateLimitStats { } impl RateLimit { - pub fn new(interval: Duration) -> Self { + pub const fn new(interval: Duration) -> Self { Self { last: None, interval, diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index dd04fb561a..d8eb803335 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -13,7 +13,7 @@ pub(super) mod tokio_epoll_uring_ext; use tokio_epoll_uring::IoBuf; -use tracing::Instrument; +use tracing::{Instrument, info}; pub(crate) use super::api::IoEngineKind; #[derive(Clone, Copy)] @@ -111,13 +111,16 @@ pub(crate) fn get() -> IoEngine { use std::os::unix::prelude::FileExt; use std::sync::atomic::{AtomicU8, Ordering}; +use std::time::Duration; use super::owned_buffers_io::io_buf_ext::FullSlice; use super::owned_buffers_io::slice::SliceMutExt; use super::{FileGuard, Metadata}; #[cfg(target_os = "linux")] -fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error) -> std::io::Error { +pub(super) fn epoll_uring_error_to_std( + e: tokio_epoll_uring::Error, +) -> std::io::Error { match e { tokio_epoll_uring::Error::Op(e) => e, tokio_epoll_uring::Error::System(system) => { @@ -149,7 +152,11 @@ impl IoEngine { #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { let system = tokio_epoll_uring_ext::thread_local_system().await; - let (resources, res) = system.read(file_guard, offset, slice).await; + let (resources, res) = + retry_ecanceled_once((file_guard, slice), |(file_guard, slice)| async { + system.read(file_guard, offset, slice).await + }) + .await; (resources, res.map_err(epoll_uring_error_to_std)) } } @@ -164,7 +171,10 @@ impl IoEngine { #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { let system = tokio_epoll_uring_ext::thread_local_system().await; - let (resources, res) = system.fsync(file_guard).await; + let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async { + system.fsync(file_guard).await + }) + .await; (resources, res.map_err(epoll_uring_error_to_std)) } } @@ -182,7 +192,10 @@ impl IoEngine { #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { let system = tokio_epoll_uring_ext::thread_local_system().await; - let (resources, res) = system.fdatasync(file_guard).await; + let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async { + system.fdatasync(file_guard).await + }) + .await; (resources, res.map_err(epoll_uring_error_to_std)) } } @@ -201,7 +214,10 @@ impl IoEngine { #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { let system = tokio_epoll_uring_ext::thread_local_system().await; - let (resources, res) = system.statx(file_guard).await; + let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async { + system.statx(file_guard).await + }) + .await; ( resources, res.map_err(epoll_uring_error_to_std).map(Metadata::from), @@ -224,6 +240,7 @@ impl IoEngine { #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { // TODO: ftruncate op for tokio-epoll-uring + // Don't forget to use retry_ecanceled_once let res = file_guard.with_std_file(|std_file| std_file.set_len(len)); (file_guard, res) } @@ -245,8 +262,11 @@ impl IoEngine { #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { let system = tokio_epoll_uring_ext::thread_local_system().await; - let ((file_guard, slice), res) = - system.write(file_guard, offset, buf.into_raw_slice()).await; + let ((file_guard, slice), res) = retry_ecanceled_once( + (file_guard, buf.into_raw_slice()), + async |(file_guard, buf)| system.write(file_guard, offset, buf).await, + ) + .await; ( (file_guard, FullSlice::must_new(slice)), res.map_err(epoll_uring_error_to_std), @@ -282,6 +302,55 @@ impl IoEngine { } } +/// We observe in tests that stop pageserver with SIGTERM immediately after it was ingesting data, +/// occasionally buffered writers fail (and get retried by BufferedWriter) with ECANCELED. +/// The problem is believed to be a race condition in how io_uring handles punted async work (io-wq) and signals. +/// Investigation ticket: +/// +/// This function retries the operation once if it fails with ECANCELED. +/// ONLY USE FOR IDEMPOTENT [`super::VirtualFile`] operations. +pub(super) async fn retry_ecanceled_once( + resources: T, + f: F, +) -> (T, Result>) +where + F: Fn(T) -> Fut, + Fut: std::future::Future>)>, + T: Send, + V: Send, +{ + let (resources, res) = f(resources).await; + let Err(e) = res else { + return (resources, res); + }; + let tokio_epoll_uring::Error::Op(err) = e else { + return (resources, Err(e)); + }; + if err.raw_os_error() != Some(nix::libc::ECANCELED) { + return (resources, Err(tokio_epoll_uring::Error::Op(err))); + } + { + static RATE_LIMIT: std::sync::Mutex = + std::sync::Mutex::new(utils::rate_limit::RateLimit::new(Duration::from_secs(1))); + let mut guard = RATE_LIMIT.lock().unwrap(); + guard.call2(|rate_limit_stats| { + info!( + %rate_limit_stats, "ECANCELED observed, assuming it is due to a signal being received by the submitting thread, retrying after a delay; this message is rate-limited" + ); + }); + drop(guard); + } + tokio::time::sleep(Duration::from_millis(100)).await; // something big enough to beat even heavily overcommitted CI runners + let (resources, res) = f(resources).await; + (resources, res) +} + +pub(super) fn panic_operation_must_be_idempotent() { + panic!( + "unsupported; io_engine may retry operations internally and thus needs them to be idempotent (retry_ecanceled_once)" + ) +} + pub enum FeatureTestResult { PlatformPreferred(IoEngineKind), Worse { diff --git a/pageserver/src/virtual_file/open_options.rs b/pageserver/src/virtual_file/open_options.rs index 2a7bb693f2..a40dfed4a4 100644 --- a/pageserver/src/virtual_file/open_options.rs +++ b/pageserver/src/virtual_file/open_options.rs @@ -110,18 +110,23 @@ impl OpenOptions { self } + /// Don't use, `O_APPEND` is not supported. + pub fn append(&mut self, _append: bool) { + super::io_engine::panic_operation_must_be_idempotent(); + } + pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result { match &self.inner { Inner::StdFs(x) => x.open(path).map(|file| file.into()), #[cfg(target_os = "linux")] Inner::TokioEpollUring(x) => { let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await; - system.open(path, x).await.map_err(|e| match e { - tokio_epoll_uring::Error::Op(e) => e, - tokio_epoll_uring::Error::System(system) => { - std::io::Error::new(std::io::ErrorKind::Other, system) - } + let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async { + let res = system.open(path, x).await; + ((), res) }) + .await; + res.map_err(super::io_engine::epoll_uring_error_to_std) } } } @@ -140,6 +145,9 @@ impl OpenOptions { } pub fn custom_flags(mut self, flags: i32) -> Self { + if flags & nix::libc::O_APPEND != 0 { + super::io_engine::panic_operation_must_be_idempotent(); + } match &mut self.inner { Inner::StdFs(x) => { let _ = x.custom_flags(flags);