mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
pageserver: testing for deletion queue
This commit is contained in:
@@ -258,6 +258,7 @@ impl BackendQueueWorker {
|
||||
let executed_keys: Vec<RemotePath> = self
|
||||
.executed_lists
|
||||
.iter()
|
||||
.rev()
|
||||
.take(MAX_KEYS_PER_DELETE)
|
||||
.map(|l| {
|
||||
RemotePath::new(&self.conf.remote_deletion_list_path(l.sequence))
|
||||
@@ -268,8 +269,8 @@ impl BackendQueueWorker {
|
||||
match self.remote_storage.delete_objects(&executed_keys).await {
|
||||
Ok(()) => {
|
||||
// Retain any lists that couldn't be deleted in that request
|
||||
self.executed_lists =
|
||||
self.executed_lists.split_off(MAX_KEYS_PER_DELETE);
|
||||
self.executed_lists
|
||||
.truncate(self.executed_lists.len() - executed_keys.len());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to purge deletion lists: {e}");
|
||||
@@ -493,6 +494,143 @@ impl DeletionQueue {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use hex_literal::hex;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
|
||||
use tokio::{runtime::EnterGuard, task::JoinHandle};
|
||||
|
||||
use crate::tenant::harness::TenantHarness;
|
||||
|
||||
use super::*;
|
||||
pub const TIMELINE_ID: TimelineId =
|
||||
TimelineId::from_array(hex!("11223344556677881122334455667788"));
|
||||
|
||||
struct TestSetup {
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
entered_runtime: EnterGuard<'static>,
|
||||
harness: TenantHarness,
|
||||
remote_fs_dir: PathBuf,
|
||||
deletion_queue: DeletionQueue,
|
||||
fe_worker: JoinHandle<()>,
|
||||
be_worker: JoinHandle<()>,
|
||||
}
|
||||
|
||||
fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
|
||||
let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
|
||||
// We do not load() the harness: we only need its config and remote_storage
|
||||
|
||||
// Set up a GenericRemoteStorage targetting a directory
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
|
||||
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let (deletion_queue, fe_worker, be_worker) =
|
||||
DeletionQueue::new(Some(storage), harness.conf);
|
||||
|
||||
let mut fe_worker = fe_worker.unwrap();
|
||||
let mut be_worker = be_worker.unwrap();
|
||||
let fe_worker_join = runtime.spawn(async move { fe_worker.background().await });
|
||||
let be_worker_join = runtime.spawn(async move { be_worker.background().await });
|
||||
|
||||
Ok(TestSetup {
|
||||
runtime,
|
||||
entered_runtime,
|
||||
harness,
|
||||
remote_fs_dir,
|
||||
deletion_queue,
|
||||
fe_worker: fe_worker_join,
|
||||
be_worker: be_worker_join,
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: put this in a common location so that we can share with remote_timeline_client's tests
|
||||
fn assert_remote_files(expected: &[&str], remote_path: &Path) {
|
||||
let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
|
||||
expected.sort();
|
||||
|
||||
let mut found: Vec<String> = Vec::new();
|
||||
for entry in std::fs::read_dir(remote_path).unwrap().flatten() {
|
||||
let entry_name = entry.file_name();
|
||||
let fname = entry_name.to_str().unwrap();
|
||||
found.push(String::from(fname));
|
||||
}
|
||||
found.sort();
|
||||
|
||||
assert_eq!(found, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deletion_queue_smoke() -> anyhow::Result<()> {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
|
||||
let content: Vec<u8> = "victim1 contents".into();
|
||||
let relative_remote_path = ctx
|
||||
.harness
|
||||
.conf
|
||||
.remote_path(&ctx.harness.timeline_path(&TIMELINE_ID))
|
||||
.expect("Failed to construct remote path");
|
||||
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
|
||||
|
||||
// Inject a victim file to remote storage
|
||||
info!("Writing");
|
||||
std::fs::create_dir_all(&remote_timeline_path)?;
|
||||
std::fs::write(
|
||||
remote_timeline_path.join(layer_file_name_1.to_string()),
|
||||
&content,
|
||||
)?;
|
||||
assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path);
|
||||
|
||||
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
|
||||
info!("Pushing");
|
||||
ctx.runtime.block_on(client.push(
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
[layer_file_name_1.clone()].to_vec(),
|
||||
));
|
||||
assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path);
|
||||
|
||||
// File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
|
||||
info!("Flushing");
|
||||
ctx.runtime.block_on(client.flush());
|
||||
assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path);
|
||||
|
||||
// File should go away when we execute
|
||||
info!("Flush-executing");
|
||||
ctx.runtime.block_on(client.flush_execute());
|
||||
assert_remote_files(&[], &remote_timeline_path);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
|
||||
/// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user