pageserver: add Gate as a partner to CancellationToken for safe shutdown of Tenant & Timeline (#5711)

## Problem

When shutting down a Tenant, it isn't just important to cause any
background tasks to stop. It's also important to wait until they have
stopped before declaring shutdown complete, in cases where we may re-use
the tenant's local storage for something else, such as running in
secondary mode, or creating a new tenant with the same ID.

## Summary of changes

A `Gate` class is added, inspired by
[seastar::gate](https://docs.seastar.io/master/classseastar_1_1gate.html).
For types that have an important lifetime that corresponds to some
physical resource, use of a Gate as well as a CancellationToken provides
a robust pattern for async requests & shutdown:
- Requests must always acquire the gate as long as they are using the
object
- Shutdown must set the cancellation token, and then `close()` the gate
to wait for requests in progress before returning.

This is not for memory safety: it's for expressing the difference
between "Arc<Tenant> exists", and "This tenant's files on disk are
eligible to be read/written".

- Both Tenant and Timeline get a Gate & CancellationToken.
- The Timeline gate is held during eviction of layers, and during
page_service requests.
- Existing cancellation support in page_service is refined to use the
timeline-scope cancellation token instead of a process-scope
cancellation token. This replaces the use of `task_mgr::associate_with`:
tasks no longer change their tenant/timelineidentity after being
spawned.

The Tenant's Gate is not yet used, but will be important for
Tenant-scoped operations in secondary mode, where we must ensure that
our secondary-mode downloads for a tenant are gated wrt the activity of
an attached Tenant.

This is part of a broader move away from using the global-state driven
`task_mgr` shutdown tokens:
- less global state where we rely on implicit knowledge of what task a
given function is running in, and more explicit references to the
cancellation token that a particular function/type will respect, making
shutdown easier to reason about.
- eventually avoid the big global TASKS mutex.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
John Spray
2023-11-06 12:39:20 +00:00
committed by GitHub
parent b3d3a2587d
commit 6defa2b5d5
16 changed files with 358 additions and 120 deletions

View File

@@ -728,12 +728,17 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
trace!("got query {query_string:?}");
if let Err(e) = handler.process_query(self, query_string).await {
log_query_error(query_string, &e);
let short_error = short_error(&e);
self.write_message_noflush(&BeMessage::ErrorResponse(
&short_error,
Some(e.pg_error_code()),
))?;
match e {
QueryError::Shutdown => return Ok(ProcessMsgResult::Break),
e => {
log_query_error(query_string, &e);
let short_error = short_error(&e);
self.write_message_noflush(&BeMessage::ErrorResponse(
&short_error,
Some(e.pg_error_code()),
))?;
}
}
}
self.write_message_noflush(&BeMessage::ReadyForQuery)?;
}

View File

@@ -1 +1,3 @@
pub mod heavier_once_cell;
pub mod gate;

151
libs/utils/src/sync/gate.rs Normal file
View File

@@ -0,0 +1,151 @@
use std::{sync::Arc, time::Duration};
/// Gates are a concurrency helper, primarily used for implementing safe shutdown.
///
/// Users of a resource call `enter()` to acquire a GateGuard, and the owner of
/// the resource calls `close()` when they want to ensure that all holders of guards
/// have released them, and that no future guards will be issued.
pub struct Gate {
/// Each caller of enter() takes one unit from the semaphore. In close(), we
/// take all the units to ensure all GateGuards are destroyed.
sem: Arc<tokio::sync::Semaphore>,
/// For observability only: a name that will be used to log warnings if a particular
/// gate is holding up shutdown
name: String,
}
/// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
/// not complete.
#[derive(Debug)]
pub struct GateGuard(tokio::sync::OwnedSemaphorePermit);
/// Observability helper: every `warn_period`, emit a log warning that we're still waiting on this gate
async fn warn_if_stuck<Fut: std::future::Future>(
fut: Fut,
name: &str,
warn_period: std::time::Duration,
) -> <Fut as std::future::Future>::Output {
let started = std::time::Instant::now();
let mut fut = std::pin::pin!(fut);
loop {
match tokio::time::timeout(warn_period, &mut fut).await {
Ok(ret) => return ret,
Err(_) => {
tracing::warn!(
gate = name,
elapsed_ms = started.elapsed().as_millis(),
"still waiting, taking longer than expected..."
);
}
}
}
}
#[derive(Debug)]
pub enum GateError {
GateClosed,
}
impl Gate {
const MAX_UNITS: u32 = u32::MAX;
pub fn new(name: String) -> Self {
Self {
sem: Arc::new(tokio::sync::Semaphore::new(Self::MAX_UNITS as usize)),
name,
}
}
/// Acquire a guard that will prevent close() calls from completing. If close()
/// was already called, this will return an error which should be interpreted
/// as "shutting down".
///
/// This function would typically be used from e.g. request handlers. While holding
/// the guard returned from this function, it is important to respect a CancellationToken
/// to avoid blocking close() indefinitely: typically types that contain a Gate will
/// also contain a CancellationToken.
pub fn enter(&self) -> Result<GateGuard, GateError> {
self.sem
.clone()
.try_acquire_owned()
.map(GateGuard)
.map_err(|_| GateError::GateClosed)
}
/// Types with a shutdown() method and a gate should call this method at the
/// end of shutdown, to ensure that all GateGuard holders are done.
///
/// This will wait for all guards to be destroyed. For this to complete promptly, it is
/// important that the holders of such guards are respecting a CancellationToken which has
/// been cancelled before entering this function.
pub async fn close(&self) {
warn_if_stuck(self.do_close(), &self.name, Duration::from_millis(1000)).await
}
async fn do_close(&self) {
tracing::debug!(gate = self.name, "Closing Gate...");
match self.sem.acquire_many(Self::MAX_UNITS).await {
Ok(_units) => {
// While holding all units, close the semaphore. All subsequent calls to enter() will fail.
self.sem.close();
}
Err(_) => {
// Semaphore closed: we are the only function that can do this, so it indicates a double-call.
// This is legal. Timeline::shutdown for example is not protected from being called more than
// once.
tracing::debug!(gate = self.name, "Double close")
}
}
tracing::debug!(gate = self.name, "Closed Gate.")
}
}
#[cfg(test)]
mod tests {
use futures::FutureExt;
use super::*;
#[tokio::test]
async fn test_idle_gate() {
// Having taken no gates, we should not be blocked in close
let gate = Gate::new("test".to_string());
gate.close().await;
// If a guard is dropped before entering, close should not be blocked
let gate = Gate::new("test".to_string());
let guard = gate.enter().unwrap();
drop(guard);
gate.close().await;
// Entering a closed guard fails
gate.enter().expect_err("enter should fail after close");
}
#[tokio::test]
async fn test_busy_gate() {
let gate = Gate::new("test".to_string());
let guard = gate.enter().unwrap();
let mut close_fut = std::pin::pin!(gate.close());
// Close should be blocked
assert!(close_fut.as_mut().now_or_never().is_none());
// Attempting to enter() should fail, even though close isn't done yet.
gate.enter()
.expect_err("enter should fail after entering close");
drop(guard);
// Guard is gone, close should finish
assert!(close_fut.as_mut().now_or_never().is_some());
// Attempting to enter() is still forbidden
gate.enter().expect_err("enter should fail finishing close");
}
}

View File

@@ -403,7 +403,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
return (evicted_bytes, evictions_failed);
};
let results = timeline.evict_layers(&batch, &cancel).await;
let results = timeline.evict_layers(&batch).await;
match results {
Ok(results) => {
@@ -554,6 +554,11 @@ async fn collect_eviction_candidates(
}
};
if tenant.cancel.is_cancelled() {
info!(%tenant_id, "Skipping tenant for eviction, it is shutting down");
continue;
}
// collect layers from all timelines in this tenant
//
// If one of the timelines becomes `!is_active()` during the iteration,

View File

@@ -396,6 +396,9 @@ async fn timeline_create_handler(
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
}
Err(tenant::CreateTimelineError::ShuttingDown) => {
json_response(StatusCode::SERVICE_UNAVAILABLE,HttpErrorBody::from_msg("tenant shutting down".to_string()))
}
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}

