refactor(pageserver) remove task_mgr for most global tasks (#8449)

## Motivation & Context

We want to move away from `task_mgr` towards explicit tracking of child
tasks.

This PR is extracted from https://github.com/neondatabase/neon/pull/8339
where I refactor `PageRequestHandler` to not depend on task_mgr anymore.

## Changes

This PR refactors all global tasks but `PageRequestHandler` to use some
combination of `JoinHandle`/`JoinSet` + `CancellationToken`.

The `task_mgr::spawn(.., shutdown_process_on_error)` functionality is
preserved through the new `exit_on_panic_or_error` wrapper.
Some global tasks were not using it before, but as of this PR, they are.
The rationale is that all global tasks are relevant for correct
operation of the overall Neon system in one way or another.

## Future Work

After #8339, we can make `task_mgr::spawn` require a `TenantId` instead
of an `Option<TenantId>` which concludes this step of cleanup work and
will help discourage future usage of task_mgr for global tasks.
This commit is contained in:
Christian Schwarz
2024-07-22 17:25:06 +02:00
committed by GitHub
parent 631a9c372f
commit e8523014d4
14 changed files with 429 additions and 297 deletions

View File

@@ -408,7 +408,6 @@ pub fn spawn<F>(
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
name: &str,
shutdown_process_on_error: bool,
future: F,
) -> PageserverTaskId
where
@@ -437,7 +436,6 @@ where
task_id,
task_cloned,
cancel,
shutdown_process_on_error,
future,
));
task_mut.join_handle = Some(join_handle);
@@ -454,82 +452,78 @@ async fn task_wrapper<F>(
task_id: u64,
task: Arc<PageServerTask>,
shutdown_token: CancellationToken,
shutdown_process_on_error: bool,
future: F,
) where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
debug!("Starting task '{}'", task_name);
let result = SHUTDOWN_TOKEN
.scope(
shutdown_token,
CURRENT_TASK.scope(task, {
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
// unwinding that would expose us to unwind-unsafe behavior.
AssertUnwindSafe(future).catch_unwind()
}),
)
.await;
task_finish(result, task_name, task_id, shutdown_process_on_error).await;
}
async fn task_finish(
result: std::result::Result<
anyhow::Result<()>,
std::boxed::Box<dyn std::any::Any + std::marker::Send>,
>,
task_name: String,
task_id: u64,
shutdown_process_on_error: bool,
) {
// Remove our entry from the global hashmap.
let task = TASKS
.lock()
.unwrap()
.remove(&task_id)
.expect("no task in registry");
let mut shutdown_process = false;
{
// wrap the future so we log panics and errors
let tenant_shard_id = task.tenant_shard_id;
let timeline_id = task.timeline_id;
let fut = async move {
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
// unwinding that would expose us to unwind-unsafe behavior.
let result = AssertUnwindSafe(future).catch_unwind().await;
match result {
Ok(Ok(())) => {
debug!("Task '{}' exited normally", task_name);
}
Ok(Err(err)) => {
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task.tenant_shard_id, task.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task.tenant_shard_id, task.timeline_id, err
);
}
error!(
"Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, tenant_shard_id, timeline_id, err
);
}
Err(err) => {
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task.tenant_shard_id, task.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task.tenant_shard_id, task.timeline_id, err
);
}
error!(
"Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, tenant_shard_id, timeline_id, err
);
}
}
}
};
if shutdown_process {
std::process::exit(1);
// add the task-locals
let fut = CURRENT_TASK.scope(task, fut);
let fut = SHUTDOWN_TOKEN.scope(shutdown_token, fut);
// poll future to completion
fut.await;
// Remove our entry from the global hashmap.
TASKS
.lock()
.unwrap()
.remove(&task_id)
.expect("no task in registry");
}
pub async fn exit_on_panic_or_error<T, E>(
task_name: &'static str,
future: impl Future<Output = Result<T, E>>,
) -> T
where
E: std::fmt::Debug,
{
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
// unwinding that would expose us to unwind-unsafe behavior.
let result = AssertUnwindSafe(future).catch_unwind().await;
match result {
Ok(Ok(val)) => val,
Ok(Err(err)) => {
error!(
task_name,
"Task exited with error, exiting process: {err:?}"
);
std::process::exit(1);
}
Err(panic_obj) => {
error!(task_name, "Task panicked, exiting process: {panic_obj:?}");
std::process::exit(1);
}
}
}