mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
pageserver: use tokio::time::timeout where possible (#4756)
Removes a bunch of cases which used `tokio::select` to emulate the `tokio::time::timeout` function. I've done an additional review on the cancellation safety of these futures, all of them seem to be cancellation safe (not that `select!` allows non-cancellation-safe futures, but as we touch them, such a review makes sense). Furthermore, I correct a few mentions of a non-existent `tokio::timeout!` macro in the docs to the `tokio::time::timeout` function.
This commit is contained in:
@@ -30,8 +30,8 @@ or similar, to wake up on shutdown.
|
||||
|
||||
In async Rust, futures can be "cancelled" at any await point, by
|
||||
dropping the Future. For example, `tokio::select!` returns as soon as
|
||||
one of the Futures returns, and drops the others. `tokio::timeout!` is
|
||||
another example. In the Rust ecosystem, some functions are
|
||||
one of the Futures returns, and drops the others. `tokio::time::timeout`
|
||||
is another example. In the Rust ecosystem, some functions are
|
||||
cancellation-safe, meaning they can be safely dropped without
|
||||
side-effects, while others are not. See documentation of
|
||||
`tokio::select!` for examples.
|
||||
@@ -42,9 +42,9 @@ function that you call cannot be assumed to be async
|
||||
cancellation-safe, and must be polled to completion.
|
||||
|
||||
The downside of non-cancellation safe code is that you have to be very
|
||||
careful when using `tokio::select!`, `tokio::timeout!`, and other such
|
||||
functions that can cause a Future to be dropped. They can only be used
|
||||
with functions that are explicitly documented to be cancellation-safe,
|
||||
careful when using `tokio::select!`, `tokio::time::timeout`, and other
|
||||
such functions that can cause a Future to be dropped. They can only be
|
||||
used with functions that are explicitly documented to be cancellation-safe,
|
||||
or you need to spawn a separate task to shield from the cancellation.
|
||||
|
||||
At the entry points to the code, we also take care to poll futures to
|
||||
|
||||
@@ -396,8 +396,8 @@ fn start_pageserver(
|
||||
|
||||
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
|
||||
|
||||
let init_sizes_done = tokio::select! {
|
||||
_ = &mut init_sizes_done => {
|
||||
let init_sizes_done = match tokio::time::timeout(timeout, &mut init_sizes_done).await {
|
||||
Ok(_) => {
|
||||
let now = std::time::Instant::now();
|
||||
tracing::info!(
|
||||
from_init_done_millis = (now - init_done).as_millis(),
|
||||
@@ -406,7 +406,7 @@ fn start_pageserver(
|
||||
);
|
||||
None
|
||||
}
|
||||
_ = tokio::time::sleep(timeout) => {
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
timeout_millis = timeout.as_millis(),
|
||||
"Initial logical size timeout elapsed; starting background jobs"
|
||||
|
||||
@@ -166,11 +166,11 @@ async fn disk_usage_eviction_task(
|
||||
.await;
|
||||
|
||||
let sleep_until = start + task_config.period;
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep_until(sleep_until) => {},
|
||||
_ = cancel.cancelled() => {
|
||||
break
|
||||
}
|
||||
if tokio::time::timeout_at(sleep_until, cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,17 +511,13 @@ pub async fn shutdown_tasks(
|
||||
warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
|
||||
}
|
||||
}
|
||||
let join_handle = tokio::select! {
|
||||
biased;
|
||||
_ = &mut join_handle => { None },
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
|
||||
// allow some time to elapse before logging to cut down the number of log
|
||||
// lines.
|
||||
info!("waiting for {} to shut down", task.name);
|
||||
Some(join_handle)
|
||||
}
|
||||
};
|
||||
if let Some(join_handle) = join_handle {
|
||||
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
// allow some time to elapse before logging to cut down the number of log
|
||||
// lines.
|
||||
info!("waiting for {} to shut down", task.name);
|
||||
// we never handled this return value, but:
|
||||
// - we don't deschedule which would lead to is_cancelled
|
||||
// - panics are already logged (is_panicked)
|
||||
|
||||
@@ -122,12 +122,12 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
warn_when_period_overrun(started_at.elapsed(), period, "compaction");
|
||||
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request during idling");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(sleep_duration) => {},
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
info!("received cancellation request during idling");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -196,12 +196,12 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
warn_when_period_overrun(started_at.elapsed(), period, "gc");
|
||||
|
||||
// Sleep
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request during idling");
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(sleep_duration) => {},
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
info!("received cancellation request during idling");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -263,9 +263,9 @@ pub(crate) async fn random_init_delay(
|
||||
rng.gen_range(Duration::ZERO..=period)
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => Err(Cancelled),
|
||||
_ = tokio::time::sleep(d) => Ok(()),
|
||||
match tokio::time::timeout(d, cancel.cancelled()).await {
|
||||
Ok(_) => Err(Cancelled),
|
||||
Err(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -100,11 +100,11 @@ impl Timeline {
|
||||
match cf {
|
||||
ControlFlow::Break(()) => break,
|
||||
ControlFlow::Continue(sleep_until) => {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep_until(sleep_until) => { }
|
||||
if tokio::time::timeout_at(sleep_until, cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user