mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 00:12:54 +00:00
## Problem Various places in remote storage were not subject to a timeout (thereby stuck TCP connections could hold things up), and did not respect a cancellation token (so things like timeline deletion or tenant detach would have to wait arbitrarily long). ## Summary of changes - Add download_cancellable and upload_cancellable helpers, and use them in all the places we wait for remote storage operations (with the exception of initdb downloads, where it would not have been safe). - Add a cancellation token arg to `download_retry`. - Use cancellation token args in various places that were missing one per #5066 Closes: #5066 Why is this only "basic" handling? - Doesn't express difference between shutdown and errors in return types, to avoid refactoring all the places that use an anyhow::Error (these should all eventually return a more structured error type) - Implements timeouts on top of remote storage, rather than within it: this means that operations hitting their timeout will lose their semaphore permit and thereby go to the back of the queue for their retry. - Doing a nicer job is tracked in https://github.com/neondatabase/neon/issues/6096
41 lines
1.1 KiB
Rust
41 lines
1.1 KiB
Rust
use std::time::Duration;
|
|
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum TimeoutCancellableError {
|
|
#[error("Timed out")]
|
|
Timeout,
|
|
#[error("Cancelled")]
|
|
Cancelled,
|
|
}
|
|
|
|
/// Wrap [`tokio::time::timeout`] with a CancellationToken.
|
|
///
|
|
/// This wrapper is appropriate for any long running operation in a task
|
|
/// that ought to respect a CancellationToken (which means most tasks).
|
|
///
|
|
/// The only time you should use a bare tokio::timeout is when the future `F`
|
|
/// itself respects a CancellationToken: otherwise, always use this wrapper
|
|
/// with your CancellationToken to ensure that your task does not hold up
|
|
/// graceful shutdown.
|
|
pub async fn timeout_cancellable<F>(
|
|
duration: Duration,
|
|
cancel: &CancellationToken,
|
|
future: F,
|
|
) -> Result<F::Output, TimeoutCancellableError>
|
|
where
|
|
F: std::future::Future,
|
|
{
|
|
tokio::select!(
|
|
r = tokio::time::timeout(duration, future) => {
|
|
r.map_err(|_| TimeoutCancellableError::Timeout)
|
|
|
|
},
|
|
_ = cancel.cancelled() => {
|
|
Err(TimeoutCancellableError::Cancelled)
|
|
|
|
}
|
|
)
|
|
}
|