flush tweaks

This commit is contained in:
John Spray
2023-08-22 13:17:57 +01:00
parent 7c4d79f4db
commit 6efddbf526

View File

@@ -23,11 +23,11 @@ const DELETION_LIST_TARGET_SIZE: usize = 16384;
// Ordinarily, we only flush to DeletionList periodically, to bound the window during
// which we might leak objects from not flushing a DeletionList after
// the objects are already unlinked from timeline metadata.
const FLUSH_DEFAULT_DEADLINE: Duration = Duration::from_millis(10000);
const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
// If someone is waiting for a flush to DeletionList, only delay a little to accumulate
// more objects before doing the flush.
const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100);
const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
// After this length of time, execute deletions which are elegible to run,
// even if we haven't accumulated enough for a full-sized DeleteObjects
@@ -514,11 +514,6 @@ pub struct FrontendQueueWorker {
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// After how long will we flush a DeletionList without reaching the target size:
// this is lazy usually, but after a failed flush it is set to a smaller time
// period to drive retries
timeout: Duration,
// Worker loop is torn down when this fires.
cancel: CancellationToken,
}
@@ -573,8 +568,6 @@ impl FrontendQueueWorker {
// so we can just drop this on the floor.
info!("Deletion list dropped, this is normal during shutdown ({e:#})");
}
self.timeout = FLUSH_DEFAULT_DEADLINE;
}
Err(e) => {
DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc();
@@ -582,7 +575,6 @@ impl FrontendQueueWorker {
sequence = self.pending.sequence,
"Failed to write deletion list to remote storage, will retry later ({e:#})"
);
self.timeout = FLUSH_EXPLICIT_DEADLINE;
}
}
}
@@ -734,7 +726,13 @@ impl FrontendQueueWorker {
let mut recovered: bool = false;
loop {
let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await {
let timeout = if self.pending_flushes.is_empty() {
FRONTEND_DEFAULT_TIMEOUT
} else {
FRONTEND_FLUSHING_TIMEOUT
};
let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => {
// Queue sender destroyed, shutting down
@@ -795,9 +793,6 @@ impl FrontendQueueWorker {
} else {
// Execute next time we flush
self.pending_flushes.push(op);
// Move up the deadline since we have been explicitly asked to flush
self.timeout = FLUSH_EXPLICIT_DEADLINE;
}
}
FrontendQueueMessage::FlushExecute(op) => {
@@ -809,8 +804,9 @@ impl FrontendQueueWorker {
}
}
if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE {
debug!(sequence = self.pending.sequence, "Flushing for deadline");
if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE
|| !self.pending_flushes.is_empty()
{
self.flush().await;
}
}
@@ -855,7 +851,6 @@ impl DeletionQueue {
conf,
rx,
tx: backend_tx,
timeout: FLUSH_DEFAULT_DEADLINE,
pending_flushes: Vec::new(),
cancel,
}),