From d98cb399787f76d61985b58648c56c050d82f206 Mon Sep 17 00:00:00 2001 From: arpad-m Date: Thu, 20 Jul 2023 16:19:38 +0200 Subject: [PATCH] pageserver: use tokio::time::timeout where possible (#4756) Removes a bunch of cases which used `tokio::select` to emulate the `tokio::time::timeout` function. I've done an additional review on the cancellation safety of these futures, all of them seem to be cancellation safe (not that `select!` allows non-cancellation-safe futures, but as we touch them, such a review makes sense). Furthermore, I correct a few mentions of a non-existent `tokio::timeout!` macro in the docs to the `tokio::time::timeout` function. --- docs/pageserver-thread-mgmt.md | 10 +++---- pageserver/src/bin/pageserver.rs | 6 ++-- pageserver/src/disk_usage_eviction_task.rs | 10 +++---- pageserver/src/task_mgr.rs | 18 +++++------ pageserver/src/tenant/tasks.rs | 30 +++++++++---------- .../src/tenant/timeline/eviction_task.rs | 10 +++---- 6 files changed, 40 insertions(+), 44 deletions(-) diff --git a/docs/pageserver-thread-mgmt.md b/docs/pageserver-thread-mgmt.md index b911933528..c911d2c53d 100644 --- a/docs/pageserver-thread-mgmt.md +++ b/docs/pageserver-thread-mgmt.md @@ -30,8 +30,8 @@ or similar, to wake up on shutdown. In async Rust, futures can be "cancelled" at any await point, by dropping the Future. For example, `tokio::select!` returns as soon as -one of the Futures returns, and drops the others. `tokio::timeout!` is -another example. In the Rust ecosystem, some functions are +one of the Futures returns, and drops the others. `tokio::time::timeout` +is another example. In the Rust ecosystem, some functions are cancellation-safe, meaning they can be safely dropped without side-effects, while others are not. See documentation of `tokio::select!` for examples. @@ -42,9 +42,9 @@ function that you call cannot be assumed to be async cancellation-safe, and must be polled to completion. The downside of non-cancellation safe code is that you have to be very -careful when using `tokio::select!`, `tokio::timeout!`, and other such -functions that can cause a Future to be dropped. They can only be used -with functions that are explicitly documented to be cancellation-safe, +careful when using `tokio::select!`, `tokio::time::timeout`, and other +such functions that can cause a Future to be dropped. They can only be +used with functions that are explicitly documented to be cancellation-safe, or you need to spawn a separate task to shield from the cancellation. At the entry points to the code, we also take care to poll futures to diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index b01ace63e4..b247fdf0ab 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -396,8 +396,8 @@ fn start_pageserver( let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed")); - let init_sizes_done = tokio::select! { - _ = &mut init_sizes_done => { + let init_sizes_done = match tokio::time::timeout(timeout, &mut init_sizes_done).await { + Ok(_) => { let now = std::time::Instant::now(); tracing::info!( from_init_done_millis = (now - init_done).as_millis(), @@ -406,7 +406,7 @@ fn start_pageserver( ); None } - _ = tokio::time::sleep(timeout) => { + Err(_) => { tracing::info!( timeout_millis = timeout.as_millis(), "Initial logical size timeout elapsed; starting background jobs" diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index b2ca9ab0bb..37c52c5423 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -166,11 +166,11 @@ async fn disk_usage_eviction_task( .await; let sleep_until = start + task_config.period; - tokio::select! { - _ = tokio::time::sleep_until(sleep_until) => {}, - _ = cancel.cancelled() => { - break - } + if tokio::time::timeout_at(sleep_until, cancel.cancelled()) + .await + .is_ok() + { + break; } } } diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 9c6851bc71..f3a4ce6db7 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -511,17 +511,13 @@ pub async fn shutdown_tasks( warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over"); } } - let join_handle = tokio::select! { - biased; - _ = &mut join_handle => { None }, - _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => { - // allow some time to elapse before logging to cut down the number of log - // lines. - info!("waiting for {} to shut down", task.name); - Some(join_handle) - } - }; - if let Some(join_handle) = join_handle { + if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle) + .await + .is_err() + { + // allow some time to elapse before logging to cut down the number of log + // lines. + info!("waiting for {} to shut down", task.name); // we never handled this return value, but: // - we don't deschedule which would lead to is_cancelled // - panics are already logged (is_panicked) diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 360818b5a7..622ae371a4 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -122,12 +122,12 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { warn_when_period_overrun(started_at.elapsed(), period, "compaction"); // Sleep - tokio::select! { - _ = cancel.cancelled() => { - info!("received cancellation request during idling"); - break; - }, - _ = tokio::time::sleep(sleep_duration) => {}, + if tokio::time::timeout(sleep_duration, cancel.cancelled()) + .await + .is_ok() + { + info!("received cancellation request during idling"); + break; } } } @@ -196,12 +196,12 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { warn_when_period_overrun(started_at.elapsed(), period, "gc"); // Sleep - tokio::select! { - _ = cancel.cancelled() => { - info!("received cancellation request during idling"); - break; - }, - _ = tokio::time::sleep(sleep_duration) => {}, + if tokio::time::timeout(sleep_duration, cancel.cancelled()) + .await + .is_ok() + { + info!("received cancellation request during idling"); + break; } } } @@ -263,9 +263,9 @@ pub(crate) async fn random_init_delay( rng.gen_range(Duration::ZERO..=period) }; - tokio::select! { - _ = cancel.cancelled() => Err(Cancelled), - _ = tokio::time::sleep(d) => Ok(()), + match tokio::time::timeout(d, cancel.cancelled()).await { + Ok(_) => Err(Cancelled), + Err(_) => Ok(()), } } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 80146419df..a5f570d942 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -100,11 +100,11 @@ impl Timeline { match cf { ControlFlow::Break(()) => break, ControlFlow::Continue(sleep_until) => { - tokio::select! { - _ = cancel.cancelled() => { - break; - } - _ = tokio::time::sleep_until(sleep_until) => { } + if tokio::time::timeout_at(sleep_until, cancel.cancelled()) + .await + .is_ok() + { + break; } } }