mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 05:50:38 +00:00
Add MockDeletionQueue for unit tests
This commit is contained in:
@@ -18,17 +18,20 @@ pub struct DeletionQueue {
|
||||
tx: tokio::sync::mpsc::Sender<QueueMessage>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum QueueMessage {
|
||||
Delete(DeletionOp),
|
||||
Flush(FlushOp),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct DeletionOp {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
layers: Vec<LayerFileName>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FlushOp {
|
||||
tx: tokio::sync::oneshot::Sender<()>,
|
||||
}
|
||||
@@ -136,7 +139,6 @@ impl DeletionQueueWorker {
|
||||
info!("Deletion queue shut down.");
|
||||
}
|
||||
}
|
||||
|
||||
impl DeletionQueue {
|
||||
pub fn new_client(&self) -> DeletionQueueClient {
|
||||
DeletionQueueClient {
|
||||
@@ -159,11 +161,123 @@ impl DeletionQueue {
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A queue to nowhere: attempts to delete will do nothing
|
||||
#[cfg(test)]
|
||||
pub fn new_mock() -> Self {
|
||||
let (tx, _) = tokio::sync::mpsc::channel(16384);
|
||||
Self { tx }
|
||||
/// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
|
||||
/// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
|
||||
#[cfg(test)]
|
||||
pub mod mock {
|
||||
use super::*;
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
pub struct MockDeletionQueue {
|
||||
tx: tokio::sync::mpsc::Sender<QueueMessage>,
|
||||
tx_pump: tokio::sync::mpsc::Sender<FlushOp>,
|
||||
executed: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl MockDeletionQueue {
|
||||
pub fn new(
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
conf: &'static PageServerConf,
|
||||
) -> Self {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(16384);
|
||||
let (tx_pump, mut rx_pump) = tokio::sync::mpsc::channel::<FlushOp>(1);
|
||||
|
||||
let executed = Arc::new(AtomicUsize::new(0));
|
||||
let executed_bg = executed.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _span = tracing::info_span!("mock_deletion_queue");
|
||||
let remote_storage = match &remote_storage {
|
||||
Some(rs) => rs,
|
||||
None => {
|
||||
info!("No remote storage configured, deletion queue will not run");
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!("Running mock deletion queue");
|
||||
// Each time we are asked to pump, drain the queue of deletions
|
||||
while let Some(flush_op) = rx_pump.recv().await {
|
||||
info!("Executing all pending deletions");
|
||||
while let Ok(msg) = rx.try_recv() {
|
||||
match msg {
|
||||
QueueMessage::Delete(op) => {
|
||||
let timeline_path =
|
||||
conf.timeline_path(&op.tenant_id, &op.timeline_id);
|
||||
|
||||
let _span = tracing::info_span!(
|
||||
"execute_deletion",
|
||||
tenant_id = %op.tenant_id,
|
||||
timeline_id = %op.timeline_id,
|
||||
);
|
||||
|
||||
for layer in op.layers {
|
||||
let local_path = timeline_path.join(layer.file_name());
|
||||
let path = match conf.remote_path(&local_path) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
panic!("Can't make a timeline path! {e}");
|
||||
}
|
||||
};
|
||||
info!("Executing deletion {path}");
|
||||
match remote_storage.delete(&path).await {
|
||||
Ok(_) => {
|
||||
debug!("Deleted {path}");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to delete {path}, leaking object! ({e})"
|
||||
);
|
||||
}
|
||||
}
|
||||
executed_bg.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
QueueMessage::Flush(op) => {
|
||||
if let Err(_) = op.tx.send(()) {
|
||||
// oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
|
||||
debug!("deletion queue flush from dropped client");
|
||||
};
|
||||
}
|
||||
}
|
||||
info!("All pending deletions have been executed");
|
||||
}
|
||||
flush_op
|
||||
.tx
|
||||
.send(())
|
||||
.expect("Test called flush but dropped before finishing");
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
tx: tx,
|
||||
tx_pump,
|
||||
executed,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_executed(&self) -> usize {
|
||||
self.executed.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub async fn pump(&self) {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
self.tx_pump
|
||||
.send(FlushOp { tx })
|
||||
.await
|
||||
.expect("pump called after deletion queue loop stopped");
|
||||
rx.await
|
||||
.expect("Mock delete queue shutdown while waiting to pump");
|
||||
}
|
||||
|
||||
pub fn new_client(&self) -> DeletionQueueClient {
|
||||
DeletionQueueClient {
|
||||
tx: self.tx.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user