diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 2a98f17ec8..8561ab7055 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -169,16 +169,91 @@ impl BackendQueueWorker { // add an ephemeral part to BackendQueueMessage::Delete that tracks which keys // in the deletion list may not be deleted yet, with guards to block on while // we wait to proceed. + + // From the S3 spec + const MAX_KEYS_PER_DELETE: usize = 1024; + + let mut accumulator = Vec::new(); + accumulator.reserve(MAX_KEYS_PER_DELETE); + + // DeletionLists we have fully ingested but might still have + // some keys in accumulator. + let mut pending_lists = Vec::new(); + + // DeletionLists we have fully executed, which may be deleted + // from remote storage. + let mut executed_lists: Vec = Vec::new(); + while let Some(msg) = self.rx.recv().await { match msg { - BackendQueueMessage::Delete(list) => { - for key in list.objects { - // TODO: batch into DeleteObjects calls (the DeletionList is not - // necessarily one batch, it can be bigger or smaller) - // - // TODO: if the delete fails, push the DeletionList into a retry slot - // so that we try it again rather than pulling more from the channel - remote_storage.delete(&key).await.expect("TODO retry"); + BackendQueueMessage::Delete(mut list) => { + if list.objects.is_empty() { + // This shouldn't happen, but is harmless. warn so that + // tests will fail if we have such a bug, but proceed with + // processing subsequent messages. + warn!("Empty DeletionList passed to deletion backend"); + executed_lists.push(list); + continue; + } + + // This loop handles deletion lists that require multiple DeleteObjects requests, + // and also handles retries if a deletion fails: we will keep going around until + // we have either deleted everything, or we have a remainder in accumulator. + while !list.objects.is_empty() || accumulator.len() == MAX_KEYS_PER_DELETE { + let take_count = if accumulator.len() == MAX_KEYS_PER_DELETE { + 0 + } else { + let available_slots = MAX_KEYS_PER_DELETE - accumulator.len(); + std::cmp::min(available_slots, list.objects.len()) + }; + + for object in list.objects.drain(list.objects.len() - take_count..) { + accumulator.push(object); + } + + if accumulator.len() == MAX_KEYS_PER_DELETE { + // Great, we got a full request: issue it. + match remote_storage.delete_objects(&accumulator).await { + Ok(()) => { + accumulator.clear(); + executed_lists.append(&mut pending_lists); + } + Err(e) => { + warn!("Batch deletion failed: {e}, will retry"); + // TODO: increment error counter + } + } + } + } + + if !accumulator.is_empty() { + // We have a remainder, deletion list is not fully processed yet + pending_lists.push(list); + } else { + // We fully processed this list, it is ready for purge + executed_lists.push(list); + } + + let executed_keys: Vec = executed_lists + .iter() + .take(MAX_KEYS_PER_DELETE) + .map(|l| { + RemotePath::new(&self.conf.remote_deletion_list_path(l.sequence)) + .expect("Failed to compose deletion list path") + }) + .collect(); + match remote_storage.delete_objects(&executed_keys).await { + Ok(()) => { + executed_lists = executed_lists + .into_iter() + .skip(MAX_KEYS_PER_DELETE) + .collect(); + } + Err(e) => { + warn!("Failed to purge deletion lists: {e}"); + // Do nothing: the elements remain in executed_lists, and purge will be retried + // next time we process some deletions and go around the loop. + } } } BackendQueueMessage::Flush(op) => {