From a0ed43cc12d3d9d933229e35188fdaa36866e9a6 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 16 Aug 2023 17:08:18 +0100 Subject: [PATCH] deletion queue: add DeletionHeader for sequence numbers --- pageserver/src/config.rs | 5 + pageserver/src/deletion_queue.rs | 329 ++++++++++++++++++++++++++----- 2 files changed, 287 insertions(+), 47 deletions(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 5808a325bb..cf4c043f5b 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -581,6 +581,11 @@ impl PageServerConf { .expect("This should always be convertible, it is relative") } + pub fn remote_deletion_header_path(&self) -> RemotePath { + RemotePath::new(&self.remote_deletion_node_prefix().join("header")) + .expect("This should always be convertible, it is relative") + } + pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf { self.tenants_path().join(tenant_id.to_string()) } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 969b519d22..5f49571ced 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -1,4 +1,6 @@ use crate::metrics::{DELETION_QUEUE_ERRORS, DELETION_QUEUE_EXECUTED, DELETION_QUEUE_SUBMITTED}; +use regex::Regex; +use remote_storage::DownloadError; use remote_storage::{GenericRemoteStorage, RemotePath}; use serde::Deserialize; use serde::Serialize; @@ -33,6 +35,9 @@ const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60); // trying again. const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_secs(1); +// From the S3 spec +const MAX_KEYS_PER_DELETE: usize = 1000; + // TODO: adminstrative "panic button" config property to disable all deletions // TODO: configurable for how long to wait before executing deletions @@ -55,6 +60,9 @@ const EXECUTE_RETRY_DEADLINE: Duration = Duration::from_secs(1); /// are pending execution. /// - Deletions read back frorm the persistent deletion blocks, which are batched up into groups /// of 1000 for execution via a DeleteObjects call. +/// +/// In S3, there is just one queue, made up of a series of DeletionList objects and +/// a DeletionHeader #[derive(Clone)] pub struct DeletionQueue { tx: tokio::sync::mpsc::Sender, @@ -110,6 +118,18 @@ struct DeletionList { objects: Vec, } +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +struct DeletionHeader { + /// Enable determining the next sequence number even if there are no deletion lists present. + /// If there _are_ deletion lists present, then their sequence numbers take precedence over + /// this. + last_deleted_list_seq: u64, + // TODO: this is where we will track a 'clean' sequence number that indicates all deletion + // lists <= that sequence have had their generations validated with the control plane + // and are OK to execute. +} + impl DeletionList { fn new(sequence: u64) -> Self { Self { @@ -258,6 +278,71 @@ impl BackendQueueWorker { } } + async fn cleanup_lists(&mut self) { + debug!( + "cleanup_lists: {0} executed lists, {1} pending lists", + self.executed_lists.len(), + self.pending_lists.len() + ); + + // Lists are always pushed into the queues + executed list in sequence order, so + // no sort is required: can find the highest sequence number by peeking at last element + let max_executed_seq = match self.executed_lists.last() { + Some(v) => v.sequence, + None => { + // No executed lists, nothing to clean up. + return; + } + }; + + // In case this is the last list, write a header out first so that + // we don't risk losing our knowledge of the sequence number (on replay, our + // next sequence number is the highest list seen + 1, or read from the header + // if there are no lists) + let header = DeletionHeader::new(max_executed_seq); + debug!("Writing header {:?}", header); + let bytes = serde_json::to_vec(&header).expect("Failed to serialize deletion header"); + let size = bytes.len(); + let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); + let header_key = self.conf.remote_deletion_header_path(); + + if let Err(e) = self + .remote_storage + .upload(source, size, &header_key, None) + .await + { + warn!("Failed to upload deletion queue header: {e:#}"); + DELETION_QUEUE_ERRORS + .with_label_values(&["put_headerr"]) + .inc(); + return; + } + + let executed_keys: Vec = self + .executed_lists + .iter() + .rev() + .take(MAX_KEYS_PER_DELETE) + .map(|l| self.conf.remote_deletion_list_path(l.sequence)) + .collect(); + + match self.remote_storage.delete_objects(&executed_keys).await { + Ok(()) => { + // Retain any lists that couldn't be deleted in that request + self.executed_lists + .truncate(self.executed_lists.len() - executed_keys.len()); + } + Err(e) => { + warn!("Failed to delete deletion list(s): {e:#}"); + // Do nothing: the elements remain in executed_lists, and purge will be retried + // next time we process some deletions and go around the loop. + DELETION_QUEUE_ERRORS + .with_label_values(&["delete_list"]) + .inc(); + } + } + } + pub async fn background(&mut self) { // TODO: if we would like to be able to defer deletions while a Layer still has // refs (but it will be elegible for deletion after process ends), then we may @@ -265,9 +350,6 @@ impl BackendQueueWorker { // in the deletion list may not be deleted yet, with guards to block on while // we wait to proceed. - // From the S3 spec - const MAX_KEYS_PER_DELETE: usize = 1000; - self.accumulator.reserve(MAX_KEYS_PER_DELETE); loop { @@ -279,8 +361,10 @@ impl BackendQueueWorker { break; } Err(_) => { - // Timeout, we hit deadline to execute whatever we have in hand + // Timeout, we hit deadline to execute whatever we have in hand. These functions will + // return immediately if no work is pending self.maybe_execute().await; + self.cleanup_lists().await; continue; } @@ -315,7 +399,10 @@ impl BackendQueueWorker { if self.accumulator.len() == MAX_KEYS_PER_DELETE { // Great, we got a full request: issue it. - self.maybe_execute().await; + if self.maybe_execute().await == false { + // Failed to execute: retry delay + tokio::time::sleep(EXECUTE_RETRY_DEADLINE).await; + }; } } @@ -327,33 +414,13 @@ impl BackendQueueWorker { self.executed_lists.push(list); } - let executed_keys: Vec = self - .executed_lists - .iter() - .rev() - .take(MAX_KEYS_PER_DELETE) - .map(|l| self.conf.remote_deletion_list_path(l.sequence)) - .collect(); - - match self.remote_storage.delete_objects(&executed_keys).await { - Ok(()) => { - // Retain any lists that couldn't be deleted in that request - self.executed_lists - .truncate(self.executed_lists.len() - executed_keys.len()); - } - Err(e) => { - warn!("Failed to delete deletion list(s): {e:#}"); - // Do nothing: the elements remain in executed_lists, and purge will be retried - // next time we process some deletions and go around the loop. - DELETION_QUEUE_ERRORS - .with_label_values(&["delete_list"]) - .inc(); - } - } + self.cleanup_lists().await; } BackendQueueMessage::Flush(op) => { self.maybe_execute().await; + self.cleanup_lists().await; + op.fire(); } } @@ -456,6 +523,50 @@ impl FrontendQueueWorker { } async fn recover(&mut self) -> Result<(), anyhow::Error> { + // Load header: this is not required to be present, e.g. when a pageserver first runs + let header_path = self.conf.remote_deletion_header_path(); + let header_bytes = match backoff::retry( + || self.remote_storage.download_all(&header_path), + |e| { + if let DownloadError::NotFound = e { + true + } else { + false + } + }, + 3, + u32::MAX, + "Reading deletion queue header", + ) + .await + { + Ok(h) => Ok(Some(h)), + Err(e) => { + if let DownloadError::NotFound = e { + debug!("Deletion header {header_path} not found, first start?"); + Ok(None) + } else { + Err(e) + } + } + }?; + + if let Some(header_bytes) = header_bytes { + if let Some(header) = match serde_json::from_slice::(&header_bytes) { + Ok(h) => Some(h), + Err(e) => { + warn!("Failed to deserialize deletion header, ignoring {header_path}: {e:#}"); + // This should never happen unless we make a mistake with our serialization. + // Ignoring a deletion header is not consequential for correctnes because all deletions + // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up. + None + } + } { + self.pending.sequence = + std::cmp::max(self.pending.sequence, header.last_deleted_list_seq + 1); + }; + }; + // TODO: this needs a CancellationToken or equivalent: usual worker teardown happens via the channel let prefix = RemotePath::new(&self.conf.remote_deletion_node_prefix()) .expect("Failed to compose path"); @@ -468,10 +579,18 @@ impl FrontendQueueWorker { ) .await?; - const LIST_EXTENSION: &str = "list"; + debug!("Loaded {} keys in deletion prefix {}", lists.len(), prefix); + + let list_name_pattern = + Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{8})-([a-zA-Z0-9]{2}).list").unwrap(); let mut seqs: Vec = Vec::new(); for l in &lists { + if l == &header_path { + // Don't try and parse the header key as a list key + continue; + } + let basename = l .strip_prefix(&prefix) .expect("Stripping prefix frrom a prefix listobjects should always work"); @@ -484,19 +603,18 @@ impl FrontendQueueWorker { } }; - let (seq_part, extension) = match basename.split_once(".") { - Some(parts) => parts, - None => { - warn!("Unexpected key in deletion queue: {basename}"); - continue; - } + let seq_part = if let Some(m) = list_name_pattern.captures(basename) { + m.get(1) + .expect("Non optional group should be present") + .as_str() + } else { + warn!("Unexpected key in deletion queue: {basename}"); + continue; }; - if extension != LIST_EXTENSION { - continue; - } + info!("seq_part {seq_part}"); - let seq: u64 = match seq_part.parse() { + let seq: u64 = match u64::from_str_radix(seq_part, 16) { Ok(s) => s, Err(e) => { warn!("Malformed key '{basename}': {e}"); @@ -508,6 +626,13 @@ impl FrontendQueueWorker { seqs.sort(); + // Initialize the next sequence number in the frontend based on the maximum of the highest list we see, + // and the last list that was deleted according to the header. Combined with writing out the header + // prior to deletions, this guarnatees no re-use of sequence numbers. + if let Some(max_list_seq) = seqs.last() { + self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1); + } + for s in seqs { let list_path = self.conf.remote_deletion_list_path(s); let lists_body = backoff::retry( @@ -532,13 +657,14 @@ impl FrontendQueueWorker { // We will drop out of recovery if this fails: it indicates that we are shutting down // or the backend has panicked + DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.objects.len() as u64); self.tx .send(BackendQueueMessage::Delete(deletion_list)) .await?; - - self.pending.sequence = s + 1; } + info!(next_sequence = self.pending.sequence, "Replay complete"); + Ok(()) } @@ -678,7 +804,10 @@ impl DeletionQueue { #[cfg(test)] mod test { use hex_literal::hex; - use std::path::{Path, PathBuf}; + use std::{ + io::ErrorKind, + path::{Path, PathBuf}, + }; use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; use tokio::{runtime::EnterGuard, task::JoinHandle}; @@ -691,14 +820,40 @@ mod test { struct TestSetup { runtime: &'static tokio::runtime::Runtime, - entered_runtime: EnterGuard<'static>, + _entered_runtime: EnterGuard<'static>, harness: TenantHarness, remote_fs_dir: PathBuf, + storage: GenericRemoteStorage, deletion_queue: DeletionQueue, fe_worker: JoinHandle<()>, be_worker: JoinHandle<()>, } + impl TestSetup { + /// Simulate a pageserver restart by destroying and recreating the deletion queue + fn restart(&mut self) { + let (deletion_queue, fe_worker, be_worker) = + DeletionQueue::new(Some(self.storage.clone()), self.harness.conf); + + self.deletion_queue = deletion_queue; + + let mut fe_worker = fe_worker.unwrap(); + let mut be_worker = be_worker.unwrap(); + let mut fe_worker = self + .runtime + .spawn(async move { fe_worker.background().await }); + let mut be_worker = self + .runtime + .spawn(async move { be_worker.background().await }); + std::mem::swap(&mut self.fe_worker, &mut fe_worker); + std::mem::swap(&mut self.be_worker, &mut be_worker); + + // Join the old workers + self.runtime.block_on(fe_worker).unwrap(); + self.runtime.block_on(be_worker).unwrap(); + } + } + fn setup(test_name: &str) -> anyhow::Result { let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}"))); let harness = TenantHarness::create(test_name)?; @@ -730,7 +885,7 @@ mod test { let entered_runtime = runtime.enter(); let (deletion_queue, fe_worker, be_worker) = - DeletionQueue::new(Some(storage), harness.conf); + DeletionQueue::new(Some(storage.clone()), harness.conf); let mut fe_worker = fe_worker.unwrap(); let mut be_worker = be_worker.unwrap(); @@ -739,9 +894,10 @@ mod test { Ok(TestSetup { runtime, - entered_runtime, + _entered_runtime: entered_runtime, harness, remote_fs_dir, + storage, deletion_queue, fe_worker: fe_worker_join, be_worker: be_worker_join, @@ -754,14 +910,34 @@ mod test { expected.sort(); let mut found: Vec = Vec::new(); - for entry in std::fs::read_dir(remote_path).unwrap().flatten() { + let dir = match std::fs::read_dir(remote_path) { + Ok(d) => d, + Err(e) => { + if e.kind() == ErrorKind::NotFound { + if expected.is_empty() { + // We are asserting prefix is empty: it is expected that the dir is missing + return; + } else { + assert_eq!(expected, Vec::::new()); + unreachable!(); + } + } else { + panic!( + "Unexpected error listing {0}: {e}", + remote_path.to_string_lossy() + ); + } + } + }; + + for entry in dir.flatten() { let entry_name = entry.file_name(); let fname = entry_name.to_str().unwrap(); found.push(String::from(fname)); } found.sort(); - assert_eq!(found, expected); + assert_eq!(expected, found); } #[test] @@ -780,6 +956,9 @@ mod test { .remote_path(&ctx.harness.timeline_path(&TIMELINE_ID)) .expect("Failed to construct remote path"); let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); + let remote_deletion_prefix = ctx + .remote_fs_dir + .join(ctx.harness.conf.remote_deletion_node_prefix()); // Inject a victim file to remote storage info!("Writing"); @@ -798,16 +977,72 @@ mod test { [layer_file_name_1.clone()].to_vec(), )); assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); + assert_remote_files(&[], &remote_deletion_prefix); // File should still be there after we write a deletion list (we haven't pushed enough to execute anything) info!("Flushing"); ctx.runtime.block_on(client.flush()); assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); + assert_remote_files( + &["0000000000000001-00000000-01.list"], + &remote_deletion_prefix, + ); // File should go away when we execute info!("Flush-executing"); ctx.runtime.block_on(client.flush_execute()); assert_remote_files(&[], &remote_timeline_path); + assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); + Ok(()) + } + + #[test] + fn deletion_queue_recovery() -> anyhow::Result<()> { + // Basic test that the deletion queue processes the deletions we pass into it + let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup"); + let client = ctx.deletion_queue.new_client(); + + let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); + let tenant_id = ctx.harness.tenant_id; + + let content: Vec = "victim1 contents".into(); + let relative_remote_path = ctx + .harness + .conf + .remote_path(&ctx.harness.timeline_path(&TIMELINE_ID)) + .expect("Failed to construct remote path"); + let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); + let remote_deletion_prefix = ctx + .remote_fs_dir + .join(ctx.harness.conf.remote_deletion_node_prefix()); + + // Inject a file, delete it, and flush to a deletion list + std::fs::create_dir_all(&remote_timeline_path)?; + std::fs::write( + remote_timeline_path.join(layer_file_name_1.to_string()), + &content, + )?; + ctx.runtime.block_on(client.push_layers( + tenant_id, + TIMELINE_ID, + [layer_file_name_1.clone()].to_vec(), + )); + ctx.runtime.block_on(client.flush()); + assert_remote_files( + &["0000000000000001-00000000-01.list"], + &remote_deletion_prefix, + ); + + // Restart the deletion queue + drop(client); + ctx.restart(); + let client = ctx.deletion_queue.new_client(); + + // If we have recovered the deletion list properly, then executing after restart should purge it + info!("Flush-executing"); + ctx.runtime.block_on(client.flush_execute()); + assert_remote_files(&[], &remote_timeline_path); + assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); Ok(()) } }