diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 553e3f2eb0..efa5cc4f99 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -23,11 +23,11 @@ const DELETION_LIST_TARGET_SIZE: usize = 16384; // Ordinarily, we only flush to DeletionList periodically, to bound the window during // which we might leak objects from not flushing a DeletionList after // the objects are already unlinked from timeline metadata. -const FLUSH_DEFAULT_DEADLINE: Duration = Duration::from_millis(10000); +const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000); // If someone is waiting for a flush to DeletionList, only delay a little to accumulate // more objects before doing the flush. -const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100); +const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100); // After this length of time, execute deletions which are elegible to run, // even if we haven't accumulated enough for a full-sized DeleteObjects @@ -514,11 +514,6 @@ pub struct FrontendQueueWorker { // These FlushOps should fire the next time we flush pending_flushes: Vec, - // After how long will we flush a DeletionList without reaching the target size: - // this is lazy usually, but after a failed flush it is set to a smaller time - // period to drive retries - timeout: Duration, - // Worker loop is torn down when this fires. cancel: CancellationToken, } @@ -573,8 +568,6 @@ impl FrontendQueueWorker { // so we can just drop this on the floor. info!("Deletion list dropped, this is normal during shutdown ({e:#})"); } - - self.timeout = FLUSH_DEFAULT_DEADLINE; } Err(e) => { DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc(); @@ -582,7 +575,6 @@ impl FrontendQueueWorker { sequence = self.pending.sequence, "Failed to write deletion list to remote storage, will retry later ({e:#})" ); - self.timeout = FLUSH_EXPLICIT_DEADLINE; } } } @@ -734,7 +726,13 @@ impl FrontendQueueWorker { let mut recovered: bool = false; loop { - let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await { + let timeout = if self.pending_flushes.is_empty() { + FRONTEND_DEFAULT_TIMEOUT + } else { + FRONTEND_FLUSHING_TIMEOUT + }; + + let msg = match tokio::time::timeout(timeout, self.rx.recv()).await { Ok(Some(msg)) => msg, Ok(None) => { // Queue sender destroyed, shutting down @@ -795,9 +793,6 @@ impl FrontendQueueWorker { } else { // Execute next time we flush self.pending_flushes.push(op); - - // Move up the deadline since we have been explicitly asked to flush - self.timeout = FLUSH_EXPLICIT_DEADLINE; } } FrontendQueueMessage::FlushExecute(op) => { @@ -809,8 +804,9 @@ impl FrontendQueueWorker { } } - if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE { - debug!(sequence = self.pending.sequence, "Flushing for deadline"); + if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE + || !self.pending_flushes.is_empty() + { self.flush().await; } } @@ -855,7 +851,6 @@ impl DeletionQueue { conf, rx, tx: backend_tx, - timeout: FLUSH_DEFAULT_DEADLINE, pending_flushes: Vec::new(), cancel, }),