diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 18822072ff..8f2cc60a47 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -45,6 +45,9 @@ pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option = None; const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/'; +// From the S3 spec +pub const MAX_KEYS_PER_DELETE: usize = 1000; + /// Path on the remote storage, relative to some inner prefix. /// The prefix is an implementation detail, that allows representing local paths /// as the remote ones, stripping the local storage prefix away. diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 43e8d47222..1f707c316c 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -353,7 +353,7 @@ fn start_pageserver( // Set up deletion queue let deletion_queue_cancel = tokio_util::sync::CancellationToken::new(); - let (deletion_queue, deletion_frontend, deletion_backend) = + let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) = DeletionQueue::new(remote_storage.clone(), conf, deletion_queue_cancel.clone()); if let Some(mut deletion_frontend) = deletion_frontend { BACKGROUND_RUNTIME.spawn(async move { @@ -371,6 +371,14 @@ fn start_pageserver( .await }); } + if let Some(mut deletion_executor) = deletion_executor { + BACKGROUND_RUNTIME.spawn(async move { + deletion_executor + .background() + .instrument(info_span!(parent: None, "deletion executor")) + .await + }); + } // Up to this point no significant I/O has been done: this should have been fast. Record // duration prior to starting I/O intensive phase of startup. diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index e4af32125f..752040e1ef 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -1,4 +1,5 @@ mod backend; +mod executor; mod frontend; use crate::metrics::DELETION_QUEUE_SUBMITTED; @@ -13,9 +14,11 @@ use tracing::{self, debug, error}; use utils::id::{TenantId, TimelineId}; pub(crate) use self::backend::BackendQueueWorker; +use self::executor::ExecutorWorker; use self::frontend::DeletionOp; pub(crate) use self::frontend::FrontendQueueWorker; use backend::BackendQueueMessage; +use executor::ExecutorMessage; use frontend::FrontendQueueMessage; use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName}; @@ -55,7 +58,7 @@ const FAILED_REMOTE_OP_RETRIES: u32 = 10; /// a DeletionHeader #[derive(Clone)] pub struct DeletionQueue { - tx: tokio::sync::mpsc::Sender, + client: DeletionQueueClient, } #[derive(Debug)] @@ -75,6 +78,7 @@ impl FlushOp { #[derive(Clone)] pub struct DeletionQueueClient { tx: tokio::sync::mpsc::Sender, + executor_tx: tokio::sync::mpsc::Sender, } #[serde_as] @@ -168,23 +172,6 @@ impl DeletionQueueClient { .await } - /// Just like push_layers, but using some already-known remote paths, instead of abstract layer names - pub(crate) async fn push_objects( - &self, - tenant_id: TenantId, - timeline_id: TimelineId, - objects: Vec, - ) -> Result<(), DeletionQueueError> { - DELETION_QUEUE_SUBMITTED.inc_by(objects.len() as u64); - self.do_push(FrontendQueueMessage::Delete(DeletionOp { - tenant_id, - timeline_id, - layers: Vec::new(), - objects, - })) - .await - } - async fn do_flush( &self, msg: FrontendQueueMessage, @@ -223,13 +210,39 @@ impl DeletionQueueClient { debug!("flush_execute: finished flushing execution..."); Ok(()) } + + /// This interface bypasses the persistent deletion queue, and any validation + /// that this pageserver is still elegible to execute the deletions. It is for + /// use in timeline deletions, where the control plane is telling us we may + /// delete everything in the timeline. + /// + /// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`. + pub(crate) async fn push_immediate( + &self, + objects: Vec, + ) -> Result<(), DeletionQueueError> { + self.executor_tx + .send(ExecutorMessage::Delete(objects)) + .await + .map_err(|_| DeletionQueueError::ShuttingDown) + } + + /// Companion to push_immediate. When this returns Ok, all prior objects sent + /// into push_immediate have been deleted from remote storage. + pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + self.executor_tx + .send(ExecutorMessage::Flush(FlushOp { tx })) + .await + .map_err(|_| DeletionQueueError::ShuttingDown)?; + + rx.await.map_err(|_| DeletionQueueError::ShuttingDown) + } } impl DeletionQueue { pub fn new_client(&self) -> DeletionQueueClient { - DeletionQueueClient { - tx: self.tx.clone(), - } + self.client.clone() } /// Caller may use the returned object to construct clients with new_client. @@ -245,26 +258,57 @@ impl DeletionQueue { Self, Option, Option, + Option, ) { + // Deep channel: it consumes deletions from all timelines and we do not want to block them let (tx, rx) = tokio::sync::mpsc::channel(16384); + // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions + let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16); + + // Shallow channel: it carries lists of paths, and we expect the main queueing to + // happen in the backend (persistent), not in this queue. + let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16); + let remote_storage = match remote_storage { - None => return (Self { tx }, None, None), + None => { + return ( + Self { + client: DeletionQueueClient { tx, executor_tx }, + }, + None, + None, + None, + ) + } Some(r) => r, }; - let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16384); - ( - Self { tx }, + Self { + client: DeletionQueueClient { + tx, + executor_tx: executor_tx.clone(), + }, + }, Some(FrontendQueueWorker::new( remote_storage.clone(), conf, rx, backend_tx, - cancel, + cancel.clone(), + )), + Some(BackendQueueWorker::new( + remote_storage.clone(), + conf, + backend_rx, + executor_tx, + )), + Some(ExecutorWorker::new( + remote_storage, + executor_rx, + cancel.clone(), )), - Some(BackendQueueWorker::new(remote_storage, conf, backend_rx)), ) } } @@ -296,12 +340,13 @@ mod test { deletion_queue: DeletionQueue, fe_worker: JoinHandle<()>, be_worker: JoinHandle<()>, + ex_worker: JoinHandle<()>, } impl TestSetup { /// Simulate a pageserver restart by destroying and recreating the deletion queue fn restart(&mut self) { - let (deletion_queue, fe_worker, be_worker) = DeletionQueue::new( + let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new( Some(self.storage.clone()), self.harness.conf, CancellationToken::new(), @@ -311,18 +356,24 @@ mod test { let mut fe_worker = fe_worker.unwrap(); let mut be_worker = be_worker.unwrap(); + let mut ex_worker = ex_worker.unwrap(); let mut fe_worker = self .runtime .spawn(async move { fe_worker.background().await }); let mut be_worker = self .runtime .spawn(async move { be_worker.background().await }); + let mut ex_worker = self.runtime.spawn(async move { + drop(ex_worker.background().await); + }); std::mem::swap(&mut self.fe_worker, &mut fe_worker); std::mem::swap(&mut self.be_worker, &mut be_worker); + std::mem::swap(&mut self.ex_worker, &mut ex_worker); // Join the old workers self.runtime.block_on(fe_worker).unwrap(); self.runtime.block_on(be_worker).unwrap(); + self.runtime.block_on(ex_worker).unwrap(); } } @@ -356,7 +407,7 @@ mod test { )); let entered_runtime = runtime.enter(); - let (deletion_queue, fe_worker, be_worker) = DeletionQueue::new( + let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new( Some(storage.clone()), harness.conf, CancellationToken::new(), @@ -364,8 +415,12 @@ mod test { let mut fe_worker = fe_worker.unwrap(); let mut be_worker = be_worker.unwrap(); + let mut ex_worker = ex_worker.unwrap(); let fe_worker_join = runtime.spawn(async move { fe_worker.background().await }); let be_worker_join = runtime.spawn(async move { be_worker.background().await }); + let ex_worker_join = runtime.spawn(async move { + drop(ex_worker.background().await); + }); Ok(TestSetup { runtime, @@ -376,6 +431,7 @@ mod test { deletion_queue, fe_worker: fe_worker_join, be_worker: be_worker_join, + ex_worker: ex_worker_join, }) } @@ -542,6 +598,7 @@ pub mod mock { pub struct MockDeletionQueue { tx: tokio::sync::mpsc::Sender, + executor_tx: tokio::sync::mpsc::Sender, tx_pump: tokio::sync::mpsc::Sender, executed: Arc, } @@ -553,6 +610,7 @@ pub mod mock { ) -> Self { let (tx, mut rx) = tokio::sync::mpsc::channel(16384); let (tx_pump, mut rx_pump) = tokio::sync::mpsc::channel::(1); + let (executor_tx, mut executor_rx) = tokio::sync::mpsc::channel(16384); let executed = Arc::new(AtomicUsize::new(0)); let executed_bg = executed.clone(); @@ -569,6 +627,31 @@ pub mod mock { // Each time we are asked to pump, drain the queue of deletions while let Some(flush_op) = rx_pump.recv().await { info!("Executing all pending deletions"); + + // Transform all executor messages to generic frontend messages + while let Ok(msg) = executor_rx.try_recv() { + match msg { + ExecutorMessage::Delete(objects) => { + for path in objects { + match remote_storage.delete(&path).await { + Ok(_) => { + debug!("Deleted {path}"); + } + Err(e) => { + error!( + "Failed to delete {path}, leaking object! ({e})" + ); + } + } + executed_bg.fetch_add(1, Ordering::Relaxed); + } + } + ExecutorMessage::Flush(flush_op) => { + flush_op.fire(); + } + } + } + while let Ok(msg) = rx.try_recv() { match msg { FrontendQueueMessage::Delete(op) => { @@ -622,6 +705,7 @@ pub mod mock { Self { tx, tx_pump, + executor_tx, executed, } } @@ -643,6 +727,7 @@ pub mod mock { pub(crate) fn new_client(&self) -> DeletionQueueClient { DeletionQueueClient { tx: self.tx.clone(), + executor_tx: self.executor_tx.clone(), } } } diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index 9ec4be3b54..933c4da1af 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -2,14 +2,15 @@ use std::time::Duration; use remote_storage::GenericRemoteStorage; use remote_storage::RemotePath; +use remote_storage::MAX_KEYS_PER_DELETE; use tracing::debug; use tracing::info; use tracing::warn; use crate::config::PageServerConf; use crate::metrics::DELETION_QUEUE_ERRORS; -use crate::metrics::DELETION_QUEUE_EXECUTED; +use super::executor::ExecutorMessage; use super::DeletionHeader; use super::DeletionList; use super::FlushOp; @@ -18,12 +19,8 @@ use super::FlushOp; // even if we haven't accumulated enough for a full-sized DeleteObjects const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60); -// If the last attempt to execute failed, wait only this long before -// trying again. -const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100); - -// From the S3 spec -const MAX_KEYS_PER_DELETE: usize = 1000; +// If we have received this number of keys, proceed with attempting to execute +const AUTOFLUSH_KEY_COUNT: usize = 16384; #[derive(Debug)] pub(super) enum BackendQueueMessage { @@ -34,23 +31,20 @@ pub struct BackendQueueWorker { remote_storage: GenericRemoteStorage, conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, - // Accumulate up to 1000 keys for the next deletion operation - accumulator: Vec, - - // DeletionLists we have fully ingested but might still have - // some keys in accumulator. + // Accumulate some lists to execute in a batch. + // The purpose of this accumulation is to implement batched validation of + // attachment generations, when split-brain protection is implemented. + // (see https://github.com/neondatabase/neon/pull/4919) pending_lists: Vec, + // Sum of all the lengths of lists in pending_lists + pending_key_count: usize, + // DeletionLists we have fully executed, which may be deleted // from remote storage. executed_lists: Vec, - - // These FlushOps should fire the next time we flush - pending_flushes: Vec, - - // How long to wait for a message before executing anyway - timeout: Duration, } impl BackendQueueWorker { @@ -58,67 +52,16 @@ impl BackendQueueWorker { remote_storage: GenericRemoteStorage, conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, ) -> Self { Self { remote_storage, conf, rx, - accumulator: Vec::new(), + tx, pending_lists: Vec::new(), + pending_key_count: 0, executed_lists: Vec::new(), - timeout: EXECUTE_IDLE_DEADLINE, - pending_flushes: Vec::new(), - } - } - - async fn maybe_execute(&mut self) -> bool { - fail::fail_point!("deletion-queue-before-execute", |_| { - info!("Skipping execution, failpoint set"); - DELETION_QUEUE_ERRORS - .with_label_values(&["failpoint"]) - .inc(); - - // Retry fast when failpoint is active, so that when it is disabled we resume promptly - self.timeout = EXECUTE_RETRY_DEADLINE; - false - }); - - if self.accumulator.is_empty() { - for f in self.pending_flushes.drain(..) { - f.fire(); - } - return true; - } - - match self.remote_storage.delete_objects(&self.accumulator).await { - Ok(()) => { - // Note: we assume that the remote storage layer returns Ok(()) if some - // or all of the deleted objects were already gone. - DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64); - info!( - "Executed deletion batch {}..{}", - self.accumulator - .first() - .expect("accumulator should be non-empty"), - self.accumulator - .last() - .expect("accumulator should be non-empty"), - ); - self.accumulator.clear(); - self.executed_lists.append(&mut self.pending_lists); - - for f in self.pending_flushes.drain(..) { - f.fire(); - } - self.timeout = EXECUTE_IDLE_DEADLINE; - true - } - Err(e) => { - warn!("DeleteObjects request failed: {e:#}, will retry"); - DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc(); - self.timeout = EXECUTE_RETRY_DEADLINE; - false - } } } @@ -187,6 +130,34 @@ impl BackendQueueWorker { } } + pub async fn flush(&mut self) { + let mut onward_lists: Vec = Vec::new(); + std::mem::swap(&mut onward_lists, &mut self.pending_lists); + for list in onward_lists { + let objects = list.objects.clone(); + // TODO: a take_objects method + self.executed_lists.push(list); + if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await { + warn!("Shutting down"); + return; + }; + } + + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let flush_op = FlushOp { tx }; + if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await { + warn!("Shutting down"); + return; + }; + + if rx.await.is_err() { + warn!("Shutting down"); + return; + } + + self.cleanup_lists().await; + } + pub async fn background(&mut self) { // TODO: if we would like to be able to defer deletions while a Layer still has // refs (but it will be elegible for deletion after process ends), then we may @@ -194,10 +165,8 @@ impl BackendQueueWorker { // in the deletion list may not be deleted yet, with guards to block on while // we wait to proceed. - self.accumulator.reserve(MAX_KEYS_PER_DELETE); - loop { - let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await { + let msg = match tokio::time::timeout(EXECUTE_IDLE_DEADLINE, self.rx.recv()).await { Ok(Some(m)) => m, Ok(None) => { // All queue senders closed @@ -207,15 +176,14 @@ impl BackendQueueWorker { Err(_) => { // Timeout, we hit deadline to execute whatever we have in hand. These functions will // return immediately if no work is pending - self.maybe_execute().await; - self.cleanup_lists().await; + self.flush().await; continue; } }; match msg { - BackendQueueMessage::Delete(mut list) => { + BackendQueueMessage::Delete(list) => { if list.objects.is_empty() { // This shouldn't happen, but is harmless. warn so that // tests will fail if we have such a bug, but proceed with @@ -225,58 +193,16 @@ impl BackendQueueWorker { continue; } - // This loop handles deletion lists that require multiple DeleteObjects requests, - // and also handles retries if a deletion fails: we will keep going around until - // we have either deleted everything, or we have a remainder in accumulator. - while !list.objects.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE - { - let take_count = if self.accumulator.len() == MAX_KEYS_PER_DELETE { - 0 - } else { - let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len(); - std::cmp::min(available_slots, list.objects.len()) - }; + self.pending_key_count += list.objects.len(); + self.pending_lists.push(list); - for object in list.objects.drain(list.objects.len() - take_count..) { - self.accumulator.push(object); - } - - if self.accumulator.len() == MAX_KEYS_PER_DELETE { - // Great, we got a full request: issue it. - if !self.maybe_execute().await { - // Failed to execute: retry delay - tokio::time::sleep(EXECUTE_RETRY_DEADLINE).await; - }; - } + if self.pending_key_count > AUTOFLUSH_KEY_COUNT { + self.flush().await; } - - if !self.accumulator.is_empty() { - // We have a remainder, `list` not fully executed yet - self.pending_lists.push(list); - } else { - // We fully processed this list, it is ready for purge - self.executed_lists.push(list); - } - - self.cleanup_lists().await; } BackendQueueMessage::Flush(op) => { - if self.accumulator.is_empty() { - op.fire(); - continue; - } - - self.maybe_execute().await; - - if self.accumulator.is_empty() { - // Successful flush. Clean up lists before firing, for the benefit of tests that would - // like to have a deterministic state post-flush. - self.cleanup_lists().await; - op.fire(); - } else { - // We didn't flush inline: defer until next time we successfully drain accumulatorr - self.pending_flushes.push(op); - } + self.flush().await; + op.fire(); } } } diff --git a/pageserver/src/deletion_queue/executor.rs b/pageserver/src/deletion_queue/executor.rs new file mode 100644 index 0000000000..8e28f0d589 --- /dev/null +++ b/pageserver/src/deletion_queue/executor.rs @@ -0,0 +1,143 @@ +use remote_storage::GenericRemoteStorage; +use remote_storage::RemotePath; +use remote_storage::MAX_KEYS_PER_DELETE; +use std::time::Duration; +use tokio_util::sync::CancellationToken; +use tracing::info; +use tracing::warn; + +use crate::metrics::DELETION_QUEUE_ERRORS; +use crate::metrics::DELETION_QUEUE_EXECUTED; + +use super::DeletionQueueError; +use super::FlushOp; + +const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10); + +pub(super) enum ExecutorMessage { + Delete(Vec), + Flush(FlushOp), +} + +/// Non-persistent deletion queue, for coalescing multiple object deletes into +/// larger DeleteObjects requests. +pub struct ExecutorWorker { + // Accumulate up to 1000 keys for the next deletion operation + accumulator: Vec, + + rx: tokio::sync::mpsc::Receiver, + + cancel: CancellationToken, + remote_storage: GenericRemoteStorage, +} + +impl ExecutorWorker { + pub(super) fn new( + remote_storage: GenericRemoteStorage, + rx: tokio::sync::mpsc::Receiver, + cancel: CancellationToken, + ) -> Self { + Self { + remote_storage, + rx, + cancel, + accumulator: Vec::new(), + } + } + + /// Wrap the remote `delete_objects` with a failpoint + pub async fn remote_delete(&self) -> Result<(), anyhow::Error> { + fail::fail_point!("deletion-queue-before-execute", |_| { + info!("Skipping execution, failpoint set"); + DELETION_QUEUE_ERRORS + .with_label_values(&["failpoint"]) + .inc(); + return Err(anyhow::anyhow!("failpoint hit")); + }); + + self.remote_storage.delete_objects(&self.accumulator).await + } + + /// Block until everything in accumulator has been executed + pub async fn flush(&mut self) -> Result<(), DeletionQueueError> { + while !self.accumulator.is_empty() && !self.cancel.is_cancelled() { + match self.remote_delete().await { + Ok(()) => { + // Note: we assume that the remote storage layer returns Ok(()) if some + // or all of the deleted objects were already gone. + DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64); + info!( + "Executed deletion batch {}..{}", + self.accumulator + .first() + .expect("accumulator should be non-empty"), + self.accumulator + .last() + .expect("accumulator should be non-empty"), + ); + self.accumulator.clear(); + } + Err(e) => { + warn!("DeleteObjects request failed: {e:#}, will retry"); + DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc(); + } + }; + } + if self.cancel.is_cancelled() { + // Expose an error because we may not have actually flushed everything + Err(DeletionQueueError::ShuttingDown) + } else { + Ok(()) + } + } + + pub async fn background(&mut self) -> Result<(), DeletionQueueError> { + self.accumulator.reserve(MAX_KEYS_PER_DELETE); + + loop { + if self.cancel.is_cancelled() { + return Err(DeletionQueueError::ShuttingDown); + } + + let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await { + Ok(Some(m)) => m, + Ok(None) => { + // All queue senders closed + info!("Shutting down"); + return Err(DeletionQueueError::ShuttingDown); + } + Err(_) => { + // Timeout, we hit deadline to execute whatever we have in hand. These functions will + // return immediately if no work is pending + self.flush().await?; + + continue; + } + }; + + match msg { + ExecutorMessage::Delete(mut list) => { + while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE { + if self.accumulator.len() == MAX_KEYS_PER_DELETE { + self.flush().await?; + // If we have received this number of keys, proceed with attempting to execute + assert_eq!(self.accumulator.len(), 0); + } + + let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len(); + let take_count = std::cmp::min(available_slots, list.len()); + for path in list.drain(list.len() - take_count..) { + self.accumulator.push(path); + } + } + } + ExecutorMessage::Flush(flush_op) => { + // If flush() errors, we drop the flush_op and the caller will get + // an error recv()'ing their oneshot channel. + self.flush().await?; + flush_op.fire(); + } + } + } + } +} diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 947679c47d..1960183285 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -821,9 +821,19 @@ impl RemoteTimelineClient { let layer_deletion_count = layers.len(); - deletion_queue - .push_layers(self.tenant_id, self.timeline_id, layers) - .await?; + let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id); + let layer_paths = layers + .into_iter() + .map(|l| { + let local_path = timeline_path.join(l.file_name()); + let remote_path = self + .conf + .remote_path(&local_path) + .expect("Timeline path should always convert to remote"); + remote_path + }) + .collect(); + deletion_queue.push_immediate(layer_paths).await?; // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage @@ -832,7 +842,7 @@ impl RemoteTimelineClient { // Execute all pending deletions, so that when we prroceed to do a list_prefixes below, we aren't // taking the burden of listing all the layers that we already know we should delete. - deletion_queue.flush_execute().await?; + deletion_queue.flush_immediate().await?; let remaining = backoff::retry( || async { @@ -863,9 +873,7 @@ impl RemoteTimelineClient { let not_referenced_count = remaining.len(); if !remaining.is_empty() { - deletion_queue - .push_objects(self.tenant_id, self.timeline_id, remaining) - .await?; + deletion_queue.push_immediate(remaining).await?; } fail::fail_point!("timeline-delete-before-index-delete", |_| { @@ -878,12 +886,12 @@ impl RemoteTimelineClient { debug!("enqueuing index part deletion"); deletion_queue - .push_objects(self.tenant_id, self.timeline_id, [index_file_path].to_vec()) + .push_immediate([index_file_path].to_vec()) .await?; // Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait // for a flush to a persistent deletion list so that we may be sure deletion will occur. - deletion_queue.flush_execute().await?; + deletion_queue.flush_immediate().await?; fail::fail_point!("timeline-delete-after-index-delete", |_| { Err(anyhow::anyhow!( diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index cf5465be3f..bb6014ede4 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -52,7 +52,7 @@ def test_tenant_delete_smoke( # The deletion queue will complain when it encounters simulated S3 errors ".*deletion frontend: Failed to write deletion list.*", ".*deletion backend: Failed to delete deletion list.*", - ".*deletion backend: DeleteObjects request failed.*", + ".*deletion executor: DeleteObjects request failed.*", ".*deletion backend: Failed to upload deletion queue header.*", ] ) @@ -218,7 +218,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints( # The deletion queue will complain when it encounters simulated S3 errors ".*deletion frontend: Failed to write deletion list.*", ".*deletion backend: Failed to delete deletion list.*", - ".*deletion backend: DeleteObjects request failed.*", + ".*deletion executor: DeleteObjects request failed.*", ".*deletion backend: Failed to upload deletion queue header.*", ] )