Implement flush_execute() in deletion queue

This commit is contained in:
John Spray
2023-08-10 13:40:56 +01:00
parent de4882886e
commit 537eca489e

View File

@@ -52,13 +52,16 @@ const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100);
/// of 1024 for execution via a DeleteObjects call.
#[derive(Clone)]
pub struct DeletionQueue {
tx: tokio::sync::mpsc::Sender<QueueMessage>,
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
}
#[derive(Debug)]
enum QueueMessage {
enum FrontendQueueMessage {
Delete(DeletionOp),
// Wait until all prior deletions make it into a persistent DeletionList
Flush(FlushOp),
// Wait until all prior deletions have been executed (i.e. objects are actually deleted)
FlushExecute(FlushOp),
}
#[derive(Debug)]
@@ -84,7 +87,7 @@ impl FlushOp {
#[derive(Clone)]
pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::Sender<QueueMessage>,
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
}
#[serde_as]
@@ -108,7 +111,7 @@ impl DeletionList {
}
impl DeletionQueueClient {
async fn do_push(&self, msg: QueueMessage) {
async fn do_push(&self, msg: FrontendQueueMessage) {
match self.tx.send(msg).await {
Ok(_) => {}
Err(e) => {
@@ -120,13 +123,17 @@ impl DeletionQueueClient {
}
}
/// Submit a list of layers for deletion: this function will return before the deletion is
/// persistent, but it may be executed at any time after this function enters: do not push
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
/// references them).
pub async fn push(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
layers: Vec<LayerFileName>,
) {
self.do_push(QueueMessage::Delete(DeletionOp {
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
tenant_id,
timeline_id,
layers,
@@ -134,9 +141,8 @@ impl DeletionQueueClient {
.await;
}
pub async fn flush(&self) {
let (tx, rx) = tokio::sync::oneshot::channel();
self.do_push(QueueMessage::Flush(FlushOp { tx })).await;
async fn do_flush(&self, msg: FrontendQueueMessage, rx: tokio::sync::oneshot::Receiver<()>) {
self.do_push(msg).await;
if let Err(_) = rx.await {
// This shouldn't happen if tenants are shut down before deletion queue. If we
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
@@ -144,16 +150,42 @@ impl DeletionQueueClient {
error!("Deletion queue dropped flush op while client was still waiting");
}
}
/// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
pub async fn flush(&self) {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx)
.await
}
// Wait until all previous deletions are executed
pub async fn flush_execute(&self) {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx)
.await
}
}
pub struct BackendQueueWorker {
remote_storage: Option<GenericRemoteStorage>,
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
// Accumulate up to 1024 keys for the next deletion operation
accumulator: Vec<RemotePath>,
// DeletionLists we have fully ingested but might still have
// some keys in accumulator.
pending_lists: Vec<DeletionList>,
// DeletionLists we have fully executed, which may be deleted
// from remote storage.
executed_lists: Vec<DeletionList>,
}
impl BackendQueueWorker {
pub async fn background(&mut self) {
async fn maybe_execute(&mut self) {
// TODO: refactor so that worker is just not constructed if there is no remote
let remote_storage = match &self.remote_storage {
Some(rs) => rs,
None => {
@@ -162,6 +194,19 @@ impl BackendQueueWorker {
}
};
match remote_storage.delete_objects(&self.accumulator).await {
Ok(()) => {
self.accumulator.clear();
self.executed_lists.append(&mut self.pending_lists);
}
Err(e) => {
warn!("Batch deletion failed: {e}, will retry");
// TODO: increment error counter
}
}
}
pub async fn background(&mut self) {
let _span = tracing::info_span!("deletion_backend");
// TODO: if we would like to be able to defer deletions while a Layer still has
@@ -173,16 +218,7 @@ impl BackendQueueWorker {
// From the S3 spec
const MAX_KEYS_PER_DELETE: usize = 1024;
let mut accumulator = Vec::new();
accumulator.reserve(MAX_KEYS_PER_DELETE);
// DeletionLists we have fully ingested but might still have
// some keys in accumulator.
let mut pending_lists = Vec::new();
// DeletionLists we have fully executed, which may be deleted
// from remote storage.
let mut executed_lists: Vec<DeletionList> = Vec::new();
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
while let Some(msg) = self.rx.recv().await {
match msg {
@@ -192,49 +228,42 @@ impl BackendQueueWorker {
// tests will fail if we have such a bug, but proceed with
// processing subsequent messages.
warn!("Empty DeletionList passed to deletion backend");
executed_lists.push(list);
self.executed_lists.push(list);
continue;
}
// This loop handles deletion lists that require multiple DeleteObjects requests,
// and also handles retries if a deletion fails: we will keep going around until
// we have either deleted everything, or we have a remainder in accumulator.
while !list.objects.is_empty() || accumulator.len() == MAX_KEYS_PER_DELETE {
let take_count = if accumulator.len() == MAX_KEYS_PER_DELETE {
while !list.objects.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE
{
let take_count = if self.accumulator.len() == MAX_KEYS_PER_DELETE {
0
} else {
let available_slots = MAX_KEYS_PER_DELETE - accumulator.len();
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
std::cmp::min(available_slots, list.objects.len())
};
for object in list.objects.drain(list.objects.len() - take_count..) {
accumulator.push(object);
self.accumulator.push(object);
}
if accumulator.len() == MAX_KEYS_PER_DELETE {
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
// Great, we got a full request: issue it.
match remote_storage.delete_objects(&accumulator).await {
Ok(()) => {
accumulator.clear();
executed_lists.append(&mut pending_lists);
}
Err(e) => {
warn!("Batch deletion failed: {e}, will retry");
// TODO: increment error counter
}
}
self.maybe_execute().await;
}
}
if !accumulator.is_empty() {
if !self.accumulator.is_empty() {
// We have a remainder, deletion list is not fully processed yet
pending_lists.push(list);
self.pending_lists.push(list);
} else {
// We fully processed this list, it is ready for purge
executed_lists.push(list);
self.executed_lists.push(list);
}
let executed_keys: Vec<RemotePath> = executed_lists
let executed_keys: Vec<RemotePath> = self
.executed_lists
.iter()
.take(MAX_KEYS_PER_DELETE)
.map(|l| {
@@ -242,12 +271,20 @@ impl BackendQueueWorker {
.expect("Failed to compose deletion list path")
})
.collect();
// TODO: refactor so that worker is just not constructed if there is no remote
let remote_storage = match &mut self.remote_storage {
Some(rs) => rs,
None => {
info!("No remote storage configured, deletion queue will not run");
return;
}
};
match remote_storage.delete_objects(&executed_keys).await {
Ok(()) => {
executed_lists = executed_lists
.into_iter()
.skip(MAX_KEYS_PER_DELETE)
.collect();
// Retain any lists that couldn't be deleted in that request
self.executed_lists =
self.executed_lists.split_off(MAX_KEYS_PER_DELETE);
}
Err(e) => {
warn!("Failed to purge deletion lists: {e}");
@@ -257,8 +294,10 @@ impl BackendQueueWorker {
}
}
BackendQueueMessage::Flush(op) => {
// TODO: add an extra frrontend flush type that passes through to this flush
// We have implicitly already processed preceeding deletions
while !self.accumulator.is_empty() {
self.maybe_execute().await;
}
op.fire();
}
}
@@ -277,7 +316,7 @@ pub struct FrontendQueueWorker {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::Receiver<QueueMessage>,
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
@@ -364,7 +403,7 @@ impl FrontendQueueWorker {
};
match msg {
QueueMessage::Delete(op) => {
FrontendQueueMessage::Delete(op) => {
let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id);
let _span = tracing::info_span!(
@@ -385,7 +424,7 @@ impl FrontendQueueWorker {
self.pending.objects.push(path);
}
}
QueueMessage::Flush(op) => {
FrontendQueueMessage::Flush(op) => {
if self.pending.objects.is_empty() {
// Execute immediately
op.fire()
@@ -400,6 +439,13 @@ impl FrontendQueueWorker {
}
}
}
FrontendQueueMessage::FlushExecute(op) => {
// We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await {
info!("Can't flush, shutting down ({e})");
// Caller will get error when their oneshot sender was dropped.
}
}
}
if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE {
@@ -442,6 +488,9 @@ impl DeletionQueue {
remote_storage,
conf,
rx: backend_rx,
accumulator: Vec::new(),
pending_lists: Vec::new(),
executed_lists: Vec::new(),
},
)
}
@@ -458,7 +507,7 @@ pub mod mock {
};
pub struct MockDeletionQueue {
tx: tokio::sync::mpsc::Sender<QueueMessage>,
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
tx_pump: tokio::sync::mpsc::Sender<FlushOp>,
executed: Arc<AtomicUsize>,
}
@@ -489,7 +538,7 @@ pub mod mock {
info!("Executing all pending deletions");
while let Ok(msg) = rx.try_recv() {
match msg {
QueueMessage::Delete(op) => {
FrontendQueueMessage::Delete(op) => {
let timeline_path =
conf.timeline_path(&op.tenant_id, &op.timeline_id);
@@ -521,11 +570,12 @@ pub mod mock {
executed_bg.fetch_add(1, Ordering::Relaxed);
}
}
QueueMessage::Flush(op) => {
if let Err(_) = op.tx.send(()) {
// oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
debug!("deletion queue flush from dropped client");
};
FrontendQueueMessage::Flush(op) => {
op.fire();
}
FrontendQueueMessage::FlushExecute(op) => {
// We have already executed all prior deletions because mock does them inline
op.fire();
}
}
info!("All pending deletions have been executed");