diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 4ffc321856..ef59c9c4a3 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -258,6 +258,7 @@ impl BackendQueueWorker { let executed_keys: Vec = self .executed_lists .iter() + .rev() .take(MAX_KEYS_PER_DELETE) .map(|l| { RemotePath::new(&self.conf.remote_deletion_list_path(l.sequence)) @@ -268,8 +269,8 @@ impl BackendQueueWorker { match self.remote_storage.delete_objects(&executed_keys).await { Ok(()) => { // Retain any lists that couldn't be deleted in that request - self.executed_lists = - self.executed_lists.split_off(MAX_KEYS_PER_DELETE); + self.executed_lists + .truncate(self.executed_lists.len() - executed_keys.len()); } Err(e) => { warn!("Failed to purge deletion lists: {e}"); @@ -493,6 +494,143 @@ impl DeletionQueue { } } +#[cfg(test)] +mod test { + use hex_literal::hex; + use std::path::{Path, PathBuf}; + + use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; + use tokio::{runtime::EnterGuard, task::JoinHandle}; + + use crate::tenant::harness::TenantHarness; + + use super::*; + pub const TIMELINE_ID: TimelineId = + TimelineId::from_array(hex!("11223344556677881122334455667788")); + + struct TestSetup { + runtime: &'static tokio::runtime::Runtime, + entered_runtime: EnterGuard<'static>, + harness: TenantHarness, + remote_fs_dir: PathBuf, + deletion_queue: DeletionQueue, + fe_worker: JoinHandle<()>, + be_worker: JoinHandle<()>, + } + + fn setup(test_name: &str) -> anyhow::Result { + let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}"))); + let harness = TenantHarness::create(test_name)?; + + // We do not load() the harness: we only need its config and remote_storage + + // Set up a GenericRemoteStorage targetting a directory + let remote_fs_dir = harness.conf.workdir.join("remote_fs"); + std::fs::create_dir_all(remote_fs_dir)?; + let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?; + let storage_config = RemoteStorageConfig { + max_concurrent_syncs: std::num::NonZeroUsize::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, + ) + .unwrap(), + max_sync_errors: std::num::NonZeroU32::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, + ) + .unwrap(), + storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), + }; + let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); + + let runtime = Box::leak(Box::new( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?, + )); + let entered_runtime = runtime.enter(); + + let (deletion_queue, fe_worker, be_worker) = + DeletionQueue::new(Some(storage), harness.conf); + + let mut fe_worker = fe_worker.unwrap(); + let mut be_worker = be_worker.unwrap(); + let fe_worker_join = runtime.spawn(async move { fe_worker.background().await }); + let be_worker_join = runtime.spawn(async move { be_worker.background().await }); + + Ok(TestSetup { + runtime, + entered_runtime, + harness, + remote_fs_dir, + deletion_queue, + fe_worker: fe_worker_join, + be_worker: be_worker_join, + }) + } + + // TODO: put this in a common location so that we can share with remote_timeline_client's tests + fn assert_remote_files(expected: &[&str], remote_path: &Path) { + let mut expected: Vec = expected.iter().map(|x| String::from(*x)).collect(); + expected.sort(); + + let mut found: Vec = Vec::new(); + for entry in std::fs::read_dir(remote_path).unwrap().flatten() { + let entry_name = entry.file_name(); + let fname = entry_name.to_str().unwrap(); + found.push(String::from(fname)); + } + found.sort(); + + assert_eq!(found, expected); + } + + #[test] + fn deletion_queue_smoke() -> anyhow::Result<()> { + // Basic test that the deletion queue processes the deletions we pass into it + let ctx = setup("deletion_queue_smoke").expect("Failed test setup"); + let client = ctx.deletion_queue.new_client(); + + let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); + let tenant_id = ctx.harness.tenant_id; + + let content: Vec = "victim1 contents".into(); + let relative_remote_path = ctx + .harness + .conf + .remote_path(&ctx.harness.timeline_path(&TIMELINE_ID)) + .expect("Failed to construct remote path"); + let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); + + // Inject a victim file to remote storage + info!("Writing"); + std::fs::create_dir_all(&remote_timeline_path)?; + std::fs::write( + remote_timeline_path.join(layer_file_name_1.to_string()), + &content, + )?; + assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); + + // File should still be there after we push it to the queue (we haven't pushed enough to flush anything) + info!("Pushing"); + ctx.runtime.block_on(client.push( + tenant_id, + TIMELINE_ID, + [layer_file_name_1.clone()].to_vec(), + )); + assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); + + // File should still be there after we write a deletion list (we haven't pushed enough to execute anything) + info!("Flushing"); + ctx.runtime.block_on(client.flush()); + assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); + + // File should go away when we execute + info!("Flush-executing"); + ctx.runtime.block_on(client.flush_execute()); + assert_remote_files(&[], &remote_timeline_path); + Ok(()) + } +} + /// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence /// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it. #[cfg(test)]