From 0694ee9531ffa2f4391cf0f27c89b5afab1ba052 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 15 Mar 2024 20:46:15 +0100 Subject: [PATCH] tokio-epoll-uring: retry on launch failures due to locked memory (#7141) refs https://github.com/neondatabase/neon/issues/7136 Problem ------- Before this PR, we were using `tokio_epoll_uring::thread_local_system()`, which panics on tokio_epoll_uring::System::launch() failure As we've learned in [the past](https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391), some older Linux kernels account io_uring instances as locked memory. And while we've raised the limit in prod considerably, we did hit it once on 2024-03-11 16:30 UTC. That was after we enabled tokio-epoll-uring fleet-wide, but before we had shipped release-5090 (c6ed86d3d0690b52e7014b6a696effa95714e8cb) which did away with the last mass-creation of tokio-epoll-uring instances as per commit 3da410c8fee05b0cd65a5c0b83fffa3d5680cd77 Author: Christian Schwarz Date: Tue Mar 5 10:03:54 2024 +0100 tokio-epoll-uring: use it on the layer-creating code paths (#6378) Nonetheless, it highlighted that panicking in this situation is probably not ideal, as it can leave the pageserver process in a semi-broken state. Further, due to low sampling rate of Prometheus metrics, we don't know much about the circumstances of this failure instance. Solution -------- This PR implements a custom thread_local_system() that is pageserver-aware and will do the following on failure: - dump relevant stats to `tracing!`, hopefully they will be useful to understand the circumstances better - if it's the locked memory failure (or any other ENOMEM): abort() the process - if it's ENOMEM, retry with exponential back-off, capped at 3s. - add metric counters so we can create an alert This makes sense in the production environment where we know that _usually_, there's ample locked memory allowance available, and we know the failure rate is rare. --- Cargo.lock | 2 + clippy.toml | 2 + pageserver/Cargo.toml | 1 + pageserver/src/metrics.rs | 27 ++- pageserver/src/virtual_file/io_engine.rs | 14 +- .../io_engine/tokio_epoll_uring_ext.rs | 194 ++++++++++++++++++ pageserver/src/virtual_file/open_options.rs | 2 +- workspace_hack/Cargo.toml | 2 + 8 files changed, 234 insertions(+), 10 deletions(-) create mode 100644 pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs diff --git a/Cargo.lock b/Cargo.lock index 99ba8b1cb3..022dc11f07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3530,6 +3530,7 @@ dependencies = [ "postgres_connection", "postgres_ffi", "pq_proto", + "procfs", "rand 0.8.5", "regex", "remote_storage", @@ -6987,6 +6988,7 @@ dependencies = [ "axum", "base64 0.21.1", "base64ct", + "byteorder", "bytes", "cc", "chrono", diff --git a/clippy.toml b/clippy.toml index 5f7dc66152..4c0c04f9a1 100644 --- a/clippy.toml +++ b/clippy.toml @@ -2,6 +2,8 @@ disallowed-methods = [ "tokio::task::block_in_place", # Allow this for now, to deny it later once we stop using Handle::block_on completely # "tokio::runtime::Handle::block_on", + # use tokio_epoll_uring_ext instead + "tokio_epoll_uring::thread_local_system", ] disallowed-macros = [ diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 5adeaffe1a..2702a2040a 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -48,6 +48,7 @@ postgres.workspace = true postgres_backend.workspace = true postgres-protocol.workspace = true postgres-types.workspace = true +procfs.workspace = true rand.workspace = true regex.workspace = true scopeguard.workspace = true diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 03537ddb05..075bb76a1b 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2465,7 +2465,8 @@ impl>, O, E> Future for MeasuredRemoteOp { } pub mod tokio_epoll_uring { - use metrics::UIntGauge; + use metrics::{register_int_counter, UIntGauge}; + use once_cell::sync::Lazy; pub struct Collector { descs: Vec, @@ -2473,15 +2474,13 @@ pub mod tokio_epoll_uring { systems_destroyed: UIntGauge, } - const NMETRICS: usize = 2; - impl metrics::core::Collector for Collector { fn desc(&self) -> Vec<&metrics::core::Desc> { self.descs.iter().collect() } fn collect(&self) -> Vec { - let mut mfs = Vec::with_capacity(NMETRICS); + let mut mfs = Vec::with_capacity(Self::NMETRICS); let tokio_epoll_uring::metrics::Metrics { systems_created, systems_destroyed, @@ -2495,6 +2494,8 @@ pub mod tokio_epoll_uring { } impl Collector { + const NMETRICS: usize = 2; + #[allow(clippy::new_without_default)] pub fn new() -> Self { let mut descs = Vec::new(); @@ -2528,6 +2529,22 @@ pub mod tokio_epoll_uring { } } } + + pub(crate) static THREAD_LOCAL_LAUNCH_SUCCESSES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_tokio_epoll_uring_pageserver_thread_local_launch_success_count", + "Number of times where thread_local_system creation spanned multiple executor threads", + ) + .unwrap() + }); + + pub(crate) static THREAD_LOCAL_LAUNCH_FAILURES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_tokio_epoll_uring_pageserver_thread_local_launch_failures_count", + "Number of times thread_local_system creation failed and was retried after back-off.", + ) + .unwrap() + }); } pub(crate) mod tenant_throttling { @@ -2656,6 +2673,8 @@ pub fn preinitialize_metrics() { &WALRECEIVER_BROKER_UPDATES, &WALRECEIVER_CANDIDATES_ADDED, &WALRECEIVER_CANDIDATES_REMOVED, + &tokio_epoll_uring::THREAD_LOCAL_LAUNCH_FAILURES, + &tokio_epoll_uring::THREAD_LOCAL_LAUNCH_SUCCESSES, ] .into_iter() .for_each(|c| { diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 55fa59e53b..2dd0ce64d6 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -6,6 +6,10 @@ //! Initialize using [`init`]. //! //! Then use [`get`] and [`super::OpenOptions`]. +//! +//! + +pub(super) mod tokio_epoll_uring_ext; use tokio_epoll_uring::{IoBuf, Slice}; use tracing::Instrument; @@ -145,7 +149,7 @@ impl IoEngine { } #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; + let system = tokio_epoll_uring_ext::thread_local_system().await; let (resources, res) = system.read(file_guard, offset, buf).await; (resources, res.map_err(epoll_uring_error_to_std)) } @@ -160,7 +164,7 @@ impl IoEngine { } #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; + let system = tokio_epoll_uring_ext::thread_local_system().await; let (resources, res) = system.fsync(file_guard).await; (resources, res.map_err(epoll_uring_error_to_std)) } @@ -178,7 +182,7 @@ impl IoEngine { } #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; + let system = tokio_epoll_uring_ext::thread_local_system().await; let (resources, res) = system.fdatasync(file_guard).await; (resources, res.map_err(epoll_uring_error_to_std)) } @@ -197,7 +201,7 @@ impl IoEngine { } #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; + let system = tokio_epoll_uring_ext::thread_local_system().await; let (resources, res) = system.statx(file_guard).await; ( resources, @@ -220,7 +224,7 @@ impl IoEngine { } #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { - let system = tokio_epoll_uring::thread_local_system().await; + let system = tokio_epoll_uring_ext::thread_local_system().await; let (resources, res) = system.write(file_guard, offset, buf).await; (resources, res.map_err(epoll_uring_error_to_std)) } diff --git a/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs b/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs new file mode 100644 index 0000000000..c4b10f3a24 --- /dev/null +++ b/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs @@ -0,0 +1,194 @@ +//! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific +//! handling in case the instance can't launched. +//! +//! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation +//! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series. +//! See for more details. + +use std::sync::atomic::AtomicU32; +use std::sync::Arc; + +use tokio_util::sync::CancellationToken; +use tracing::{error, info, info_span, warn, Instrument}; +use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS}; + +use tokio_epoll_uring::{System, SystemHandle}; + +use crate::virtual_file::on_fatal_io_error; + +use crate::metrics::tokio_epoll_uring as metrics; + +#[derive(Clone)] +struct ThreadLocalState(Arc); + +struct ThreadLocalStateInner { + cell: tokio::sync::OnceCell, + launch_attempts: AtomicU32, +} + +impl ThreadLocalState { + pub fn new() -> Self { + Self(Arc::new(ThreadLocalStateInner { + cell: tokio::sync::OnceCell::default(), + launch_attempts: AtomicU32::new(0), + })) + } + pub fn make_id_string(&self) -> String { + format!("0x{:p}", Arc::as_ptr(&self.0)) + } +} + +impl Drop for ThreadLocalState { + fn drop(&mut self) { + info!(parent: None, id=%self.make_id_string(), "tokio-epoll-uring_ext: ThreadLocalState is being dropped and id might be re-used in the future"); + } +} + +thread_local! { + static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new(); +} + +/// Panics if we cannot [`System::launch`]. +pub async fn thread_local_system() -> Handle { + let fake_cancel = CancellationToken::new(); + loop { + let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone()); + let inner = &thread_local_state.0; + let get_or_init_res = inner + .cell + .get_or_try_init(|| async { + let attempt_no = inner + .launch_attempts + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no); + async { + // Rate-limit retries per thread-local. + // NB: doesn't yield to executor at attempt_no=0. + utils::backoff::exponential_backoff( + attempt_no, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + &fake_cancel, + ) + .await; + let res = System::launch() + // this might move us to another executor thread => loop outside the get_or_try_init, not inside it + .await; + match res { + Ok(system) => { + info!("successfully launched system"); + metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc(); + Ok(system) + } + Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => { + warn!("not enough locked memory to tokio-epoll-uring, will retry"); + info_span!("stats").in_scope(|| { + emit_launch_failure_process_stats(); + }); + metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc(); + Err(()) + } + // abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere. + // This is equivalent to a fatal IO error. + Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => { + error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process"); + info_span!("stats").in_scope(|| { + emit_launch_failure_process_stats(); + }); + on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring"); + }, + } + } + .instrument(span) + .await + }) + .await; + if get_or_init_res.is_ok() { + return Handle(thread_local_state); + } + } +} + +fn emit_launch_failure_process_stats() { + // tokio-epoll-uring stats + // vmlck + rlimit + // number of threads + // rss / system memory usage generally + + let tokio_epoll_uring::metrics::Metrics { + systems_created, + systems_destroyed, + } = tokio_epoll_uring::metrics::global(); + info!(systems_created, systems_destroyed, "tokio-epoll-uring"); + + match procfs::process::Process::myself() { + Ok(myself) => { + match myself.limits() { + Ok(limits) => { + info!(?limits.max_locked_memory, "/proc/self/limits"); + } + Err(error) => { + info!(%error, "no limit stats due to error"); + } + } + + match myself.status() { + Ok(status) => { + let procfs::process::Status { + vmsize, + vmlck, + vmpin, + vmrss, + rssanon, + rssfile, + rssshmem, + vmdata, + vmstk, + vmexe, + vmlib, + vmpte, + threads, + .. + } = status; + info!( + vmsize, + vmlck, + vmpin, + vmrss, + rssanon, + rssfile, + rssshmem, + vmdata, + vmstk, + vmexe, + vmlib, + vmpte, + threads, + "/proc/self/status" + ); + } + Err(error) => { + info!(%error, "no status status due to error"); + } + } + } + Err(error) => { + info!(%error, "no process stats due to error"); + } + }; +} + +#[derive(Clone)] +pub struct Handle(ThreadLocalState); + +impl std::ops::Deref for Handle { + type Target = SystemHandle; + + fn deref(&self) -> &Self::Target { + self.0 + .0 + .cell + .get() + .expect("must be already initialized when using this") + } +} diff --git a/pageserver/src/virtual_file/open_options.rs b/pageserver/src/virtual_file/open_options.rs index f75edb0bac..7f951270d1 100644 --- a/pageserver/src/virtual_file/open_options.rs +++ b/pageserver/src/virtual_file/open_options.rs @@ -98,7 +98,7 @@ impl OpenOptions { OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()), #[cfg(target_os = "linux")] OpenOptions::TokioEpollUring(x) => { - let system = tokio_epoll_uring::thread_local_system().await; + let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await; system.open(path, x).await.map_err(|e| match e { tokio_epoll_uring::Error::Op(e) => e, tokio_epoll_uring::Error::System(system) => { diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 8593b752c2..0646091006 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -24,6 +24,7 @@ aws-smithy-types = { version = "1", default-features = false, features = ["byte- axum = { version = "0.6", features = ["ws"] } base64 = { version = "0.21", features = ["alloc"] } base64ct = { version = "1", default-features = false, features = ["std"] } +byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } clap = { version = "4", features = ["derive", "string"] } @@ -86,6 +87,7 @@ zstd-sys = { version = "2", default-features = false, features = ["legacy", "std [build-dependencies] anyhow = { version = "1", features = ["backtrace"] } +byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }