mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
pageserver: implement batching in deletion queue
This commit is contained in:
@@ -169,16 +169,91 @@ impl BackendQueueWorker {
|
||||
// add an ephemeral part to BackendQueueMessage::Delete that tracks which keys
|
||||
// in the deletion list may not be deleted yet, with guards to block on while
|
||||
// we wait to proceed.
|
||||
|
||||
// From the S3 spec
|
||||
const MAX_KEYS_PER_DELETE: usize = 1024;
|
||||
|
||||
let mut accumulator = Vec::new();
|
||||
accumulator.reserve(MAX_KEYS_PER_DELETE);
|
||||
|
||||
// DeletionLists we have fully ingested but might still have
|
||||
// some keys in accumulator.
|
||||
let mut pending_lists = Vec::new();
|
||||
|
||||
// DeletionLists we have fully executed, which may be deleted
|
||||
// from remote storage.
|
||||
let mut executed_lists: Vec<DeletionList> = Vec::new();
|
||||
|
||||
while let Some(msg) = self.rx.recv().await {
|
||||
match msg {
|
||||
BackendQueueMessage::Delete(list) => {
|
||||
for key in list.objects {
|
||||
// TODO: batch into DeleteObjects calls (the DeletionList is not
|
||||
// necessarily one batch, it can be bigger or smaller)
|
||||
//
|
||||
// TODO: if the delete fails, push the DeletionList into a retry slot
|
||||
// so that we try it again rather than pulling more from the channel
|
||||
remote_storage.delete(&key).await.expect("TODO retry");
|
||||
BackendQueueMessage::Delete(mut list) => {
|
||||
if list.objects.is_empty() {
|
||||
// This shouldn't happen, but is harmless. warn so that
|
||||
// tests will fail if we have such a bug, but proceed with
|
||||
// processing subsequent messages.
|
||||
warn!("Empty DeletionList passed to deletion backend");
|
||||
executed_lists.push(list);
|
||||
continue;
|
||||
}
|
||||
|
||||
// This loop handles deletion lists that require multiple DeleteObjects requests,
|
||||
// and also handles retries if a deletion fails: we will keep going around until
|
||||
// we have either deleted everything, or we have a remainder in accumulator.
|
||||
while !list.objects.is_empty() || accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
let take_count = if accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
0
|
||||
} else {
|
||||
let available_slots = MAX_KEYS_PER_DELETE - accumulator.len();
|
||||
std::cmp::min(available_slots, list.objects.len())
|
||||
};
|
||||
|
||||
for object in list.objects.drain(list.objects.len() - take_count..) {
|
||||
accumulator.push(object);
|
||||
}
|
||||
|
||||
if accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
// Great, we got a full request: issue it.
|
||||
match remote_storage.delete_objects(&accumulator).await {
|
||||
Ok(()) => {
|
||||
accumulator.clear();
|
||||
executed_lists.append(&mut pending_lists);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Batch deletion failed: {e}, will retry");
|
||||
// TODO: increment error counter
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !accumulator.is_empty() {
|
||||
// We have a remainder, deletion list is not fully processed yet
|
||||
pending_lists.push(list);
|
||||
} else {
|
||||
// We fully processed this list, it is ready for purge
|
||||
executed_lists.push(list);
|
||||
}
|
||||
|
||||
let executed_keys: Vec<RemotePath> = executed_lists
|
||||
.iter()
|
||||
.take(MAX_KEYS_PER_DELETE)
|
||||
.map(|l| {
|
||||
RemotePath::new(&self.conf.remote_deletion_list_path(l.sequence))
|
||||
.expect("Failed to compose deletion list path")
|
||||
})
|
||||
.collect();
|
||||
match remote_storage.delete_objects(&executed_keys).await {
|
||||
Ok(()) => {
|
||||
executed_lists = executed_lists
|
||||
.into_iter()
|
||||
.skip(MAX_KEYS_PER_DELETE)
|
||||
.collect();
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to purge deletion lists: {e}");
|
||||
// Do nothing: the elements remain in executed_lists, and purge will be retried
|
||||
// next time we process some deletions and go around the loop.
|
||||
}
|
||||
}
|
||||
}
|
||||
BackendQueueMessage::Flush(op) => {
|
||||
|
||||
Reference in New Issue
Block a user