View File

@@ -61,14 +61,6 @@ pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_cod
)
.await;
// Shut down any page service tasks.
timed(
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
"shutdown PageRequestHandlers",
Duration::from_secs(1),
)
.await;
// Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC tasks.
timed(
@@ -78,6 +70,15 @@ pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_cod
)
.await;
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
// should already have been canclled via mgr::shutdown_all_tenants
timed(
task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None),
"shutdown PageRequestHandlers",
Duration::from_secs(1),
)
.await;
// Best effort to persist any outstanding deletions, to avoid leaking objects
if let Some(mut deletion_queue) = deletion_queue {
deletion_queue.shutdown(Duration::from_secs(5)).await;

View File

@@ -223,13 +223,7 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(
conf,
broker_client,
auth,
connection_ctx,
task_mgr::shutdown_token(),
);
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
@@ -263,10 +257,6 @@ struct PageServerHandler {
/// For each query received over the connection,
/// `process_query` creates a child context from this one.
connection_ctx: RequestContext,
/// A token that should fire when the tenant transitions from
/// attached state, or when the pageserver is shutting down.
cancel: CancellationToken,
}
impl PageServerHandler {
@@ -275,7 +265,6 @@ impl PageServerHandler {
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> Self {
PageServerHandler {
_conf: conf,
@@ -283,7 +272,6 @@ impl PageServerHandler {
auth,
claims: None,
connection_ctx,
cancel,
}
}
@@ -291,7 +279,11 @@ impl PageServerHandler {
/// this rather than naked flush() in order to shut down promptly. Without this, we would
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
/// in the flush.
async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
async fn flush_cancellable<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
cancel: &CancellationToken,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
@@ -299,7 +291,7 @@ impl PageServerHandler {
flush_r = pgb.flush() => {
Ok(flush_r?)
},
_ = self.cancel.cancelled() => {
_ = cancel.cancelled() => {
Err(QueryError::Shutdown)
}
)
@@ -308,6 +300,7 @@ impl PageServerHandler {
fn copyin_stream<'a, IO>(
&'a self,
pgb: &'a mut PostgresBackend<IO>,
cancel: &'a CancellationToken,
) -> impl Stream<Item = io::Result<Bytes>> + 'a
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -317,7 +310,7 @@ impl PageServerHandler {
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
_ = cancel.cancelled() => {
// We were requested to shut down.
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
@@ -357,7 +350,7 @@ impl PageServerHandler {
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
@@ -384,10 +377,6 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id();
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let mut tracer = if tenant.get_trace_read_requests() {
@@ -405,9 +394,14 @@ impl PageServerHandler {
.get_timeline(timeline_id, true)
.map_err(|e| anyhow::anyhow!(e))?;
// Avoid starting new requests if the timeline has already started shutting down,
// and block timeline shutdown until this request is complete, or drops out due
// to cancellation.
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
self.flush_cancellable(pgb).await?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
@@ -415,7 +409,7 @@ impl PageServerHandler {
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
_ = timeline.cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
@@ -490,9 +484,20 @@ impl PageServerHandler {
}
};
if let Err(e) = &response {
if timeline.cancel.is_cancelled() {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropped response during shutdown: {e:#}"));
return Err(QueryError::Shutdown);
}
}
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
@@ -500,7 +505,7 @@ impl PageServerHandler {
});
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
self.flush_cancellable(pgb).await?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
}
Ok(())
}
@@ -522,7 +527,6 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id();
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
@@ -543,9 +547,9 @@ impl PageServerHandler {
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb).await?;
self.flush_cancellable(pgb, &tenant.cancel).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
timeline
.import_basebackup_from_tar(
&mut copyin_reader,
@@ -582,7 +586,6 @@ impl PageServerHandler {
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_and_timeline_id();
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let last_record_lsn = timeline.get_last_record_lsn();
@@ -598,8 +601,8 @@ impl PageServerHandler {
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
self.flush_cancellable(pgb, &timeline.cancel).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
info!("wal import complete");
@@ -807,7 +810,7 @@ impl PageServerHandler {
// switch client to COPYOUT
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
self.flush_cancellable(pgb).await?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
// Send a tarball of the latest layer on the timeline. Compress if not
// fullbackup. TODO Compress in that case too (tests need to be updated)
@@ -859,7 +862,7 @@ impl PageServerHandler {
}
pgb.write_message_noflush(&BeMessage::CopyDone)?;
self.flush_cancellable(pgb).await?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
let basebackup_after = started
.elapsed()

View File

@@ -44,6 +44,17 @@ pub enum CalculateLogicalSizeError {
Other(#[from] anyhow::Error),
}
impl From<PageReconstructError> for CalculateLogicalSizeError {
fn from(pre: PageReconstructError) -> Self {
match pre {
PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
Self::Cancelled
}
_ => Self::Other(pre.into()),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum RelationError {
#[error("Relation Already Exists")]
@@ -573,7 +584,7 @@ impl Timeline {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
let mut total_size: u64 = 0;
@@ -587,10 +598,7 @@ impl Timeline {
return Err(CalculateLogicalSizeError::Cancelled);
}
let relsize_key = rel_size_to_key(rel);
let mut buf = self
.get(relsize_key, lsn, ctx)
.await
.with_context(|| format!("read relation size of {rel:?}"))?;
let mut buf = self.get(relsize_key, lsn, ctx).await?;
let relsize = buf.get_u32_le();
total_size += relsize as u64;

View File

@@ -299,10 +299,6 @@ pub enum TaskKind {
#[derive(Default)]
struct MutableTaskState {
/// Tenant and timeline that this task is associated with.
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
/// Handle for waiting for the task to exit. It can be None, if the
/// the task has already exited.
join_handle: Option<JoinHandle<()>>,
@@ -319,6 +315,11 @@ struct PageServerTask {
// To request task shutdown, just cancel this token.
cancel: CancellationToken,
/// Tasks may optionally be launched for a particular tenant/timeline, enabling
/// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
mutable: Mutex<MutableTaskState>,
}
@@ -344,11 +345,9 @@ where
kind,
name: name.to_string(),
cancel: cancel.clone(),
mutable: Mutex::new(MutableTaskState {
tenant_id,
timeline_id,
join_handle: None,
}),
tenant_id,
timeline_id,
mutable: Mutex::new(MutableTaskState { join_handle: None }),
});
TASKS.lock().unwrap().insert(task_id, Arc::clone(&task));
@@ -418,8 +417,6 @@ async fn task_finish(
let mut shutdown_process = false;
{
let task_mut = task.mutable.lock().unwrap();
match result {
Ok(Ok(())) => {
debug!("Task '{}' exited normally", task_name);
@@ -428,13 +425,13 @@ async fn task_finish(
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task_mut.tenant_id, task_mut.timeline_id, err
task_name, task.tenant_id, task.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}",
task_name, task_mut.tenant_id, task_mut.timeline_id, err
task_name, task.tenant_id, task.timeline_id, err
);
}
}
@@ -442,13 +439,13 @@ async fn task_finish(
if shutdown_process_on_error {
error!(
"Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task_mut.tenant_id, task_mut.timeline_id, err
task_name, task.tenant_id, task.timeline_id, err
);
shutdown_process = true;
} else {
error!(
"Task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}",
task_name, task_mut.tenant_id, task_mut.timeline_id, err
task_name, task.tenant_id, task.timeline_id, err
);
}
}
@@ -460,17 +457,6 @@ async fn task_finish(
}
}
// expected to be called from the task of the given id.
pub fn associate_with(tenant_id: Option<TenantId>, timeline_id: Option<TimelineId>) {
CURRENT_TASK.with(|ct| {
let mut task_mut = ct.mutable.lock().unwrap();
task_mut.tenant_id = tenant_id;
task_mut.timeline_id = timeline_id;
});
}
/// Is there a task running that matches the criteria
/// Signal and wait for tasks to shut down.
///
///
@@ -493,17 +479,16 @@ pub async fn shutdown_tasks(
{
let tasks = TASKS.lock().unwrap();
for task in tasks.values() {
let task_mut = task.mutable.lock().unwrap();
if (kind.is_none() || Some(task.kind) == kind)
&& (tenant_id.is_none() || task_mut.tenant_id == tenant_id)
&& (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
&& (tenant_id.is_none() || task.tenant_id == tenant_id)
&& (timeline_id.is_none() || task.timeline_id == timeline_id)
{
task.cancel.cancel();
victim_tasks.push((
Arc::clone(task),
task.kind,
task_mut.tenant_id,
task_mut.timeline_id,
task.tenant_id,
task.timeline_id,
));
}
}

View File

@@ -26,6 +26,7 @@ use tracing::*;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext;
use utils::sync::gate::Gate;
use std::cmp::min;
use std::collections::hash_map::Entry;
@@ -252,6 +253,14 @@ pub struct Tenant {
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
// Cancellation token fires when we have entered shutdown(). This is a parent of
// Timelines' cancellation token.
pub(crate) cancel: CancellationToken,
// Users of the Tenant such as the page service must take this Gate to avoid
// trying to use a Tenant which is shutting down.
pub(crate) gate: Gate,
}
pub(crate) enum WalRedoManager {
@@ -395,6 +404,8 @@ pub enum CreateTimelineError {
AncestorLsn(anyhow::Error),
#[error("ancestor timeline is not active")]
AncestorNotActive,
#[error("tenant shutting down")]
ShuttingDown,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@@ -1524,6 +1535,11 @@ impl Tenant {
)));
}
let _gate = self
.gate
.enter()
.map_err(|_| CreateTimelineError::ShuttingDown)?;
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
debug!("timeline {new_timeline_id} already exists");
@@ -1808,6 +1824,7 @@ impl Tenant {
freeze_and_flush: bool,
) -> Result<(), completion::Barrier> {
span::debug_assert_current_span_has_tenant_id();
// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
@@ -1846,6 +1863,7 @@ impl Tenant {
js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
})
};
tracing::info!("Waiting for timelines...");
while let Some(res) = js.join_next().await {
match res {
Ok(()) => {}
@@ -1855,12 +1873,21 @@ impl Tenant {
}
}
// We cancel the Tenant's cancellation token _after_ the timelines have all shut down. This permits
// them to continue to do work during their shutdown methods, e.g. flushing data.
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
// shutdown all tenant and timeline tasks: gc, compaction, page service
// No new tasks will be started for this tenant because it's in `Stopping` state.
//
// this will additionally shutdown and await all timeline tasks.
tracing::debug!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
// Wait for any in-flight operations to complete
self.gate.close().await;
Ok(())
}
@@ -2267,6 +2294,7 @@ impl Tenant {
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned().flatten(),
state,
self.cancel.child_token(),
);
Ok(timeline)
@@ -2356,6 +2384,8 @@ impl Tenant {
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
gate: Gate::new(format!("Tenant<{tenant_id}>")),
}
}

View File

@@ -406,10 +406,12 @@ async fn fill_logical_sizes(
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
}
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {

View File

@@ -23,7 +23,7 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantTimelineId;
use utils::{id::TenantTimelineId, sync::gate::Gate};
use std::cmp::{max, min, Ordering};
use std::collections::{BinaryHeap, HashMap, HashSet};
@@ -310,6 +310,13 @@ pub struct Timeline {
/// Load or creation time information about the disk_consistent_lsn and when the loading
/// happened. Used for consumption metrics.
pub(crate) loaded_at: (Lsn, SystemTime),
/// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
pub(crate) gate: Gate,
/// Cancellation token scoped to this timeline: anything doing long-running work relating
/// to the timeline should drop out when this token fires.
pub(crate) cancel: CancellationToken,
}
pub struct WalReceiverInfo {
@@ -786,7 +793,11 @@ impl Timeline {
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
//
// Suppress error when it's due to cancellation
if !self.cancel.is_cancelled() {
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
}
};
@@ -884,7 +895,12 @@ impl Timeline {
pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
debug_assert_current_span_has_tenant_and_timeline_id();
// Signal any subscribers to our cancellation token to drop out
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
// prevent writes to the InMemoryLayer
tracing::debug!("Waiting for WalReceiverManager...");
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.tenant_id),
@@ -920,6 +936,16 @@ impl Timeline {
warn!("failed to await for frozen and flushed uploads: {e:#}");
}
}
// Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
// while doing so.
self.last_record_lsn.shutdown();
tracing::debug!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(self.timeline_id)).await;
// Finally wait until any gate-holders are complete
self.gate.close().await;
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -1048,6 +1074,11 @@ impl Timeline {
/// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let _gate = self
.gate
.enter()
.map_err(|_| anyhow::anyhow!("Shutting down"))?;
let Some(local_layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
};
@@ -1063,9 +1094,8 @@ impl Timeline {
.as_ref()
.ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
let cancel = CancellationToken::new();
let results = self
.evict_layer_batch(remote_client, &[local_layer], &cancel)
.evict_layer_batch(remote_client, &[local_layer])
.await?;
assert_eq!(results.len(), 1);
let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
@@ -1080,15 +1110,18 @@ impl Timeline {
pub(crate) async fn evict_layers(
&self,
layers_to_evict: &[Layer],
cancel: &CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
let _gate = self
.gate
.enter()
.map_err(|_| anyhow::anyhow!("Shutting down"))?;
let remote_client = self
.remote_client
.as_ref()
.context("timeline must have RemoteTimelineClient")?;
self.evict_layer_batch(remote_client, layers_to_evict, cancel)
.await
self.evict_layer_batch(remote_client, layers_to_evict).await
}
/// Evict multiple layers at once, continuing through errors.
@@ -1109,7 +1142,6 @@ impl Timeline {
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Layer],
cancel: &CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
// ensure that the layers have finished uploading
// (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
@@ -1157,7 +1189,7 @@ impl Timeline {
};
tokio::select! {
_ = cancel.cancelled() => {},
_ = self.cancel.cancelled() => {},
_ = join => {}
}
@@ -1267,6 +1299,7 @@ impl Timeline {
initial_logical_size_can_start: Option<completion::Barrier>,
initial_logical_size_attempt: Option<completion::Completion>,
state: TimelineState,
cancel: CancellationToken,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(state);
@@ -1367,6 +1400,8 @@ impl Timeline {
initial_logical_size_can_start,
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
cancel,
gate: Gate::new(format!("Timeline<{tenant_id}/{timeline_id}>")),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -2030,6 +2065,10 @@ impl Timeline {
let mut cont_lsn = Lsn(request_lsn.0 + 1);
'outer: loop {
if self.cancel.is_cancelled() {
return Err(PageReconstructError::Cancelled);
}
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
@@ -4366,25 +4405,10 @@ mod tests {
.expect("should had been resident")
.drop_eviction_guard();
let cancel = tokio_util::sync::CancellationToken::new();
let batch = [layer];
let first = {
let cancel = cancel.child_token();
async {
let cancel = cancel;
timeline
.evict_layer_batch(&rc, &batch, &cancel)
.await
.unwrap()
}
};
let second = async {
timeline
.evict_layer_batch(&rc, &batch, &cancel)
.await
.unwrap()
};
let first = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() };
let second = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() };
let (first, second) = tokio::join!(first, second);

View File

@@ -17,6 +17,7 @@ use crate::{
deletion_queue::DeletionQueueClient,
task_mgr::{self, TaskKind},
tenant::{
debug_assert_current_span_has_tenant_and_timeline_id,
metadata::TimelineMetadata,
remote_timeline_client::{
self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
@@ -30,6 +31,11 @@ use super::{Timeline, TimelineResources};
/// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
debug_assert_current_span_has_tenant_and_timeline_id();
// Notify any timeline work to drop out of loops/requests
tracing::debug!("Cancelling CancellationToken");
timeline.cancel.cancel();
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
let maybe_started_walreceiver = { timeline.walreceiver.lock().unwrap().take() };
@@ -74,6 +80,11 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
"failpoint: timeline-delete-before-index-deleted-at"
))?
});
tracing::debug!("Waiting for gate...");
timeline.gate.close().await;
tracing::debug!("Shutdown complete");
Ok(())
}

View File

@@ -277,10 +277,7 @@ impl Timeline {
Some(c) => c,
};
let results = match self
.evict_layer_batch(remote_client, &candidates, cancel)
.await
{
let results = match self.evict_layer_batch(remote_client, &candidates).await {
Err(pre_err) => {
stats.errors += candidates.len();
error!("could not do any evictions: {pre_err:#}");

View File

@@ -426,7 +426,7 @@ impl ConnectionManagerState {
timeline,
new_sk.wal_source_connconf,
events_sender,
cancellation,
cancellation.clone(),
connect_timeout,
ctx,
node_id,
@@ -447,7 +447,14 @@ impl ConnectionManagerState {
}
WalReceiverError::Other(e) => {
// give out an error to have task_mgr give it a really verbose logging
Err(e).context("walreceiver connection handling failure")
if cancellation.is_cancelled() {
// Ideally we would learn about this via some path other than Other, but
// that requires refactoring all the intermediate layers of ingest code
// that only emit anyhow::Error
Ok(())
} else {
Err(e).context("walreceiver connection handling failure")
}
}
}
}

View File

@@ -17,6 +17,10 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
n_restarts = 10
scale = 10
# Pageserver currently logs requests on non-active tenants at error level
# https://github.com/neondatabase/neon/issues/5784
env.pageserver.allowed_errors.append(".* will not become active. Current state: Stopping.*")
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])