From 0fdc492aa47cbedd441787c41ff40b5c9f778055 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 9 Aug 2023 16:05:20 +0100 Subject: [PATCH] Add MockDeletionQueue for unit tests --- pageserver/src/deletion_queue.rs | 126 +++++++++++++++++++++++++++++-- 1 file changed, 120 insertions(+), 6 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index bb697e9346..e12c58eb59 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -18,17 +18,20 @@ pub struct DeletionQueue { tx: tokio::sync::mpsc::Sender, } +#[derive(Debug)] enum QueueMessage { Delete(DeletionOp), Flush(FlushOp), } +#[derive(Debug)] struct DeletionOp { tenant_id: TenantId, timeline_id: TimelineId, layers: Vec, } +#[derive(Debug)] struct FlushOp { tx: tokio::sync::oneshot::Sender<()>, } @@ -136,7 +139,6 @@ impl DeletionQueueWorker { info!("Deletion queue shut down."); } } - impl DeletionQueue { pub fn new_client(&self) -> DeletionQueueClient { DeletionQueueClient { @@ -159,11 +161,123 @@ impl DeletionQueue { }, ) } +} - /// A queue to nowhere: attempts to delete will do nothing - #[cfg(test)] - pub fn new_mock() -> Self { - let (tx, _) = tokio::sync::mpsc::channel(16384); - Self { tx } +/// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence +/// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it. +#[cfg(test)] +pub mod mock { + use super::*; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + pub struct MockDeletionQueue { + tx: tokio::sync::mpsc::Sender, + tx_pump: tokio::sync::mpsc::Sender, + executed: Arc, + } + + impl MockDeletionQueue { + pub fn new( + remote_storage: Option, + conf: &'static PageServerConf, + ) -> Self { + let (tx, mut rx) = tokio::sync::mpsc::channel(16384); + let (tx_pump, mut rx_pump) = tokio::sync::mpsc::channel::(1); + + let executed = Arc::new(AtomicUsize::new(0)); + let executed_bg = executed.clone(); + + tokio::spawn(async move { + let _span = tracing::info_span!("mock_deletion_queue"); + let remote_storage = match &remote_storage { + Some(rs) => rs, + None => { + info!("No remote storage configured, deletion queue will not run"); + return; + } + }; + info!("Running mock deletion queue"); + // Each time we are asked to pump, drain the queue of deletions + while let Some(flush_op) = rx_pump.recv().await { + info!("Executing all pending deletions"); + while let Ok(msg) = rx.try_recv() { + match msg { + QueueMessage::Delete(op) => { + let timeline_path = + conf.timeline_path(&op.tenant_id, &op.timeline_id); + + let _span = tracing::info_span!( + "execute_deletion", + tenant_id = %op.tenant_id, + timeline_id = %op.timeline_id, + ); + + for layer in op.layers { + let local_path = timeline_path.join(layer.file_name()); + let path = match conf.remote_path(&local_path) { + Ok(p) => p, + Err(e) => { + panic!("Can't make a timeline path! {e}"); + } + }; + info!("Executing deletion {path}"); + match remote_storage.delete(&path).await { + Ok(_) => { + debug!("Deleted {path}"); + } + Err(e) => { + error!( + "Failed to delete {path}, leaking object! ({e})" + ); + } + } + executed_bg.fetch_add(1, Ordering::Relaxed); + } + } + QueueMessage::Flush(op) => { + if let Err(_) = op.tx.send(()) { + // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush. + debug!("deletion queue flush from dropped client"); + }; + } + } + info!("All pending deletions have been executed"); + } + flush_op + .tx + .send(()) + .expect("Test called flush but dropped before finishing"); + } + }); + + Self { + tx: tx, + tx_pump, + executed, + } + } + + pub fn get_executed(&self) -> usize { + self.executed.load(Ordering::Relaxed) + } + + pub async fn pump(&self) { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.tx_pump + .send(FlushOp { tx }) + .await + .expect("pump called after deletion queue loop stopped"); + rx.await + .expect("Mock delete queue shutdown while waiting to pump"); + } + + pub fn new_client(&self) -> DeletionQueueClient { + DeletionQueueClient { + tx: self.tx.clone(), + } + } } }