deletion queue: improve frontend retry

This commit is contained in:
John Spray
2023-08-15 18:06:11 +01:00
parent 504fe9c2b0
commit 3edd7ece40

View File

@@ -4,7 +4,7 @@ use serde::Deserialize;
use serde::Serialize;
use serde_with::serde_as;
use tokio;
use tokio::time::{Duration, Instant};
use tokio::time::Duration;
use tracing::{self, debug, error, info, warn};
use utils::id::{TenantId, TimelineId};
@@ -249,7 +249,7 @@ impl BackendQueueWorker {
true
}
Err(e) => {
warn!("DeleteObjects request failed: {e}, will retry");
warn!("DeleteObjects request failed: {e:#}, will retry");
DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc();
self.timeout = EXECUTE_RETRY_DEADLINE;
false
@@ -344,7 +344,7 @@ impl BackendQueueWorker {
.truncate(self.executed_lists.len() - executed_keys.len());
}
Err(e) => {
warn!("Failed to delete deletion list(s): {e}");
warn!("Failed to delete deletion list(s): {e:#}");
// Do nothing: the elements remain in executed_lists, and purge will be retried
// next time we process some deletions and go around the loop.
DELETION_QUEUE_ERRORS
@@ -383,12 +383,13 @@ pub struct FrontendQueueWorker {
// and our next sequence number
pending: DeletionList,
// When we should next proactively flush if we have pending deletions, even if
// the target deletion list size has not been reached.
deadline: Instant,
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// After how long will we flush a DeletionList without reaching the target size:
// this is lazy usually, but after a failed flush it is set to a smaller time
// period to drive retries
timeout: Duration,
}
impl FrontendQueueWorker {
@@ -441,15 +442,18 @@ impl FrontendQueueWorker {
if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await {
// This is allowed to fail: it will only happen if the backend worker is shut down,
// so we can just drop this on the floor.
info!("Deletion list dropped, this is normal during shutdown ({e})");
info!("Deletion list dropped, this is normal during shutdown ({e:#})");
}
self.timeout = FLUSH_DEFAULT_DEADLINE;
}
Err(e) => {
DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc();
warn!(
sequence = self.pending.sequence,
"Failed to write deletion list to remote storage, will retry later ({e})"
)
"Failed to write deletion list to remote storage, will retry later ({e:#})"
);
self.timeout = FLUSH_EXPLICIT_DEADLINE;
}
}
}
@@ -459,24 +463,15 @@ impl FrontendQueueWorker {
pub async fn background(&mut self) {
info!("Started deletion frontend worker");
loop {
let flush_delay = self.deadline.duration_since(Instant::now());
// Wait for the next message, or to hit self.deadline
let msg = tokio::select! {
msg_opt = self.rx.recv() => {
match msg_opt {
None => {
break;
},
Some(msg)=> {msg}
}
},
_ = tokio::time::sleep(flush_delay) => {
self.deadline = Instant::now() + FLUSH_DEFAULT_DEADLINE;
if !self.pending.objects.is_empty() {
debug!("Flushing for deadline");
self.flush().await;
}
let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => {
// Queue sender destroyed, shutting down
break;
}
Err(_) => {
// Hit deadline, flush.
self.flush().await;
continue;
}
};
@@ -514,10 +509,7 @@ impl FrontendQueueWorker {
self.pending_flushes.push(op);
// Move up the deadline since we have been explicitly asked to flush
let flush_delay = self.deadline.duration_since(Instant::now());
if flush_delay > FLUSH_EXPLICIT_DEADLINE {
self.deadline = Instant::now() + FLUSH_EXPLICIT_DEADLINE;
}
self.timeout = FLUSH_EXPLICIT_DEADLINE;
}
}
FrontendQueueMessage::FlushExecute(op) => {
@@ -576,7 +568,7 @@ impl DeletionQueue {
conf,
rx,
tx: backend_tx,
deadline: Instant::now() + FLUSH_DEFAULT_DEADLINE,
timeout: FLUSH_DEFAULT_DEADLINE,
pending_flushes: Vec::new(),
}),
Some(BackendQueueWorker {