diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 98476c2ccc..8a23f88049 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -566,40 +566,25 @@ impl PageServerConf { self.workdir.join("tenants") } - pub fn remote_deletion_node_prefix(&self) -> PathBuf { - PathBuf::from_str("deletion") - .unwrap() - .join(self.id.to_string()) + pub fn deletion_prefix(&self) -> PathBuf { + self.workdir.join("deletion") } - pub fn remote_deletion_list_path(&self, sequence: u64) -> RemotePath { - // Encode a version in the key, so that if we ever switch away from JSON we can + pub fn deletion_list_path(&self, sequence: u64) -> PathBuf { + // Encode a version in the filename, so that if we ever switch away from JSON we can // increment this. const VERSION: u8 = 1; - // Placeholder, pending implementation of generation numbers - const GENERATION: u32 = 0; - - RemotePath::new(&self.remote_deletion_node_prefix().join(format!( - "{sequence:016x}-{GENERATION:08x}-{VERSION:02x}.list" - ))) - .expect("This should always be convertible, it is relative") + self.deletion_prefix() + .join(format!("{sequence:016x}-{VERSION:02x}.list")) } - pub fn remote_deletion_header_path(&self) -> RemotePath { - // Encode a version in the key, so that if we ever switch away from JSON we can + pub fn deletion_header_path(&self) -> PathBuf { + // Encode a version in the filename, so that if we ever switch away from JSON we can // increment this. const VERSION: u8 = 1; - // Placeholder, pending implementation of generation numbers - const GENERATION: u32 = 0; - - RemotePath::new( - &self - .remote_deletion_node_prefix() - .join(format!("header-{GENERATION:08x}-{VERSION:02x}")), - ) - .expect("This should always be convertible, it is relative") + self.deletion_prefix().join(format!("header-{VERSION:02x}")) } pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 341fc7d771..034ee242e6 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -25,14 +25,6 @@ use frontend::FrontendQueueMessage; use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName}; -// Arbitrary thresholds for retries: we do not depend on success -// within OP_RETRIES, as workers will just go around their consume loop: -// the purpose of the backoff::retries with these constants are to -// retry _sooner_ than we would if going around the whole loop. -const FAILED_REMOTE_OP_WARN_THRESHOLD: u32 = 3; - -const FAILED_REMOTE_OP_RETRIES: u32 = 10; - // TODO: adminstrative "panic button" config property to disable all deletions // TODO: configurable for how long to wait before executing deletions @@ -344,18 +336,12 @@ impl DeletionQueue { }, }, Some(FrontendQueueWorker::new( - remote_storage.clone(), conf, rx, backend_tx, cancel.clone(), )), - Some(BackendQueueWorker::new( - remote_storage.clone(), - conf, - backend_rx, - executor_tx, - )), + Some(BackendQueueWorker::new(conf, backend_rx, executor_tx)), Some(ExecutorWorker::new( remote_storage, executor_rx, @@ -523,6 +509,25 @@ mod test { assert_eq!(expected, found); } + fn assert_local_files(expected: &[&str], directory: &Path) { + let mut dir = match std::fs::read_dir(directory) { + Ok(d) => d, + Err(_) => { + assert_eq!(expected, &Vec::::new()); + return; + } + }; + let mut found = Vec::new(); + while let Some(dentry) = dir.next() { + let dentry = dentry.unwrap(); + let file_name = dentry.file_name(); + let file_name_str = file_name.to_string_lossy(); + found.push(file_name_str.to_string()); + } + found.sort(); + assert_eq!(expected, found); + } + #[test] fn deletion_queue_smoke() -> anyhow::Result<()> { // Basic test that the deletion queue processes the deletions we pass into it @@ -539,9 +544,7 @@ mod test { .remote_path(&ctx.harness.timeline_path(&TIMELINE_ID)) .expect("Failed to construct remote path"); let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); - let remote_deletion_prefix = ctx - .remote_fs_dir - .join(ctx.harness.conf.remote_deletion_node_prefix()); + let deletion_prefix = ctx.harness.conf.deletion_prefix(); // Inject a victim file to remote storage info!("Writing"); @@ -560,27 +563,25 @@ mod test { [layer_file_name_1.clone()].to_vec(), ))?; assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); - assert_remote_files(&[], &remote_deletion_prefix); + + assert_local_files(&[], &deletion_prefix); // File should still be there after we write a deletion list (we haven't pushed enough to execute anything) info!("Flushing"); ctx.runtime.block_on(client.flush())?; assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path); - assert_remote_files( - &["0000000000000001-00000000-01.list"], - &remote_deletion_prefix, - ); + assert_local_files(&["0000000000000001-01.list"], &deletion_prefix); // File should go away when we execute info!("Flush-executing"); ctx.runtime.block_on(client.flush_execute())?; assert_remote_files(&[], &remote_timeline_path); - assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); + assert_local_files(&["header-01"], &deletion_prefix); // Flushing on an empty queue should succeed immediately, and not write any lists info!("Flush-executing on empty"); ctx.runtime.block_on(client.flush_execute())?; - assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); + assert_local_files(&["header-01"], &deletion_prefix); Ok(()) } @@ -601,9 +602,7 @@ mod test { .remote_path(&ctx.harness.timeline_path(&TIMELINE_ID)) .expect("Failed to construct remote path"); let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); - let remote_deletion_prefix = ctx - .remote_fs_dir - .join(ctx.harness.conf.remote_deletion_node_prefix()); + let deletion_prefix = ctx.harness.conf.deletion_prefix(); // Inject a file, delete it, and flush to a deletion list std::fs::create_dir_all(&remote_timeline_path)?; @@ -617,10 +616,7 @@ mod test { [layer_file_name_1.clone()].to_vec(), ))?; ctx.runtime.block_on(client.flush())?; - assert_remote_files( - &["0000000000000001-00000000-01.list"], - &remote_deletion_prefix, - ); + assert_local_files(&["0000000000000001-01.list"], &deletion_prefix); // Restart the deletion queue drop(client); @@ -631,7 +627,7 @@ mod test { info!("Flush-executing"); ctx.runtime.block_on(client.flush_execute())?; assert_remote_files(&[], &remote_timeline_path); - assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix); + assert_local_files(&["header-01"], &deletion_prefix); Ok(()) } } diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index b803fc2c79..8d32d23668 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -1,8 +1,5 @@ use std::time::Duration; -use remote_storage::GenericRemoteStorage; -use remote_storage::RemotePath; -use remote_storage::MAX_KEYS_PER_DELETE; use tracing::debug; use tracing::info; use tracing::warn; @@ -28,7 +25,6 @@ pub(super) enum BackendQueueMessage { Flush(FlushOp), } pub struct BackendQueueWorker { - remote_storage: GenericRemoteStorage, conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, @@ -49,13 +45,11 @@ pub struct BackendQueueWorker { impl BackendQueueWorker { pub(super) fn new( - remote_storage: GenericRemoteStorage, conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, ) -> Self { Self { - remote_storage, conf, rx, tx, @@ -88,16 +82,11 @@ impl BackendQueueWorker { // if there are no lists) let header = DeletionHeader::new(max_executed_seq); debug!("Writing header {:?}", header); - let bytes = serde_json::to_vec(&header).expect("Failed to serialize deletion header"); - let size = bytes.len(); - let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); - let header_key = self.conf.remote_deletion_header_path(); + let header_bytes = + serde_json::to_vec(&header).expect("Failed to serialize deletion header"); + let header_path = self.conf.deletion_header_path(); - if let Err(e) = self - .remote_storage - .upload(source, size, &header_key, None) - .await - { + if let Err(e) = tokio::fs::write(&header_path, header_bytes).await { warn!("Failed to upload deletion queue header: {e:#}"); DELETION_QUEUE_ERRORS .with_label_values(&["put_header"]) @@ -105,27 +94,14 @@ impl BackendQueueWorker { return; } - let executed_keys: Vec = self - .executed_lists - .iter() - .rev() - .take(MAX_KEYS_PER_DELETE) - .map(|l| self.conf.remote_deletion_list_path(l.sequence)) - .collect(); - - match self.remote_storage.delete_objects(&executed_keys).await { - Ok(()) => { - // Retain any lists that couldn't be deleted in that request - self.executed_lists - .truncate(self.executed_lists.len() - executed_keys.len()); - } - Err(e) => { - warn!("Failed to delete deletion list(s): {e:#}"); - // Do nothing: the elements remain in executed_lists, and purge will be retried - // next time we process some deletions and go around the loop. - DELETION_QUEUE_ERRORS - .with_label_values(&["delete_list"]) - .inc(); + while let Some(list) = self.executed_lists.pop() { + let list_path = self.conf.deletion_list_path(list.sequence); + if let Err(e) = tokio::fs::remove_file(&list_path).await { + // Unexpected: we should have permissions and nothing else should + // be touching these files + tracing::error!("Failed to delete {0}: {e:#}", list_path.display()); + self.executed_lists.push(list); + break; } } } diff --git a/pageserver/src/deletion_queue/frontend.rs b/pageserver/src/deletion_queue/frontend.rs index 5f366e2f78..edccfc0cc8 100644 --- a/pageserver/src/deletion_queue/frontend.rs +++ b/pageserver/src/deletion_queue/frontend.rs @@ -2,20 +2,16 @@ use super::BackendQueueMessage; use super::DeletionHeader; use super::DeletionList; use super::FlushOp; -use super::FAILED_REMOTE_OP_RETRIES; -use super::FAILED_REMOTE_OP_WARN_THRESHOLD; +use std::fs::create_dir_all; use std::time::Duration; use regex::Regex; -use remote_storage::DownloadError; -use remote_storage::GenericRemoteStorage; use remote_storage::RemotePath; use tokio_util::sync::CancellationToken; use tracing::debug; use tracing::info; use tracing::warn; -use utils::backoff; use utils::id::TenantId; use utils::id::TimelineId; @@ -59,7 +55,6 @@ pub(super) enum FrontendQueueMessage { } pub struct FrontendQueueWorker { - remote_storage: GenericRemoteStorage, conf: &'static PageServerConf, // Incoming frontend requests to delete some keys @@ -81,7 +76,6 @@ pub struct FrontendQueueWorker { impl FrontendQueueWorker { pub(super) fn new( - remote_storage: GenericRemoteStorage, conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, @@ -89,7 +83,6 @@ impl FrontendQueueWorker { ) -> Self { Self { pending: DeletionList::new(1), - remote_storage, conf, rx, tx, @@ -98,23 +91,12 @@ impl FrontendQueueWorker { } } async fn upload_pending_list(&mut self) -> anyhow::Result<()> { - let key = &self.conf.remote_deletion_list_path(self.pending.sequence); + let path = self.conf.deletion_list_path(self.pending.sequence); - backoff::retry( - || { - let bytes = - serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); - let size = bytes.len(); - let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes)); - self.remote_storage.upload(source, size, key, None) - }, - |_| false, - FAILED_REMOTE_OP_WARN_THRESHOLD, - FAILED_REMOTE_OP_RETRIES, - "upload deletion list", - backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), - ) - .await + let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list"); + tokio::fs::write(&path, &bytes).await?; + tokio::fs::File::open(&path).await?.sync_all().await?; + Ok(()) } /// Try to flush `list` to persistent storage @@ -161,21 +143,19 @@ impl FrontendQueueWorker { async fn recover(&mut self) -> Result<(), anyhow::Error> { // Load header: this is not required to be present, e.g. when a pageserver first runs - let header_path = self.conf.remote_deletion_header_path(); - let header_bytes = match backoff::retry( - || self.remote_storage.download_all(&header_path), - |e| matches!(e, DownloadError::NotFound), - FAILED_REMOTE_OP_WARN_THRESHOLD, - u32::MAX, - "Reading deletion queue header", - backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), - ) - .await - { + let header_path = self.conf.deletion_header_path(); + + // Synchronous, but we only do it once per process lifetime so it's tolerable + create_dir_all(&self.conf.deletion_prefix())?; + + let header_bytes = match tokio::fs::read(&header_path).await { Ok(h) => Ok(Some(h)), Err(e) => { - if let DownloadError::NotFound = e { - debug!("Deletion header {header_path} not found, first start?"); + if e.kind() == std::io::ErrorKind::NotFound { + debug!( + "Deletion header {0} not found, first start?", + header_path.display() + ); Ok(None) } else { Err(e) @@ -187,7 +167,10 @@ impl FrontendQueueWorker { if let Some(header) = match serde_json::from_slice::(&header_bytes) { Ok(h) => Some(h), Err(e) => { - warn!("Failed to deserialize deletion header, ignoring {header_path}: {e:#}"); + warn!( + "Failed to deserialize deletion header, ignoring {0}: {e:#}", + header_path.display() + ); // This should never happen unless we make a mistake with our serialization. // Ignoring a deletion header is not consequential for correctnes because all deletions // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up. @@ -199,42 +182,27 @@ impl FrontendQueueWorker { }; }; - let prefix = RemotePath::new(&self.conf.remote_deletion_node_prefix()) - .expect("Failed to compose path"); - let lists = backoff::retry( - || async { self.remote_storage.list_prefixes(Some(&prefix)).await }, - |_| false, - FAILED_REMOTE_OP_WARN_THRESHOLD, - u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck - "Recovering deletion lists", - backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), - ) - .await?; + let mut dir = match tokio::fs::read_dir(&self.conf.deletion_prefix()).await { + Ok(d) => d, + Err(e) => { + warn!( + "Failed to open deletion list directory {0}: {e:#}", + header_path.display() + ); - debug!("Loaded {} keys in deletion prefix {}", lists.len(), prefix); - let list_name_pattern = - Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{8})-([a-zA-Z0-9]{2}).list").unwrap(); + // Give up: if we can't read the deletion list directory, we probably can't + // write lists into it later, so the queue won't work. + return Err(e.into()); + } + }; + + let list_name_pattern = Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{2}).list").unwrap(); let mut seqs: Vec = Vec::new(); - for l in &lists { - if l == &header_path { - // Don't try and parse the header key as a list key - continue; - } - - let basename = l - .strip_prefix(&prefix) - .expect("Stripping prefix frrom a prefix listobjects should always work"); - let basename = match basename.to_str() { - Some(s) => s, - None => { - // Should never happen, we are the only ones writing objects here - warn!("Unexpected key encoding in deletion queue object"); - continue; - } - }; - - let seq_part = if let Some(m) = list_name_pattern.captures(basename) { + while let Some(dentry) = dir.next_entry().await? { + let file_name = dentry.file_name().to_owned(); + let basename = file_name.to_string_lossy(); + let seq_part = if let Some(m) = list_name_pattern.captures(&basename) { m.get(1) .expect("Non optional group should be present") .as_str() @@ -252,7 +220,6 @@ impl FrontendQueueWorker { }; seqs.push(seq); } - seqs.sort(); // Initialize the next sequence number in the frontend based on the maximum of the highest list we see, @@ -263,19 +230,10 @@ impl FrontendQueueWorker { } for s in seqs { - let list_path = self.conf.remote_deletion_list_path(s); - let lists_body = backoff::retry( - || self.remote_storage.download_all(&list_path), - |_| false, - FAILED_REMOTE_OP_WARN_THRESHOLD, - u32::MAX, - "Reading a deletion list", - backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown), - ) - .await?; + let list_path = self.conf.deletion_list_path(s); + let list_bytes = tokio::fs::read(&list_path).await?; - let deletion_list = match serde_json::from_slice::(lists_body.as_slice()) - { + let deletion_list = match serde_json::from_slice::(&list_bytes) { Ok(l) => l, Err(e) => { // Drop the list on the floor: any objects it referenced will be left behind @@ -305,7 +263,7 @@ impl FrontendQueueWorker { let mut recovered: bool = false; - loop { + while !self.cancel.is_cancelled() { let timeout = if self.pending_flushes.is_empty() { FRONTEND_DEFAULT_TIMEOUT } else { @@ -333,9 +291,7 @@ impl FrontendQueueWorker { if let Err(e) = self.recover().await { // This should only happen in truly unrecoverable cases, like the recovery finding that the backend // queue receiver has been dropped. - info!( - "Deletion queue recover aborted, deletion queue will not proceed ({e:#})" - ); + info!("Deletion queue recover aborted, deletion queue will not proceed ({e})"); return; } else { recovered = true;