diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 3325ce01d4..91719fb3af 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -25,7 +25,6 @@ //! the current task has been requested to shut down. You can use that with //! Tokio select!(). //! -//! //! TODO: This would be a good place to also handle panics in a somewhat sane way. //! Depending on what task panics, we might want to kill the whole server, or //! only a single tenant or timeline. @@ -43,9 +42,9 @@ use std::sync::{Arc, Mutex}; use futures::FutureExt; use tokio::runtime::Runtime; -use tokio::sync::watch; use tokio::task::JoinHandle; use tokio::task_local; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -146,11 +145,10 @@ static TASKS: Lazy>>> = Lazy::new(|| Mutex::new(HashMap::new())); task_local! { - // There is a Tokio watch channel for each task, which can be used to signal the - // task that it needs to shut down. This task local variable holds the receiving - // end of the channel. The sender is kept in the global registry, so that anyone - // can send the signal to request task shutdown. - static SHUTDOWN_RX: watch::Receiver; + // This is a cancellation token which will be cancelled when a task needs to shut down. The + // root token is kept in the global registry, so that anyone can send the signal to request + // task shutdown. + static SHUTDOWN_TOKEN: CancellationToken; // Each task holds reference to its own PageServerTask here. static CURRENT_TASK: Arc; @@ -226,8 +224,8 @@ struct PageServerTask { name: String, - // To request task shutdown, send 'true' to the channel to notify the task. - shutdown_tx: watch::Sender, + // To request task shutdown, just cancel this token. + cancel: CancellationToken, mutable: Mutex, } @@ -247,13 +245,13 @@ pub fn spawn( where F: Future> + Send + 'static, { - let (shutdown_tx, shutdown_rx) = watch::channel(false); + let cancel = CancellationToken::new(); let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed); let task = Arc::new(PageServerTask { task_id: PageserverTaskId(task_id), kind, name: name.to_string(), - shutdown_tx, + cancel: cancel.clone(), mutable: Mutex::new(MutableTaskState { tenant_id, timeline_id, @@ -271,7 +269,7 @@ where task_name, task_id, task_cloned, - shutdown_rx, + cancel, shutdown_process_on_error, future, )); @@ -288,7 +286,7 @@ async fn task_wrapper( task_name: String, task_id: u64, task: Arc, - shutdown_rx: watch::Receiver, + shutdown_token: CancellationToken, shutdown_process_on_error: bool, future: F, ) where @@ -296,9 +294,9 @@ async fn task_wrapper( { debug!("Starting task '{}'", task_name); - let result = SHUTDOWN_RX + let result = SHUTDOWN_TOKEN .scope( - shutdown_rx, + shutdown_token, CURRENT_TASK.scope(task, { // We use AssertUnwindSafe here so that the payload function // doesn't need to be UnwindSafe. We don't do anything after the @@ -408,7 +406,7 @@ pub async fn shutdown_tasks( && (tenant_id.is_none() || task_mut.tenant_id == tenant_id) && (timeline_id.is_none() || task_mut.timeline_id == timeline_id) { - let _ = task.shutdown_tx.send_replace(true); + task.cancel.cancel(); victim_tasks.push(Arc::clone(task)); } } @@ -439,21 +437,28 @@ pub fn current_task_kind() -> Option { /// A Future that can be used to check if the current task has been requested to /// shut down. pub async fn shutdown_watcher() { - let mut shutdown_rx = SHUTDOWN_RX - .try_with(|rx| rx.clone()) + let token = SHUTDOWN_TOKEN + .try_with(|t| t.clone()) .expect("shutdown_requested() called in an unexpected task or thread"); - while !*shutdown_rx.borrow() { - if shutdown_rx.changed().await.is_err() { - break; - } - } + token.cancelled().await; +} + +/// Clone the current task's cancellation token, which can be moved across tasks. +/// +/// When the task which is currently executing is shutdown, the cancellation token will be +/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or +/// `tokio::task::JoinSet::spawn`. +pub fn shutdown_token() -> CancellationToken { + SHUTDOWN_TOKEN + .try_with(|t| t.clone()) + .expect("shutdown_token() called in an unexpected task or thread") } /// Has the current task been requested to shut down? pub fn is_shutdown_requested() -> bool { - if let Ok(shutdown_rx) = SHUTDOWN_RX.try_with(|rx| rx.clone()) { - *shutdown_rx.borrow() + if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) { + cancel.is_cancelled() } else { if !cfg!(test) { warn!("is_shutdown_requested() called in an unexpected task or thread");