task_mgr: use CancellationToken instead of shutdown_rx (#3124)

this should help us in the future to have more freedom with spawning
tasks and cancelling things, most importantly blocking tasks (assuming
the CancellationToken::is_cancelled is performant enough).
CancellationToken allows creation of hierarchical cancellations, which
would also simplify the task_mgr shutdown operation, rendering it
unnecessary.
This commit is contained in:
Joonas Koivunen
2022-12-16 17:19:47 +02:00
committed by GitHub
parent 8d39fcdf72
commit c86c0c08ef

View File

@@ -25,7 +25,6 @@
//! the current task has been requested to shut down. You can use that with
//! Tokio select!().
//!
//!
//! TODO: This would be a good place to also handle panics in a somewhat sane way.
//! Depending on what task panics, we might want to kill the whole server, or
//! only a single tenant or timeline.
@@ -43,9 +42,9 @@ use std::sync::{Arc, Mutex};
use futures::FutureExt;
use tokio::runtime::Runtime;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
@@ -146,11 +145,10 @@ static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
task_local! {
// There is a Tokio watch channel for each task, which can be used to signal the
// task that it needs to shut down. This task local variable holds the receiving
// end of the channel. The sender is kept in the global registry, so that anyone
// can send the signal to request task shutdown.
static SHUTDOWN_RX: watch::Receiver<bool>;
// This is a cancellation token which will be cancelled when a task needs to shut down. The
// root token is kept in the global registry, so that anyone can send the signal to request
// task shutdown.
static SHUTDOWN_TOKEN: CancellationToken;
// Each task holds reference to its own PageServerTask here.
static CURRENT_TASK: Arc<PageServerTask>;
@@ -226,8 +224,8 @@ struct PageServerTask {
name: String,
// To request task shutdown, send 'true' to the channel to notify the task.
shutdown_tx: watch::Sender<bool>,
// To request task shutdown, just cancel this token.
cancel: CancellationToken,
mutable: Mutex<MutableTaskState>,
}
@@ -247,13 +245,13 @@ pub fn spawn<F>(
where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let cancel = CancellationToken::new();
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
let task = Arc::new(PageServerTask {
task_id: PageserverTaskId(task_id),
kind,
name: name.to_string(),
shutdown_tx,
cancel: cancel.clone(),
mutable: Mutex::new(MutableTaskState {
tenant_id,
timeline_id,
@@ -271,7 +269,7 @@ where
task_name,
task_id,
task_cloned,
shutdown_rx,
cancel,
shutdown_process_on_error,
future,
));
@@ -288,7 +286,7 @@ async fn task_wrapper<F>(
task_name: String,
task_id: u64,
task: Arc<PageServerTask>,
shutdown_rx: watch::Receiver<bool>,
shutdown_token: CancellationToken,
shutdown_process_on_error: bool,
future: F,
) where
@@ -296,9 +294,9 @@ async fn task_wrapper<F>(
{
debug!("Starting task '{}'", task_name);
let result = SHUTDOWN_RX
let result = SHUTDOWN_TOKEN
.scope(
shutdown_rx,
shutdown_token,
CURRENT_TASK.scope(task, {
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
@@ -408,7 +406,7 @@ pub async fn shutdown_tasks(
&& (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
&& (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
{
let _ = task.shutdown_tx.send_replace(true);
task.cancel.cancel();
victim_tasks.push(Arc::clone(task));
}
}
@@ -439,21 +437,28 @@ pub fn current_task_kind() -> Option<TaskKind> {
/// A Future that can be used to check if the current task has been requested to
/// shut down.
pub async fn shutdown_watcher() {
let mut shutdown_rx = SHUTDOWN_RX
.try_with(|rx| rx.clone())
let token = SHUTDOWN_TOKEN
.try_with(|t| t.clone())
.expect("shutdown_requested() called in an unexpected task or thread");
while !*shutdown_rx.borrow() {
if shutdown_rx.changed().await.is_err() {
break;
}
}
token.cancelled().await;
}
/// Clone the current task's cancellation token, which can be moved across tasks.
///
/// When the task which is currently executing is shutdown, the cancellation token will be
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
/// `tokio::task::JoinSet::spawn`.
pub fn shutdown_token() -> CancellationToken {
SHUTDOWN_TOKEN
.try_with(|t| t.clone())
.expect("shutdown_token() called in an unexpected task or thread")
}
/// Has the current task been requested to shut down?
pub fn is_shutdown_requested() -> bool {
if let Ok(shutdown_rx) = SHUTDOWN_RX.try_with(|rx| rx.clone()) {
*shutdown_rx.borrow()
if let Ok(cancel) = SHUTDOWN_TOKEN.try_with(|t| t.clone()) {
cancel.is_cancelled()
} else {
if !cfg!(test) {
warn!("is_shutdown_requested() called in an unexpected task or thread");