mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
deletion queue: refactor coalescing into Executor
This commit is contained in:
@@ -45,6 +45,9 @@ pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
|
||||
|
||||
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
|
||||
|
||||
// From the S3 spec
|
||||
pub const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
/// Path on the remote storage, relative to some inner prefix.
|
||||
/// The prefix is an implementation detail, that allows representing local paths
|
||||
/// as the remote ones, stripping the local storage prefix away.
|
||||
|
||||
@@ -353,7 +353,7 @@ fn start_pageserver(
|
||||
|
||||
// Set up deletion queue
|
||||
let deletion_queue_cancel = tokio_util::sync::CancellationToken::new();
|
||||
let (deletion_queue, deletion_frontend, deletion_backend) =
|
||||
let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) =
|
||||
DeletionQueue::new(remote_storage.clone(), conf, deletion_queue_cancel.clone());
|
||||
if let Some(mut deletion_frontend) = deletion_frontend {
|
||||
BACKGROUND_RUNTIME.spawn(async move {
|
||||
@@ -371,6 +371,14 @@ fn start_pageserver(
|
||||
.await
|
||||
});
|
||||
}
|
||||
if let Some(mut deletion_executor) = deletion_executor {
|
||||
BACKGROUND_RUNTIME.spawn(async move {
|
||||
deletion_executor
|
||||
.background()
|
||||
.instrument(info_span!(parent: None, "deletion executor"))
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
// duration prior to starting I/O intensive phase of startup.
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
mod backend;
|
||||
mod executor;
|
||||
mod frontend;
|
||||
|
||||
use crate::metrics::DELETION_QUEUE_SUBMITTED;
|
||||
@@ -13,9 +14,11 @@ use tracing::{self, debug, error};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub(crate) use self::backend::BackendQueueWorker;
|
||||
use self::executor::ExecutorWorker;
|
||||
use self::frontend::DeletionOp;
|
||||
pub(crate) use self::frontend::FrontendQueueWorker;
|
||||
use backend::BackendQueueMessage;
|
||||
use executor::ExecutorMessage;
|
||||
use frontend::FrontendQueueMessage;
|
||||
|
||||
use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
|
||||
@@ -55,7 +58,7 @@ const FAILED_REMOTE_OP_RETRIES: u32 = 10;
|
||||
/// a DeletionHeader
|
||||
#[derive(Clone)]
|
||||
pub struct DeletionQueue {
|
||||
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
|
||||
client: DeletionQueueClient,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -75,6 +78,7 @@ impl FlushOp {
|
||||
#[derive(Clone)]
|
||||
pub struct DeletionQueueClient {
|
||||
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -168,23 +172,6 @@ impl DeletionQueueClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Just like push_layers, but using some already-known remote paths, instead of abstract layer names
|
||||
pub(crate) async fn push_objects(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
objects: Vec<RemotePath>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
DELETION_QUEUE_SUBMITTED.inc_by(objects.len() as u64);
|
||||
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
layers: Vec::new(),
|
||||
objects,
|
||||
}))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn do_flush(
|
||||
&self,
|
||||
msg: FrontendQueueMessage,
|
||||
@@ -223,13 +210,39 @@ impl DeletionQueueClient {
|
||||
debug!("flush_execute: finished flushing execution...");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This interface bypasses the persistent deletion queue, and any validation
|
||||
/// that this pageserver is still elegible to execute the deletions. It is for
|
||||
/// use in timeline deletions, where the control plane is telling us we may
|
||||
/// delete everything in the timeline.
|
||||
///
|
||||
/// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
|
||||
pub(crate) async fn push_immediate(
|
||||
&self,
|
||||
objects: Vec<RemotePath>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.executor_tx
|
||||
.send(ExecutorMessage::Delete(objects))
|
||||
.await
|
||||
.map_err(|_| DeletionQueueError::ShuttingDown)
|
||||
}
|
||||
|
||||
/// Companion to push_immediate. When this returns Ok, all prior objects sent
|
||||
/// into push_immediate have been deleted from remote storage.
|
||||
pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
self.executor_tx
|
||||
.send(ExecutorMessage::Flush(FlushOp { tx }))
|
||||
.await
|
||||
.map_err(|_| DeletionQueueError::ShuttingDown)?;
|
||||
|
||||
rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
|
||||
impl DeletionQueue {
|
||||
pub fn new_client(&self) -> DeletionQueueClient {
|
||||
DeletionQueueClient {
|
||||
tx: self.tx.clone(),
|
||||
}
|
||||
self.client.clone()
|
||||
}
|
||||
|
||||
/// Caller may use the returned object to construct clients with new_client.
|
||||
@@ -245,26 +258,57 @@ impl DeletionQueue {
|
||||
Self,
|
||||
Option<FrontendQueueWorker>,
|
||||
Option<BackendQueueWorker>,
|
||||
Option<ExecutorWorker>,
|
||||
) {
|
||||
// Deep channel: it consumes deletions from all timelines and we do not want to block them
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
|
||||
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
// Shallow channel: it carries lists of paths, and we expect the main queueing to
|
||||
// happen in the backend (persistent), not in this queue.
|
||||
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
let remote_storage = match remote_storage {
|
||||
None => return (Self { tx }, None, None),
|
||||
None => {
|
||||
return (
|
||||
Self {
|
||||
client: DeletionQueueClient { tx, executor_tx },
|
||||
},
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
}
|
||||
Some(r) => r,
|
||||
};
|
||||
|
||||
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
(
|
||||
Self { tx },
|
||||
Self {
|
||||
client: DeletionQueueClient {
|
||||
tx,
|
||||
executor_tx: executor_tx.clone(),
|
||||
},
|
||||
},
|
||||
Some(FrontendQueueWorker::new(
|
||||
remote_storage.clone(),
|
||||
conf,
|
||||
rx,
|
||||
backend_tx,
|
||||
cancel,
|
||||
cancel.clone(),
|
||||
)),
|
||||
Some(BackendQueueWorker::new(
|
||||
remote_storage.clone(),
|
||||
conf,
|
||||
backend_rx,
|
||||
executor_tx,
|
||||
)),
|
||||
Some(ExecutorWorker::new(
|
||||
remote_storage,
|
||||
executor_rx,
|
||||
cancel.clone(),
|
||||
)),
|
||||
Some(BackendQueueWorker::new(remote_storage, conf, backend_rx)),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -296,12 +340,13 @@ mod test {
|
||||
deletion_queue: DeletionQueue,
|
||||
fe_worker: JoinHandle<()>,
|
||||
be_worker: JoinHandle<()>,
|
||||
ex_worker: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
/// Simulate a pageserver restart by destroying and recreating the deletion queue
|
||||
fn restart(&mut self) {
|
||||
let (deletion_queue, fe_worker, be_worker) = DeletionQueue::new(
|
||||
let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new(
|
||||
Some(self.storage.clone()),
|
||||
self.harness.conf,
|
||||
CancellationToken::new(),
|
||||
@@ -311,18 +356,24 @@ mod test {
|
||||
|
||||
let mut fe_worker = fe_worker.unwrap();
|
||||
let mut be_worker = be_worker.unwrap();
|
||||
let mut ex_worker = ex_worker.unwrap();
|
||||
let mut fe_worker = self
|
||||
.runtime
|
||||
.spawn(async move { fe_worker.background().await });
|
||||
let mut be_worker = self
|
||||
.runtime
|
||||
.spawn(async move { be_worker.background().await });
|
||||
let mut ex_worker = self.runtime.spawn(async move {
|
||||
drop(ex_worker.background().await);
|
||||
});
|
||||
std::mem::swap(&mut self.fe_worker, &mut fe_worker);
|
||||
std::mem::swap(&mut self.be_worker, &mut be_worker);
|
||||
std::mem::swap(&mut self.ex_worker, &mut ex_worker);
|
||||
|
||||
// Join the old workers
|
||||
self.runtime.block_on(fe_worker).unwrap();
|
||||
self.runtime.block_on(be_worker).unwrap();
|
||||
self.runtime.block_on(ex_worker).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -356,7 +407,7 @@ mod test {
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let (deletion_queue, fe_worker, be_worker) = DeletionQueue::new(
|
||||
let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new(
|
||||
Some(storage.clone()),
|
||||
harness.conf,
|
||||
CancellationToken::new(),
|
||||
@@ -364,8 +415,12 @@ mod test {
|
||||
|
||||
let mut fe_worker = fe_worker.unwrap();
|
||||
let mut be_worker = be_worker.unwrap();
|
||||
let mut ex_worker = ex_worker.unwrap();
|
||||
let fe_worker_join = runtime.spawn(async move { fe_worker.background().await });
|
||||
let be_worker_join = runtime.spawn(async move { be_worker.background().await });
|
||||
let ex_worker_join = runtime.spawn(async move {
|
||||
drop(ex_worker.background().await);
|
||||
});
|
||||
|
||||
Ok(TestSetup {
|
||||
runtime,
|
||||
@@ -376,6 +431,7 @@ mod test {
|
||||
deletion_queue,
|
||||
fe_worker: fe_worker_join,
|
||||
be_worker: be_worker_join,
|
||||
ex_worker: ex_worker_join,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -542,6 +598,7 @@ pub mod mock {
|
||||
|
||||
pub struct MockDeletionQueue {
|
||||
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
tx_pump: tokio::sync::mpsc::Sender<FlushOp>,
|
||||
executed: Arc<AtomicUsize>,
|
||||
}
|
||||
@@ -553,6 +610,7 @@ pub mod mock {
|
||||
) -> Self {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(16384);
|
||||
let (tx_pump, mut rx_pump) = tokio::sync::mpsc::channel::<FlushOp>(1);
|
||||
let (executor_tx, mut executor_rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
let executed = Arc::new(AtomicUsize::new(0));
|
||||
let executed_bg = executed.clone();
|
||||
@@ -569,6 +627,31 @@ pub mod mock {
|
||||
// Each time we are asked to pump, drain the queue of deletions
|
||||
while let Some(flush_op) = rx_pump.recv().await {
|
||||
info!("Executing all pending deletions");
|
||||
|
||||
// Transform all executor messages to generic frontend messages
|
||||
while let Ok(msg) = executor_rx.try_recv() {
|
||||
match msg {
|
||||
ExecutorMessage::Delete(objects) => {
|
||||
for path in objects {
|
||||
match remote_storage.delete(&path).await {
|
||||
Ok(_) => {
|
||||
debug!("Deleted {path}");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to delete {path}, leaking object! ({e})"
|
||||
);
|
||||
}
|
||||
}
|
||||
executed_bg.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
ExecutorMessage::Flush(flush_op) => {
|
||||
flush_op.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while let Ok(msg) = rx.try_recv() {
|
||||
match msg {
|
||||
FrontendQueueMessage::Delete(op) => {
|
||||
@@ -622,6 +705,7 @@ pub mod mock {
|
||||
Self {
|
||||
tx,
|
||||
tx_pump,
|
||||
executor_tx,
|
||||
executed,
|
||||
}
|
||||
}
|
||||
@@ -643,6 +727,7 @@ pub mod mock {
|
||||
pub(crate) fn new_client(&self) -> DeletionQueueClient {
|
||||
DeletionQueueClient {
|
||||
tx: self.tx.clone(),
|
||||
executor_tx: self.executor_tx.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,14 +2,15 @@ use std::time::Duration;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::DELETION_QUEUE_ERRORS;
|
||||
use crate::metrics::DELETION_QUEUE_EXECUTED;
|
||||
|
||||
use super::executor::ExecutorMessage;
|
||||
use super::DeletionHeader;
|
||||
use super::DeletionList;
|
||||
use super::FlushOp;
|
||||
@@ -18,12 +19,8 @@ use super::FlushOp;
|
||||
// even if we haven't accumulated enough for a full-sized DeleteObjects
|
||||
const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60);
|
||||
|
||||
// If the last attempt to execute failed, wait only this long before
|
||||
// trying again.
|
||||
const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_millis(100);
|
||||
|
||||
// From the S3 spec
|
||||
const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
// If we have received this number of keys, proceed with attempting to execute
|
||||
const AUTOFLUSH_KEY_COUNT: usize = 16384;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) enum BackendQueueMessage {
|
||||
@@ -34,23 +31,20 @@ pub struct BackendQueueWorker {
|
||||
remote_storage: GenericRemoteStorage,
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
|
||||
// Accumulate up to 1000 keys for the next deletion operation
|
||||
accumulator: Vec<RemotePath>,
|
||||
|
||||
// DeletionLists we have fully ingested but might still have
|
||||
// some keys in accumulator.
|
||||
// Accumulate some lists to execute in a batch.
|
||||
// The purpose of this accumulation is to implement batched validation of
|
||||
// attachment generations, when split-brain protection is implemented.
|
||||
// (see https://github.com/neondatabase/neon/pull/4919)
|
||||
pending_lists: Vec<DeletionList>,
|
||||
|
||||
// Sum of all the lengths of lists in pending_lists
|
||||
pending_key_count: usize,
|
||||
|
||||
// DeletionLists we have fully executed, which may be deleted
|
||||
// from remote storage.
|
||||
executed_lists: Vec<DeletionList>,
|
||||
|
||||
// These FlushOps should fire the next time we flush
|
||||
pending_flushes: Vec<FlushOp>,
|
||||
|
||||
// How long to wait for a message before executing anyway
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl BackendQueueWorker {
|
||||
@@ -58,67 +52,16 @@ impl BackendQueueWorker {
|
||||
remote_storage: GenericRemoteStorage,
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
) -> Self {
|
||||
Self {
|
||||
remote_storage,
|
||||
conf,
|
||||
rx,
|
||||
accumulator: Vec::new(),
|
||||
tx,
|
||||
pending_lists: Vec::new(),
|
||||
pending_key_count: 0,
|
||||
executed_lists: Vec::new(),
|
||||
timeout: EXECUTE_IDLE_DEADLINE,
|
||||
pending_flushes: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn maybe_execute(&mut self) -> bool {
|
||||
fail::fail_point!("deletion-queue-before-execute", |_| {
|
||||
info!("Skipping execution, failpoint set");
|
||||
DELETION_QUEUE_ERRORS
|
||||
.with_label_values(&["failpoint"])
|
||||
.inc();
|
||||
|
||||
// Retry fast when failpoint is active, so that when it is disabled we resume promptly
|
||||
self.timeout = EXECUTE_RETRY_DEADLINE;
|
||||
false
|
||||
});
|
||||
|
||||
if self.accumulator.is_empty() {
|
||||
for f in self.pending_flushes.drain(..) {
|
||||
f.fire();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
match self.remote_storage.delete_objects(&self.accumulator).await {
|
||||
Ok(()) => {
|
||||
// Note: we assume that the remote storage layer returns Ok(()) if some
|
||||
// or all of the deleted objects were already gone.
|
||||
DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64);
|
||||
info!(
|
||||
"Executed deletion batch {}..{}",
|
||||
self.accumulator
|
||||
.first()
|
||||
.expect("accumulator should be non-empty"),
|
||||
self.accumulator
|
||||
.last()
|
||||
.expect("accumulator should be non-empty"),
|
||||
);
|
||||
self.accumulator.clear();
|
||||
self.executed_lists.append(&mut self.pending_lists);
|
||||
|
||||
for f in self.pending_flushes.drain(..) {
|
||||
f.fire();
|
||||
}
|
||||
self.timeout = EXECUTE_IDLE_DEADLINE;
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("DeleteObjects request failed: {e:#}, will retry");
|
||||
DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc();
|
||||
self.timeout = EXECUTE_RETRY_DEADLINE;
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,6 +130,34 @@ impl BackendQueueWorker {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn flush(&mut self) {
|
||||
let mut onward_lists: Vec<DeletionList> = Vec::new();
|
||||
std::mem::swap(&mut onward_lists, &mut self.pending_lists);
|
||||
for list in onward_lists {
|
||||
let objects = list.objects.clone();
|
||||
// TODO: a take_objects method
|
||||
self.executed_lists.push(list);
|
||||
if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
};
|
||||
}
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
let flush_op = FlushOp { tx };
|
||||
if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
};
|
||||
|
||||
if rx.await.is_err() {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
}
|
||||
|
||||
self.cleanup_lists().await;
|
||||
}
|
||||
|
||||
pub async fn background(&mut self) {
|
||||
// TODO: if we would like to be able to defer deletions while a Layer still has
|
||||
// refs (but it will be elegible for deletion after process ends), then we may
|
||||
@@ -194,10 +165,8 @@ impl BackendQueueWorker {
|
||||
// in the deletion list may not be deleted yet, with guards to block on while
|
||||
// we wait to proceed.
|
||||
|
||||
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
|
||||
|
||||
loop {
|
||||
let msg = match tokio::time::timeout(self.timeout, self.rx.recv()).await {
|
||||
let msg = match tokio::time::timeout(EXECUTE_IDLE_DEADLINE, self.rx.recv()).await {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
// All queue senders closed
|
||||
@@ -207,15 +176,14 @@ impl BackendQueueWorker {
|
||||
Err(_) => {
|
||||
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
|
||||
// return immediately if no work is pending
|
||||
self.maybe_execute().await;
|
||||
self.cleanup_lists().await;
|
||||
self.flush().await;
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match msg {
|
||||
BackendQueueMessage::Delete(mut list) => {
|
||||
BackendQueueMessage::Delete(list) => {
|
||||
if list.objects.is_empty() {
|
||||
// This shouldn't happen, but is harmless. warn so that
|
||||
// tests will fail if we have such a bug, but proceed with
|
||||
@@ -225,58 +193,16 @@ impl BackendQueueWorker {
|
||||
continue;
|
||||
}
|
||||
|
||||
// This loop handles deletion lists that require multiple DeleteObjects requests,
|
||||
// and also handles retries if a deletion fails: we will keep going around until
|
||||
// we have either deleted everything, or we have a remainder in accumulator.
|
||||
while !list.objects.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE
|
||||
{
|
||||
let take_count = if self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
0
|
||||
} else {
|
||||
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
|
||||
std::cmp::min(available_slots, list.objects.len())
|
||||
};
|
||||
self.pending_key_count += list.objects.len();
|
||||
self.pending_lists.push(list);
|
||||
|
||||
for object in list.objects.drain(list.objects.len() - take_count..) {
|
||||
self.accumulator.push(object);
|
||||
}
|
||||
|
||||
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
// Great, we got a full request: issue it.
|
||||
if !self.maybe_execute().await {
|
||||
// Failed to execute: retry delay
|
||||
tokio::time::sleep(EXECUTE_RETRY_DEADLINE).await;
|
||||
};
|
||||
}
|
||||
if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
|
||||
self.flush().await;
|
||||
}
|
||||
|
||||
if !self.accumulator.is_empty() {
|
||||
// We have a remainder, `list` not fully executed yet
|
||||
self.pending_lists.push(list);
|
||||
} else {
|
||||
// We fully processed this list, it is ready for purge
|
||||
self.executed_lists.push(list);
|
||||
}
|
||||
|
||||
self.cleanup_lists().await;
|
||||
}
|
||||
BackendQueueMessage::Flush(op) => {
|
||||
if self.accumulator.is_empty() {
|
||||
op.fire();
|
||||
continue;
|
||||
}
|
||||
|
||||
self.maybe_execute().await;
|
||||
|
||||
if self.accumulator.is_empty() {
|
||||
// Successful flush. Clean up lists before firing, for the benefit of tests that would
|
||||
// like to have a deterministic state post-flush.
|
||||
self.cleanup_lists().await;
|
||||
op.fire();
|
||||
} else {
|
||||
// We didn't flush inline: defer until next time we successfully drain accumulatorr
|
||||
self.pending_flushes.push(op);
|
||||
}
|
||||
self.flush().await;
|
||||
op.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
143
pageserver/src/deletion_queue/executor.rs
Normal file
143
pageserver/src/deletion_queue/executor.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::metrics::DELETION_QUEUE_ERRORS;
|
||||
use crate::metrics::DELETION_QUEUE_EXECUTED;
|
||||
|
||||
use super::DeletionQueueError;
|
||||
use super::FlushOp;
|
||||
|
||||
const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
pub(super) enum ExecutorMessage {
|
||||
Delete(Vec<RemotePath>),
|
||||
Flush(FlushOp),
|
||||
}
|
||||
|
||||
/// Non-persistent deletion queue, for coalescing multiple object deletes into
|
||||
/// larger DeleteObjects requests.
|
||||
pub struct ExecutorWorker {
|
||||
// Accumulate up to 1000 keys for the next deletion operation
|
||||
accumulator: Vec<RemotePath>,
|
||||
|
||||
rx: tokio::sync::mpsc::Receiver<ExecutorMessage>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
}
|
||||
|
||||
impl ExecutorWorker {
|
||||
pub(super) fn new(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
rx: tokio::sync::mpsc::Receiver<ExecutorMessage>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
remote_storage,
|
||||
rx,
|
||||
cancel,
|
||||
accumulator: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap the remote `delete_objects` with a failpoint
|
||||
pub async fn remote_delete(&self) -> Result<(), anyhow::Error> {
|
||||
fail::fail_point!("deletion-queue-before-execute", |_| {
|
||||
info!("Skipping execution, failpoint set");
|
||||
DELETION_QUEUE_ERRORS
|
||||
.with_label_values(&["failpoint"])
|
||||
.inc();
|
||||
return Err(anyhow::anyhow!("failpoint hit"));
|
||||
});
|
||||
|
||||
self.remote_storage.delete_objects(&self.accumulator).await
|
||||
}
|
||||
|
||||
/// Block until everything in accumulator has been executed
|
||||
pub async fn flush(&mut self) -> Result<(), DeletionQueueError> {
|
||||
while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
|
||||
match self.remote_delete().await {
|
||||
Ok(()) => {
|
||||
// Note: we assume that the remote storage layer returns Ok(()) if some
|
||||
// or all of the deleted objects were already gone.
|
||||
DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64);
|
||||
info!(
|
||||
"Executed deletion batch {}..{}",
|
||||
self.accumulator
|
||||
.first()
|
||||
.expect("accumulator should be non-empty"),
|
||||
self.accumulator
|
||||
.last()
|
||||
.expect("accumulator should be non-empty"),
|
||||
);
|
||||
self.accumulator.clear();
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("DeleteObjects request failed: {e:#}, will retry");
|
||||
DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc();
|
||||
}
|
||||
};
|
||||
}
|
||||
if self.cancel.is_cancelled() {
|
||||
// Expose an error because we may not have actually flushed everything
|
||||
Err(DeletionQueueError::ShuttingDown)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn background(&mut self) -> Result<(), DeletionQueueError> {
|
||||
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
|
||||
let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
// All queue senders closed
|
||||
info!("Shutting down");
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
|
||||
// return immediately if no work is pending
|
||||
self.flush().await?;
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match msg {
|
||||
ExecutorMessage::Delete(mut list) => {
|
||||
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
self.flush().await?;
|
||||
// If we have received this number of keys, proceed with attempting to execute
|
||||
assert_eq!(self.accumulator.len(), 0);
|
||||
}
|
||||
|
||||
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
|
||||
let take_count = std::cmp::min(available_slots, list.len());
|
||||
for path in list.drain(list.len() - take_count..) {
|
||||
self.accumulator.push(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
ExecutorMessage::Flush(flush_op) => {
|
||||
// If flush() errors, we drop the flush_op and the caller will get
|
||||
// an error recv()'ing their oneshot channel.
|
||||
self.flush().await?;
|
||||
flush_op.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -821,9 +821,19 @@ impl RemoteTimelineClient {
|
||||
|
||||
let layer_deletion_count = layers.len();
|
||||
|
||||
deletion_queue
|
||||
.push_layers(self.tenant_id, self.timeline_id, layers)
|
||||
.await?;
|
||||
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
|
||||
let layer_paths = layers
|
||||
.into_iter()
|
||||
.map(|l| {
|
||||
let local_path = timeline_path.join(l.file_name());
|
||||
let remote_path = self
|
||||
.conf
|
||||
.remote_path(&local_path)
|
||||
.expect("Timeline path should always convert to remote");
|
||||
remote_path
|
||||
})
|
||||
.collect();
|
||||
deletion_queue.push_immediate(layer_paths).await?;
|
||||
|
||||
// Do not delete index part yet, it is needed for possible retry. If we remove it first
|
||||
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
|
||||
@@ -832,7 +842,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Execute all pending deletions, so that when we prroceed to do a list_prefixes below, we aren't
|
||||
// taking the burden of listing all the layers that we already know we should delete.
|
||||
deletion_queue.flush_execute().await?;
|
||||
deletion_queue.flush_immediate().await?;
|
||||
|
||||
let remaining = backoff::retry(
|
||||
|| async {
|
||||
@@ -863,9 +873,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
let not_referenced_count = remaining.len();
|
||||
if !remaining.is_empty() {
|
||||
deletion_queue
|
||||
.push_objects(self.tenant_id, self.timeline_id, remaining)
|
||||
.await?;
|
||||
deletion_queue.push_immediate(remaining).await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-delete", |_| {
|
||||
@@ -878,12 +886,12 @@ impl RemoteTimelineClient {
|
||||
|
||||
debug!("enqueuing index part deletion");
|
||||
deletion_queue
|
||||
.push_objects(self.tenant_id, self.timeline_id, [index_file_path].to_vec())
|
||||
.push_immediate([index_file_path].to_vec())
|
||||
.await?;
|
||||
|
||||
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
|
||||
// for a flush to a persistent deletion list so that we may be sure deletion will occur.
|
||||
deletion_queue.flush_execute().await?;
|
||||
deletion_queue.flush_immediate().await?;
|
||||
|
||||
fail::fail_point!("timeline-delete-after-index-delete", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
|
||||
@@ -52,7 +52,7 @@ def test_tenant_delete_smoke(
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion frontend: Failed to write deletion list.*",
|
||||
".*deletion backend: Failed to delete deletion list.*",
|
||||
".*deletion backend: DeleteObjects request failed.*",
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
".*deletion backend: Failed to upload deletion queue header.*",
|
||||
]
|
||||
)
|
||||
@@ -218,7 +218,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion frontend: Failed to write deletion list.*",
|
||||
".*deletion backend: Failed to delete deletion list.*",
|
||||
".*deletion backend: DeleteObjects request failed.*",
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
".*deletion backend: Failed to upload deletion queue header.*",
|
||||
]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user