From 537eca489e086b6045a4d1d6f7fab8232b5cba11 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 10 Aug 2023 13:40:56 +0100 Subject: [PATCH] Implement flush_execute() in deletion queue --- pageserver/src/deletion_queue.rs | 160 ++++++++++++++++++++----------- 1 file changed, 105 insertions(+), 55 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 8561ab7055..93abcb4bc7 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -52,13 +52,16 @@ const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100); /// of 1024 for execution via a DeleteObjects call. #[derive(Clone)] pub struct DeletionQueue { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, } #[derive(Debug)] -enum QueueMessage { +enum FrontendQueueMessage { Delete(DeletionOp), + // Wait until all prior deletions make it into a persistent DeletionList Flush(FlushOp), + // Wait until all prior deletions have been executed (i.e. objects are actually deleted) + FlushExecute(FlushOp), } #[derive(Debug)] @@ -84,7 +87,7 @@ impl FlushOp { #[derive(Clone)] pub struct DeletionQueueClient { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, } #[serde_as] @@ -108,7 +111,7 @@ impl DeletionList { } impl DeletionQueueClient { - async fn do_push(&self, msg: QueueMessage) { + async fn do_push(&self, msg: FrontendQueueMessage) { match self.tx.send(msg).await { Ok(_) => {} Err(e) => { @@ -120,13 +123,17 @@ impl DeletionQueueClient { } } + /// Submit a list of layers for deletion: this function will return before the deletion is + /// persistent, but it may be executed at any time after this function enters: do not push + /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer + /// references them). pub async fn push( &self, tenant_id: TenantId, timeline_id: TimelineId, layers: Vec, ) { - self.do_push(QueueMessage::Delete(DeletionOp { + self.do_push(FrontendQueueMessage::Delete(DeletionOp { tenant_id, timeline_id, layers, @@ -134,9 +141,8 @@ impl DeletionQueueClient { .await; } - pub async fn flush(&self) { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.do_push(QueueMessage::Flush(FlushOp { tx })).await; + async fn do_flush(&self, msg: FrontendQueueMessage, rx: tokio::sync::oneshot::Receiver<()>) { + self.do_push(msg).await; if let Err(_) = rx.await { // This shouldn't happen if tenants are shut down before deletion queue. If we // encounter a bug like this, then a flusher will incorrectly believe it has flushed @@ -144,16 +150,42 @@ impl DeletionQueueClient { error!("Deletion queue dropped flush op while client was still waiting"); } } + + /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList) + pub async fn flush(&self) { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx) + .await + } + + // Wait until all previous deletions are executed + pub async fn flush_execute(&self) { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx) + .await + } } pub struct BackendQueueWorker { remote_storage: Option, conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, + + // Accumulate up to 1024 keys for the next deletion operation + accumulator: Vec, + + // DeletionLists we have fully ingested but might still have + // some keys in accumulator. + pending_lists: Vec, + + // DeletionLists we have fully executed, which may be deleted + // from remote storage. + executed_lists: Vec, } impl BackendQueueWorker { - pub async fn background(&mut self) { + async fn maybe_execute(&mut self) { + // TODO: refactor so that worker is just not constructed if there is no remote let remote_storage = match &self.remote_storage { Some(rs) => rs, None => { @@ -162,6 +194,19 @@ impl BackendQueueWorker { } }; + match remote_storage.delete_objects(&self.accumulator).await { + Ok(()) => { + self.accumulator.clear(); + self.executed_lists.append(&mut self.pending_lists); + } + Err(e) => { + warn!("Batch deletion failed: {e}, will retry"); + // TODO: increment error counter + } + } + } + + pub async fn background(&mut self) { let _span = tracing::info_span!("deletion_backend"); // TODO: if we would like to be able to defer deletions while a Layer still has @@ -173,16 +218,7 @@ impl BackendQueueWorker { // From the S3 spec const MAX_KEYS_PER_DELETE: usize = 1024; - let mut accumulator = Vec::new(); - accumulator.reserve(MAX_KEYS_PER_DELETE); - - // DeletionLists we have fully ingested but might still have - // some keys in accumulator. - let mut pending_lists = Vec::new(); - - // DeletionLists we have fully executed, which may be deleted - // from remote storage. - let mut executed_lists: Vec = Vec::new(); + self.accumulator.reserve(MAX_KEYS_PER_DELETE); while let Some(msg) = self.rx.recv().await { match msg { @@ -192,49 +228,42 @@ impl BackendQueueWorker { // tests will fail if we have such a bug, but proceed with // processing subsequent messages. warn!("Empty DeletionList passed to deletion backend"); - executed_lists.push(list); + self.executed_lists.push(list); continue; } // This loop handles deletion lists that require multiple DeleteObjects requests, // and also handles retries if a deletion fails: we will keep going around until // we have either deleted everything, or we have a remainder in accumulator. - while !list.objects.is_empty() || accumulator.len() == MAX_KEYS_PER_DELETE { - let take_count = if accumulator.len() == MAX_KEYS_PER_DELETE { + while !list.objects.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE + { + let take_count = if self.accumulator.len() == MAX_KEYS_PER_DELETE { 0 } else { - let available_slots = MAX_KEYS_PER_DELETE - accumulator.len(); + let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len(); std::cmp::min(available_slots, list.objects.len()) }; for object in list.objects.drain(list.objects.len() - take_count..) { - accumulator.push(object); + self.accumulator.push(object); } - if accumulator.len() == MAX_KEYS_PER_DELETE { + if self.accumulator.len() == MAX_KEYS_PER_DELETE { // Great, we got a full request: issue it. - match remote_storage.delete_objects(&accumulator).await { - Ok(()) => { - accumulator.clear(); - executed_lists.append(&mut pending_lists); - } - Err(e) => { - warn!("Batch deletion failed: {e}, will retry"); - // TODO: increment error counter - } - } + self.maybe_execute().await; } } - if !accumulator.is_empty() { + if !self.accumulator.is_empty() { // We have a remainder, deletion list is not fully processed yet - pending_lists.push(list); + self.pending_lists.push(list); } else { // We fully processed this list, it is ready for purge - executed_lists.push(list); + self.executed_lists.push(list); } - let executed_keys: Vec = executed_lists + let executed_keys: Vec = self + .executed_lists .iter() .take(MAX_KEYS_PER_DELETE) .map(|l| { @@ -242,12 +271,20 @@ impl BackendQueueWorker { .expect("Failed to compose deletion list path") }) .collect(); + + // TODO: refactor so that worker is just not constructed if there is no remote + let remote_storage = match &mut self.remote_storage { + Some(rs) => rs, + None => { + info!("No remote storage configured, deletion queue will not run"); + return; + } + }; match remote_storage.delete_objects(&executed_keys).await { Ok(()) => { - executed_lists = executed_lists - .into_iter() - .skip(MAX_KEYS_PER_DELETE) - .collect(); + // Retain any lists that couldn't be deleted in that request + self.executed_lists = + self.executed_lists.split_off(MAX_KEYS_PER_DELETE); } Err(e) => { warn!("Failed to purge deletion lists: {e}"); @@ -257,8 +294,10 @@ impl BackendQueueWorker { } } BackendQueueMessage::Flush(op) => { - // TODO: add an extra frrontend flush type that passes through to this flush - // We have implicitly already processed preceeding deletions + while !self.accumulator.is_empty() { + self.maybe_execute().await; + } + op.fire(); } } @@ -277,7 +316,7 @@ pub struct FrontendQueueWorker { conf: &'static PageServerConf, // Incoming frontend requests to delete some keys - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::Receiver, // Outbound requests to the backend to execute deletion lists we have composed. tx: tokio::sync::mpsc::Sender, @@ -364,7 +403,7 @@ impl FrontendQueueWorker { }; match msg { - QueueMessage::Delete(op) => { + FrontendQueueMessage::Delete(op) => { let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id); let _span = tracing::info_span!( @@ -385,7 +424,7 @@ impl FrontendQueueWorker { self.pending.objects.push(path); } } - QueueMessage::Flush(op) => { + FrontendQueueMessage::Flush(op) => { if self.pending.objects.is_empty() { // Execute immediately op.fire() @@ -400,6 +439,13 @@ impl FrontendQueueWorker { } } } + FrontendQueueMessage::FlushExecute(op) => { + // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute + if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await { + info!("Can't flush, shutting down ({e})"); + // Caller will get error when their oneshot sender was dropped. + } + } } if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE { @@ -442,6 +488,9 @@ impl DeletionQueue { remote_storage, conf, rx: backend_rx, + accumulator: Vec::new(), + pending_lists: Vec::new(), + executed_lists: Vec::new(), }, ) } @@ -458,7 +507,7 @@ pub mod mock { }; pub struct MockDeletionQueue { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, tx_pump: tokio::sync::mpsc::Sender, executed: Arc, } @@ -489,7 +538,7 @@ pub mod mock { info!("Executing all pending deletions"); while let Ok(msg) = rx.try_recv() { match msg { - QueueMessage::Delete(op) => { + FrontendQueueMessage::Delete(op) => { let timeline_path = conf.timeline_path(&op.tenant_id, &op.timeline_id); @@ -521,11 +570,12 @@ pub mod mock { executed_bg.fetch_add(1, Ordering::Relaxed); } } - QueueMessage::Flush(op) => { - if let Err(_) = op.tx.send(()) { - // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush. - debug!("deletion queue flush from dropped client"); - }; + FrontendQueueMessage::Flush(op) => { + op.fire(); + } + FrontendQueueMessage::FlushExecute(op) => { + // We have already executed all prior deletions because mock does them inline + op.fire(); } } info!("All pending deletions have been executed");