mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
## Problem Follow up of https://github.com/neondatabase/neon/pull/9682, that patch didn't fully address the problem: what if shutdown fails due to whatever reason and then we reattach the tenant? Then we will still remove the future layer. The underlying problem is that the fix for #5878 gets voided because of the generation optimizations. Of course, we also need to ensure that delete happens after uploads, but note that we only schedule deletes when there are no ongoing upload tasks, so that's fine. ## Summary of changes * Add a test case to reproduce the behavior (by changing the original test case to attach the same generation). * If layer upload happens after the deletion, drain the deletion queue before uploading. * If blocked_deletion is enabled, directly remove it from the blocked_deletion queue. * Local fs backend fix to avoid race between deletion and preload. * test_emergency_mode does not need to wait for uploads (and it's generally not possible to wait for uploads). * ~~Optimize deletion executor to skip validation if there are no files to delete.~~ this doesn't work --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
183 lines
6.7 KiB
Rust
183 lines
6.7 KiB
Rust
//! The deleter is the final stage in the deletion queue. It accumulates remote
|
|
//! paths to delete, and periodically executes them in batches of up to 1000
|
|
//! using the DeleteObjects request.
|
|
//!
|
|
//! Its purpose is to increase efficiency of remote storage I/O by issuing a smaller
|
|
//! number of full-sized DeleteObjects requests, rather than a larger number of
|
|
//! smaller requests.
|
|
|
|
use remote_storage::GenericRemoteStorage;
|
|
use remote_storage::RemotePath;
|
|
use remote_storage::TimeoutOrCancel;
|
|
use remote_storage::MAX_KEYS_PER_DELETE;
|
|
use std::time::Duration;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::info;
|
|
use tracing::warn;
|
|
use utils::backoff;
|
|
use utils::pausable_failpoint;
|
|
|
|
use crate::metrics;
|
|
|
|
use super::DeletionQueueError;
|
|
use super::FlushOp;
|
|
|
|
const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
|
|
|
|
pub(super) enum DeleterMessage {
|
|
Delete(Vec<RemotePath>),
|
|
Flush(FlushOp),
|
|
}
|
|
|
|
/// Non-persistent deletion queue, for coalescing multiple object deletes into
|
|
/// larger DeleteObjects requests.
|
|
pub(super) struct Deleter {
|
|
// Accumulate up to 1000 keys for the next deletion operation
|
|
accumulator: Vec<RemotePath>,
|
|
|
|
rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
|
|
|
|
cancel: CancellationToken,
|
|
remote_storage: GenericRemoteStorage,
|
|
}
|
|
|
|
impl Deleter {
|
|
pub(super) fn new(
|
|
remote_storage: GenericRemoteStorage,
|
|
rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
|
|
cancel: CancellationToken,
|
|
) -> Self {
|
|
Self {
|
|
remote_storage,
|
|
rx,
|
|
cancel,
|
|
accumulator: Vec::new(),
|
|
}
|
|
}
|
|
|
|
/// Wrap the remote `delete_objects` with a failpoint
|
|
async fn remote_delete(&self) -> Result<(), anyhow::Error> {
|
|
// A backoff::retry is used here for two reasons:
|
|
// - To provide a backoff rather than busy-polling the API on errors
|
|
// - To absorb transient 429/503 conditions without hitting our error
|
|
// logging path for issues deleting objects.
|
|
backoff::retry(
|
|
|| async {
|
|
fail::fail_point!("deletion-queue-before-execute", |_| {
|
|
info!("Skipping execution, failpoint set");
|
|
|
|
metrics::DELETION_QUEUE
|
|
.remote_errors
|
|
.with_label_values(&["failpoint"])
|
|
.inc();
|
|
Err(anyhow::anyhow!("failpoint: deletion-queue-before-execute"))
|
|
});
|
|
|
|
self.remote_storage
|
|
.delete_objects(&self.accumulator, &self.cancel)
|
|
.await
|
|
},
|
|
TimeoutOrCancel::caused_by_cancel,
|
|
3,
|
|
10,
|
|
"executing deletion batch",
|
|
&self.cancel,
|
|
)
|
|
.await
|
|
.ok_or_else(|| anyhow::anyhow!("Shutting down"))
|
|
.and_then(|x| x)
|
|
}
|
|
|
|
/// Block until everything in accumulator has been executed
|
|
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
|
|
while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
|
|
pausable_failpoint!("deletion-queue-before-execute-pause");
|
|
match self.remote_delete().await {
|
|
Ok(()) => {
|
|
// Note: we assume that the remote storage layer returns Ok(()) if some
|
|
// or all of the deleted objects were already gone.
|
|
metrics::DELETION_QUEUE
|
|
.keys_executed
|
|
.inc_by(self.accumulator.len() as u64);
|
|
info!(
|
|
"Executed deletion batch {}..{}",
|
|
self.accumulator
|
|
.first()
|
|
.expect("accumulator should be non-empty"),
|
|
self.accumulator
|
|
.last()
|
|
.expect("accumulator should be non-empty"),
|
|
);
|
|
self.accumulator.clear();
|
|
}
|
|
Err(e) => {
|
|
if self.cancel.is_cancelled() {
|
|
return Err(DeletionQueueError::ShuttingDown);
|
|
}
|
|
warn!("DeleteObjects request failed: {e:#}, will continue trying");
|
|
metrics::DELETION_QUEUE
|
|
.remote_errors
|
|
.with_label_values(&["execute"])
|
|
.inc();
|
|
}
|
|
};
|
|
}
|
|
if self.cancel.is_cancelled() {
|
|
// Expose an error because we may not have actually flushed everything
|
|
Err(DeletionQueueError::ShuttingDown)
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
|
|
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
|
|
|
|
loop {
|
|
if self.cancel.is_cancelled() {
|
|
return Err(DeletionQueueError::ShuttingDown);
|
|
}
|
|
|
|
let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
|
|
Ok(Some(m)) => m,
|
|
Ok(None) => {
|
|
// All queue senders closed
|
|
info!("Shutting down");
|
|
return Err(DeletionQueueError::ShuttingDown);
|
|
}
|
|
Err(_) => {
|
|
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
|
|
// return immediately if no work is pending
|
|
self.flush().await?;
|
|
|
|
continue;
|
|
}
|
|
};
|
|
|
|
match msg {
|
|
DeleterMessage::Delete(mut list) => {
|
|
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
|
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
|
self.flush().await?;
|
|
// If we have received this number of keys, proceed with attempting to execute
|
|
assert_eq!(self.accumulator.len(), 0);
|
|
}
|
|
|
|
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
|
|
let take_count = std::cmp::min(available_slots, list.len());
|
|
for path in list.drain(list.len() - take_count..) {
|
|
self.accumulator.push(path);
|
|
}
|
|
}
|
|
}
|
|
DeleterMessage::Flush(flush_op) => {
|
|
// If flush() errors, we drop the flush_op and the caller will get
|
|
// an error recv()'ing their oneshot channel.
|
|
self.flush().await?;
|
|
flush_op.notify();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|