diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 8bd8f4eeb3..0c43dc9787 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -1,4 +1,4 @@ -use crate::metrics::{DELETION_QUEUE_EXECUTED, DELETION_QUEUE_SUBMITTED}; +use crate::metrics::{DELETION_QUEUE_ERRORS, DELETION_QUEUE_EXECUTED, DELETION_QUEUE_SUBMITTED}; use remote_storage::{GenericRemoteStorage, RemotePath}; use serde::Deserialize; use serde::Serialize; @@ -24,13 +24,15 @@ const FLUSH_DEFAULT_DEADLINE: Duration = Duration::from_millis(10000); // more objects before doing the flush. const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100); -// TODO: metrics for queue length, deletions executed, deletion errors +// After this length of time, execute deletions which are elegible to run, +// even if we haven't accumulated enough for a full-sized DeleteObjects +const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60); + +// If the last attempt to execute failed, wait only this long before +// trying again. +const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_secs(1); // TODO: adminstrative "panic button" config property to disable all deletions - -// TODO: implement admin API hook to flush deletion queue, for use in integration tests -// that would like to assert deleted objects are gone - // TODO: configurable for how long to wait before executing deletions /// We aggregate object deletions from many tenants in one place, for several reasons: @@ -40,14 +42,11 @@ const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100); /// - Globally control throughput of deletions, as these are a low priority task: do /// not compete with the same S3 clients/connections used for higher priority uploads. /// -/// There are two parts ot this, frontend and backend, joined by channels: +/// There are two parts to this, frontend and backend, joined by channels: /// - DeletionQueueWorker consumes the frontend queue: the "DeletionQueue" that makes up /// the public interface and accepts deletion requests. /// - BackendQueueWorker consumes the backend queue: a queue of DeletionList that have /// already been written to S3 and are now eligible for final deletion. -/// -/// -/// /// /// There are three queues internally: /// - Incoming deletes (the DeletionQueue that the outside world sees) @@ -139,6 +138,7 @@ impl DeletionQueueClient { layers: Vec, ) { DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64); + info!("pushed!"); self.do_push(FrontendQueueMessage::Delete(DeletionOp { tenant_id, timeline_id, @@ -166,6 +166,10 @@ impl DeletionQueueClient { // Wait until all previous deletions are executed pub async fn flush_execute(&self) { + // Flush any buffered work to deletion lists + self.flush().await; + + // Flush execution of deletion lists let (tx, rx) = tokio::sync::oneshot::channel::<()>(); self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx) .await @@ -187,30 +191,52 @@ pub struct BackendQueueWorker { // DeletionLists we have fully executed, which may be deleted // from remote storage. executed_lists: Vec, + + // How long to wait for a message before executing anyway + timeout: Duration, } impl BackendQueueWorker { - async fn maybe_execute(&mut self) { + async fn maybe_execute(&mut self) -> bool { fail::fail_point!("deletion-queue-before-execute", |_| { - return; + info!("Skipping execution, failpoint set"); + DELETION_QUEUE_ERRORS + .with_label_values(&["failpoint"]) + .inc(); + return false; }); + if self.accumulator.is_empty() { + return true; + } + match self.remote_storage.delete_objects(&self.accumulator).await { Ok(()) => { DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64); + info!( + "Executed deletion batch {}..{}", + self.accumulator + .first() + .expect("accumulator should be non-empty"), + self.accumulator + .last() + .expect("accumulator should be non-empty"), + ); self.accumulator.clear(); self.executed_lists.append(&mut self.pending_lists); + self.timeout = EXECUTE_IDLE_DEADLINE; + true } Err(e) => { - warn!("Batch deletion failed: {e}, will retry"); - // TODO: increment error counter + warn!("DeleteObjects request failed: {e}, will retry"); + DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc(); + self.timeout = EXECUTE_RETRY_DEADLINE; + false } } } pub async fn background(&mut self) { - let _span = tracing::info_span!("deletion_backend"); - // TODO: if we would like to be able to defer deletions while a Layer still has // refs (but it will be elegible for deletion after process ends), then we may // add an ephemeral part to BackendQueueMessage::Delete that tracks which keys @@ -222,7 +248,22 @@ impl BackendQueueWorker { self.accumulator.reserve(MAX_KEYS_PER_DELETE); - while let Some(msg) = self.rx.recv().await { + loop { + let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await { + Ok(Some(m)) => m, + Ok(None) => { + // All queue senders closed + info!("Shutting down"); + break; + } + Err(_) => { + // Timeout, we hit deadline to execute whatever we have in hand + self.maybe_execute().await; + + continue; + } + }; + match msg { BackendQueueMessage::Delete(mut list) => { if list.objects.is_empty() { @@ -282,16 +323,17 @@ impl BackendQueueWorker { .truncate(self.executed_lists.len() - executed_keys.len()); } Err(e) => { - warn!("Failed to purge deletion lists: {e}"); + warn!("Failed to delete deletion list(s): {e}"); // Do nothing: the elements remain in executed_lists, and purge will be retried // next time we process some deletions and go around the loop. + DELETION_QUEUE_ERRORS + .with_label_values(&["delete_list"]) + .inc(); } } } BackendQueueMessage::Flush(op) => { - while !self.accumulator.is_empty() { - self.maybe_execute().await; - } + self.maybe_execute().await; op.fire(); } @@ -341,8 +383,29 @@ impl FrontendQueueWorker { let size = bytes.len(); let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); + if self.pending.objects.is_empty() { + // We do not expect to be called in this state, but handle it so that later + // logging code can be assured that therre is always a first+last key to print + for f in self.pending_flushes.drain(..) { + f.fire(); + } + return; + } + match self.remote_storage.upload(source, size, &key, None).await { Ok(_) => { + info!( + "Stored deletion list {key} ({0}..{1})", + self.pending + .objects + .first() + .expect("list should be non-empty"), + self.pending + .objects + .last() + .expect("list should be non-empty"), + ); + for f in self.pending_flushes.drain(..) { f.fire(); } @@ -361,9 +424,10 @@ impl FrontendQueueWorker { } } Err(e) => { + DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc(); warn!( sequence = self.pending.sequence, - "Failed to flush deletion list, will retry later ({e})" + "Failed to write deletion list to remote storage, will retry later ({e})" ) } } @@ -372,6 +436,7 @@ impl FrontendQueueWorker { /// This is the front-end ingest, where we bundle up deletion requests into DeletionList /// and write them out, for later pub async fn background(&mut self) { + info!("Started deletion frontend worker"); loop { let flush_delay = self.deadline.duration_since(Instant::now()); @@ -400,7 +465,7 @@ impl FrontendQueueWorker { let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id); let _span = tracing::info_span!( - "execute_deletion", + "deletion_frontend_enqueue", tenant_id = %op.tenant_id, timeline_id = %op.timeline_id, ); @@ -498,6 +563,7 @@ impl DeletionQueue { accumulator: Vec::new(), pending_lists: Vec::new(), executed_lists: Vec::new(), + timeout: EXECUTE_IDLE_DEADLINE, }), ) } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 4735ead418..2bdf880373 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -660,7 +660,7 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy = Lazy::new pub(crate) static DELETION_QUEUE_SUBMITTED: Lazy = Lazy::new(|| { register_int_counter!( - "pageserver_deletion_queue_submitted", + "pageserver_deletion_queue_submitted_total", "Number of objects submitted for deletion" ) .expect("failed to define a metric") @@ -668,12 +668,21 @@ pub(crate) static DELETION_QUEUE_SUBMITTED: Lazy = Lazy::new(|| { pub(crate) static DELETION_QUEUE_EXECUTED: Lazy = Lazy::new(|| { register_int_counter!( - "pageserver_deletion_queue_executed", + "pageserver_deletion_queue_executed_total", "Number of objects deleted" ) .expect("failed to define a metric") }); +pub(crate) static DELETION_QUEUE_ERRORS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_deletion_queue_errors_total", + "Incremented on retryable remote I/O errors writing deletion lists or executing deletions.", + &["op_kind"], + ) + .expect("failed to define a metric") +}); + static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( "pageserver_remote_timeline_client_bytes_started",