diff --git a/libs/utils/src/seqwait.rs b/libs/utils/src/seqwait.rs index 5bc7ca91d6..effc9c67b5 100644 --- a/libs/utils/src/seqwait.rs +++ b/libs/utils/src/seqwait.rs @@ -125,6 +125,9 @@ where // Wake everyone with an error. let mut internal = self.internal.lock().unwrap(); + // Block any future waiters from starting + internal.shutdown = true; + // This will steal the entire waiters map. // When we drop it all waiters will be woken. mem::take(&mut internal.waiters) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7a8f37f923..63016042cf 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -303,11 +303,7 @@ async fn build_timeline_info( // we're executing this function, we will outlive the timeline on-disk state. info.current_logical_size_non_incremental = Some( timeline - .get_current_logical_size_non_incremental( - info.last_record_lsn, - CancellationToken::new(), - ctx, - ) + .get_current_logical_size_non_incremental(info.last_record_lsn, ctx) .await?, ); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2201d6c86b..ee5f1732e4 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -512,7 +512,11 @@ impl PageServerHandler { }; if let Err(e) = &response { - if timeline.cancel.is_cancelled() { + // Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet, + // because wait_lsn etc will drop out + // is_stopping(): [`Timeline::flush_and_shutdown`] has entered + // is_canceled(): [`Timeline::shutdown`]` has entered + if timeline.cancel.is_cancelled() || timeline.is_stopping() { // If we fail to fulfil a request during shutdown, which may be _because_ of // shutdown, then do not send the error to the client. Instead just drop the // connection. diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index aa4d155bcc..9e8a6b02cc 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -21,7 +21,6 @@ use serde::{Deserialize, Serialize}; use std::collections::{hash_map, HashMap, HashSet}; use std::ops::ControlFlow; use std::ops::Range; -use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -578,7 +577,6 @@ impl Timeline { pub async fn get_current_logical_size_non_incremental( &self, lsn: Lsn, - cancel: CancellationToken, ctx: &RequestContext, ) -> Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); @@ -590,7 +588,7 @@ impl Timeline { let mut total_size: u64 = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? { - if cancel.is_cancelled() { + if self.cancel.is_cancelled() { return Err(CalculateLogicalSizeError::Cancelled); } let relsize_key = rel_size_to_key(rel); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dc07ea7346..758f8b15a1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1841,7 +1841,13 @@ impl Tenant { timelines.values().for_each(|timeline| { let timeline = Arc::clone(timeline); let span = Span::current(); - js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await }); + js.spawn(async move { + if freeze_and_flush { + timeline.flush_and_shutdown().instrument(span).await + } else { + timeline.shutdown().instrument(span).await + } + }); }) }; tracing::info!("Waiting for timelines..."); @@ -4727,7 +4733,7 @@ mod tests { // Keeps uninit mark in place let raw_tline = tline.raw_timeline().unwrap(); raw_tline - .shutdown(false) + .shutdown() .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id)) .await; std::mem::forget(tline); diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index e4df94b8e9..a85dc9231c 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use anyhow::{bail, Context}; use tokio::sync::oneshot::error::RecvError; use tokio::sync::Semaphore; -use tokio_util::sync::CancellationToken; use crate::context::RequestContext; use crate::pgdatadir_mapping::CalculateLogicalSizeError; @@ -350,10 +349,6 @@ async fn fill_logical_sizes( // our advantage with `?` error handling. let mut joinset = tokio::task::JoinSet::new(); - let cancel = tokio_util::sync::CancellationToken::new(); - // be sure to cancel all spawned tasks if we are dropped - let _dg = cancel.clone().drop_guard(); - // For each point that would benefit from having a logical size available, // spawn a Task to fetch it, unless we have it cached already. for seg in segments.iter() { @@ -371,15 +366,8 @@ async fn fill_logical_sizes( let parallel_size_calcs = Arc::clone(limit); let ctx = ctx.attached_child(); joinset.spawn( - calculate_logical_size( - parallel_size_calcs, - timeline, - lsn, - cause, - ctx, - cancel.child_token(), - ) - .in_current_span(), + calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx) + .in_current_span(), ); } e.insert(cached_size); @@ -487,14 +475,13 @@ async fn calculate_logical_size( lsn: utils::lsn::Lsn, cause: LogicalSizeCalculationCause, ctx: RequestContext, - cancel: CancellationToken, ) -> Result { let _permit = tokio::sync::Semaphore::acquire_owned(limit) .await .expect("global semaphore should not had been closed"); let size_res = timeline - .spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel) + .spawn_ondemand_logical_size_calculation(lsn, cause, ctx) .instrument(info_span!("spawn_ondemand_logical_size_calculation")) .await?; Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res)) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 36629e0655..bbb96cb172 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -36,7 +36,6 @@ use std::time::{Duration, Instant, SystemTime}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, }; -use crate::deletion_queue::DeletionQueueClient; use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer, @@ -50,6 +49,7 @@ use crate::tenant::{ metadata::{save_metadata, TimelineMetadata}, par_fsync, }; +use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError}; use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum}; @@ -247,7 +247,7 @@ pub struct Timeline { /// the flush finishes. You can use that to wait for the flush to finish. layer_flush_start_tx: tokio::sync::watch::Sender, /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel - layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>, + layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>, /// Layer removal lock. /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. @@ -374,6 +374,19 @@ pub enum PageReconstructError { WalRedo(anyhow::Error), } +#[derive(thiserror::Error, Debug)] +enum FlushLayerError { + /// Timeline cancellation token was cancelled + #[error("timeline shutting down")] + Cancelled, + + #[error(transparent)] + PageReconstructError(#[from] PageReconstructError), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + impl std::fmt::Debug for PageReconstructError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { match self { @@ -891,15 +904,16 @@ impl Timeline { self.launch_eviction_task(background_jobs_can_start); } + /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then + /// also to remote storage. This method can easily take multiple seconds for a busy timeline. + /// + /// While we are flushing, we continue to accept read I/O. #[instrument(skip_all, fields(timeline_id=%self.timeline_id))] - pub async fn shutdown(self: &Arc, freeze_and_flush: bool) { + pub(crate) async fn flush_and_shutdown(&self) { debug_assert_current_span_has_tenant_and_timeline_id(); - // Signal any subscribers to our cancellation token to drop out - tracing::debug!("Cancelling CancellationToken"); - self.cancel.cancel(); - - // prevent writes to the InMemoryLayer + // Stop ingesting data, so that we are not still writing to an InMemoryLayer while + // trying to flush tracing::debug!("Waiting for WalReceiverManager..."); task_mgr::shutdown_tasks( Some(TaskKind::WalReceiverManager), @@ -908,40 +922,70 @@ impl Timeline { ) .await; + // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance + self.last_record_lsn.shutdown(); + // now all writers to InMemory layer are gone, do the final flush if requested - if freeze_and_flush { - match self.freeze_and_flush().await { - Ok(()) => {} - Err(e) => { - warn!("failed to freeze and flush: {e:#}"); - return; // TODO: should probably drain remote timeline client anyways? + match self.freeze_and_flush().await { + Ok(_) => { + // drain the upload queue + if let Some(client) = self.remote_client.as_ref() { + // if we did not wait for completion here, it might be our shutdown process + // didn't wait for remote uploads to complete at all, as new tasks can forever + // be spawned. + // + // what is problematic is the shutting down of RemoteTimelineClient, because + // obviously it does not make sense to stop while we wait for it, but what + // about corner cases like s3 suddenly hanging up? + if let Err(e) = client.wait_completion().await { + // Non-fatal. Shutdown is infallible. Failures to flush just mean that + // we have some extra WAL replay to do next time the timeline starts. + warn!("failed to flush to remote storage: {e:#}"); + } } } - - // drain the upload queue - let res = if let Some(client) = self.remote_client.as_ref() { - // if we did not wait for completion here, it might be our shutdown process - // didn't wait for remote uploads to complete at all, as new tasks can forever - // be spawned. - // - // what is problematic is the shutting down of RemoteTimelineClient, because - // obviously it does not make sense to stop while we wait for it, but what - // about corner cases like s3 suddenly hanging up? - client.wait_completion().await - } else { - Ok(()) - }; - - if let Err(e) = res { - warn!("failed to await for frozen and flushed uploads: {e:#}"); + Err(e) => { + // Non-fatal. Shutdown is infallible. Failures to flush just mean that + // we have some extra WAL replay to do next time the timeline starts. + warn!("failed to freeze and flush: {e:#}"); } } + self.shutdown().await; + } + + /// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of + /// the graceful [`Timeline::flush_and_shutdown`] function. + pub(crate) async fn shutdown(&self) { + // Signal any subscribers to our cancellation token to drop out + tracing::debug!("Cancelling CancellationToken"); + self.cancel.cancel(); + // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel // while doing so. self.last_record_lsn.shutdown(); + // Shut down the layer flush task before the remote client, as one depends on the other + task_mgr::shutdown_tasks( + Some(TaskKind::LayerFlushTask), + Some(self.tenant_id), + Some(self.timeline_id), + ) + .await; + + // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in + // case our caller wants to use that for a deletion + if let Some(remote_client) = self.remote_client.as_ref() { + match remote_client.stop() { + Ok(()) => {} + Err(StopError::QueueUninitialized) => { + // Shutting down during initialization is legal + } + } + } + tracing::debug!("Waiting for tasks..."); + task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(self.timeline_id)).await; // Finally wait until any gate-holders are complete @@ -985,7 +1029,12 @@ impl Timeline { reason, backtrace: backtrace_str, }; - self.set_state(broken_state) + self.set_state(broken_state); + + // Although the Broken state is not equivalent to shutdown() (shutdown will be called + // later when this tenant is detach or the process shuts down), firing the cancellation token + // here avoids the need for other tasks to watch for the Broken state explicitly. + self.cancel.cancel(); } pub fn current_state(&self) -> TimelineState { @@ -1741,12 +1790,8 @@ impl Timeline { // delay will be terminated by a timeout regardless. let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() }; - // no extra cancellation here, because nothing really waits for this to complete compared - // to spawn_ondemand_logical_size_calculation. - let cancel = CancellationToken::new(); - let calculated_size = match self_clone - .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel) + .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx) .await { Ok(s) => s, @@ -1815,7 +1860,6 @@ impl Timeline { lsn: Lsn, cause: LogicalSizeCalculationCause, ctx: RequestContext, - cancel: CancellationToken, ) -> oneshot::Receiver> { let (sender, receiver) = oneshot::channel(); let self_clone = Arc::clone(self); @@ -1836,7 +1880,7 @@ impl Timeline { false, async move { let res = self_clone - .logical_size_calculation_task(lsn, cause, &ctx, cancel) + .logical_size_calculation_task(lsn, cause, &ctx) .await; let _ = sender.send(res).ok(); Ok(()) // Receiver is responsible for handling errors @@ -1852,58 +1896,28 @@ impl Timeline { lsn: Lsn, cause: LogicalSizeCalculationCause, ctx: &RequestContext, - cancel: CancellationToken, ) -> Result { span::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut timeline_state_updates = self.subscribe_for_state_updates(); + let _guard = self.gate.enter(); + let self_calculation = Arc::clone(self); let mut calculation = pin!(async { - let cancel = cancel.child_token(); let ctx = ctx.attached_child(); self_calculation - .calculate_logical_size(lsn, cause, cancel, &ctx) + .calculate_logical_size(lsn, cause, &ctx) .await }); - let timeline_state_cancellation = async { - loop { - match timeline_state_updates.changed().await { - Ok(()) => { - let new_state = timeline_state_updates.borrow().clone(); - match new_state { - // we're running this job for active timelines only - TimelineState::Active => continue, - TimelineState::Broken { .. } - | TimelineState::Stopping - | TimelineState::Loading => { - break format!("aborted because timeline became inactive (new state: {new_state:?})") - } - } - } - Err(_sender_dropped_error) => { - // can't happen, the sender is not dropped as long as the Timeline exists - break "aborted because state watch was dropped".to_string(); - } - } - } - }; - - let taskmgr_shutdown_cancellation = async { - task_mgr::shutdown_watcher().await; - "aborted because task_mgr shutdown requested".to_string() - }; tokio::select! { res = &mut calculation => { res } - reason = timeline_state_cancellation => { - debug!(reason = reason, "cancelling calculation"); - cancel.cancel(); + _ = self.cancel.cancelled() => { + debug!("cancelling logical size calculation for timeline shutdown"); calculation.await } - reason = taskmgr_shutdown_cancellation => { - debug!(reason = reason, "cancelling calculation"); - cancel.cancel(); + _ = task_mgr::shutdown_watcher() => { + debug!("cancelling logical size calculation for task shutdown"); calculation.await } } @@ -1917,7 +1931,6 @@ impl Timeline { &self, up_to_lsn: Lsn, cause: LogicalSizeCalculationCause, - cancel: CancellationToken, ctx: &RequestContext, ) -> Result { info!( @@ -1960,7 +1973,7 @@ impl Timeline { }; let timer = storage_time_metrics.start_timer(); let logical_size = self - .get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx) + .get_current_logical_size_non_incremental(up_to_lsn, ctx) .await?; debug!("calculated logical size: {logical_size}"); timer.stop_and_record(); @@ -2373,6 +2386,10 @@ impl Timeline { info!("started flush loop"); loop { tokio::select! { + _ = self.cancel.cancelled() => { + info!("shutting down layer flush task"); + break; + }, _ = task_mgr::shutdown_watcher() => { info!("shutting down layer flush task"); break; @@ -2384,6 +2401,14 @@ impl Timeline { let timer = self.metrics.flush_time_histo.start_timer(); let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { + if self.cancel.is_cancelled() { + info!("dropping out of flush loop for timeline shutdown"); + // Note: we do not bother transmitting into [`layer_flush_done_tx`], because + // anyone waiting on that will respect self.cancel as well: they will stop + // waiting at the same time we as drop out of this loop. + return; + } + let layer_to_flush = { let guard = self.layers.read().await; guard.layer_map().frozen_layers.front().cloned() @@ -2392,9 +2417,18 @@ impl Timeline { let Some(layer_to_flush) = layer_to_flush else { break Ok(()); }; - if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await { - error!("could not flush frozen layer: {err:?}"); - break Err(err); + match self.flush_frozen_layer(layer_to_flush, ctx).await { + Ok(()) => {} + Err(FlushLayerError::Cancelled) => { + info!("dropping out of flush loop for timeline shutdown"); + return; + } + err @ Err( + FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_), + ) => { + error!("could not flush frozen layer: {err:?}"); + break err; + } } }; // Notify any listeners that we're done @@ -2443,7 +2477,17 @@ impl Timeline { } } trace!("waiting for flush to complete"); - rx.changed().await?; + tokio::select! { + rx_e = rx.changed() => { + rx_e?; + }, + // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring + // the notification from [`flush_loop`] that it completed. + _ = self.cancel.cancelled() => { + tracing::info!("Cancelled layer flush due on timeline shutdown"); + return Ok(()) + } + }; trace!("done") } } @@ -2458,7 +2502,7 @@ impl Timeline { self: &Arc, frozen_layer: Arc, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), FlushLayerError> { // As a special case, when we have just imported an image into the repository, // instead of writing out a L0 delta layer, we directly write out image layer // files instead. This is possible as long as *all* the data imported into the @@ -2483,6 +2527,11 @@ impl Timeline { let (partitioning, _lsn) = self .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx) .await?; + + if self.cancel.is_cancelled() { + return Err(FlushLayerError::Cancelled); + } + // For image layers, we add them immediately into the layer map. ( self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx) @@ -2514,6 +2563,10 @@ impl Timeline { ) }; + if self.cancel.is_cancelled() { + return Err(FlushLayerError::Cancelled); + } + let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1); let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); @@ -2523,6 +2576,10 @@ impl Timeline { let metadata = { let mut guard = self.layers.write().await; + if self.cancel.is_cancelled() { + return Err(FlushLayerError::Cancelled); + } + guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics); if disk_consistent_lsn != old_disk_consistent_lsn { diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index e3aad22e40..183fcb872f 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -326,8 +326,7 @@ impl Timeline { match state.last_layer_access_imitation { Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ } _ => { - self.imitate_timeline_cached_layer_accesses(cancel, ctx) - .await; + self.imitate_timeline_cached_layer_accesses(ctx).await; state.last_layer_access_imitation = Some(tokio::time::Instant::now()) } } @@ -367,21 +366,12 @@ impl Timeline { /// Recompute the values which would cause on-demand downloads during restart. #[instrument(skip_all)] - async fn imitate_timeline_cached_layer_accesses( - &self, - cancel: &CancellationToken, - ctx: &RequestContext, - ) { + async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) { let lsn = self.get_last_record_lsn(); // imitiate on-restart initial logical size let size = self - .calculate_logical_size( - lsn, - LogicalSizeCalculationCause::EvictionTaskImitation, - cancel.clone(), - ctx, - ) + .calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx) .instrument(info_span!("calculate_logical_size")) .await;