deletion queue: refactor into frontend/backend modules

This commit is contained in:
John Spray
2023-08-22 16:38:13 +01:00
parent d9755becab
commit 416026381f
3 changed files with 717 additions and 646 deletions

View File

@@ -1,52 +1,32 @@
use crate::metrics::{DELETION_QUEUE_ERRORS, DELETION_QUEUE_EXECUTED, DELETION_QUEUE_SUBMITTED};
use regex::Regex;
use remote_storage::DownloadError;
mod backend;
mod frontend;
use crate::metrics::DELETION_QUEUE_SUBMITTED;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
use serde_with::serde_as;
use thiserror::Error;
use tokio;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{self, debug, error, info, warn};
use utils::backoff;
use tracing::{self, debug, error};
use utils::id::{TenantId, TimelineId};
pub(crate) use self::backend::BackendQueueWorker;
use self::frontend::DeletionOp;
pub(crate) use self::frontend::FrontendQueueWorker;
use backend::BackendQueueMessage;
use frontend::FrontendQueueMessage;
use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
// of magnitude 1MB when we are under heavy delete load.
const DELETION_LIST_TARGET_SIZE: usize = 16384;
// Ordinarily, we only flush to DeletionList periodically, to bound the window during
// which we might leak objects from not flushing a DeletionList after
// the objects are already unlinked from timeline metadata.
const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
// If someone is waiting for a flush to DeletionList, only delay a little to accumulate
// more objects before doing the flush.
const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
// After this length of time, execute deletions which are elegible to run,
// even if we haven't accumulated enough for a full-sized DeleteObjects
const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60);
// If the last attempt to execute failed, wait only this long before
// trying again.
const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100);
// From the S3 spec
const MAX_KEYS_PER_DELETE: usize = 1000;
// Arbitrary thresholds for retries: we do not depend on success
// within OP_RETRIES, as workers will just go around their consume loop:
// the purpose of the backoff::retries with these constants are to
// retry _sooner_ than we would if going around the whole loop.
pub(crate) const FAILED_REMOTE_OP_WARN_THRESHOLD: u32 = 3;
const FAILED_REMOTE_OP_WARN_THRESHOLD: u32 = 3;
pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
const FAILED_REMOTE_OP_RETRIES: u32 = 10;
// TODO: adminstrative "panic button" config property to disable all deletions
// TODO: configurable for how long to wait before executing deletions
@@ -78,26 +58,6 @@ pub struct DeletionQueue {
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
}
#[derive(Debug)]
enum FrontendQueueMessage {
Delete(DeletionOp),
// Wait until all prior deletions make it into a persistent DeletionList
Flush(FlushOp),
// Wait until all prior deletions have been executed (i.e. objects are actually deleted)
FlushExecute(FlushOp),
}
#[derive(Debug)]
struct DeletionOp {
tenant_id: TenantId,
timeline_id: TimelineId,
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
// have a config object handy to project it to a remote key, and need the consuming worker
// to do it for you.
layers: Vec<LayerFileName>,
objects: Vec<RemotePath>,
}
#[derive(Debug)]
struct FlushOp {
tx: tokio::sync::oneshot::Sender<()>,
@@ -192,7 +152,7 @@ impl DeletionQueueClient {
/// persistent, but it may be executed at any time after this function enters: do not push
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
/// references them).
pub async fn push_layers(
pub(crate) async fn push_layers(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -209,7 +169,7 @@ impl DeletionQueueClient {
}
/// Just like push_layers, but using some already-known remote paths, instead of abstract layer names
pub async fn push_objects(
pub(crate) async fn push_objects(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -250,7 +210,7 @@ impl DeletionQueueClient {
}
// Wait until all previous deletions are executed
pub async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
debug!("flush_execute: flushing to deletion lists...");
// Flush any buffered work to deletion lists
self.flush().await?;
@@ -265,579 +225,6 @@ impl DeletionQueueClient {
}
}
pub struct BackendQueueWorker {
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
// Accumulate up to 1000 keys for the next deletion operation
accumulator: Vec<RemotePath>,
// DeletionLists we have fully ingested but might still have
// some keys in accumulator.
pending_lists: Vec<DeletionList>,
// DeletionLists we have fully executed, which may be deleted
// from remote storage.
executed_lists: Vec<DeletionList>,
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// How long to wait for a message before executing anyway
timeout: Duration,
}
impl BackendQueueWorker {
async fn maybe_execute(&mut self) -> bool {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
DELETION_QUEUE_ERRORS
.with_label_values(&["failpoint"])
.inc();
// Retry fast when failpoint is active, so that when it is disabled we resume promptly
self.timeout = EXECUTE_RETRY_DEADLINE;
false
});
if self.accumulator.is_empty() {
for f in self.pending_flushes.drain(..) {
f.fire();
}
return true;
}
match self.remote_storage.delete_objects(&self.accumulator).await {
Ok(()) => {
// Note: we assume that the remote storage layer returns Ok(()) if some
// or all of the deleted objects were already gone.
DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64);
info!(
"Executed deletion batch {}..{}",
self.accumulator
.first()
.expect("accumulator should be non-empty"),
self.accumulator
.last()
.expect("accumulator should be non-empty"),
);
self.accumulator.clear();
self.executed_lists.append(&mut self.pending_lists);
for f in self.pending_flushes.drain(..) {
f.fire();
}
self.timeout = EXECUTE_IDLE_DEADLINE;
true
}
Err(e) => {
warn!("DeleteObjects request failed: {e:#}, will retry");
DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc();
self.timeout = EXECUTE_RETRY_DEADLINE;
false
}
}
}
async fn cleanup_lists(&mut self) {
debug!(
"cleanup_lists: {0} executed lists, {1} pending lists",
self.executed_lists.len(),
self.pending_lists.len()
);
// Lists are always pushed into the queues + executed list in sequence order, so
// no sort is required: can find the highest sequence number by peeking at last element
let max_executed_seq = match self.executed_lists.last() {
Some(v) => v.sequence,
None => {
// No executed lists, nothing to clean up.
return;
}
};
// In case this is the last list, write a header out first so that
// we don't risk losing our knowledge of the sequence number (on replay, our
// next sequence number is the highest list seen + 1, or read from the header
// if there are no lists)
let header = DeletionHeader::new(max_executed_seq);
debug!("Writing header {:?}", header);
let bytes = serde_json::to_vec(&header).expect("Failed to serialize deletion header");
let size = bytes.len();
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
let header_key = self.conf.remote_deletion_header_path();
if let Err(e) = self
.remote_storage
.upload(source, size, &header_key, None)
.await
{
warn!("Failed to upload deletion queue header: {e:#}");
DELETION_QUEUE_ERRORS
.with_label_values(&["put_header"])
.inc();
return;
}
let executed_keys: Vec<RemotePath> = self
.executed_lists
.iter()
.rev()
.take(MAX_KEYS_PER_DELETE)
.map(|l| self.conf.remote_deletion_list_path(l.sequence))
.collect();
match self.remote_storage.delete_objects(&executed_keys).await {
Ok(()) => {
// Retain any lists that couldn't be deleted in that request
self.executed_lists
.truncate(self.executed_lists.len() - executed_keys.len());
}
Err(e) => {
warn!("Failed to delete deletion list(s): {e:#}");
// Do nothing: the elements remain in executed_lists, and purge will be retried
// next time we process some deletions and go around the loop.
DELETION_QUEUE_ERRORS
.with_label_values(&["delete_list"])
.inc();
}
}
}
pub async fn background(&mut self) {
// TODO: if we would like to be able to defer deletions while a Layer still has
// refs (but it will be elegible for deletion after process ends), then we may
// add an ephemeral part to BackendQueueMessage::Delete that tracks which keys
// in the deletion list may not be deleted yet, with guards to block on while
// we wait to proceed.
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
loop {
let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await {
Ok(Some(m)) => m,
Ok(None) => {
// All queue senders closed
info!("Shutting down");
break;
}
Err(_) => {
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
// return immediately if no work is pending
self.maybe_execute().await;
self.cleanup_lists().await;
continue;
}
};
match msg {
BackendQueueMessage::Delete(mut list) => {
if list.objects.is_empty() {
// This shouldn't happen, but is harmless. warn so that
// tests will fail if we have such a bug, but proceed with
// processing subsequent messages.
warn!("Empty DeletionList passed to deletion backend");
self.executed_lists.push(list);
continue;
}
// This loop handles deletion lists that require multiple DeleteObjects requests,
// and also handles retries if a deletion fails: we will keep going around until
// we have either deleted everything, or we have a remainder in accumulator.
while !list.objects.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE
{
let take_count = if self.accumulator.len() == MAX_KEYS_PER_DELETE {
0
} else {
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
std::cmp::min(available_slots, list.objects.len())
};
for object in list.objects.drain(list.objects.len() - take_count..) {
self.accumulator.push(object);
}
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
// Great, we got a full request: issue it.
if !self.maybe_execute().await {
// Failed to execute: retry delay
tokio::time::sleep(EXECUTE_RETRY_DEADLINE).await;
};
}
}
if !self.accumulator.is_empty() {
// We have a remainder, `list` not fully executed yet
self.pending_lists.push(list);
} else {
// We fully processed this list, it is ready for purge
self.executed_lists.push(list);
}
self.cleanup_lists().await;
}
BackendQueueMessage::Flush(op) => {
if self.accumulator.is_empty() {
op.fire();
continue;
}
self.maybe_execute().await;
if self.accumulator.is_empty() {
// Successful flush. Clean up lists before firing, for the benefit of tests that would
// like to have a deterministic state post-flush.
self.cleanup_lists().await;
op.fire();
} else {
// We didn't flush inline: defer until next time we successfully drain accumulatorr
self.pending_flushes.push(op);
}
}
}
}
}
}
#[derive(Debug)]
enum BackendQueueMessage {
Delete(DeletionList),
Flush(FlushOp),
}
pub struct FrontendQueueWorker {
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
// The list we are currently building, contains a buffer of keys to delete
// and our next sequence number
pending: DeletionList,
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// Worker loop is torn down when this fires.
cancel: CancellationToken,
}
impl FrontendQueueWorker {
async fn upload_pending_list(&mut self) -> anyhow::Result<()> {
let key = &self.conf.remote_deletion_list_path(self.pending.sequence);
backoff::retry(
|| {
let bytes =
serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
let size = bytes.len();
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
self.remote_storage.upload(source, size, key, None)
},
|_| false,
FAILED_REMOTE_OP_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"upload deletion list",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")),
)
.await
}
/// Try to flush `list` to persistent storage
///
/// This does not return errors, because on failure to flush we do not lose
/// any state: flushing will be retried implicitly on the next deadline
async fn flush(&mut self) {
if self.pending.objects.is_empty() {
// We do not expect to be called in this state, but handle it so that later
// logging code can be assured that therre is always a first+last key to print
for f in self.pending_flushes.drain(..) {
f.fire();
}
return;
}
match self.upload_pending_list().await {
Ok(_) => {
info!(
sequence = self.pending.sequence,
"Stored deletion list ({0}..{1})",
self.pending
.objects
.first()
.expect("list should be non-empty"),
self.pending
.objects
.last()
.expect("list should be non-empty"),
);
for f in self.pending_flushes.drain(..) {
f.fire();
}
let mut onward_list = DeletionList::new(self.pending.sequence);
std::mem::swap(&mut onward_list.objects, &mut self.pending.objects);
// We have consumed out of pending: reset it for the next incoming deletions to accumulate there
self.pending = DeletionList::new(self.pending.sequence + 1);
if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await {
// This is allowed to fail: it will only happen if the backend worker is shut down,
// so we can just drop this on the floor.
info!("Deletion list dropped, this is normal during shutdown ({e:#})");
}
}
Err(e) => {
DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc();
warn!(
sequence = self.pending.sequence,
"Failed to write deletion list to remote storage, will retry later ({e:#})"
);
}
}
}
async fn recover(&mut self) -> Result<(), anyhow::Error> {
// Load header: this is not required to be present, e.g. when a pageserver first runs
let header_path = self.conf.remote_deletion_header_path();
let header_bytes = match backoff::retry(
|| self.remote_storage.download_all(&header_path),
|e| matches!(e, DownloadError::NotFound),
FAILED_REMOTE_OP_WARN_THRESHOLD,
u32::MAX,
"Reading deletion queue header",
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
)
.await
{
Ok(h) => Ok(Some(h)),
Err(e) => {
if let DownloadError::NotFound = e {
debug!("Deletion header {header_path} not found, first start?");
Ok(None)
} else {
Err(e)
}
}
}?;
if let Some(header_bytes) = header_bytes {
if let Some(header) = match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
Ok(h) => Some(h),
Err(e) => {
warn!("Failed to deserialize deletion header, ignoring {header_path}: {e:#}");
// This should never happen unless we make a mistake with our serialization.
// Ignoring a deletion header is not consequential for correctnes because all deletions
// are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
None
}
} {
self.pending.sequence =
std::cmp::max(self.pending.sequence, header.last_deleted_list_seq + 1);
};
};
let prefix = RemotePath::new(&self.conf.remote_deletion_node_prefix())
.expect("Failed to compose path");
let lists = backoff::retry(
|| async { self.remote_storage.list_prefixes(Some(&prefix)).await },
|_| false,
FAILED_REMOTE_OP_WARN_THRESHOLD,
u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck
"Recovering deletion lists",
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
)
.await?;
debug!("Loaded {} keys in deletion prefix {}", lists.len(), prefix);
let list_name_pattern =
Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{8})-([a-zA-Z0-9]{2}).list").unwrap();
let mut seqs: Vec<u64> = Vec::new();
for l in &lists {
if l == &header_path {
// Don't try and parse the header key as a list key
continue;
}
let basename = l
.strip_prefix(&prefix)
.expect("Stripping prefix frrom a prefix listobjects should always work");
let basename = match basename.to_str() {
Some(s) => s,
None => {
// Should never happen, we are the only ones writing objects here
warn!("Unexpected key encoding in deletion queue object");
continue;
}
};
let seq_part = if let Some(m) = list_name_pattern.captures(basename) {
m.get(1)
.expect("Non optional group should be present")
.as_str()
} else {
warn!("Unexpected key in deletion queue: {basename}");
continue;
};
let seq: u64 = match u64::from_str_radix(seq_part, 16) {
Ok(s) => s,
Err(e) => {
warn!("Malformed key '{basename}': {e}");
continue;
}
};
seqs.push(seq);
}
seqs.sort();
// Initialize the next sequence number in the frontend based on the maximum of the highest list we see,
// and the last list that was deleted according to the header. Combined with writing out the header
// prior to deletions, this guarnatees no re-use of sequence numbers.
if let Some(max_list_seq) = seqs.last() {
self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
}
for s in seqs {
let list_path = self.conf.remote_deletion_list_path(s);
let lists_body = backoff::retry(
|| self.remote_storage.download_all(&list_path),
|_| false,
FAILED_REMOTE_OP_WARN_THRESHOLD,
u32::MAX,
"Reading a deletion list",
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
)
.await?;
let deletion_list = match serde_json::from_slice::<DeletionList>(lists_body.as_slice())
{
Ok(l) => l,
Err(e) => {
// Drop the list on the floor: any objects it referenced will be left behind
// for scrubbing to clean up. This should never happen unless we have a serialization bug.
warn!(sequence = s, "Failed to deserialize deletion list: {e}");
continue;
}
};
// We will drop out of recovery if this fails: it indicates that we are shutting down
// or the backend has panicked
DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.objects.len() as u64);
self.tx
.send(BackendQueueMessage::Delete(deletion_list))
.await?;
}
info!(next_sequence = self.pending.sequence, "Replay complete");
Ok(())
}
/// This is the front-end ingest, where we bundle up deletion requests into DeletionList
/// and write them out, for later
pub async fn background(&mut self) {
info!("Started deletion frontend worker");
let mut recovered: bool = false;
loop {
let timeout = if self.pending_flushes.is_empty() {
FRONTEND_DEFAULT_TIMEOUT
} else {
FRONTEND_FLUSHING_TIMEOUT
};
let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => {
// Queue sender destroyed, shutting down
break;
}
Err(_) => {
// Hit deadline, flush.
self.flush().await;
continue;
}
};
// On first message, do recovery. This avoids unnecessary recovery very
// early in startup, and simplifies testing by avoiding a 404 reading the
// header on every first pageserver startup.
if !recovered {
// Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3
if let Err(e) = self.recover().await {
// This should only happen in truly unrecoverable cases, like the recovery finding that the backend
// queue receiver has been dropped.
info!(
"Deletion queue recover aborted, deletion queue will not proceed ({e:#})"
);
return;
} else {
recovered = true;
}
}
match msg {
FrontendQueueMessage::Delete(op) => {
debug!(
"Deletion enqueue {0} layers, {1} other objects",
op.layers.len(),
op.objects.len()
);
let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id);
for layer in op.layers {
// TODO go directly to remote path without composing local path
let local_path = timeline_path.join(layer.file_name());
let path = match self.conf.remote_path(&local_path) {
Ok(p) => p,
Err(e) => {
panic!("Can't make a timeline path! {e}");
}
};
self.pending.objects.push(path);
}
self.pending.objects.extend(op.objects.into_iter())
}
FrontendQueueMessage::Flush(op) => {
if self.pending.objects.is_empty() {
// Execute immediately
debug!("No pending objects, flushing immediately");
op.fire()
} else {
// Execute next time we flush
self.pending_flushes.push(op);
}
}
FrontendQueueMessage::FlushExecute(op) => {
// We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await {
info!("Can't flush, shutting down ({e})");
// Caller will get error when their oneshot sender was dropped.
}
}
}
if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE
|| !self.pending_flushes.is_empty()
{
self.flush().await;
}
}
info!("Deletion queue shut down.");
}
}
impl DeletionQueue {
pub fn new_client(&self) -> DeletionQueueClient {
DeletionQueueClient {
@@ -870,25 +257,14 @@ impl DeletionQueue {
(
Self { tx },
Some(FrontendQueueWorker {
pending: DeletionList::new(1),
remote_storage: remote_storage.clone(),
Some(FrontendQueueWorker::new(
remote_storage.clone(),
conf,
rx,
tx: backend_tx,
pending_flushes: Vec::new(),
backend_tx,
cancel,
}),
Some(BackendQueueWorker {
remote_storage,
conf,
rx: backend_rx,
accumulator: Vec::new(),
pending_lists: Vec::new(),
executed_lists: Vec::new(),
timeout: EXECUTE_IDLE_DEADLINE,
pending_flushes: Vec::new(),
}),
)),
Some(BackendQueueWorker::new(remote_storage, conf, backend_rx)),
)
}
}
@@ -900,6 +276,7 @@ mod test {
io::ErrorKind,
path::{Path, PathBuf},
};
use tracing::info;
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
use tokio::{runtime::EnterGuard, task::JoinHandle};
@@ -1155,6 +532,8 @@ mod test {
/// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
#[cfg(test)]
pub mod mock {
use tracing::info;
use super::*;
use std::sync::{
atomic::{AtomicUsize, Ordering},
@@ -1261,7 +640,7 @@ pub mod mock {
.expect("Mock delete queue shutdown while waiting to pump");
}
pub fn new_client(&self) -> DeletionQueueClient {
pub(crate) fn new_client(&self) -> DeletionQueueClient {
DeletionQueueClient {
tx: self.tx.clone(),
}

View File

@@ -0,0 +1,284 @@
use std::time::Duration;
use remote_storage::GenericRemoteStorage;
use remote_storage::RemotePath;
use tracing::debug;
use tracing::info;
use tracing::warn;
use crate::config::PageServerConf;
use crate::metrics::DELETION_QUEUE_ERRORS;
use crate::metrics::DELETION_QUEUE_EXECUTED;
use super::DeletionHeader;
use super::DeletionList;
use super::FlushOp;
// After this length of time, execute deletions which are elegible to run,
// even if we haven't accumulated enough for a full-sized DeleteObjects
const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60);
// If the last attempt to execute failed, wait only this long before
// trying again.
const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100);
// From the S3 spec
const MAX_KEYS_PER_DELETE: usize = 1000;
#[derive(Debug)]
pub(super) enum BackendQueueMessage {
Delete(DeletionList),
Flush(FlushOp),
}
pub struct BackendQueueWorker {
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
// Accumulate up to 1000 keys for the next deletion operation
accumulator: Vec<RemotePath>,
// DeletionLists we have fully ingested but might still have
// some keys in accumulator.
pending_lists: Vec<DeletionList>,
// DeletionLists we have fully executed, which may be deleted
// from remote storage.
executed_lists: Vec<DeletionList>,
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// How long to wait for a message before executing anyway
timeout: Duration,
}
impl BackendQueueWorker {
pub(super) fn new(
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
) -> Self {
Self {
remote_storage,
conf,
rx,
accumulator: Vec::new(),
pending_lists: Vec::new(),
executed_lists: Vec::new(),
timeout: EXECUTE_IDLE_DEADLINE,
pending_flushes: Vec::new(),
}
}
async fn maybe_execute(&mut self) -> bool {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
DELETION_QUEUE_ERRORS
.with_label_values(&["failpoint"])
.inc();
// Retry fast when failpoint is active, so that when it is disabled we resume promptly
self.timeout = EXECUTE_RETRY_DEADLINE;
false
});
if self.accumulator.is_empty() {
for f in self.pending_flushes.drain(..) {
f.fire();
}
return true;
}
match self.remote_storage.delete_objects(&self.accumulator).await {
Ok(()) => {
// Note: we assume that the remote storage layer returns Ok(()) if some
// or all of the deleted objects were already gone.
DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64);
info!(
"Executed deletion batch {}..{}",
self.accumulator
.first()
.expect("accumulator should be non-empty"),
self.accumulator
.last()
.expect("accumulator should be non-empty"),
);
self.accumulator.clear();
self.executed_lists.append(&mut self.pending_lists);
for f in self.pending_flushes.drain(..) {
f.fire();
}
self.timeout = EXECUTE_IDLE_DEADLINE;
true
}
Err(e) => {
warn!("DeleteObjects request failed: {e:#}, will retry");
DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc();
self.timeout = EXECUTE_RETRY_DEADLINE;
false
}
}
}
async fn cleanup_lists(&mut self) {
debug!(
"cleanup_lists: {0} executed lists, {1} pending lists",
self.executed_lists.len(),
self.pending_lists.len()
);
// Lists are always pushed into the queues + executed list in sequence order, so
// no sort is required: can find the highest sequence number by peeking at last element
let max_executed_seq = match self.executed_lists.last() {
Some(v) => v.sequence,
None => {
// No executed lists, nothing to clean up.
return;
}
};
// In case this is the last list, write a header out first so that
// we don't risk losing our knowledge of the sequence number (on replay, our
// next sequence number is the highest list seen + 1, or read from the header
// if there are no lists)
let header = DeletionHeader::new(max_executed_seq);
debug!("Writing header {:?}", header);
let bytes = serde_json::to_vec(&header).expect("Failed to serialize deletion header");
let size = bytes.len();
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
let header_key = self.conf.remote_deletion_header_path();
if let Err(e) = self
.remote_storage
.upload(source, size, &header_key, None)
.await
{
warn!("Failed to upload deletion queue header: {e:#}");
DELETION_QUEUE_ERRORS
.with_label_values(&["put_header"])
.inc();
return;
}
let executed_keys: Vec<RemotePath> = self
.executed_lists
.iter()
.rev()
.take(MAX_KEYS_PER_DELETE)
.map(|l| self.conf.remote_deletion_list_path(l.sequence))
.collect();
match self.remote_storage.delete_objects(&executed_keys).await {
Ok(()) => {
// Retain any lists that couldn't be deleted in that request
self.executed_lists
.truncate(self.executed_lists.len() - executed_keys.len());
}
Err(e) => {
warn!("Failed to delete deletion list(s): {e:#}");
// Do nothing: the elements remain in executed_lists, and purge will be retried
// next time we process some deletions and go around the loop.
DELETION_QUEUE_ERRORS
.with_label_values(&["delete_list"])
.inc();
}
}
}
pub async fn background(&mut self) {
// TODO: if we would like to be able to defer deletions while a Layer still has
// refs (but it will be elegible for deletion after process ends), then we may
// add an ephemeral part to BackendQueueMessage::Delete that tracks which keys
// in the deletion list may not be deleted yet, with guards to block on while
// we wait to proceed.
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
loop {
let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await {
Ok(Some(m)) => m,
Ok(None) => {
// All queue senders closed
info!("Shutting down");
break;
}
Err(_) => {
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
// return immediately if no work is pending
self.maybe_execute().await;
self.cleanup_lists().await;
continue;
}
};
match msg {
BackendQueueMessage::Delete(mut list) => {
if list.objects.is_empty() {
// This shouldn't happen, but is harmless. warn so that
// tests will fail if we have such a bug, but proceed with
// processing subsequent messages.
warn!("Empty DeletionList passed to deletion backend");
self.executed_lists.push(list);
continue;
}
// This loop handles deletion lists that require multiple DeleteObjects requests,
// and also handles retries if a deletion fails: we will keep going around until
// we have either deleted everything, or we have a remainder in accumulator.
while !list.objects.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE
{
let take_count = if self.accumulator.len() == MAX_KEYS_PER_DELETE {
0
} else {
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
std::cmp::min(available_slots, list.objects.len())
};
for object in list.objects.drain(list.objects.len() - take_count..) {
self.accumulator.push(object);
}
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
// Great, we got a full request: issue it.
if !self.maybe_execute().await {
// Failed to execute: retry delay
tokio::time::sleep(EXECUTE_RETRY_DEADLINE).await;
};
}
}
if !self.accumulator.is_empty() {
// We have a remainder, `list` not fully executed yet
self.pending_lists.push(list);
} else {
// We fully processed this list, it is ready for purge
self.executed_lists.push(list);
}
self.cleanup_lists().await;
}
BackendQueueMessage::Flush(op) => {
if self.accumulator.is_empty() {
op.fire();
continue;
}
self.maybe_execute().await;
if self.accumulator.is_empty() {
// Successful flush. Clean up lists before firing, for the benefit of tests that would
// like to have a deterministic state post-flush.
self.cleanup_lists().await;
op.fire();
} else {
// We didn't flush inline: defer until next time we successfully drain accumulatorr
self.pending_flushes.push(op);
}
}
}
}
}
}

View File

@@ -0,0 +1,408 @@
use super::BackendQueueMessage;
use super::DeletionHeader;
use super::DeletionList;
use super::FlushOp;
use super::FAILED_REMOTE_OP_RETRIES;
use super::FAILED_REMOTE_OP_WARN_THRESHOLD;
use std::time::Duration;
use regex::Regex;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::info;
use tracing::warn;
use utils::backoff;
use utils::id::TenantId;
use utils::id::TimelineId;
use crate::config::PageServerConf;
use crate::metrics::DELETION_QUEUE_ERRORS;
use crate::metrics::DELETION_QUEUE_SUBMITTED;
use crate::tenant::storage_layer::LayerFileName;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
// of magnitude 1MB when we are under heavy delete load.
const DELETION_LIST_TARGET_SIZE: usize = 16384;
// Ordinarily, we only flush to DeletionList periodically, to bound the window during
// which we might leak objects from not flushing a DeletionList after
// the objects are already unlinked from timeline metadata.
const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
// If someone is waiting for a flush to DeletionList, only delay a little to accumulate
// more objects before doing the flush.
const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
#[derive(Debug)]
pub(super) struct DeletionOp {
pub(super) tenant_id: TenantId,
pub(super) timeline_id: TimelineId,
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
// have a config object handy to project it to a remote key, and need the consuming worker
// to do it for you.
pub(super) layers: Vec<LayerFileName>,
pub(super) objects: Vec<RemotePath>,
}
#[derive(Debug)]
pub(super) enum FrontendQueueMessage {
Delete(DeletionOp),
// Wait until all prior deletions make it into a persistent DeletionList
Flush(FlushOp),
// Wait until all prior deletions have been executed (i.e. objects are actually deleted)
FlushExecute(FlushOp),
}
pub struct FrontendQueueWorker {
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
// The list we are currently building, contains a buffer of keys to delete
// and our next sequence number
pending: DeletionList,
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// Worker loop is torn down when this fires.
cancel: CancellationToken,
}
impl FrontendQueueWorker {
pub(super) fn new(
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
cancel: CancellationToken,
) -> Self {
Self {
pending: DeletionList::new(1),
remote_storage,
conf,
rx,
tx,
pending_flushes: Vec::new(),
cancel,
}
}
async fn upload_pending_list(&mut self) -> anyhow::Result<()> {
let key = &self.conf.remote_deletion_list_path(self.pending.sequence);
backoff::retry(
|| {
let bytes =
serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
let size = bytes.len();
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
self.remote_storage.upload(source, size, key, None)
},
|_| false,
FAILED_REMOTE_OP_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"upload deletion list",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")),
)
.await
}
/// Try to flush `list` to persistent storage
///
/// This does not return errors, because on failure to flush we do not lose
/// any state: flushing will be retried implicitly on the next deadline
async fn flush(&mut self) {
if self.pending.objects.is_empty() {
// We do not expect to be called in this state, but handle it so that later
// logging code can be assured that therre is always a first+last key to print
for f in self.pending_flushes.drain(..) {
f.fire();
}
return;
}
match self.upload_pending_list().await {
Ok(_) => {
info!(
sequence = self.pending.sequence,
"Stored deletion list ({0}..{1})",
self.pending
.objects
.first()
.expect("list should be non-empty"),
self.pending
.objects
.last()
.expect("list should be non-empty"),
);
for f in self.pending_flushes.drain(..) {
f.fire();
}
let mut onward_list = DeletionList::new(self.pending.sequence);
std::mem::swap(&mut onward_list.objects, &mut self.pending.objects);
// We have consumed out of pending: reset it for the next incoming deletions to accumulate there
self.pending = DeletionList::new(self.pending.sequence + 1);
if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await {
// This is allowed to fail: it will only happen if the backend worker is shut down,
// so we can just drop this on the floor.
info!("Deletion list dropped, this is normal during shutdown ({e:#})");
}
}
Err(e) => {
DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc();
warn!(
sequence = self.pending.sequence,
"Failed to write deletion list to remote storage, will retry later ({e:#})"
);
}
}
}
async fn recover(&mut self) -> Result<(), anyhow::Error> {
// Load header: this is not required to be present, e.g. when a pageserver first runs
let header_path = self.conf.remote_deletion_header_path();
let header_bytes = match backoff::retry(
|| self.remote_storage.download_all(&header_path),
|e| matches!(e, DownloadError::NotFound),
FAILED_REMOTE_OP_WARN_THRESHOLD,
u32::MAX,
"Reading deletion queue header",
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
)
.await
{
Ok(h) => Ok(Some(h)),
Err(e) => {
if let DownloadError::NotFound = e {
debug!("Deletion header {header_path} not found, first start?");
Ok(None)
} else {
Err(e)
}
}
}?;
if let Some(header_bytes) = header_bytes {
if let Some(header) = match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
Ok(h) => Some(h),
Err(e) => {
warn!("Failed to deserialize deletion header, ignoring {header_path}: {e:#}");
// This should never happen unless we make a mistake with our serialization.
// Ignoring a deletion header is not consequential for correctnes because all deletions
// are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
None
}
} {
self.pending.sequence =
std::cmp::max(self.pending.sequence, header.last_deleted_list_seq + 1);
};
};
let prefix = RemotePath::new(&self.conf.remote_deletion_node_prefix())
.expect("Failed to compose path");
let lists = backoff::retry(
|| async { self.remote_storage.list_prefixes(Some(&prefix)).await },
|_| false,
FAILED_REMOTE_OP_WARN_THRESHOLD,
u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck
"Recovering deletion lists",
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
)
.await?;
debug!("Loaded {} keys in deletion prefix {}", lists.len(), prefix);
let list_name_pattern =
Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{8})-([a-zA-Z0-9]{2}).list").unwrap();
let mut seqs: Vec<u64> = Vec::new();
for l in &lists {
if l == &header_path {
// Don't try and parse the header key as a list key
continue;
}
let basename = l
.strip_prefix(&prefix)
.expect("Stripping prefix frrom a prefix listobjects should always work");
let basename = match basename.to_str() {
Some(s) => s,
None => {
// Should never happen, we are the only ones writing objects here
warn!("Unexpected key encoding in deletion queue object");
continue;
}
};
let seq_part = if let Some(m) = list_name_pattern.captures(basename) {
m.get(1)
.expect("Non optional group should be present")
.as_str()
} else {
warn!("Unexpected key in deletion queue: {basename}");
continue;
};
let seq: u64 = match u64::from_str_radix(seq_part, 16) {
Ok(s) => s,
Err(e) => {
warn!("Malformed key '{basename}': {e}");
continue;
}
};
seqs.push(seq);
}
seqs.sort();
// Initialize the next sequence number in the frontend based on the maximum of the highest list we see,
// and the last list that was deleted according to the header. Combined with writing out the header
// prior to deletions, this guarnatees no re-use of sequence numbers.
if let Some(max_list_seq) = seqs.last() {
self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
}
for s in seqs {
let list_path = self.conf.remote_deletion_list_path(s);
let lists_body = backoff::retry(
|| self.remote_storage.download_all(&list_path),
|_| false,
FAILED_REMOTE_OP_WARN_THRESHOLD,
u32::MAX,
"Reading a deletion list",
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
)
.await?;
let deletion_list = match serde_json::from_slice::<DeletionList>(lists_body.as_slice())
{
Ok(l) => l,
Err(e) => {
// Drop the list on the floor: any objects it referenced will be left behind
// for scrubbing to clean up. This should never happen unless we have a serialization bug.
warn!(sequence = s, "Failed to deserialize deletion list: {e}");
continue;
}
};
// We will drop out of recovery if this fails: it indicates that we are shutting down
// or the backend has panicked
DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.objects.len() as u64);
self.tx
.send(BackendQueueMessage::Delete(deletion_list))
.await?;
}
info!(next_sequence = self.pending.sequence, "Replay complete");
Ok(())
}
/// This is the front-end ingest, where we bundle up deletion requests into DeletionList
/// and write them out, for later
pub async fn background(&mut self) {
info!("Started deletion frontend worker");
let mut recovered: bool = false;
loop {
let timeout = if self.pending_flushes.is_empty() {
FRONTEND_DEFAULT_TIMEOUT
} else {
FRONTEND_FLUSHING_TIMEOUT
};
let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => {
// Queue sender destroyed, shutting down
break;
}
Err(_) => {
// Hit deadline, flush.
self.flush().await;
continue;
}
};
// On first message, do recovery. This avoids unnecessary recovery very
// early in startup, and simplifies testing by avoiding a 404 reading the
// header on every first pageserver startup.
if !recovered {
// Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3
if let Err(e) = self.recover().await {
// This should only happen in truly unrecoverable cases, like the recovery finding that the backend
// queue receiver has been dropped.
info!(
"Deletion queue recover aborted, deletion queue will not proceed ({e:#})"
);
return;
} else {
recovered = true;
}
}
match msg {
FrontendQueueMessage::Delete(op) => {
debug!(
"Deletion enqueue {0} layers, {1} other objects",
op.layers.len(),
op.objects.len()
);
let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id);
for layer in op.layers {
// TODO go directly to remote path without composing local path
let local_path = timeline_path.join(layer.file_name());
let path = match self.conf.remote_path(&local_path) {
Ok(p) => p,
Err(e) => {
panic!("Can't make a timeline path! {e}");
}
};
self.pending.objects.push(path);
}
self.pending.objects.extend(op.objects.into_iter())
}
FrontendQueueMessage::Flush(op) => {
if self.pending.objects.is_empty() {
// Execute immediately
debug!("No pending objects, flushing immediately");
op.fire()
} else {
// Execute next time we flush
self.pending_flushes.push(op);
}
}
FrontendQueueMessage::FlushExecute(op) => {
// We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await {
info!("Can't flush, shutting down ({e})");
// Caller will get error when their oneshot sender was dropped.
}
}
}
if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE
|| !self.pending_flushes.is_empty()
{
self.flush().await;
}
}
info!("Deletion queue shut down.");
}
}