diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 5b00715090..16fa7fdd34 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -9,7 +9,7 @@ use anyhow::{anyhow, Context}; use clap::{Arg, ArgAction, Command}; use fail::FailScenario; use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; -use pageserver::deletion_queue::DeletionQueue; +use pageserver::deletion_queue::{DeletionQueue, DeletionQueueError}; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; use pageserver::task_mgr::WALRECEIVER_RUNTIME; @@ -633,8 +633,21 @@ fn start_pageserver( let dq = deletion_queue.clone(); BACKGROUND_RUNTIME.block_on(async move { match tokio::time::timeout(Duration::from_secs(5), dq.new_client().flush()).await { - Ok(()) => { - info!("Deletion queue flushed successfully on shutdown"); + Ok(flush_r) => { + match flush_r { + Ok(()) => { + info!("Deletion queue flushed successfully on shutdown") + } + Err(e) => { + match e { + DeletionQueueError::ShuttingDown => { + // This is not harmful for correctness, but is unexpected: the deletion + // queue's workers should stay alive as long as there are any client handles instantiated. + warn!("Deletion queue stopped prematurely"); + } + } + } + } } Err(e) => { warn!("Timed out flushing deletion queue on shutdown ({e})") diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 82ff7ca4d6..11cf417ade 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -5,6 +5,7 @@ use remote_storage::{GenericRemoteStorage, RemotePath}; use serde::Deserialize; use serde::Serialize; use serde_with::serde_as; +use thiserror::Error; use tokio; use tokio::time::Duration; use tracing::{self, debug, error, info, warn}; @@ -158,15 +159,22 @@ impl DeletionList { } } +#[derive(Error, Debug)] +pub enum DeletionQueueError { + #[error("Deletion queue unavailable during shutdown")] + ShuttingDown, +} + impl DeletionQueueClient { - async fn do_push(&self, msg: FrontendQueueMessage) { + async fn do_push(&self, msg: FrontendQueueMessage) -> Result<(), DeletionQueueError> { match self.tx.send(msg).await { - Ok(_) => {} + Ok(_) => Ok(()), Err(e) => { // This shouldn't happen, we should shut down all tenants before // we shut down the global delete queue. If we encounter a bug like this, // we may leak objects as deletions won't be processed. error!("Deletion queue closed while pushing, shutting down? ({e})"); + Err(DeletionQueueError::ShuttingDown) } } } @@ -180,7 +188,7 @@ impl DeletionQueueClient { tenant_id: TenantId, timeline_id: TimelineId, layers: Vec, - ) { + ) -> Result<(), DeletionQueueError> { DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64); self.do_push(FrontendQueueMessage::Delete(DeletionOp { tenant_id, @@ -188,7 +196,7 @@ impl DeletionQueueClient { layers, objects: Vec::new(), })) - .await; + .await } /// Just like push_layers, but using some already-known remote paths, instead of abstract layer names @@ -197,7 +205,7 @@ impl DeletionQueueClient { tenant_id: TenantId, timeline_id: TimelineId, objects: Vec, - ) { + ) -> Result<(), DeletionQueueError> { DELETION_QUEUE_SUBMITTED.inc_by(objects.len() as u64); self.do_push(FrontendQueueMessage::Delete(DeletionOp { tenant_id, @@ -205,38 +213,46 @@ impl DeletionQueueClient { layers: Vec::new(), objects, })) - .await; + .await } - async fn do_flush(&self, msg: FrontendQueueMessage, rx: tokio::sync::oneshot::Receiver<()>) { - self.do_push(msg).await; + async fn do_flush( + &self, + msg: FrontendQueueMessage, + rx: tokio::sync::oneshot::Receiver<()>, + ) -> Result<(), DeletionQueueError> { + self.do_push(msg).await?; if rx.await.is_err() { // This shouldn't happen if tenants are shut down before deletion queue. If we // encounter a bug like this, then a flusher will incorrectly believe it has flushed // when it hasn't, possibly leading to leaking objects. error!("Deletion queue dropped flush op while client was still waiting"); + Err(DeletionQueueError::ShuttingDown) + } else { + Ok(()) } } /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList) - pub async fn flush(&self) { + pub async fn flush(&self) -> Result<(), DeletionQueueError> { let (tx, rx) = tokio::sync::oneshot::channel::<()>(); self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx) .await } // Wait until all previous deletions are executed - pub async fn flush_execute(&self) { + pub async fn flush_execute(&self) -> Result<(), DeletionQueueError> { debug!("flush_execute: flushing to deletion lists..."); // Flush any buffered work to deletion lists - self.flush().await; + self.flush().await?; // Flush execution of deletion lists let (tx, rx) = tokio::sync::oneshot::channel::<()>(); debug!("flush_execute: flushing execution..."); self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx) - .await; + .await?; debug!("flush_execute: finished flushing execution..."); + Ok(()) } } @@ -1029,13 +1045,13 @@ mod test { tenant_id, TIMELINE_ID, [layer_file_name_1.clone()].to_vec(), - )); + ))?; assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); assert_remote_files(&[], &remote_deletion_prefix); // File should still be there after we write a deletion list (we haven't pushed enough to execute anything) info!("Flushing"); - ctx.runtime.block_on(client.flush()); + ctx.runtime.block_on(client.flush())?; assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); assert_remote_files( &["0000000000000001-00000000-01.list"], @@ -1044,13 +1060,13 @@ mod test { // File should go away when we execute info!("Flush-executing"); - ctx.runtime.block_on(client.flush_execute()); + ctx.runtime.block_on(client.flush_execute())?; assert_remote_files(&[], &remote_timeline_path); assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); // Flushing on an empty queue should succeed immediately, and not write any lists info!("Flush-executing on empty"); - ctx.runtime.block_on(client.flush_execute()); + ctx.runtime.block_on(client.flush_execute())?; assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); Ok(()) @@ -1086,8 +1102,8 @@ mod test { tenant_id, TIMELINE_ID, [layer_file_name_1.clone()].to_vec(), - )); - ctx.runtime.block_on(client.flush()); + ))?; + ctx.runtime.block_on(client.flush())?; assert_remote_files( &["0000000000000001-00000000-01.list"], &remote_deletion_prefix, @@ -1100,7 +1116,7 @@ mod test { // If we have recovered the deletion list properly, then executing after restart should purge it info!("Flush-executing"); - ctx.runtime.block_on(client.flush_execute()); + ctx.runtime.block_on(client.flush_execute())?; assert_remote_files(&[], &remote_timeline_path); assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); Ok(()) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index d0e37b67f3..34924e3746 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -23,7 +23,7 @@ use super::models::{ TimelineCreateRequest, TimelineGcRequest, TimelineInfo, }; use crate::context::{DownloadBehavior, RequestContext}; -use crate::deletion_queue::DeletionQueue; +use crate::deletion_queue::{DeletionQueue, DeletionQueueError}; use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; @@ -1148,14 +1148,25 @@ async fn deletion_queue_flush( let queue_client = state.deletion_queue.new_client(); tokio::select! { - _ = async { + flush_result = async { if execute { - queue_client.flush_execute().await; + queue_client.flush_execute().await } else { - queue_client.flush().await; + queue_client.flush().await } } => { - json_response(StatusCode::OK, ()) + match flush_result { + Ok(())=> { + json_response(StatusCode::OK, ()) + }, + Err(e) => { + match e { + DeletionQueueError::ShuttingDown => { + Err(ApiError::ShuttingDown) + } + } + } + } }, _ = cancel.cancelled() => { Err(ApiError::ShuttingDown) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1b3514500d..6abbb4aee1 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -664,7 +664,7 @@ impl RemoteTimelineClient { // Enqueue deletions deletion_queue_client .push_layers(self.tenant_id, self.timeline_id, names.to_vec()) - .await; + .await?; Ok(()) } @@ -817,7 +817,7 @@ impl RemoteTimelineClient { deletion_queue .push_layers(self.tenant_id, self.timeline_id, layers) - .await; + .await?; // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage @@ -826,7 +826,7 @@ impl RemoteTimelineClient { // Execute all pending deletions, so that when we prroceed to do a list_prefixes below, we aren't // taking the burden of listing all the layers that we already know we should delete. - deletion_queue.flush_execute().await; + deletion_queue.flush_execute().await?; let remaining = backoff::retry( || async { @@ -858,7 +858,7 @@ impl RemoteTimelineClient { if !remaining.is_empty() { deletion_queue .push_objects(self.tenant_id, self.timeline_id, remaining) - .await; + .await?; } fail::fail_point!("timeline-delete-before-index-delete", |_| { @@ -872,11 +872,11 @@ impl RemoteTimelineClient { debug!("enqueuing index part deletion"); deletion_queue .push_objects(self.tenant_id, self.timeline_id, [index_file_path].to_vec()) - .await; + .await?; // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait // for a flush to a persistent deletion list so that we may be sure deletion will occur. - deletion_queue.flush_execute().await; + deletion_queue.flush_execute().await?; fail::fail_point!("timeline-delete-after-index-delete", |_| { Err(anyhow::anyhow!(