diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 234e984240..1c6d27da2c 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -384,8 +384,12 @@ impl DeletionQueueClient { } } - async fn do_push(&self, msg: FrontendQueueMessage) -> Result<(), DeletionQueueError> { - match self.tx.send(msg).await { + async fn do_push( + &self, + queue: &tokio::sync::mpsc::Sender, + msg: T, + ) -> Result<(), DeletionQueueError> { + match queue.send(msg).await { Ok(_) => Ok(()), Err(e) => { // This shouldn't happen, we should shut down all tenants before @@ -401,9 +405,10 @@ impl DeletionQueueClient { &self, attached_tenants: HashMap, ) -> Result<(), DeletionQueueError> { - self.do_push(FrontendQueueMessage::Recover(RecoverOp { - attached_tenants, - })) + self.do_push( + &self.tx, + FrontendQueueMessage::Recover(RecoverOp { attached_tenants }), + ) .await } @@ -481,22 +486,26 @@ impl DeletionQueueClient { } DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64); - self.do_push(FrontendQueueMessage::Delete(DeletionOp { - tenant_id, - timeline_id, - layers, - generation: current_generation, - objects: Vec::new(), - })) + self.do_push( + &self.tx, + FrontendQueueMessage::Delete(DeletionOp { + tenant_id, + timeline_id, + layers, + generation: current_generation, + objects: Vec::new(), + }), + ) .await } - async fn do_flush( + async fn do_flush( &self, - msg: FrontendQueueMessage, + queue: &tokio::sync::mpsc::Sender, + msg: T, rx: tokio::sync::oneshot::Receiver<()>, ) -> Result<(), DeletionQueueError> { - self.do_push(msg).await?; + self.do_push(queue, msg).await?; if rx.await.is_err() { // This shouldn't happen if tenants are shut down before deletion queue. If we // encounter a bug like this, then a flusher will incorrectly believe it has flushed @@ -511,7 +520,7 @@ impl DeletionQueueClient { /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList) pub async fn flush(&self) -> Result<(), DeletionQueueError> { let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx) + self.do_flush(&self.tx, FrontendQueueMessage::Flush(FlushOp { tx }), rx) .await } @@ -521,11 +530,27 @@ impl DeletionQueueClient { // Flush any buffered work to deletion lists self.flush().await?; - // Flush execution of deletion lists + // Flush the backend into the executor of deletion lists let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + debug!("flush_execute: flushing backend..."); + self.do_flush( + &self.tx, + FrontendQueueMessage::FlushExecute(FlushOp { tx }), + rx, + ) + .await?; + debug!("flush_execute: finished flushing backend..."); + + // Flush any immediate-mode deletions (the above backend flush will only flush + // the executor if deletions had flowed through the backend) debug!("flush_execute: flushing execution..."); - self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx) - .await?; + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + self.do_flush( + &self.executor_tx, + ExecutorMessage::Flush(FlushOp { tx }), + rx, + ) + .await?; debug!("flush_execute: finished flushing execution..."); Ok(()) } diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index a24811e0d5..f8410eb52e 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -241,54 +241,64 @@ impl BackendQueueWorker { Ok(()) } - pub async fn flush(&mut self) { + pub async fn flush(&mut self) -> Result<(), DeletionQueueError> { + tracing::debug!("Flushing with {} pending lists", self.pending_lists.len()); + // Issue any required generation validation calls to the control plane - if let Err(DeletionQueueError::ShuttingDown) = self.validate().await { - warn!("Shutting down"); - return; - } + self.validate().await?; // After successful validation, nothing is pending: any lists that // made it through validation will be in validated_lists. assert!(self.pending_lists.is_empty()); self.pending_key_count = 0; - // Return quickly if we have no validated lists to execute. + tracing::debug!( + "Validation complete, have {} validated lists", + self.validated_lists.len() + ); + + // Return quickly if we have no validated lists to execute. This avoids flushing the + // executor when an idle backend hits its autoflush interval if self.validated_lists.is_empty() { - return; + return Ok(()); } // Drain `validated_lists` into the executor let mut executing_lists = Vec::new(); for mut list in self.validated_lists.drain(..) { let objects = list.drain_paths(); - if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await { - warn!("Shutting down"); - return; - }; - + self.tx + .send(ExecutorMessage::Delete(objects)) + .await + .map_err(|_| DeletionQueueError::ShuttingDown)?; executing_lists.push(list); } + self.flush_executor().await?; + + // Erase the deletion lists whose keys have all be deleted from remote storage + self.cleanup_lists(executing_lists).await; + + Ok(()) + } + + async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> { // Flush the executor, so that all the keys referenced by these deletion lists // are actually removed from remote storage. This is a precondition to deleting // the deletion lists themselves. let (tx, rx) = tokio::sync::oneshot::channel::<()>(); let flush_op = FlushOp { tx }; - if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await { - warn!("Shutting down"); - return; - }; - if rx.await.is_err() { - warn!("Shutting down"); - return; - } + self.tx + .send(ExecutorMessage::Flush(flush_op)) + .await + .map_err(|_| DeletionQueueError::ShuttingDown)?; - // Erase the deletion lists whose keys have all be deleted from remote storage - self.cleanup_lists(executing_lists).await; + rx.await.map_err(|_| DeletionQueueError::ShuttingDown) } pub async fn background(&mut self) { + tracing::info!("Started deletion backend worker"); + while !self.cancel.is_cancelled() { let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await { Ok(Some(m)) => m, @@ -299,8 +309,9 @@ impl BackendQueueWorker { } Err(_) => { // Timeout, we hit deadline to execute whatever we have in hand. These functions will - // return immediately if no work is pending - self.flush().await; + // return immediately if no work is pending. + // Drop result, because it' a background flush and we don't care whether it really worked. + drop(self.flush().await); continue; } @@ -316,13 +327,15 @@ impl BackendQueueWorker { } if self.pending_key_count > AUTOFLUSH_KEY_COUNT { - self.flush().await; + // Drop possible shutdown error, because we will just fall out of loop if that happens + drop(self.flush().await); } } BackendQueueMessage::Flush(op) => { - self.flush().await; - - op.fire(); + if let Ok(()) = self.flush().await { + // If we fail due to shutting down, we will just drop `op` to propagate that status. + op.fire(); + } } } }