diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index ad3af4e794..455fe7a481 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -728,12 +728,17 @@ impl PostgresBackend { trace!("got query {query_string:?}"); if let Err(e) = handler.process_query(self, query_string).await { - log_query_error(query_string, &e); - let short_error = short_error(&e); - self.write_message_noflush(&BeMessage::ErrorResponse( - &short_error, - Some(e.pg_error_code()), - ))?; + match e { + QueryError::Shutdown => return Ok(ProcessMsgResult::Break), + e => { + log_query_error(query_string, &e); + let short_error = short_error(&e); + self.write_message_noflush(&BeMessage::ErrorResponse( + &short_error, + Some(e.pg_error_code()), + ))?; + } + } } self.write_message_noflush(&BeMessage::ReadyForQuery)?; } diff --git a/libs/utils/src/sync.rs b/libs/utils/src/sync.rs index 125eeca129..2ee8f35449 100644 --- a/libs/utils/src/sync.rs +++ b/libs/utils/src/sync.rs @@ -1 +1,3 @@ pub mod heavier_once_cell; + +pub mod gate; diff --git a/libs/utils/src/sync/gate.rs b/libs/utils/src/sync/gate.rs new file mode 100644 index 0000000000..1391d238e6 --- /dev/null +++ b/libs/utils/src/sync/gate.rs @@ -0,0 +1,151 @@ +use std::{sync::Arc, time::Duration}; + +/// Gates are a concurrency helper, primarily used for implementing safe shutdown. +/// +/// Users of a resource call `enter()` to acquire a GateGuard, and the owner of +/// the resource calls `close()` when they want to ensure that all holders of guards +/// have released them, and that no future guards will be issued. +pub struct Gate { + /// Each caller of enter() takes one unit from the semaphore. In close(), we + /// take all the units to ensure all GateGuards are destroyed. + sem: Arc, + + /// For observability only: a name that will be used to log warnings if a particular + /// gate is holding up shutdown + name: String, +} + +/// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will +/// not complete. +#[derive(Debug)] +pub struct GateGuard(tokio::sync::OwnedSemaphorePermit); + +/// Observability helper: every `warn_period`, emit a log warning that we're still waiting on this gate +async fn warn_if_stuck( + fut: Fut, + name: &str, + warn_period: std::time::Duration, +) -> ::Output { + let started = std::time::Instant::now(); + + let mut fut = std::pin::pin!(fut); + + loop { + match tokio::time::timeout(warn_period, &mut fut).await { + Ok(ret) => return ret, + Err(_) => { + tracing::warn!( + gate = name, + elapsed_ms = started.elapsed().as_millis(), + "still waiting, taking longer than expected..." + ); + } + } + } +} + +#[derive(Debug)] +pub enum GateError { + GateClosed, +} + +impl Gate { + const MAX_UNITS: u32 = u32::MAX; + + pub fn new(name: String) -> Self { + Self { + sem: Arc::new(tokio::sync::Semaphore::new(Self::MAX_UNITS as usize)), + name, + } + } + + /// Acquire a guard that will prevent close() calls from completing. If close() + /// was already called, this will return an error which should be interpreted + /// as "shutting down". + /// + /// This function would typically be used from e.g. request handlers. While holding + /// the guard returned from this function, it is important to respect a CancellationToken + /// to avoid blocking close() indefinitely: typically types that contain a Gate will + /// also contain a CancellationToken. + pub fn enter(&self) -> Result { + self.sem + .clone() + .try_acquire_owned() + .map(GateGuard) + .map_err(|_| GateError::GateClosed) + } + + /// Types with a shutdown() method and a gate should call this method at the + /// end of shutdown, to ensure that all GateGuard holders are done. + /// + /// This will wait for all guards to be destroyed. For this to complete promptly, it is + /// important that the holders of such guards are respecting a CancellationToken which has + /// been cancelled before entering this function. + pub async fn close(&self) { + warn_if_stuck(self.do_close(), &self.name, Duration::from_millis(1000)).await + } + + async fn do_close(&self) { + tracing::debug!(gate = self.name, "Closing Gate..."); + match self.sem.acquire_many(Self::MAX_UNITS).await { + Ok(_units) => { + // While holding all units, close the semaphore. All subsequent calls to enter() will fail. + self.sem.close(); + } + Err(_) => { + // Semaphore closed: we are the only function that can do this, so it indicates a double-call. + // This is legal. Timeline::shutdown for example is not protected from being called more than + // once. + tracing::debug!(gate = self.name, "Double close") + } + } + tracing::debug!(gate = self.name, "Closed Gate.") + } +} + +#[cfg(test)] +mod tests { + use futures::FutureExt; + + use super::*; + + #[tokio::test] + async fn test_idle_gate() { + // Having taken no gates, we should not be blocked in close + let gate = Gate::new("test".to_string()); + gate.close().await; + + // If a guard is dropped before entering, close should not be blocked + let gate = Gate::new("test".to_string()); + let guard = gate.enter().unwrap(); + drop(guard); + gate.close().await; + + // Entering a closed guard fails + gate.enter().expect_err("enter should fail after close"); + } + + #[tokio::test] + async fn test_busy_gate() { + let gate = Gate::new("test".to_string()); + + let guard = gate.enter().unwrap(); + + let mut close_fut = std::pin::pin!(gate.close()); + + // Close should be blocked + assert!(close_fut.as_mut().now_or_never().is_none()); + + // Attempting to enter() should fail, even though close isn't done yet. + gate.enter() + .expect_err("enter should fail after entering close"); + + drop(guard); + + // Guard is gone, close should finish + assert!(close_fut.as_mut().now_or_never().is_some()); + + // Attempting to enter() is still forbidden + gate.enter().expect_err("enter should fail finishing close"); + } +} diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 413c941bc4..bc4bf51862 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -403,7 +403,7 @@ pub async fn disk_usage_eviction_task_iteration_impl( return (evicted_bytes, evictions_failed); }; - let results = timeline.evict_layers(&batch, &cancel).await; + let results = timeline.evict_layers(&batch).await; match results { Ok(results) => { @@ -554,6 +554,11 @@ async fn collect_eviction_candidates( } }; + if tenant.cancel.is_cancelled() { + info!(%tenant_id, "Skipping tenant for eviction, it is shutting down"); + continue; + } + // collect layers from all timelines in this tenant // // If one of the timelines becomes `!is_active()` during the iteration, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index a6fb26b298..db728f243e 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -396,6 +396,9 @@ async fn timeline_create_handler( Err(e @ tenant::CreateTimelineError::AncestorNotActive) => { json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string())) } + Err(tenant::CreateTimelineError::ShuttingDown) => { + json_response(StatusCode::SERVICE_UNAVAILABLE,HttpErrorBody::from_msg("tenant shutting down".to_string())) + } Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)), } } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index e71130e8af..04c3ae9746 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -61,14 +61,6 @@ pub async fn shutdown_pageserver(deletion_queue: Option, exit_cod ) .await; - // Shut down any page service tasks. - timed( - task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None), - "shutdown PageRequestHandlers", - Duration::from_secs(1), - ) - .await; - // Shut down all the tenants. This flushes everything to disk and kills // the checkpoint and GC tasks. timed( @@ -78,6 +70,15 @@ pub async fn shutdown_pageserver(deletion_queue: Option, exit_cod ) .await; + // Shut down any page service tasks: any in-progress work for particular timelines or tenants + // should already have been canclled via mgr::shutdown_all_tenants + timed( + task_mgr::shutdown_tasks(Some(TaskKind::PageRequestHandler), None, None), + "shutdown PageRequestHandlers", + Duration::from_secs(1), + ) + .await; + // Best effort to persist any outstanding deletions, to avoid leaking objects if let Some(mut deletion_queue) = deletion_queue { deletion_queue.shutdown(Duration::from_secs(5)).await; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 536334d051..334aee3dd1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -223,13 +223,7 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = PageServerHandler::new( - conf, - broker_client, - auth, - connection_ctx, - task_mgr::shutdown_token(), - ); + let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; match pgbackend @@ -263,10 +257,6 @@ struct PageServerHandler { /// For each query received over the connection, /// `process_query` creates a child context from this one. connection_ctx: RequestContext, - - /// A token that should fire when the tenant transitions from - /// attached state, or when the pageserver is shutting down. - cancel: CancellationToken, } impl PageServerHandler { @@ -275,7 +265,6 @@ impl PageServerHandler { broker_client: storage_broker::BrokerClientChannel, auth: Option>, connection_ctx: RequestContext, - cancel: CancellationToken, ) -> Self { PageServerHandler { _conf: conf, @@ -283,7 +272,6 @@ impl PageServerHandler { auth, claims: None, connection_ctx, - cancel, } } @@ -291,7 +279,11 @@ impl PageServerHandler { /// this rather than naked flush() in order to shut down promptly. Without this, we would /// block shutdown of a tenant if a postgres client was failing to consume bytes we send /// in the flush. - async fn flush_cancellable(&self, pgb: &mut PostgresBackend) -> Result<(), QueryError> + async fn flush_cancellable( + &self, + pgb: &mut PostgresBackend, + cancel: &CancellationToken, + ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { @@ -299,7 +291,7 @@ impl PageServerHandler { flush_r = pgb.flush() => { Ok(flush_r?) }, - _ = self.cancel.cancelled() => { + _ = cancel.cancelled() => { Err(QueryError::Shutdown) } ) @@ -308,6 +300,7 @@ impl PageServerHandler { fn copyin_stream<'a, IO>( &'a self, pgb: &'a mut PostgresBackend, + cancel: &'a CancellationToken, ) -> impl Stream> + 'a where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, @@ -317,7 +310,7 @@ impl PageServerHandler { let msg = tokio::select! { biased; - _ = self.cancel.cancelled() => { + _ = cancel.cancelled() => { // We were requested to shut down. let msg = "pageserver is shutting down"; let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None)); @@ -357,7 +350,7 @@ impl PageServerHandler { let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg))); // error can't happen here, ErrorResponse serialization should be always ok pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?; - self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?; } Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => { @@ -384,10 +377,6 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id(); - // NOTE: pagerequests handler exits when connection is closed, - // so there is no need to reset the association - task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); - // Make request tracer if needed let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; let mut tracer = if tenant.get_trace_read_requests() { @@ -405,9 +394,14 @@ impl PageServerHandler { .get_timeline(timeline_id, true) .map_err(|e| anyhow::anyhow!(e))?; + // Avoid starting new requests if the timeline has already started shutting down, + // and block timeline shutdown until this request is complete, or drops out due + // to cancellation. + let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?; + // switch client to COPYBOTH pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; - self.flush_cancellable(pgb).await?; + self.flush_cancellable(pgb, &timeline.cancel).await?; let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id); @@ -415,7 +409,7 @@ impl PageServerHandler { let msg = tokio::select! { biased; - _ = self.cancel.cancelled() => { + _ = timeline.cancel.cancelled() => { // We were requested to shut down. info!("shutdown request received in page handler"); return Err(QueryError::Shutdown) @@ -490,9 +484,20 @@ impl PageServerHandler { } }; + if let Err(e) = &response { + if timeline.cancel.is_cancelled() { + // If we fail to fulfil a request during shutdown, which may be _because_ of + // shutdown, then do not send the error to the client. Instead just drop the + // connection. + span.in_scope(|| info!("dropped response during shutdown: {e:#}")); + return Err(QueryError::Shutdown); + } + } + let response = response.unwrap_or_else(|e| { // print the all details to the log with {:#}, but for the client the - // error message is enough + // error message is enough. Do not log if shutting down, as the anyhow::Error + // here includes cancellation which is not an error. span.in_scope(|| error!("error reading relation or page version: {:#}", e)); PagestreamBeMessage::Error(PagestreamErrorResponse { message: e.to_string(), @@ -500,7 +505,7 @@ impl PageServerHandler { }); pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?; - self.flush_cancellable(pgb).await?; + self.flush_cancellable(pgb, &timeline.cancel).await?; } Ok(()) } @@ -522,7 +527,6 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id(); - task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Create empty timeline info!("creating new timeline"); let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; @@ -543,9 +547,9 @@ impl PageServerHandler { // Import basebackup provided via CopyData info!("importing basebackup"); pgb.write_message_noflush(&BeMessage::CopyInResponse)?; - self.flush_cancellable(pgb).await?; + self.flush_cancellable(pgb, &tenant.cancel).await?; - let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb))); + let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel))); timeline .import_basebackup_from_tar( &mut copyin_reader, @@ -582,7 +586,6 @@ impl PageServerHandler { IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { debug_assert_current_span_has_tenant_and_timeline_id(); - task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; let last_record_lsn = timeline.get_last_record_lsn(); @@ -598,8 +601,8 @@ impl PageServerHandler { // Import wal provided via CopyData info!("importing wal"); pgb.write_message_noflush(&BeMessage::CopyInResponse)?; - self.flush_cancellable(pgb).await?; - let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb))); + self.flush_cancellable(pgb, &timeline.cancel).await?; + let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel))); import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?; info!("wal import complete"); @@ -807,7 +810,7 @@ impl PageServerHandler { // switch client to COPYOUT pgb.write_message_noflush(&BeMessage::CopyOutResponse)?; - self.flush_cancellable(pgb).await?; + self.flush_cancellable(pgb, &timeline.cancel).await?; // Send a tarball of the latest layer on the timeline. Compress if not // fullbackup. TODO Compress in that case too (tests need to be updated) @@ -859,7 +862,7 @@ impl PageServerHandler { } pgb.write_message_noflush(&BeMessage::CopyDone)?; - self.flush_cancellable(pgb).await?; + self.flush_cancellable(pgb, &timeline.cancel).await?; let basebackup_after = started .elapsed() diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index daadf6abd4..88974588d4 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -44,6 +44,17 @@ pub enum CalculateLogicalSizeError { Other(#[from] anyhow::Error), } +impl From for CalculateLogicalSizeError { + fn from(pre: PageReconstructError) -> Self { + match pre { + PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => { + Self::Cancelled + } + _ => Self::Other(pre.into()), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum RelationError { #[error("Relation Already Exists")] @@ -573,7 +584,7 @@ impl Timeline { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); // Fetch list of database dirs and iterate them - let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?; + let buf = self.get(DBDIR_KEY, lsn, ctx).await?; let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?; let mut total_size: u64 = 0; @@ -587,10 +598,7 @@ impl Timeline { return Err(CalculateLogicalSizeError::Cancelled); } let relsize_key = rel_size_to_key(rel); - let mut buf = self - .get(relsize_key, lsn, ctx) - .await - .with_context(|| format!("read relation size of {rel:?}"))?; + let mut buf = self.get(relsize_key, lsn, ctx).await?; let relsize = buf.get_u32_le(); total_size += relsize as u64; diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 017322ffb2..4270b6edb0 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -299,10 +299,6 @@ pub enum TaskKind { #[derive(Default)] struct MutableTaskState { - /// Tenant and timeline that this task is associated with. - tenant_id: Option, - timeline_id: Option, - /// Handle for waiting for the task to exit. It can be None, if the /// the task has already exited. join_handle: Option>, @@ -319,6 +315,11 @@ struct PageServerTask { // To request task shutdown, just cancel this token. cancel: CancellationToken, + /// Tasks may optionally be launched for a particular tenant/timeline, enabling + /// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`] + tenant_id: Option, + timeline_id: Option, + mutable: Mutex, } @@ -344,11 +345,9 @@ where kind, name: name.to_string(), cancel: cancel.clone(), - mutable: Mutex::new(MutableTaskState { - tenant_id, - timeline_id, - join_handle: None, - }), + tenant_id, + timeline_id, + mutable: Mutex::new(MutableTaskState { join_handle: None }), }); TASKS.lock().unwrap().insert(task_id, Arc::clone(&task)); @@ -418,8 +417,6 @@ async fn task_finish( let mut shutdown_process = false; { - let task_mut = task.mutable.lock().unwrap(); - match result { Ok(Ok(())) => { debug!("Task '{}' exited normally", task_name); @@ -428,13 +425,13 @@ async fn task_finish( if shutdown_process_on_error { error!( "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}", - task_name, task_mut.tenant_id, task_mut.timeline_id, err + task_name, task.tenant_id, task.timeline_id, err ); shutdown_process = true; } else { error!( "Task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}", - task_name, task_mut.tenant_id, task_mut.timeline_id, err + task_name, task.tenant_id, task.timeline_id, err ); } } @@ -442,13 +439,13 @@ async fn task_finish( if shutdown_process_on_error { error!( "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}", - task_name, task_mut.tenant_id, task_mut.timeline_id, err + task_name, task.tenant_id, task.timeline_id, err ); shutdown_process = true; } else { error!( "Task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}", - task_name, task_mut.tenant_id, task_mut.timeline_id, err + task_name, task.tenant_id, task.timeline_id, err ); } } @@ -460,17 +457,6 @@ async fn task_finish( } } -// expected to be called from the task of the given id. -pub fn associate_with(tenant_id: Option, timeline_id: Option) { - CURRENT_TASK.with(|ct| { - let mut task_mut = ct.mutable.lock().unwrap(); - task_mut.tenant_id = tenant_id; - task_mut.timeline_id = timeline_id; - }); -} - -/// Is there a task running that matches the criteria - /// Signal and wait for tasks to shut down. /// /// @@ -493,17 +479,16 @@ pub async fn shutdown_tasks( { let tasks = TASKS.lock().unwrap(); for task in tasks.values() { - let task_mut = task.mutable.lock().unwrap(); if (kind.is_none() || Some(task.kind) == kind) - && (tenant_id.is_none() || task_mut.tenant_id == tenant_id) - && (timeline_id.is_none() || task_mut.timeline_id == timeline_id) + && (tenant_id.is_none() || task.tenant_id == tenant_id) + && (timeline_id.is_none() || task.timeline_id == timeline_id) { task.cancel.cancel(); victim_tasks.push(( Arc::clone(task), task.kind, - task_mut.tenant_id, - task_mut.timeline_id, + task.tenant_id, + task.timeline_id, )); } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3a426ac87b..55815a4956 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -26,6 +26,7 @@ use tracing::*; use utils::completion; use utils::crashsafe::path_with_suffix_extension; use utils::fs_ext; +use utils::sync::gate::Gate; use std::cmp::min; use std::collections::hash_map::Entry; @@ -252,6 +253,14 @@ pub struct Tenant { eviction_task_tenant_state: tokio::sync::Mutex, pub(crate) delete_progress: Arc>, + + // Cancellation token fires when we have entered shutdown(). This is a parent of + // Timelines' cancellation token. + pub(crate) cancel: CancellationToken, + + // Users of the Tenant such as the page service must take this Gate to avoid + // trying to use a Tenant which is shutting down. + pub(crate) gate: Gate, } pub(crate) enum WalRedoManager { @@ -395,6 +404,8 @@ pub enum CreateTimelineError { AncestorLsn(anyhow::Error), #[error("ancestor timeline is not active")] AncestorNotActive, + #[error("tenant shutting down")] + ShuttingDown, #[error(transparent)] Other(#[from] anyhow::Error), } @@ -1524,6 +1535,11 @@ impl Tenant { ))); } + let _gate = self + .gate + .enter() + .map_err(|_| CreateTimelineError::ShuttingDown)?; + if let Ok(existing) = self.get_timeline(new_timeline_id, false) { debug!("timeline {new_timeline_id} already exists"); @@ -1808,6 +1824,7 @@ impl Tenant { freeze_and_flush: bool, ) -> Result<(), completion::Barrier> { span::debug_assert_current_span_has_tenant_id(); + // Set tenant (and its timlines) to Stoppping state. // // Since we can only transition into Stopping state after activation is complete, @@ -1846,6 +1863,7 @@ impl Tenant { js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await }); }) }; + tracing::info!("Waiting for timelines..."); while let Some(res) = js.join_next().await { match res { Ok(()) => {} @@ -1855,12 +1873,21 @@ impl Tenant { } } + // We cancel the Tenant's cancellation token _after_ the timelines have all shut down. This permits + // them to continue to do work during their shutdown methods, e.g. flushing data. + tracing::debug!("Cancelling CancellationToken"); + self.cancel.cancel(); + // shutdown all tenant and timeline tasks: gc, compaction, page service // No new tasks will be started for this tenant because it's in `Stopping` state. // // this will additionally shutdown and await all timeline tasks. + tracing::debug!("Waiting for tasks..."); task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await; + // Wait for any in-flight operations to complete + self.gate.close().await; + Ok(()) } @@ -2267,6 +2294,7 @@ impl Tenant { initial_logical_size_can_start.cloned(), initial_logical_size_attempt.cloned().flatten(), state, + self.cancel.child_token(), ); Ok(timeline) @@ -2356,6 +2384,8 @@ impl Tenant { cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())), + cancel: CancellationToken::default(), + gate: Gate::new(format!("Tenant<{tenant_id}>")), } } diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 5e8ee2b99e..e4df94b8e9 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -406,10 +406,12 @@ async fn fill_logical_sizes( have_any_error = true; } Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => { - warn!( - timeline_id=%timeline.timeline_id, - "failed to calculate logical size at {lsn}: {error:#}" - ); + if !matches!(error, CalculateLogicalSizeError::Cancelled) { + warn!( + timeline_id=%timeline.timeline_id, + "failed to calculate logical size at {lsn}: {error:#}" + ); + } have_any_error = true; } Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index baa97b6cf9..36629e0655 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -23,7 +23,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::id::TenantTimelineId; +use utils::{id::TenantTimelineId, sync::gate::Gate}; use std::cmp::{max, min, Ordering}; use std::collections::{BinaryHeap, HashMap, HashSet}; @@ -310,6 +310,13 @@ pub struct Timeline { /// Load or creation time information about the disk_consistent_lsn and when the loading /// happened. Used for consumption metrics. pub(crate) loaded_at: (Lsn, SystemTime), + + /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data + pub(crate) gate: Gate, + + /// Cancellation token scoped to this timeline: anything doing long-running work relating + /// to the timeline should drop out when this token fires. + pub(crate) cancel: CancellationToken, } pub struct WalReceiverInfo { @@ -786,7 +793,11 @@ impl Timeline { // as an empty timeline. Also in unit tests, when we use the timeline // as a simple key-value store, ignoring the datadir layout. Log the // error but continue. - error!("could not compact, repartitioning keyspace failed: {err:?}"); + // + // Suppress error when it's due to cancellation + if !self.cancel.is_cancelled() { + error!("could not compact, repartitioning keyspace failed: {err:?}"); + } } }; @@ -884,7 +895,12 @@ impl Timeline { pub async fn shutdown(self: &Arc, freeze_and_flush: bool) { debug_assert_current_span_has_tenant_and_timeline_id(); + // Signal any subscribers to our cancellation token to drop out + tracing::debug!("Cancelling CancellationToken"); + self.cancel.cancel(); + // prevent writes to the InMemoryLayer + tracing::debug!("Waiting for WalReceiverManager..."); task_mgr::shutdown_tasks( Some(TaskKind::WalReceiverManager), Some(self.tenant_id), @@ -920,6 +936,16 @@ impl Timeline { warn!("failed to await for frozen and flushed uploads: {e:#}"); } } + + // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel + // while doing so. + self.last_record_lsn.shutdown(); + + tracing::debug!("Waiting for tasks..."); + task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(self.timeline_id)).await; + + // Finally wait until any gate-holders are complete + self.gate.close().await; } pub fn set_state(&self, new_state: TimelineState) { @@ -1048,6 +1074,11 @@ impl Timeline { /// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { + let _gate = self + .gate + .enter() + .map_err(|_| anyhow::anyhow!("Shutting down"))?; + let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None); }; @@ -1063,9 +1094,8 @@ impl Timeline { .as_ref() .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?; - let cancel = CancellationToken::new(); let results = self - .evict_layer_batch(remote_client, &[local_layer], &cancel) + .evict_layer_batch(remote_client, &[local_layer]) .await?; assert_eq!(results.len(), 1); let result: Option> = results.into_iter().next().unwrap(); @@ -1080,15 +1110,18 @@ impl Timeline { pub(crate) async fn evict_layers( &self, layers_to_evict: &[Layer], - cancel: &CancellationToken, ) -> anyhow::Result>>> { + let _gate = self + .gate + .enter() + .map_err(|_| anyhow::anyhow!("Shutting down"))?; + let remote_client = self .remote_client .as_ref() .context("timeline must have RemoteTimelineClient")?; - self.evict_layer_batch(remote_client, layers_to_evict, cancel) - .await + self.evict_layer_batch(remote_client, layers_to_evict).await } /// Evict multiple layers at once, continuing through errors. @@ -1109,7 +1142,6 @@ impl Timeline { &self, remote_client: &Arc, layers_to_evict: &[Layer], - cancel: &CancellationToken, ) -> anyhow::Result>>> { // ensure that the layers have finished uploading // (don't hold the layer_removal_cs while we do it, we're not removing anything yet) @@ -1157,7 +1189,7 @@ impl Timeline { }; tokio::select! { - _ = cancel.cancelled() => {}, + _ = self.cancel.cancelled() => {}, _ = join => {} } @@ -1267,6 +1299,7 @@ impl Timeline { initial_logical_size_can_start: Option, initial_logical_size_attempt: Option, state: TimelineState, + cancel: CancellationToken, ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); let (state, _) = watch::channel(state); @@ -1367,6 +1400,8 @@ impl Timeline { initial_logical_size_can_start, initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt), + cancel, + gate: Gate::new(format!("Timeline<{tenant_id}/{timeline_id}>")), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -2030,6 +2065,10 @@ impl Timeline { let mut cont_lsn = Lsn(request_lsn.0 + 1); 'outer: loop { + if self.cancel.is_cancelled() { + return Err(PageReconstructError::Cancelled); + } + // The function should have updated 'state' //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn); match result { @@ -4366,25 +4405,10 @@ mod tests { .expect("should had been resident") .drop_eviction_guard(); - let cancel = tokio_util::sync::CancellationToken::new(); let batch = [layer]; - let first = { - let cancel = cancel.child_token(); - async { - let cancel = cancel; - timeline - .evict_layer_batch(&rc, &batch, &cancel) - .await - .unwrap() - } - }; - let second = async { - timeline - .evict_layer_batch(&rc, &batch, &cancel) - .await - .unwrap() - }; + let first = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() }; + let second = async { timeline.evict_layer_batch(&rc, &batch).await.unwrap() }; let (first, second) = tokio::join!(first, second); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 6d30664515..56a99a25cf 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -17,6 +17,7 @@ use crate::{ deletion_queue::DeletionQueueClient, task_mgr::{self, TaskKind}, tenant::{ + debug_assert_current_span_has_tenant_and_timeline_id, metadata::TimelineMetadata, remote_timeline_client::{ self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient, @@ -30,6 +31,11 @@ use super::{Timeline, TimelineResources}; /// Now that the Timeline is in Stopping state, request all the related tasks to shut down. async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> { + debug_assert_current_span_has_tenant_and_timeline_id(); + // Notify any timeline work to drop out of loops/requests + tracing::debug!("Cancelling CancellationToken"); + timeline.cancel.cancel(); + // Stop the walreceiver first. debug!("waiting for wal receiver to shutdown"); let maybe_started_walreceiver = { timeline.walreceiver.lock().unwrap().take() }; @@ -74,6 +80,11 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> { "failpoint: timeline-delete-before-index-deleted-at" ))? }); + + tracing::debug!("Waiting for gate..."); + timeline.gate.close().await; + tracing::debug!("Shutdown complete"); + Ok(()) } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index dc5c71bbe1..52c53a5c3b 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -277,10 +277,7 @@ impl Timeline { Some(c) => c, }; - let results = match self - .evict_layer_batch(remote_client, &candidates, cancel) - .await - { + let results = match self.evict_layer_batch(remote_client, &candidates).await { Err(pre_err) => { stats.errors += candidates.len(); error!("could not do any evictions: {pre_err:#}"); diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index e28c4a5f69..3077712445 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -426,7 +426,7 @@ impl ConnectionManagerState { timeline, new_sk.wal_source_connconf, events_sender, - cancellation, + cancellation.clone(), connect_timeout, ctx, node_id, @@ -447,7 +447,14 @@ impl ConnectionManagerState { } WalReceiverError::Other(e) => { // give out an error to have task_mgr give it a really verbose logging - Err(e).context("walreceiver connection handling failure") + if cancellation.is_cancelled() { + // Ideally we would learn about this via some path other than Other, but + // that requires refactoring all the intermediate layers of ingest code + // that only emit anyhow::Error + Ok(()) + } else { + Err(e).context("walreceiver connection handling failure") + } } } } diff --git a/test_runner/regress/test_pageserver_restarts_under_workload.py b/test_runner/regress/test_pageserver_restarts_under_workload.py index 65569f3bac..d07b8dbe6b 100644 --- a/test_runner/regress/test_pageserver_restarts_under_workload.py +++ b/test_runner/regress/test_pageserver_restarts_under_workload.py @@ -17,6 +17,10 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB n_restarts = 10 scale = 10 + # Pageserver currently logs requests on non-active tenants at error level + # https://github.com/neondatabase/neon/issues/5784 + env.pageserver.allowed_errors.append(".* will not become active. Current state: Stopping.*") + def run_pgbench(connstr: str): log.info(f"Start a pgbench workload on pg {connstr}") pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])