From 3edd7ece40c087fff1ef286ee0499a1893bb0fea Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 15 Aug 2023 18:06:11 +0100 Subject: [PATCH] deletion queue: improve frontend retry --- pageserver/src/deletion_queue.rs | 58 ++++++++++++++------------------ 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 446e89d152..f35d07cbf7 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -4,7 +4,7 @@ use serde::Deserialize; use serde::Serialize; use serde_with::serde_as; use tokio; -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; use tracing::{self, debug, error, info, warn}; use utils::id::{TenantId, TimelineId}; @@ -249,7 +249,7 @@ impl BackendQueueWorker { true } Err(e) => { - warn!("DeleteObjects request failed: {e}, will retry"); + warn!("DeleteObjects request failed: {e:#}, will retry"); DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc(); self.timeout = EXECUTE_RETRY_DEADLINE; false @@ -344,7 +344,7 @@ impl BackendQueueWorker { .truncate(self.executed_lists.len() - executed_keys.len()); } Err(e) => { - warn!("Failed to delete deletion list(s): {e}"); + warn!("Failed to delete deletion list(s): {e:#}"); // Do nothing: the elements remain in executed_lists, and purge will be retried // next time we process some deletions and go around the loop. DELETION_QUEUE_ERRORS @@ -383,12 +383,13 @@ pub struct FrontendQueueWorker { // and our next sequence number pending: DeletionList, - // When we should next proactively flush if we have pending deletions, even if - // the target deletion list size has not been reached. - deadline: Instant, - // These FlushOps should fire the next time we flush pending_flushes: Vec, + + // After how long will we flush a DeletionList without reaching the target size: + // this is lazy usually, but after a failed flush it is set to a smaller time + // period to drive retries + timeout: Duration, } impl FrontendQueueWorker { @@ -441,15 +442,18 @@ impl FrontendQueueWorker { if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await { // This is allowed to fail: it will only happen if the backend worker is shut down, // so we can just drop this on the floor. - info!("Deletion list dropped, this is normal during shutdown ({e})"); + info!("Deletion list dropped, this is normal during shutdown ({e:#})"); } + + self.timeout = FLUSH_DEFAULT_DEADLINE; } Err(e) => { DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc(); warn!( sequence = self.pending.sequence, - "Failed to write deletion list to remote storage, will retry later ({e})" - ) + "Failed to write deletion list to remote storage, will retry later ({e:#})" + ); + self.timeout = FLUSH_EXPLICIT_DEADLINE; } } } @@ -459,24 +463,15 @@ impl FrontendQueueWorker { pub async fn background(&mut self) { info!("Started deletion frontend worker"); loop { - let flush_delay = self.deadline.duration_since(Instant::now()); - - // Wait for the next message, or to hit self.deadline - let msg = tokio::select! { - msg_opt = self.rx.recv() => { - match msg_opt { - None => { - break; - }, - Some(msg)=> {msg} - } - }, - _ = tokio::time::sleep(flush_delay) => { - self.deadline = Instant::now() + FLUSH_DEFAULT_DEADLINE; - if !self.pending.objects.is_empty() { - debug!("Flushing for deadline"); - self.flush().await; - } + let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await { + Ok(Some(msg)) => msg, + Ok(None) => { + // Queue sender destroyed, shutting down + break; + } + Err(_) => { + // Hit deadline, flush. + self.flush().await; continue; } }; @@ -514,10 +509,7 @@ impl FrontendQueueWorker { self.pending_flushes.push(op); // Move up the deadline since we have been explicitly asked to flush - let flush_delay = self.deadline.duration_since(Instant::now()); - if flush_delay > FLUSH_EXPLICIT_DEADLINE { - self.deadline = Instant::now() + FLUSH_EXPLICIT_DEADLINE; - } + self.timeout = FLUSH_EXPLICIT_DEADLINE; } } FrontendQueueMessage::FlushExecute(op) => { @@ -576,7 +568,7 @@ impl DeletionQueue { conf, rx, tx: backend_tx, - deadline: Instant::now() + FLUSH_DEFAULT_DEADLINE, + timeout: FLUSH_DEFAULT_DEADLINE, pending_flushes: Vec::new(), }), Some(BackendQueueWorker {