From 416026381fa25f709fe5bfb25b20354a8cac31c7 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 22 Aug 2023 16:38:13 +0100 Subject: [PATCH] deletion queue: refactor into frontend/backend modules --- pageserver/src/deletion_queue.rs | 671 +--------------------- pageserver/src/deletion_queue/backend.rs | 284 +++++++++ pageserver/src/deletion_queue/frontend.rs | 408 +++++++++++++ 3 files changed, 717 insertions(+), 646 deletions(-) create mode 100644 pageserver/src/deletion_queue/backend.rs create mode 100644 pageserver/src/deletion_queue/frontend.rs diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index d6fc80f567..e4af32125f 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -1,52 +1,32 @@ -use crate::metrics::{DELETION_QUEUE_ERRORS, DELETION_QUEUE_EXECUTED, DELETION_QUEUE_SUBMITTED}; -use regex::Regex; -use remote_storage::DownloadError; +mod backend; +mod frontend; + +use crate::metrics::DELETION_QUEUE_SUBMITTED; use remote_storage::{GenericRemoteStorage, RemotePath}; use serde::Deserialize; use serde::Serialize; use serde_with::serde_as; use thiserror::Error; use tokio; -use tokio::time::Duration; use tokio_util::sync::CancellationToken; -use tracing::{self, debug, error, info, warn}; -use utils::backoff; +use tracing::{self, debug, error}; use utils::id::{TenantId, TimelineId}; +pub(crate) use self::backend::BackendQueueWorker; +use self::frontend::DeletionOp; +pub(crate) use self::frontend::FrontendQueueWorker; +use backend::BackendQueueMessage; +use frontend::FrontendQueueMessage; + use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName}; -// The number of keys in a DeletionList before we will proactively persist it -// (without reaching a flush deadline). This aims to deliver objects of the order -// of magnitude 1MB when we are under heavy delete load. -const DELETION_LIST_TARGET_SIZE: usize = 16384; - -// Ordinarily, we only flush to DeletionList periodically, to bound the window during -// which we might leak objects from not flushing a DeletionList after -// the objects are already unlinked from timeline metadata. -const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000); - -// If someone is waiting for a flush to DeletionList, only delay a little to accumulate -// more objects before doing the flush. -const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100); - -// After this length of time, execute deletions which are elegible to run, -// even if we haven't accumulated enough for a full-sized DeleteObjects -const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60); - -// If the last attempt to execute failed, wait only this long before -// trying again. -const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100); - -// From the S3 spec -const MAX_KEYS_PER_DELETE: usize = 1000; - // Arbitrary thresholds for retries: we do not depend on success // within OP_RETRIES, as workers will just go around their consume loop: // the purpose of the backoff::retries with these constants are to // retry _sooner_ than we would if going around the whole loop. -pub(crate) const FAILED_REMOTE_OP_WARN_THRESHOLD: u32 = 3; +const FAILED_REMOTE_OP_WARN_THRESHOLD: u32 = 3; -pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10; +const FAILED_REMOTE_OP_RETRIES: u32 = 10; // TODO: adminstrative "panic button" config property to disable all deletions // TODO: configurable for how long to wait before executing deletions @@ -78,26 +58,6 @@ pub struct DeletionQueue { tx: tokio::sync::mpsc::Sender, } -#[derive(Debug)] -enum FrontendQueueMessage { - Delete(DeletionOp), - // Wait until all prior deletions make it into a persistent DeletionList - Flush(FlushOp), - // Wait until all prior deletions have been executed (i.e. objects are actually deleted) - FlushExecute(FlushOp), -} - -#[derive(Debug)] -struct DeletionOp { - tenant_id: TenantId, - timeline_id: TimelineId, - // `layers` and `objects` are both just lists of objects. `layers` is used if you do not - // have a config object handy to project it to a remote key, and need the consuming worker - // to do it for you. - layers: Vec, - objects: Vec, -} - #[derive(Debug)] struct FlushOp { tx: tokio::sync::oneshot::Sender<()>, @@ -192,7 +152,7 @@ impl DeletionQueueClient { /// persistent, but it may be executed at any time after this function enters: do not push /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer /// references them). - pub async fn push_layers( + pub(crate) async fn push_layers( &self, tenant_id: TenantId, timeline_id: TimelineId, @@ -209,7 +169,7 @@ impl DeletionQueueClient { } /// Just like push_layers, but using some already-known remote paths, instead of abstract layer names - pub async fn push_objects( + pub(crate) async fn push_objects( &self, tenant_id: TenantId, timeline_id: TimelineId, @@ -250,7 +210,7 @@ impl DeletionQueueClient { } // Wait until all previous deletions are executed - pub async fn flush_execute(&self) -> Result<(), DeletionQueueError> { + pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> { debug!("flush_execute: flushing to deletion lists..."); // Flush any buffered work to deletion lists self.flush().await?; @@ -265,579 +225,6 @@ impl DeletionQueueClient { } } -pub struct BackendQueueWorker { - remote_storage: GenericRemoteStorage, - conf: &'static PageServerConf, - rx: tokio::sync::mpsc::Receiver, - - // Accumulate up to 1000 keys for the next deletion operation - accumulator: Vec, - - // DeletionLists we have fully ingested but might still have - // some keys in accumulator. - pending_lists: Vec, - - // DeletionLists we have fully executed, which may be deleted - // from remote storage. - executed_lists: Vec, - - // These FlushOps should fire the next time we flush - pending_flushes: Vec, - - // How long to wait for a message before executing anyway - timeout: Duration, -} - -impl BackendQueueWorker { - async fn maybe_execute(&mut self) -> bool { - fail::fail_point!("deletion-queue-before-execute", |_| { - info!("Skipping execution, failpoint set"); - DELETION_QUEUE_ERRORS - .with_label_values(&["failpoint"]) - .inc(); - - // Retry fast when failpoint is active, so that when it is disabled we resume promptly - self.timeout = EXECUTE_RETRY_DEADLINE; - false - }); - - if self.accumulator.is_empty() { - for f in self.pending_flushes.drain(..) { - f.fire(); - } - return true; - } - - match self.remote_storage.delete_objects(&self.accumulator).await { - Ok(()) => { - // Note: we assume that the remote storage layer returns Ok(()) if some - // or all of the deleted objects were already gone. - DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64); - info!( - "Executed deletion batch {}..{}", - self.accumulator - .first() - .expect("accumulator should be non-empty"), - self.accumulator - .last() - .expect("accumulator should be non-empty"), - ); - self.accumulator.clear(); - self.executed_lists.append(&mut self.pending_lists); - - for f in self.pending_flushes.drain(..) { - f.fire(); - } - self.timeout = EXECUTE_IDLE_DEADLINE; - true - } - Err(e) => { - warn!("DeleteObjects request failed: {e:#}, will retry"); - DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc(); - self.timeout = EXECUTE_RETRY_DEADLINE; - false - } - } - } - - async fn cleanup_lists(&mut self) { - debug!( - "cleanup_lists: {0} executed lists, {1} pending lists", - self.executed_lists.len(), - self.pending_lists.len() - ); - - // Lists are always pushed into the queues + executed list in sequence order, so - // no sort is required: can find the highest sequence number by peeking at last element - let max_executed_seq = match self.executed_lists.last() { - Some(v) => v.sequence, - None => { - // No executed lists, nothing to clean up. - return; - } - }; - - // In case this is the last list, write a header out first so that - // we don't risk losing our knowledge of the sequence number (on replay, our - // next sequence number is the highest list seen + 1, or read from the header - // if there are no lists) - let header = DeletionHeader::new(max_executed_seq); - debug!("Writing header {:?}", header); - let bytes = serde_json::to_vec(&header).expect("Failed to serialize deletion header"); - let size = bytes.len(); - let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); - let header_key = self.conf.remote_deletion_header_path(); - - if let Err(e) = self - .remote_storage - .upload(source, size, &header_key, None) - .await - { - warn!("Failed to upload deletion queue header: {e:#}"); - DELETION_QUEUE_ERRORS - .with_label_values(&["put_header"]) - .inc(); - return; - } - - let executed_keys: Vec = self - .executed_lists - .iter() - .rev() - .take(MAX_KEYS_PER_DELETE) - .map(|l| self.conf.remote_deletion_list_path(l.sequence)) - .collect(); - - match self.remote_storage.delete_objects(&executed_keys).await { - Ok(()) => { - // Retain any lists that couldn't be deleted in that request - self.executed_lists - .truncate(self.executed_lists.len() - executed_keys.len()); - } - Err(e) => { - warn!("Failed to delete deletion list(s): {e:#}"); - // Do nothing: the elements remain in executed_lists, and purge will be retried - // next time we process some deletions and go around the loop. - DELETION_QUEUE_ERRORS - .with_label_values(&["delete_list"]) - .inc(); - } - } - } - - pub async fn background(&mut self) { - // TODO: if we would like to be able to defer deletions while a Layer still has - // refs (but it will be elegible for deletion after process ends), then we may - // add an ephemeral part to BackendQueueMessage::Delete that tracks which keys - // in the deletion list may not be deleted yet, with guards to block on while - // we wait to proceed. - - self.accumulator.reserve(MAX_KEYS_PER_DELETE); - - loop { - let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await { - Ok(Some(m)) => m, - Ok(None) => { - // All queue senders closed - info!("Shutting down"); - break; - } - Err(_) => { - // Timeout, we hit deadline to execute whatever we have in hand. These functions will - // return immediately if no work is pending - self.maybe_execute().await; - self.cleanup_lists().await; - - continue; - } - }; - - match msg { - BackendQueueMessage::Delete(mut list) => { - if list.objects.is_empty() { - // This shouldn't happen, but is harmless. warn so that - // tests will fail if we have such a bug, but proceed with - // processing subsequent messages. - warn!("Empty DeletionList passed to deletion backend"); - self.executed_lists.push(list); - continue; - } - - // This loop handles deletion lists that require multiple DeleteObjects requests, - // and also handles retries if a deletion fails: we will keep going around until - // we have either deleted everything, or we have a remainder in accumulator. - while !list.objects.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE - { - let take_count = if self.accumulator.len() == MAX_KEYS_PER_DELETE { - 0 - } else { - let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len(); - std::cmp::min(available_slots, list.objects.len()) - }; - - for object in list.objects.drain(list.objects.len() - take_count..) { - self.accumulator.push(object); - } - - if self.accumulator.len() == MAX_KEYS_PER_DELETE { - // Great, we got a full request: issue it. - if !self.maybe_execute().await { - // Failed to execute: retry delay - tokio::time::sleep(EXECUTE_RETRY_DEADLINE).await; - }; - } - } - - if !self.accumulator.is_empty() { - // We have a remainder, `list` not fully executed yet - self.pending_lists.push(list); - } else { - // We fully processed this list, it is ready for purge - self.executed_lists.push(list); - } - - self.cleanup_lists().await; - } - BackendQueueMessage::Flush(op) => { - if self.accumulator.is_empty() { - op.fire(); - continue; - } - - self.maybe_execute().await; - - if self.accumulator.is_empty() { - // Successful flush. Clean up lists before firing, for the benefit of tests that would - // like to have a deterministic state post-flush. - self.cleanup_lists().await; - op.fire(); - } else { - // We didn't flush inline: defer until next time we successfully drain accumulatorr - self.pending_flushes.push(op); - } - } - } - } - } -} - -#[derive(Debug)] -enum BackendQueueMessage { - Delete(DeletionList), - Flush(FlushOp), -} - -pub struct FrontendQueueWorker { - remote_storage: GenericRemoteStorage, - conf: &'static PageServerConf, - - // Incoming frontend requests to delete some keys - rx: tokio::sync::mpsc::Receiver, - - // Outbound requests to the backend to execute deletion lists we have composed. - tx: tokio::sync::mpsc::Sender, - - // The list we are currently building, contains a buffer of keys to delete - // and our next sequence number - pending: DeletionList, - - // These FlushOps should fire the next time we flush - pending_flushes: Vec, - - // Worker loop is torn down when this fires. - cancel: CancellationToken, -} - -impl FrontendQueueWorker { - async fn upload_pending_list(&mut self) -> anyhow::Result<()> { - let key = &self.conf.remote_deletion_list_path(self.pending.sequence); - - backoff::retry( - || { - let bytes = - serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); - let size = bytes.len(); - let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); - self.remote_storage.upload(source, size, key, None) - }, - |_| false, - FAILED_REMOTE_OP_WARN_THRESHOLD, - FAILED_REMOTE_OP_RETRIES, - "upload deletion list", - backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), - ) - .await - } - - /// Try to flush `list` to persistent storage - /// - /// This does not return errors, because on failure to flush we do not lose - /// any state: flushing will be retried implicitly on the next deadline - async fn flush(&mut self) { - if self.pending.objects.is_empty() { - // We do not expect to be called in this state, but handle it so that later - // logging code can be assured that therre is always a first+last key to print - for f in self.pending_flushes.drain(..) { - f.fire(); - } - return; - } - - match self.upload_pending_list().await { - Ok(_) => { - info!( - sequence = self.pending.sequence, - "Stored deletion list ({0}..{1})", - self.pending - .objects - .first() - .expect("list should be non-empty"), - self.pending - .objects - .last() - .expect("list should be non-empty"), - ); - - for f in self.pending_flushes.drain(..) { - f.fire(); - } - - let mut onward_list = DeletionList::new(self.pending.sequence); - std::mem::swap(&mut onward_list.objects, &mut self.pending.objects); - - // We have consumed out of pending: reset it for the next incoming deletions to accumulate there - self.pending = DeletionList::new(self.pending.sequence + 1); - - if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await { - // This is allowed to fail: it will only happen if the backend worker is shut down, - // so we can just drop this on the floor. - info!("Deletion list dropped, this is normal during shutdown ({e:#})"); - } - } - Err(e) => { - DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc(); - warn!( - sequence = self.pending.sequence, - "Failed to write deletion list to remote storage, will retry later ({e:#})" - ); - } - } - } - - async fn recover(&mut self) -> Result<(), anyhow::Error> { - // Load header: this is not required to be present, e.g. when a pageserver first runs - let header_path = self.conf.remote_deletion_header_path(); - let header_bytes = match backoff::retry( - || self.remote_storage.download_all(&header_path), - |e| matches!(e, DownloadError::NotFound), - FAILED_REMOTE_OP_WARN_THRESHOLD, - u32::MAX, - "Reading deletion queue header", - backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), - ) - .await - { - Ok(h) => Ok(Some(h)), - Err(e) => { - if let DownloadError::NotFound = e { - debug!("Deletion header {header_path} not found, first start?"); - Ok(None) - } else { - Err(e) - } - } - }?; - - if let Some(header_bytes) = header_bytes { - if let Some(header) = match serde_json::from_slice::(&header_bytes) { - Ok(h) => Some(h), - Err(e) => { - warn!("Failed to deserialize deletion header, ignoring {header_path}: {e:#}"); - // This should never happen unless we make a mistake with our serialization. - // Ignoring a deletion header is not consequential for correctnes because all deletions - // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up. - None - } - } { - self.pending.sequence = - std::cmp::max(self.pending.sequence, header.last_deleted_list_seq + 1); - }; - }; - - let prefix = RemotePath::new(&self.conf.remote_deletion_node_prefix()) - .expect("Failed to compose path"); - let lists = backoff::retry( - || async { self.remote_storage.list_prefixes(Some(&prefix)).await }, - |_| false, - FAILED_REMOTE_OP_WARN_THRESHOLD, - u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck - "Recovering deletion lists", - backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), - ) - .await?; - - debug!("Loaded {} keys in deletion prefix {}", lists.len(), prefix); - let list_name_pattern = - Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{8})-([a-zA-Z0-9]{2}).list").unwrap(); - - let mut seqs: Vec = Vec::new(); - for l in &lists { - if l == &header_path { - // Don't try and parse the header key as a list key - continue; - } - - let basename = l - .strip_prefix(&prefix) - .expect("Stripping prefix frrom a prefix listobjects should always work"); - let basename = match basename.to_str() { - Some(s) => s, - None => { - // Should never happen, we are the only ones writing objects here - warn!("Unexpected key encoding in deletion queue object"); - continue; - } - }; - - let seq_part = if let Some(m) = list_name_pattern.captures(basename) { - m.get(1) - .expect("Non optional group should be present") - .as_str() - } else { - warn!("Unexpected key in deletion queue: {basename}"); - continue; - }; - - let seq: u64 = match u64::from_str_radix(seq_part, 16) { - Ok(s) => s, - Err(e) => { - warn!("Malformed key '{basename}': {e}"); - continue; - } - }; - seqs.push(seq); - } - - seqs.sort(); - - // Initialize the next sequence number in the frontend based on the maximum of the highest list we see, - // and the last list that was deleted according to the header. Combined with writing out the header - // prior to deletions, this guarnatees no re-use of sequence numbers. - if let Some(max_list_seq) = seqs.last() { - self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1); - } - - for s in seqs { - let list_path = self.conf.remote_deletion_list_path(s); - let lists_body = backoff::retry( - || self.remote_storage.download_all(&list_path), - |_| false, - FAILED_REMOTE_OP_WARN_THRESHOLD, - u32::MAX, - "Reading a deletion list", - backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), - ) - .await?; - - let deletion_list = match serde_json::from_slice::(lists_body.as_slice()) - { - Ok(l) => l, - Err(e) => { - // Drop the list on the floor: any objects it referenced will be left behind - // for scrubbing to clean up. This should never happen unless we have a serialization bug. - warn!(sequence = s, "Failed to deserialize deletion list: {e}"); - continue; - } - }; - - // We will drop out of recovery if this fails: it indicates that we are shutting down - // or the backend has panicked - DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.objects.len() as u64); - self.tx - .send(BackendQueueMessage::Delete(deletion_list)) - .await?; - } - - info!(next_sequence = self.pending.sequence, "Replay complete"); - - Ok(()) - } - - /// This is the front-end ingest, where we bundle up deletion requests into DeletionList - /// and write them out, for later - pub async fn background(&mut self) { - info!("Started deletion frontend worker"); - - let mut recovered: bool = false; - - loop { - let timeout = if self.pending_flushes.is_empty() { - FRONTEND_DEFAULT_TIMEOUT - } else { - FRONTEND_FLUSHING_TIMEOUT - }; - - let msg = match tokio::time::timeout(timeout, self.rx.recv()).await { - Ok(Some(msg)) => msg, - Ok(None) => { - // Queue sender destroyed, shutting down - break; - } - Err(_) => { - // Hit deadline, flush. - self.flush().await; - continue; - } - }; - - // On first message, do recovery. This avoids unnecessary recovery very - // early in startup, and simplifies testing by avoiding a 404 reading the - // header on every first pageserver startup. - if !recovered { - // Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3 - if let Err(e) = self.recover().await { - // This should only happen in truly unrecoverable cases, like the recovery finding that the backend - // queue receiver has been dropped. - info!( - "Deletion queue recover aborted, deletion queue will not proceed ({e:#})" - ); - return; - } else { - recovered = true; - } - } - - match msg { - FrontendQueueMessage::Delete(op) => { - debug!( - "Deletion enqueue {0} layers, {1} other objects", - op.layers.len(), - op.objects.len() - ); - - let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id); - for layer in op.layers { - // TODO go directly to remote path without composing local path - let local_path = timeline_path.join(layer.file_name()); - let path = match self.conf.remote_path(&local_path) { - Ok(p) => p, - Err(e) => { - panic!("Can't make a timeline path! {e}"); - } - }; - self.pending.objects.push(path); - } - - self.pending.objects.extend(op.objects.into_iter()) - } - FrontendQueueMessage::Flush(op) => { - if self.pending.objects.is_empty() { - // Execute immediately - debug!("No pending objects, flushing immediately"); - op.fire() - } else { - // Execute next time we flush - self.pending_flushes.push(op); - } - } - FrontendQueueMessage::FlushExecute(op) => { - // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute - if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await { - info!("Can't flush, shutting down ({e})"); - // Caller will get error when their oneshot sender was dropped. - } - } - } - - if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE - || !self.pending_flushes.is_empty() - { - self.flush().await; - } - } - info!("Deletion queue shut down."); - } -} impl DeletionQueue { pub fn new_client(&self) -> DeletionQueueClient { DeletionQueueClient { @@ -870,25 +257,14 @@ impl DeletionQueue { ( Self { tx }, - Some(FrontendQueueWorker { - pending: DeletionList::new(1), - remote_storage: remote_storage.clone(), + Some(FrontendQueueWorker::new( + remote_storage.clone(), conf, rx, - tx: backend_tx, - pending_flushes: Vec::new(), + backend_tx, cancel, - }), - Some(BackendQueueWorker { - remote_storage, - conf, - rx: backend_rx, - accumulator: Vec::new(), - pending_lists: Vec::new(), - executed_lists: Vec::new(), - timeout: EXECUTE_IDLE_DEADLINE, - pending_flushes: Vec::new(), - }), + )), + Some(BackendQueueWorker::new(remote_storage, conf, backend_rx)), ) } } @@ -900,6 +276,7 @@ mod test { io::ErrorKind, path::{Path, PathBuf}, }; + use tracing::info; use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; use tokio::{runtime::EnterGuard, task::JoinHandle}; @@ -1155,6 +532,8 @@ mod test { /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it. #[cfg(test)] pub mod mock { + use tracing::info; + use super::*; use std::sync::{ atomic::{AtomicUsize, Ordering}, @@ -1261,7 +640,7 @@ pub mod mock { .expect("Mock delete queue shutdown while waiting to pump"); } - pub fn new_client(&self) -> DeletionQueueClient { + pub(crate) fn new_client(&self) -> DeletionQueueClient { DeletionQueueClient { tx: self.tx.clone(), } diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs new file mode 100644 index 0000000000..9ec4be3b54 --- /dev/null +++ b/pageserver/src/deletion_queue/backend.rs @@ -0,0 +1,284 @@ +use std::time::Duration; + +use remote_storage::GenericRemoteStorage; +use remote_storage::RemotePath; +use tracing::debug; +use tracing::info; +use tracing::warn; + +use crate::config::PageServerConf; +use crate::metrics::DELETION_QUEUE_ERRORS; +use crate::metrics::DELETION_QUEUE_EXECUTED; + +use super::DeletionHeader; +use super::DeletionList; +use super::FlushOp; + +// After this length of time, execute deletions which are elegible to run, +// even if we haven't accumulated enough for a full-sized DeleteObjects +const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60); + +// If the last attempt to execute failed, wait only this long before +// trying again. +const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100); + +// From the S3 spec +const MAX_KEYS_PER_DELETE: usize = 1000; + +#[derive(Debug)] +pub(super) enum BackendQueueMessage { + Delete(DeletionList), + Flush(FlushOp), +} +pub struct BackendQueueWorker { + remote_storage: GenericRemoteStorage, + conf: &'static PageServerConf, + rx: tokio::sync::mpsc::Receiver, + + // Accumulate up to 1000 keys for the next deletion operation + accumulator: Vec, + + // DeletionLists we have fully ingested but might still have + // some keys in accumulator. + pending_lists: Vec, + + // DeletionLists we have fully executed, which may be deleted + // from remote storage. + executed_lists: Vec, + + // These FlushOps should fire the next time we flush + pending_flushes: Vec, + + // How long to wait for a message before executing anyway + timeout: Duration, +} + +impl BackendQueueWorker { + pub(super) fn new( + remote_storage: GenericRemoteStorage, + conf: &'static PageServerConf, + rx: tokio::sync::mpsc::Receiver, + ) -> Self { + Self { + remote_storage, + conf, + rx, + accumulator: Vec::new(), + pending_lists: Vec::new(), + executed_lists: Vec::new(), + timeout: EXECUTE_IDLE_DEADLINE, + pending_flushes: Vec::new(), + } + } + + async fn maybe_execute(&mut self) -> bool { + fail::fail_point!("deletion-queue-before-execute", |_| { + info!("Skipping execution, failpoint set"); + DELETION_QUEUE_ERRORS + .with_label_values(&["failpoint"]) + .inc(); + + // Retry fast when failpoint is active, so that when it is disabled we resume promptly + self.timeout = EXECUTE_RETRY_DEADLINE; + false + }); + + if self.accumulator.is_empty() { + for f in self.pending_flushes.drain(..) { + f.fire(); + } + return true; + } + + match self.remote_storage.delete_objects(&self.accumulator).await { + Ok(()) => { + // Note: we assume that the remote storage layer returns Ok(()) if some + // or all of the deleted objects were already gone. + DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64); + info!( + "Executed deletion batch {}..{}", + self.accumulator + .first() + .expect("accumulator should be non-empty"), + self.accumulator + .last() + .expect("accumulator should be non-empty"), + ); + self.accumulator.clear(); + self.executed_lists.append(&mut self.pending_lists); + + for f in self.pending_flushes.drain(..) { + f.fire(); + } + self.timeout = EXECUTE_IDLE_DEADLINE; + true + } + Err(e) => { + warn!("DeleteObjects request failed: {e:#}, will retry"); + DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc(); + self.timeout = EXECUTE_RETRY_DEADLINE; + false + } + } + } + + async fn cleanup_lists(&mut self) { + debug!( + "cleanup_lists: {0} executed lists, {1} pending lists", + self.executed_lists.len(), + self.pending_lists.len() + ); + + // Lists are always pushed into the queues + executed list in sequence order, so + // no sort is required: can find the highest sequence number by peeking at last element + let max_executed_seq = match self.executed_lists.last() { + Some(v) => v.sequence, + None => { + // No executed lists, nothing to clean up. + return; + } + }; + + // In case this is the last list, write a header out first so that + // we don't risk losing our knowledge of the sequence number (on replay, our + // next sequence number is the highest list seen + 1, or read from the header + // if there are no lists) + let header = DeletionHeader::new(max_executed_seq); + debug!("Writing header {:?}", header); + let bytes = serde_json::to_vec(&header).expect("Failed to serialize deletion header"); + let size = bytes.len(); + let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); + let header_key = self.conf.remote_deletion_header_path(); + + if let Err(e) = self + .remote_storage + .upload(source, size, &header_key, None) + .await + { + warn!("Failed to upload deletion queue header: {e:#}"); + DELETION_QUEUE_ERRORS + .with_label_values(&["put_header"]) + .inc(); + return; + } + + let executed_keys: Vec = self + .executed_lists + .iter() + .rev() + .take(MAX_KEYS_PER_DELETE) + .map(|l| self.conf.remote_deletion_list_path(l.sequence)) + .collect(); + + match self.remote_storage.delete_objects(&executed_keys).await { + Ok(()) => { + // Retain any lists that couldn't be deleted in that request + self.executed_lists + .truncate(self.executed_lists.len() - executed_keys.len()); + } + Err(e) => { + warn!("Failed to delete deletion list(s): {e:#}"); + // Do nothing: the elements remain in executed_lists, and purge will be retried + // next time we process some deletions and go around the loop. + DELETION_QUEUE_ERRORS + .with_label_values(&["delete_list"]) + .inc(); + } + } + } + + pub async fn background(&mut self) { + // TODO: if we would like to be able to defer deletions while a Layer still has + // refs (but it will be elegible for deletion after process ends), then we may + // add an ephemeral part to BackendQueueMessage::Delete that tracks which keys + // in the deletion list may not be deleted yet, with guards to block on while + // we wait to proceed. + + self.accumulator.reserve(MAX_KEYS_PER_DELETE); + + loop { + let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await { + Ok(Some(m)) => m, + Ok(None) => { + // All queue senders closed + info!("Shutting down"); + break; + } + Err(_) => { + // Timeout, we hit deadline to execute whatever we have in hand. These functions will + // return immediately if no work is pending + self.maybe_execute().await; + self.cleanup_lists().await; + + continue; + } + }; + + match msg { + BackendQueueMessage::Delete(mut list) => { + if list.objects.is_empty() { + // This shouldn't happen, but is harmless. warn so that + // tests will fail if we have such a bug, but proceed with + // processing subsequent messages. + warn!("Empty DeletionList passed to deletion backend"); + self.executed_lists.push(list); + continue; + } + + // This loop handles deletion lists that require multiple DeleteObjects requests, + // and also handles retries if a deletion fails: we will keep going around until + // we have either deleted everything, or we have a remainder in accumulator. + while !list.objects.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE + { + let take_count = if self.accumulator.len() == MAX_KEYS_PER_DELETE { + 0 + } else { + let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len(); + std::cmp::min(available_slots, list.objects.len()) + }; + + for object in list.objects.drain(list.objects.len() - take_count..) { + self.accumulator.push(object); + } + + if self.accumulator.len() == MAX_KEYS_PER_DELETE { + // Great, we got a full request: issue it. + if !self.maybe_execute().await { + // Failed to execute: retry delay + tokio::time::sleep(EXECUTE_RETRY_DEADLINE).await; + }; + } + } + + if !self.accumulator.is_empty() { + // We have a remainder, `list` not fully executed yet + self.pending_lists.push(list); + } else { + // We fully processed this list, it is ready for purge + self.executed_lists.push(list); + } + + self.cleanup_lists().await; + } + BackendQueueMessage::Flush(op) => { + if self.accumulator.is_empty() { + op.fire(); + continue; + } + + self.maybe_execute().await; + + if self.accumulator.is_empty() { + // Successful flush. Clean up lists before firing, for the benefit of tests that would + // like to have a deterministic state post-flush. + self.cleanup_lists().await; + op.fire(); + } else { + // We didn't flush inline: defer until next time we successfully drain accumulatorr + self.pending_flushes.push(op); + } + } + } + } + } +} diff --git a/pageserver/src/deletion_queue/frontend.rs b/pageserver/src/deletion_queue/frontend.rs new file mode 100644 index 0000000000..3cc603880d --- /dev/null +++ b/pageserver/src/deletion_queue/frontend.rs @@ -0,0 +1,408 @@ +use super::BackendQueueMessage; +use super::DeletionHeader; +use super::DeletionList; +use super::FlushOp; +use super::FAILED_REMOTE_OP_RETRIES; +use super::FAILED_REMOTE_OP_WARN_THRESHOLD; + +use std::time::Duration; + +use regex::Regex; +use remote_storage::DownloadError; +use remote_storage::GenericRemoteStorage; +use remote_storage::RemotePath; +use tokio_util::sync::CancellationToken; +use tracing::debug; +use tracing::info; +use tracing::warn; +use utils::backoff; +use utils::id::TenantId; +use utils::id::TimelineId; + +use crate::config::PageServerConf; +use crate::metrics::DELETION_QUEUE_ERRORS; +use crate::metrics::DELETION_QUEUE_SUBMITTED; +use crate::tenant::storage_layer::LayerFileName; + +// The number of keys in a DeletionList before we will proactively persist it +// (without reaching a flush deadline). This aims to deliver objects of the order +// of magnitude 1MB when we are under heavy delete load. +const DELETION_LIST_TARGET_SIZE: usize = 16384; + +// Ordinarily, we only flush to DeletionList periodically, to bound the window during +// which we might leak objects from not flushing a DeletionList after +// the objects are already unlinked from timeline metadata. +const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000); + +// If someone is waiting for a flush to DeletionList, only delay a little to accumulate +// more objects before doing the flush. +const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100); + +#[derive(Debug)] +pub(super) struct DeletionOp { + pub(super) tenant_id: TenantId, + pub(super) timeline_id: TimelineId, + // `layers` and `objects` are both just lists of objects. `layers` is used if you do not + // have a config object handy to project it to a remote key, and need the consuming worker + // to do it for you. + pub(super) layers: Vec, + pub(super) objects: Vec, +} + +#[derive(Debug)] +pub(super) enum FrontendQueueMessage { + Delete(DeletionOp), + // Wait until all prior deletions make it into a persistent DeletionList + Flush(FlushOp), + // Wait until all prior deletions have been executed (i.e. objects are actually deleted) + FlushExecute(FlushOp), +} + +pub struct FrontendQueueWorker { + remote_storage: GenericRemoteStorage, + conf: &'static PageServerConf, + + // Incoming frontend requests to delete some keys + rx: tokio::sync::mpsc::Receiver, + + // Outbound requests to the backend to execute deletion lists we have composed. + tx: tokio::sync::mpsc::Sender, + + // The list we are currently building, contains a buffer of keys to delete + // and our next sequence number + pending: DeletionList, + + // These FlushOps should fire the next time we flush + pending_flushes: Vec, + + // Worker loop is torn down when this fires. + cancel: CancellationToken, +} + +impl FrontendQueueWorker { + pub(super) fn new( + remote_storage: GenericRemoteStorage, + conf: &'static PageServerConf, + rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, + cancel: CancellationToken, + ) -> Self { + Self { + pending: DeletionList::new(1), + remote_storage, + conf, + rx, + tx, + pending_flushes: Vec::new(), + cancel, + } + } + async fn upload_pending_list(&mut self) -> anyhow::Result<()> { + let key = &self.conf.remote_deletion_list_path(self.pending.sequence); + + backoff::retry( + || { + let bytes = + serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); + let size = bytes.len(); + let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); + self.remote_storage.upload(source, size, key, None) + }, + |_| false, + FAILED_REMOTE_OP_WARN_THRESHOLD, + FAILED_REMOTE_OP_RETRIES, + "upload deletion list", + backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), + ) + .await + } + + /// Try to flush `list` to persistent storage + /// + /// This does not return errors, because on failure to flush we do not lose + /// any state: flushing will be retried implicitly on the next deadline + async fn flush(&mut self) { + if self.pending.objects.is_empty() { + // We do not expect to be called in this state, but handle it so that later + // logging code can be assured that therre is always a first+last key to print + for f in self.pending_flushes.drain(..) { + f.fire(); + } + return; + } + + match self.upload_pending_list().await { + Ok(_) => { + info!( + sequence = self.pending.sequence, + "Stored deletion list ({0}..{1})", + self.pending + .objects + .first() + .expect("list should be non-empty"), + self.pending + .objects + .last() + .expect("list should be non-empty"), + ); + + for f in self.pending_flushes.drain(..) { + f.fire(); + } + + let mut onward_list = DeletionList::new(self.pending.sequence); + std::mem::swap(&mut onward_list.objects, &mut self.pending.objects); + + // We have consumed out of pending: reset it for the next incoming deletions to accumulate there + self.pending = DeletionList::new(self.pending.sequence + 1); + + if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await { + // This is allowed to fail: it will only happen if the backend worker is shut down, + // so we can just drop this on the floor. + info!("Deletion list dropped, this is normal during shutdown ({e:#})"); + } + } + Err(e) => { + DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc(); + warn!( + sequence = self.pending.sequence, + "Failed to write deletion list to remote storage, will retry later ({e:#})" + ); + } + } + } + + async fn recover(&mut self) -> Result<(), anyhow::Error> { + // Load header: this is not required to be present, e.g. when a pageserver first runs + let header_path = self.conf.remote_deletion_header_path(); + let header_bytes = match backoff::retry( + || self.remote_storage.download_all(&header_path), + |e| matches!(e, DownloadError::NotFound), + FAILED_REMOTE_OP_WARN_THRESHOLD, + u32::MAX, + "Reading deletion queue header", + backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), + ) + .await + { + Ok(h) => Ok(Some(h)), + Err(e) => { + if let DownloadError::NotFound = e { + debug!("Deletion header {header_path} not found, first start?"); + Ok(None) + } else { + Err(e) + } + } + }?; + + if let Some(header_bytes) = header_bytes { + if let Some(header) = match serde_json::from_slice::(&header_bytes) { + Ok(h) => Some(h), + Err(e) => { + warn!("Failed to deserialize deletion header, ignoring {header_path}: {e:#}"); + // This should never happen unless we make a mistake with our serialization. + // Ignoring a deletion header is not consequential for correctnes because all deletions + // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up. + None + } + } { + self.pending.sequence = + std::cmp::max(self.pending.sequence, header.last_deleted_list_seq + 1); + }; + }; + + let prefix = RemotePath::new(&self.conf.remote_deletion_node_prefix()) + .expect("Failed to compose path"); + let lists = backoff::retry( + || async { self.remote_storage.list_prefixes(Some(&prefix)).await }, + |_| false, + FAILED_REMOTE_OP_WARN_THRESHOLD, + u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck + "Recovering deletion lists", + backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), + ) + .await?; + + debug!("Loaded {} keys in deletion prefix {}", lists.len(), prefix); + let list_name_pattern = + Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{8})-([a-zA-Z0-9]{2}).list").unwrap(); + + let mut seqs: Vec = Vec::new(); + for l in &lists { + if l == &header_path { + // Don't try and parse the header key as a list key + continue; + } + + let basename = l + .strip_prefix(&prefix) + .expect("Stripping prefix frrom a prefix listobjects should always work"); + let basename = match basename.to_str() { + Some(s) => s, + None => { + // Should never happen, we are the only ones writing objects here + warn!("Unexpected key encoding in deletion queue object"); + continue; + } + }; + + let seq_part = if let Some(m) = list_name_pattern.captures(basename) { + m.get(1) + .expect("Non optional group should be present") + .as_str() + } else { + warn!("Unexpected key in deletion queue: {basename}"); + continue; + }; + + let seq: u64 = match u64::from_str_radix(seq_part, 16) { + Ok(s) => s, + Err(e) => { + warn!("Malformed key '{basename}': {e}"); + continue; + } + }; + seqs.push(seq); + } + + seqs.sort(); + + // Initialize the next sequence number in the frontend based on the maximum of the highest list we see, + // and the last list that was deleted according to the header. Combined with writing out the header + // prior to deletions, this guarnatees no re-use of sequence numbers. + if let Some(max_list_seq) = seqs.last() { + self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1); + } + + for s in seqs { + let list_path = self.conf.remote_deletion_list_path(s); + let lists_body = backoff::retry( + || self.remote_storage.download_all(&list_path), + |_| false, + FAILED_REMOTE_OP_WARN_THRESHOLD, + u32::MAX, + "Reading a deletion list", + backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), + ) + .await?; + + let deletion_list = match serde_json::from_slice::(lists_body.as_slice()) + { + Ok(l) => l, + Err(e) => { + // Drop the list on the floor: any objects it referenced will be left behind + // for scrubbing to clean up. This should never happen unless we have a serialization bug. + warn!(sequence = s, "Failed to deserialize deletion list: {e}"); + continue; + } + }; + + // We will drop out of recovery if this fails: it indicates that we are shutting down + // or the backend has panicked + DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.objects.len() as u64); + self.tx + .send(BackendQueueMessage::Delete(deletion_list)) + .await?; + } + + info!(next_sequence = self.pending.sequence, "Replay complete"); + + Ok(()) + } + + /// This is the front-end ingest, where we bundle up deletion requests into DeletionList + /// and write them out, for later + pub async fn background(&mut self) { + info!("Started deletion frontend worker"); + + let mut recovered: bool = false; + + loop { + let timeout = if self.pending_flushes.is_empty() { + FRONTEND_DEFAULT_TIMEOUT + } else { + FRONTEND_FLUSHING_TIMEOUT + }; + + let msg = match tokio::time::timeout(timeout, self.rx.recv()).await { + Ok(Some(msg)) => msg, + Ok(None) => { + // Queue sender destroyed, shutting down + break; + } + Err(_) => { + // Hit deadline, flush. + self.flush().await; + continue; + } + }; + + // On first message, do recovery. This avoids unnecessary recovery very + // early in startup, and simplifies testing by avoiding a 404 reading the + // header on every first pageserver startup. + if !recovered { + // Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3 + if let Err(e) = self.recover().await { + // This should only happen in truly unrecoverable cases, like the recovery finding that the backend + // queue receiver has been dropped. + info!( + "Deletion queue recover aborted, deletion queue will not proceed ({e:#})" + ); + return; + } else { + recovered = true; + } + } + + match msg { + FrontendQueueMessage::Delete(op) => { + debug!( + "Deletion enqueue {0} layers, {1} other objects", + op.layers.len(), + op.objects.len() + ); + + let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id); + for layer in op.layers { + // TODO go directly to remote path without composing local path + let local_path = timeline_path.join(layer.file_name()); + let path = match self.conf.remote_path(&local_path) { + Ok(p) => p, + Err(e) => { + panic!("Can't make a timeline path! {e}"); + } + }; + self.pending.objects.push(path); + } + + self.pending.objects.extend(op.objects.into_iter()) + } + FrontendQueueMessage::Flush(op) => { + if self.pending.objects.is_empty() { + // Execute immediately + debug!("No pending objects, flushing immediately"); + op.fire() + } else { + // Execute next time we flush + self.pending_flushes.push(op); + } + } + FrontendQueueMessage::FlushExecute(op) => { + // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute + if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await { + info!("Can't flush, shutting down ({e})"); + // Caller will get error when their oneshot sender was dropped. + } + } + } + + if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE + || !self.pending_flushes.is_empty() + { + self.flush().await; + } + } + info!("Deletion queue shut down."); + } +}