Files
neon/pageserver/src/task_mgr.rs
Christian Schwarz 3220f830b7 pageserver: use a single tokio runtime (#6555)
Before this PR, each core had 3 executor threads from 3 different
runtimes. With this PR, we just have one runtime, with one thread per
core. Switching to a single tokio runtime should reduce that effective
over-commit of CPU and in theory help with tail latencies -- iff all
tokio tasks are well-behaved and yield to the runtime regularly.

Are All Tasks Well-Behaved? Are We Ready?
-----------------------------------------

Sadly there doesn't seem to be good out-of-the box tokio tooling to
answer this question.

We *believe* all tasks are well behaved in today's code base, as of the
switch to `virtual_file_io_engine = "tokio-epoll-uring"` in production
(https://github.com/neondatabase/aws/pull/1121).

The only remaining executor-thread-blocking code is walredo and some
filesystem namespace operations.

Filesystem namespace operations work is being tracked in #6663 and not
considered likely to actually block at this time.

Regarding walredo, it currently does a blocking `poll` for read/write to
the pipe file descriptors we use for IPC with the walredo process.
There is an ongoing experiment to make walredo async (#6628), but it
needs more time because there are surprisingly tricky trade-offs that
are articulated in that PR's description (which itself is still WIP).
What's relevant for *this* PR is that
1. walredo is always CPU-bound
2. production tail latencies for walredo request-response
(`pageserver_wal_redo_seconds_bucket`) are
  - p90: with few exceptions, low hundreds of micro-seconds
  - p95: except on very packed pageservers, below 1ms
  - p99: all below 50ms, vast majority below 1ms
  - p99.9: almost all around 50ms, rarely at >= 70ms
- [Dashboard
Link](https://neonprod.grafana.net/d/edgggcrmki3uof/2024-03-walredo-latency?orgId=1&var-ds=ZNX49CDVz&var-pXX_by_instance=0.9&var-pXX_by_instance=0.99&var-pXX_by_instance=0.95&var-adhoc=instance%7C%21%3D%7Cpageserver-30.us-west-2.aws.neon.tech&var-per_instance_pXX_max_seconds=0.0005&from=1711049688777&to=1711136088777)

The ones below 1ms are below our current threshold for when we start
thinking about yielding to the executor.
The tens of milliseconds stalls aren't great, but, not least because of
the implicit overcommit of CPU by the three runtimes, we can't be sure
whether these tens of milliseconds are inherently necessary to do the
walredo work or whether we could be faster if there was less contention
for CPU.

On the first item (walredo being always CPU-bound work): it means that
walredo processes will always compete with the executor threads.
We could yield, using async walredo, but then we hit the trade-offs
explained in that PR.

tl;dr: the risk of stalling executor threads through blocking walredo
seems low, and switching to one runtime cleans up one potential source
for higher-than-necessary stall times (explained in the previous
paragraphs).


Code Changes
------------

- Remove the 3 different runtime definitions.
- Add a new definition called `THE_RUNTIME`.
- Use it in all places that previously used one of the 3 removed
runtimes.
- Remove the argument from `task_mgr`.
- Fix failpoint usage where `pausable_failpoint!` should have been used.
We encountered some actual failures because of this, e.g., hung
`get_metric()` calls during test teardown that would client-timeout
after 300s.

As indicated by the comment above `THE_RUNTIME`, we could take this
clean-up further.
But before we create so much churn, let's first validate that there's no
perf regression.


Performance
-----------

We will test this in staging using the various nightly benchmark runs.

However, the worst-case impact of this change is likely compaction
(=>image layer creation) competing with compute requests.
Image layer creation work can't be easily generated & repeated quickly
by pagebench.
So, we'll simply watch getpage & basebackup tail latencies in staging.

Additionally, I have done manual benchmarking using pagebench.
Report:
https://neondatabase.notion.site/2024-03-23-oneruntime-change-benchmarking-22a399c411e24399a73311115fb703ec?pvs=4
Tail latencies and throughput are marginally better (no regression =
good).
Except in a workload with 128 clients against one tenant.
There, the p99.9 and p99.99 getpage latency is about 2x worse (at
slightly lower throughput).
A dip in throughput every 20s (compaction_period_ is clearly visible,
and probably responsible for that worse tail latency.
This has potential to improve with async walredo, and is an edge case
workload anyway.


Future Work
-----------

1. Once this change has shown satisfying results in production, change
the codebase to use the ambient runtime instead of explicitly
referencing `THE_RUNTIME`.
2. Have a mode where we run with a single-threaded runtime, so we
uncover executor stalls more quickly.
3. Switch or write our own failpoints library that is async-native:
https://github.com/neondatabase/neon/issues/7216
2024-03-23 19:25:11 +01:00

558 lines
20 KiB
Rust

//!
//! This module provides centralized handling of tokio tasks in the Page Server.
//!
//! We provide a few basic facilities:
//! - A global registry of tasks that lists what kind of tasks they are, and
//! which tenant or timeline they are working on
//!
//! - The ability to request a task to shut down.
//!
//!
//! # How it works?
//!
//! There is a global hashmap of all the tasks (`TASKS`). Whenever a new
//! task is spawned, a PageServerTask entry is added there, and when a
//! task dies, it removes itself from the hashmap. If you want to kill a
//! task, you can scan the hashmap to find it.
//!
//! # Task shutdown
//!
//! To kill a task, we rely on co-operation from the victim. Each task is
//! expected to periodically call the `is_shutdown_requested()` function, and
//! if it returns true, exit gracefully. In addition to that, when waiting for
//! the network or other long-running operation, you can use
//! `shutdown_watcher()` function to get a Future that will become ready if
//! the current task has been requested to shut down. You can use that with
//! Tokio select!().
//!
//! TODO: This would be a good place to also handle panics in a somewhat sane way.
//! Depending on what task panics, we might want to kill the whole server, or
//! only a single tenant or timeline.
//!
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use futures::FutureExt;
use pageserver_api::shard::TenantShardId;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use once_cell::sync::Lazy;
use utils::id::TimelineId;
//
// There are four runtimes:
//
// Compute request runtime
// - used to handle connections from compute nodes. Any tasks related to satisfying
// GetPage requests, base backups, import, and other such compute node operations
// are handled by the Compute request runtime
// - page_service.rs
// - this includes layer downloads from remote storage, if a layer is needed to
// satisfy a GetPage request
//
// Management request runtime
// - used to handle HTTP API requests
//
// WAL receiver runtime:
// - used to handle WAL receiver connections.
// - and to receiver updates from storage_broker
//
// Background runtime
// - layer flushing
// - garbage collection
// - compaction
// - remote storage uploads
// - initial tenant loading
//
// Everything runs in a tokio task. If you spawn new tasks, spawn it using the correct
// runtime.
//
// There might be situations when one task needs to wait for a task running in another
// Runtime to finish. For example, if a background operation needs a layer from remote
// storage, it will start to download it. If a background operation needs a remote layer,
// and the download was already initiated by a GetPage request, the background task
// will wait for the download - running in the Page server runtime - to finish.
// Another example: the initial tenant loading tasks are launched in the background ops
// runtime. If a GetPage request comes in before the load of a tenant has finished, the
// GetPage request will wait for the tenant load to finish.
//
// The core Timeline code is synchronous, and uses a bunch of std Mutexes and RWLocks to
// protect data structures. Let's keep it that way. Synchronous code is easier to debug
// and analyze, and there's a lot of hairy, low-level, performance critical code there.
//
// It's nice to have different runtimes, so that you can quickly eyeball how much CPU
// time each class of operations is taking, with 'top -H' or similar.
//
// It's also good to avoid hogging all threads that would be needed to process
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
//
/// The single tokio runtime used by all pageserver code.
/// In the past, we had multiple runtimes, and in the future we should weed out
/// remaining references to this global field and rely on ambient runtime instead,
/// i.e., use `tokio::spawn` instead of `THE_RUNTIME.spawn()`, etc.
pub static THE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
// if you change the number of worker threads please change the constant below
.enable_all()
.build()
.expect("Failed to create background op runtime")
});
pub(crate) static THE_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
// force init and thus panics
let _ = THE_RUNTIME.handle();
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
// tokio would had already panicked for parsing errors or NotUnicode
//
// this will be wrong if any of the runtimes gets their worker threads configured to something
// else, but that has not been needed in a long time.
std::env::var("TOKIO_WORKER_THREADS")
.map(|s| s.parse::<usize>().unwrap())
.unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
});
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);
impl fmt::Display for PageserverTaskId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
/// Each task that we track is associated with a "task ID". It's just an
/// increasing number that we assign. Note that it is different from tokio::task::Id.
static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
/// Global registry of tasks
static TASKS: Lazy<Mutex<HashMap<u64, Arc<PageServerTask>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
task_local! {
// This is a cancellation token which will be cancelled when a task needs to shut down. The
// root token is kept in the global registry, so that anyone can send the signal to request
// task shutdown.
static SHUTDOWN_TOKEN: CancellationToken;
// Each task holds reference to its own PageServerTask here.
static CURRENT_TASK: Arc<PageServerTask>;
}
///
/// There are many kinds of tasks in the system. Some are associated with a particular
/// tenant or timeline, while others are global.
///
/// Note that we don't try to limit how many task of a certain kind can be running
/// at the same time.
///
#[derive(
Debug,
// NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
enumset::EnumSetType,
enum_map::Enum,
serde::Serialize,
serde::Deserialize,
strum_macros::IntoStaticStr,
strum_macros::EnumString,
)]
pub enum TaskKind {
// Pageserver startup, i.e., `main`
Startup,
// libpq listener task. It just accepts connection and spawns a
// PageRequestHandler task for each connection.
LibpqEndpointListener,
// HTTP endpoint listener.
HttpEndpointListener,
// Task that handles a single connection. A PageRequestHandler task
// starts detached from any particular tenant or timeline, but it can be
// associated with one later, after receiving a command from the client.
PageRequestHandler,
/// Manages the WAL receiver connection for one timeline.
/// It subscribes to events from storage_broker and decides which safekeeper to connect to.
/// Once the decision has been made, it establishes the connection using the `tokio-postgres` library.
/// There is at most one connection at any given time.
///
/// That `tokio-postgres` library represents a connection as two objects: a `Client` and a `Connection`.
/// The `Client` object is what library users use to make requests & get responses.
/// Internally, `Client` hands over requests to the `Connection` object.
/// The `Connection` object is responsible for speaking the wire protocol.
///
/// Walreceiver uses its own abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
/// That abstraction doesn't use `task_mgr`.
/// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
/// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
///
/// Once the connection is established, the `TaskHandle` task creates a
/// [`WalReceiverConnectionPoller`] task_mgr task that is responsible for polling
/// the `Connection` object.
/// A `CancellationToken` created by the `TaskHandle` task ensures
/// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
///
/// [`WalReceiverConnectionHandler`]: Self::WalReceiverConnectionHandler
/// [`WalReceiverConnectionPoller`]: Self::WalReceiverConnectionPoller
WalReceiverManager,
/// The `TaskHandle` task that executes `handle_walreceiver_connection`.
/// Not a `task_mgr` task, but we use this `TaskKind` for its `RequestContext`.
/// See the comment on [`WalReceiverManager`].
///
/// [`WalReceiverManager`]: Self::WalReceiverManager
WalReceiverConnectionHandler,
/// The task that polls the `tokio-postgres::Connection` object.
/// Spawned by task [`WalReceiverConnectionHandler`](Self::WalReceiverConnectionHandler).
/// See the comment on [`WalReceiverManager`](Self::WalReceiverManager).
WalReceiverConnectionPoller,
// Garbage collection worker. One per tenant
GarbageCollector,
// Compaction. One per tenant.
Compaction,
// Eviction. One per timeline.
Eviction,
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
/// See [`crate::tenant::secondary`].
SecondaryDownloads,
/// See [`crate::tenant::secondary`].
SecondaryUploads,
// Initial logical size calculation
InitialLogicalSizeCalculation,
OndemandLogicalSizeCalculation,
// Task that flushes frozen in-memory layers to disk
LayerFlushTask,
// Task that uploads a file to remote storage
RemoteUploadTask,
// task that handles the initial downloading of all tenants
InitialLoad,
// task that handles attaching a tenant
Attach,
// Used mostly for background deletion from s3
TimelineDeletionWorker,
// task that handhes metrics collection
MetricsCollection,
// task that drives downloading layers
DownloadAllRemoteLayers,
// Task that calculates synthetis size for all active tenants
CalculateSyntheticSize,
// A request that comes in via the pageserver HTTP API.
MgmtRequest,
DebugTool,
#[cfg(test)]
UnitTest,
}
#[derive(Default)]
struct MutableTaskState {
/// Handle for waiting for the task to exit. It can be None, if the
/// the task has already exited.
join_handle: Option<JoinHandle<()>>,
}
struct PageServerTask {
task_id: PageserverTaskId,
kind: TaskKind,
name: String,
// To request task shutdown, just cancel this token.
cancel: CancellationToken,
/// Tasks may optionally be launched for a particular tenant/timeline, enabling
/// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
mutable: Mutex<MutableTaskState>,
}
/// Launch a new task
/// Note: if shutdown_process_on_error is set to true failure
/// of the task will lead to shutdown of entire process
pub fn spawn<F>(
kind: TaskKind,
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
name: &str,
shutdown_process_on_error: bool,
future: F,
) -> PageserverTaskId
where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let cancel = CancellationToken::new();
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
let task = Arc::new(PageServerTask {
task_id: PageserverTaskId(task_id),
kind,
name: name.to_string(),
cancel: cancel.clone(),
tenant_shard_id,
timeline_id,
mutable: Mutex::new(MutableTaskState { join_handle: None }),
});
TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
let mut task_mut = task.mutable.lock().unwrap();
let task_name = name.to_string();
let task_cloned = Arc::clone(&task);
let join_handle = THE_RUNTIME.spawn(task_wrapper(
task_name,
task_id,
task_cloned,
cancel,
shutdown_process_on_error,
future,
));
task_mut.join_handle = Some(join_handle);
drop(task_mut);
// The task is now running. Nothing more to do here
PageserverTaskId(task_id)
}
/// This wrapper function runs in a newly-spawned task. It initializes the
/// task-local variables and calls the payload function.
async fn task_wrapper<F>(
task_name: String,
task_id: u64,
task: Arc<PageServerTask>,
shutdown_token: CancellationToken,
shutdown_process_on_error: bool,
future: F,
) where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
debug!("Starting task '{}'", task_name);
let result = SHUTDOWN_TOKEN
.scope(
shutdown_token,
CURRENT_TASK.scope(task, {
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
// unwinding that would expose us to unwind-unsafe behavior.
AssertUnwindSafe(future).catch_unwind()
}),
)
.await;
task_finish(result, task_name, task_id, shutdown_process_on_error).await;
}
async fn task_finish(
result: std::result::Result<
anyhow::Result<()>,
std::boxed::Box<dyn std::any::Any + std::marker::Send>,
>,
task_name: String,
task_id: u64,
shutdown_process_on_error: bool,
) {
// Remove our entry from the global hashmap.
let task = TASKS
.lock()
.unwrap()
.remove(&task_id)
.expect("no task in registry");
let mut shutdown_process = false;
{
match result {
Ok(Ok(())) => {
debug!("Task '{}' exited normally", task_name);
}
Ok(Err(err)) => {
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task.tenant_shard_id, task.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task.tenant_shard_id, task.timeline_id, err
);
}
}
Err(err) => {
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task.tenant_shard_id, task.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task.tenant_shard_id, task.timeline_id, err
);
}
}
}
}
if shutdown_process {
std::process::exit(1);
}
}
/// Signal and wait for tasks to shut down.
///
///
/// The arguments are used to select the tasks to kill. Any None arguments are
/// ignored. For example, to shut down all WalReceiver tasks:
///
/// shutdown_tasks(Some(TaskKind::WalReceiver), None, None)
///
/// Or to shut down all tasks for given timeline:
///
/// shutdown_tasks(None, Some(tenant_shard_id), Some(timeline_id))
///
pub async fn shutdown_tasks(
kind: Option<TaskKind>,
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
) {
let mut victim_tasks = Vec::new();
{
let tasks = TASKS.lock().unwrap();
for task in tasks.values() {
if (kind.is_none() || Some(task.kind) == kind)
&& (tenant_shard_id.is_none() || task.tenant_shard_id == tenant_shard_id)
&& (timeline_id.is_none() || task.timeline_id == timeline_id)
{
task.cancel.cancel();
victim_tasks.push((
Arc::clone(task),
task.kind,
task.tenant_shard_id,
task.timeline_id,
));
}
}
}
let log_all = kind.is_none() && tenant_shard_id.is_none() && timeline_id.is_none();
for (task, task_kind, tenant_shard_id, timeline_id) in victim_tasks {
let join_handle = {
let mut task_mut = task.mutable.lock().unwrap();
task_mut.join_handle.take()
};
if let Some(mut join_handle) = join_handle {
if log_all {
if tenant_shard_id.is_none() {
// there are quite few of these
info!(name = task.name, kind = ?task_kind, "stopping global task");
} else {
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
}
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
.await
.is_err()
{
// allow some time to elapse before logging to cut down the number of log
// lines.
info!("waiting for task {} to shut down", task.name);
// we never handled this return value, but:
// - we don't deschedule which would lead to is_cancelled
// - panics are already logged (is_panicked)
// - task errors are already logged in the wrapper
let _ = join_handle.await;
info!("task {} completed", task.name);
}
} else {
// Possibly one of:
// * The task had not even fully started yet.
// * It was shut down concurrently and already exited
}
}
}
pub fn current_task_kind() -> Option<TaskKind> {
CURRENT_TASK.try_with(|ct| ct.kind).ok()
}
pub fn current_task_id() -> Option<PageserverTaskId> {
CURRENT_TASK.try_with(|ct| ct.task_id).ok()
}
/// A Future that can be used to check if the current task has been requested to
/// shut down.
pub async fn shutdown_watcher() {
let token = SHUTDOWN_TOKEN
.try_with(|t| t.clone())
.expect("shutdown_watcher() called in an unexpected task or thread");
token.cancelled().await;
}
/// Clone the current task's cancellation token, which can be moved across tasks.
///
/// When the task which is currently executing is shutdown, the cancellation token will be
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
/// `tokio::task::JoinSet::spawn`.
pub fn shutdown_token() -> CancellationToken {
let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
if cfg!(test) {
// in tests this method is called from non-taskmgr spawned tasks, and that is all ok.
res.unwrap_or_default()
} else {
res.expect("shutdown_token() called in an unexpected task or thread")
}
}
/// Has the current task been requested to shut down?
pub fn is_shutdown_requested() -> bool {
if let Ok(true_or_false) = SHUTDOWN_TOKEN.try_with(|t| t.is_cancelled()) {
true_or_false
} else {
if !cfg!(test) {
warn!("is_shutdown_requested() called in an unexpected task or thread");
}
false
}
}