diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index 78849edfc1..bd09e9c4d7 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -83,15 +83,15 @@ impl ControlPlaneClient { .json(&request) .send() .await - .map_err(|e| RemoteAttemptError::Remote(e))?; + .map_err(RemoteAttemptError::Remote)?; response .error_for_status_ref() - .map_err(|e| RemoteAttemptError::Remote(e))?; + .map_err(RemoteAttemptError::Remote)?; response .json::() .await - .map_err(|e| RemoteAttemptError::Remote(e)) + .map_err(RemoteAttemptError::Remote) }, |_| false, 3, @@ -128,11 +128,11 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient { response.tenants.len() ); - return Ok(response + Ok(response .tenants .into_iter() .map(|t| (t.id, Generation::new(t.generation))) - .collect::>()); + .collect::>()) } async fn validate( diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs new file mode 100644 index 0000000000..51a2cf9f48 --- /dev/null +++ b/pageserver/src/deletion_queue.rs @@ -0,0 +1,1239 @@ +mod backend; +mod executor; +mod frontend; + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use crate::control_plane_client::ControlPlaneGenerationsApi; +use crate::metrics::DELETION_QUEUE_SUBMITTED; +use crate::tenant::remote_timeline_client::remote_layer_path; +use crate::tenant::remote_timeline_client::remote_timeline_path; +use anyhow::Context; +use hex::FromHex; +use remote_storage::{GenericRemoteStorage, RemotePath}; +use serde::Deserialize; +use serde::Serialize; +use serde_with::serde_as; +use thiserror::Error; +use tokio; +use tokio_util::sync::CancellationToken; +use tracing::{self, debug, error}; +use utils::generation::Generation; +use utils::id::{TenantId, TimelineId}; +use utils::lsn::Lsn; + +pub(crate) use self::backend::BackendQueueWorker; +use self::executor::ExecutorWorker; +use self::frontend::DeletionOp; +pub(crate) use self::frontend::FrontendQueueWorker; +use self::frontend::RecoverOp; +use backend::BackendQueueMessage; +use executor::ExecutorMessage; +use frontend::FrontendQueueMessage; + +use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName}; + +// TODO: adminstrative "panic button" config property to disable all deletions +// TODO: configurable for how long to wait before executing deletions + +/// We aggregate object deletions from many tenants in one place, for several reasons: +/// - Coalesce deletions into fewer DeleteObjects calls +/// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes +/// to flush any outstanding deletions. +/// - Globally control throughput of deletions, as these are a low priority task: do +/// not compete with the same S3 clients/connections used for higher priority uploads. +/// - Enable gating deletions on validation of a tenant's generation number, to make +/// it safe to multi-attach tenants (see docs/rfcs/025-generation-numbers.md) +/// +/// There are two kinds of deletion: deferred and immediate. A deferred deletion +/// may be intentionally delayed to protect passive readers of S3 data, and is +/// subject to a generation number validation step. An immediate deletion is +/// ready to execute immediately, and is only queued up so that it can be coalesced +/// with other deletions in flight. +/// +/// Deferred deletions pass through three steps: +/// - Frontend: accumulate deletion requests from Timelines, and batch them up into +/// DeletionLists, which are persisted to disk. +/// - Backend: accumulate deletion lists, and validate them en-masse prior to passing +/// the keys in the list onward for actual deletion. Also validate remote_consistent_lsn +/// updates for running timelines. +/// - Executor: accumulate object keys that the backend has validated, and execute them in +/// batches of 1000 keys via DeleteObjects. +/// +/// Non-deferred deletions, such as during timeline deletion, bypass the first +/// two stages and are passed straight into the Executor. +/// +/// Internally, each stage is joined by a channel to the next. On disk, there is only +/// one queue (of DeletionLists), which is written by the frontend and consumed +/// by the backend. +#[derive(Clone)] +pub struct DeletionQueue { + client: DeletionQueueClient, +} + +#[derive(Debug)] +struct FlushOp { + tx: tokio::sync::oneshot::Sender<()>, +} + +impl FlushOp { + fn fire(self) { + if self.tx.send(()).is_err() { + // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush. + debug!("deletion queue flush from dropped client"); + }; + } +} + +#[derive(Clone, Debug)] +pub struct DeletionQueueClient { + tx: tokio::sync::mpsc::Sender, + executor_tx: tokio::sync::mpsc::Sender, + + lsn_table: Arc>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct TenantDeletionList { + /// For each Timeline, a list of key fragments to append to the timeline remote path + /// when reconstructing a full key + #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] + timelines: HashMap>, + + /// The generation in which this deletion was emitted: note that this may not be the + /// same as the generation of any layers being deleted. The generation of the layer + /// has already been absorbed into the keys in `objects` + generation: Generation, +} + +impl TenantDeletionList { + pub(crate) fn len(&self) -> usize { + self.timelines.values().map(|v| v.len()).sum() + } +} + +/// For HashMaps using a `hex` compatible key, where we would like to encode the key as a string +fn to_hex_map(input: &HashMap, serializer: S) -> Result +where + S: serde::Serializer, + V: Serialize, + I: AsRef<[u8]>, +{ + let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone())); + + transformed + .collect::>() + .serialize(serializer) +} + +/// For HashMaps using a FromHex key, where we would like to decode the key +fn from_hex_map<'de, D, V, I>(deserializer: D) -> Result, D::Error> +where + D: serde::de::Deserializer<'de>, + V: Deserialize<'de>, + I: FromHex + std::hash::Hash + Eq, +{ + let hex_map = HashMap::::deserialize(deserializer)?; + hex_map + .into_iter() + .map(|(k, v)| { + I::from_hex(k) + .map(|k| (k, v)) + .map_err(|_| serde::de::Error::custom("Invalid hex ID")) + }) + .collect() +} + +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +struct DeletionList { + /// Serialization version, for future use + version: u8, + + /// Used for constructing a unique key for each deletion list we write out. + sequence: u64, + + /// To avoid repeating tenant/timeline IDs in every key, we store keys in + /// nested HashMaps by TenantTimelineID. Each Tenant only appears once + /// with one unique generation ID: if someone tries to push a second generation + /// ID for the same tenant, we will start a new DeletionList. + #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] + tenants: HashMap, + + /// Avoid having to walk `tenants` to calculate the number of keys in + /// the nested deletion lists + size: usize, + + /// Set to true when the list has undergone validation with the control + /// plane and the remaining contents of `tenants` are valid. A list may + /// also be implicitly marked valid by DeletionHeader.validated_sequence + /// advancing to >= DeletionList.sequence + #[serde(default)] + #[serde(skip_serializing_if = "std::ops::Not::not")] + validated: bool, +} + +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +struct DeletionHeader { + /// Serialization version, for future use + version: u8, + + /// The highest sequence number (inclusive) that has been validated. All deletion + /// lists on disk with a sequence <= this value are safe to execute. + validated_sequence: u64, +} + +impl DeletionHeader { + const VERSION_LATEST: u8 = 1; + + fn new(validated_sequence: u64) -> Self { + Self { + version: Self::VERSION_LATEST, + validated_sequence, + } + } + + async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> { + debug!("Saving deletion list header {:?}", self); + let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; + let header_path = conf.deletion_header_path(); + + tokio::fs::write(&header_path, header_bytes) + .await + .map_err(|e| anyhow::anyhow!(e)) + } +} + +impl DeletionList { + const VERSION_LATEST: u8 = 1; + fn new(sequence: u64) -> Self { + Self { + version: Self::VERSION_LATEST, + sequence, + tenants: HashMap::new(), + size: 0, + validated: false, + } + } + + fn drain(&mut self) -> Self { + let mut tenants = HashMap::new(); + std::mem::swap(&mut self.tenants, &mut tenants); + let other = Self { + version: Self::VERSION_LATEST, + sequence: self.sequence, + tenants, + size: self.size, + validated: self.validated, + }; + self.size = 0; + other + } + + fn is_empty(&self) -> bool { + self.tenants.is_empty() + } + + fn len(&self) -> usize { + self.size + } + + /// Returns true if the push was accepted, false if the caller must start a new + /// deletion list. + fn push( + &mut self, + tenant: &TenantId, + timeline: &TimelineId, + generation: Generation, + objects: &mut Vec, + ) -> bool { + if objects.is_empty() { + // Avoid inserting an empty TimelineDeletionList: this preserves the property + // that if we have no keys, then self.objects is empty (used in Self::is_empty) + return true; + } + + let tenant_entry = self + .tenants + .entry(*tenant) + .or_insert_with(|| TenantDeletionList { + timelines: HashMap::new(), + generation, + }); + + if tenant_entry.generation != generation { + // Only one generation per tenant per list: signal to + // caller to start a new list. + return false; + } + + let timeline_entry = tenant_entry + .timelines + .entry(*timeline) + .or_insert_with(Vec::new); + + let timeline_remote_path = remote_timeline_path(tenant, timeline); + + self.size += objects.len(); + timeline_entry.extend(objects.drain(..).map(|p| { + p.strip_prefix(&timeline_remote_path) + .expect("Timeline paths always start with the timeline prefix") + .to_string_lossy() + .to_string() + })); + true + } + + fn drain_paths(&mut self) -> Vec { + let mut tenants = HashMap::new(); + std::mem::swap(&mut tenants, &mut self.tenants); + + let mut result = Vec::new(); + for (tenant, tenant_deletions) in tenants.into_iter() { + for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() { + let timeline_remote_path = remote_timeline_path(&tenant, &timeline); + result.extend( + timeline_layers + .into_iter() + .map(|l| timeline_remote_path.join(&PathBuf::from(l))), + ); + } + } + + result + } + + async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> { + let path = conf.deletion_list_path(self.sequence); + + let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); + tokio::fs::write(&path, &bytes).await?; + tokio::fs::File::open(&path).await?.sync_all().await?; + Ok(()) + } +} + +impl std::fmt::Display for DeletionList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DeletionList", + self.sequence, + self.tenants.len(), + self.size + ) + } +} + +struct PendingLsn { + projected: Lsn, + result_tx: tokio::sync::mpsc::Sender, +} + +struct TenantLsnState { + timelines: HashMap, + + // In what generation was the most recent update proposed? + generation: Generation, + + // Any timelines' LSNs projected since this flag was last cleared? + // (optimization so that reader doesn't have to walk timelines) + dirty: bool, +} + +struct VisibleLsnUpdates { + tenants: HashMap, +} + +impl VisibleLsnUpdates { + fn new() -> Self { + Self { + tenants: HashMap::new(), + } + } +} + +impl std::fmt::Debug for VisibleLsnUpdates { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "VisibleLsnUpdates({} tenants)", self.tenants.len()) + } +} + +#[derive(Error, Debug)] +pub enum DeletionQueueError { + #[error("Deletion queue unavailable during shutdown")] + ShuttingDown, +} + +impl DeletionQueueClient { + pub(crate) fn broken() -> Self { + // Channels whose receivers are immediately dropped. + let (tx, _rx) = tokio::sync::mpsc::channel(1); + let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1); + Self { + tx, + executor_tx, + lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())), + } + } + + async fn do_push(&self, msg: FrontendQueueMessage) -> Result<(), DeletionQueueError> { + match self.tx.send(msg).await { + Ok(_) => Ok(()), + Err(e) => { + // This shouldn't happen, we should shut down all tenants before + // we shut down the global delete queue. If we encounter a bug like this, + // we may leak objects as deletions won't be processed. + error!("Deletion queue closed while pushing, shutting down? ({e})"); + Err(DeletionQueueError::ShuttingDown) + } + } + } + + pub(crate) async fn recover( + &self, + attached_tenants: HashMap, + ) -> Result<(), DeletionQueueError> { + self.do_push(FrontendQueueMessage::Recover(RecoverOp { + attached_tenants, + })) + .await + } + + /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside + /// world, it must validate its generation number before doing so. Rather than do this synchronously, + /// we allow the timeline to publish updates at will via this API, and then read back what LSN was most + /// recently validated separately. + /// + /// In this function we publish the LSN to the `projected` field of the timeline's entry in the VisibleLsnUpdates. The + /// backend will later wake up and notice that the tenant's generation requires validation. + pub(crate) async fn update_remote_consistent_lsn( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + current_generation: Generation, + lsn: Lsn, + result_tx: tokio::sync::mpsc::Sender, + ) { + let mut locked = self + .lsn_table + .write() + .expect("Lock should never be poisoned"); + + let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState { + timelines: HashMap::new(), + generation: current_generation, + dirty: true, + }); + + if tenant_entry.generation != current_generation { + // Generation might have changed if we were detached and then re-attached: in this case, + // state from the previous generation cannot be trusted. + tenant_entry.timelines.clear(); + tenant_entry.generation = current_generation; + } + + tenant_entry.dirty = true; + + tenant_entry.timelines.insert( + timeline_id, + PendingLsn { + projected: lsn, + result_tx, + }, + ); + } + + /// Submit a list of layers for deletion: this function will return before the deletion is + /// persistent, but it may be executed at any time after this function enters: do not push + /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer + /// references them). + /// + /// The `current_generation` is the generation of this pageserver's current attachment. The + /// generations in `layers` are the generations in which those layers were written. + pub(crate) async fn push_layers( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + current_generation: Generation, + layers: Vec<(LayerFileName, Generation)>, + ) -> Result<(), DeletionQueueError> { + if current_generation.is_none() { + debug!("Enqueuing deletions in legacy mode, skipping queue"); + let mut layer_paths = Vec::new(); + for (layer, generation) in layers { + layer_paths.push(remote_layer_path( + &tenant_id, + &timeline_id, + &layer, + generation, + )); + } + self.push_immediate(layer_paths).await?; + return self.flush_immediate().await; + } + + DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64); + self.do_push(FrontendQueueMessage::Delete(DeletionOp { + tenant_id, + timeline_id, + layers, + generation: current_generation, + objects: Vec::new(), + })) + .await + } + + async fn do_flush( + &self, + msg: FrontendQueueMessage, + rx: tokio::sync::oneshot::Receiver<()>, + ) -> Result<(), DeletionQueueError> { + self.do_push(msg).await?; + if rx.await.is_err() { + // This shouldn't happen if tenants are shut down before deletion queue. If we + // encounter a bug like this, then a flusher will incorrectly believe it has flushed + // when it hasn't, possibly leading to leaking objects. + error!("Deletion queue dropped flush op while client was still waiting"); + Err(DeletionQueueError::ShuttingDown) + } else { + Ok(()) + } + } + + /// Wait until all previous deletions are persistent (either executed, or written to a DeletionList) + pub async fn flush(&self) -> Result<(), DeletionQueueError> { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx) + .await + } + + // Wait until all previous deletions are executed + pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> { + debug!("flush_execute: flushing to deletion lists..."); + // Flush any buffered work to deletion lists + self.flush().await?; + + // Flush execution of deletion lists + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + debug!("flush_execute: flushing execution..."); + self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx) + .await?; + debug!("flush_execute: finished flushing execution..."); + Ok(()) + } + + /// This interface bypasses the persistent deletion queue, and any validation + /// that this pageserver is still elegible to execute the deletions. It is for + /// use in timeline deletions, where the control plane is telling us we may + /// delete everything in the timeline. + /// + /// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`. + pub(crate) async fn push_immediate( + &self, + objects: Vec, + ) -> Result<(), DeletionQueueError> { + DELETION_QUEUE_SUBMITTED.inc_by(objects.len() as u64); + self.executor_tx + .send(ExecutorMessage::Delete(objects)) + .await + .map_err(|_| DeletionQueueError::ShuttingDown) + } + + /// Companion to push_immediate. When this returns Ok, all prior objects sent + /// into push_immediate have been deleted from remote storage. + pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + self.executor_tx + .send(ExecutorMessage::Flush(FlushOp { tx })) + .await + .map_err(|_| DeletionQueueError::ShuttingDown)?; + + rx.await.map_err(|_| DeletionQueueError::ShuttingDown) + } +} + +impl DeletionQueue { + pub fn new_client(&self) -> DeletionQueueClient { + self.client.clone() + } + + /// Caller may use the returned object to construct clients with new_client. + /// Caller should tokio::spawn the background() members of the two worker objects returned: + /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice. + /// + /// If remote_storage is None, then the returned workers will also be None. + pub fn new( + remote_storage: Option, + control_plane_client: Option>, + conf: &'static PageServerConf, + cancel: CancellationToken, + ) -> ( + Self, + Option, + Option, + Option, + ) { + // Deep channel: it consumes deletions from all timelines and we do not want to block them + let (tx, rx) = tokio::sync::mpsc::channel(16384); + + // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions + let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16); + + // Shallow channel: it carries lists of paths, and we expect the main queueing to + // happen in the backend (persistent), not in this queue. + let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16); + + let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())); + + let remote_storage = match remote_storage { + None => { + return ( + Self { + client: DeletionQueueClient { + tx, + executor_tx, + lsn_table: lsn_table.clone(), + }, + }, + None, + None, + None, + ) + } + Some(r) => r, + }; + + ( + Self { + client: DeletionQueueClient { + tx, + executor_tx: executor_tx.clone(), + lsn_table: lsn_table.clone(), + }, + }, + Some(FrontendQueueWorker::new( + conf, + rx, + backend_tx, + cancel.clone(), + )), + Some(BackendQueueWorker::new( + conf, + backend_rx, + executor_tx, + control_plane_client, + lsn_table.clone(), + cancel.clone(), + )), + Some(ExecutorWorker::new( + remote_storage, + executor_rx, + cancel.clone(), + )), + ) + } +} + +#[cfg(test)] +mod test { + use hex_literal::hex; + use std::{ + io::ErrorKind, + path::{Path, PathBuf}, + time::Duration, + }; + use tracing::info; + + use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; + use tokio::task::JoinHandle; + + use crate::{ + repository::Key, + tenant::{ + harness::TenantHarness, remote_timeline_client::remote_timeline_path, + storage_layer::DeltaFileName, + }, + }; + + use super::*; + pub const TIMELINE_ID: TimelineId = + TimelineId::from_array(hex!("11223344556677881122334455667788")); + + pub const EXAMPLE_LAYER_NAME: LayerFileName = LayerFileName::Delta(DeltaFileName { + key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF), + lsn_range: Lsn(0x00000000016B59D8)..Lsn(0x00000000016B5A51), + }); + + // When you need a second layer in a test. + pub const EXAMPLE_LAYER_NAME_ALT: LayerFileName = LayerFileName::Delta(DeltaFileName { + key_range: Key::from_i128(0x0)..Key::from_i128(0xFFFFFFFFFFFFFFFF), + lsn_range: Lsn(0x00000000016B5A51)..Lsn(0x00000000016B5A61), + }); + + struct TestSetup { + harness: TenantHarness, + remote_fs_dir: PathBuf, + storage: GenericRemoteStorage, + mock_control_plane: Arc, + deletion_queue: DeletionQueue, + fe_worker: JoinHandle<()>, + be_worker: JoinHandle<()>, + ex_worker: JoinHandle<()>, + } + + impl TestSetup { + /// Simulate a pageserver restart by destroying and recreating the deletion queue + async fn restart(&mut self) { + let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new( + Some(self.storage.clone()), + Some(self.mock_control_plane.clone()), + self.harness.conf, + CancellationToken::new(), + ); + + self.deletion_queue = deletion_queue; + + let mut fe_worker = fe_worker.unwrap(); + let mut be_worker = be_worker.unwrap(); + let mut ex_worker = ex_worker.unwrap(); + let mut fe_worker = tokio::spawn(async move { fe_worker.background().await }); + let mut be_worker = tokio::spawn(async move { be_worker.background().await }); + let mut ex_worker = tokio::spawn(async move { + drop(ex_worker.background().await); + }); + std::mem::swap(&mut self.fe_worker, &mut fe_worker); + std::mem::swap(&mut self.be_worker, &mut be_worker); + std::mem::swap(&mut self.ex_worker, &mut ex_worker); + + // Join the old workers + fe_worker.await.unwrap(); + be_worker.await.unwrap(); + ex_worker.await.unwrap(); + } + + fn set_latest_generation(&self, gen: Generation) { + let tenant_id = self.harness.tenant_id; + self.mock_control_plane + .latest_generation + .lock() + .unwrap() + .insert(tenant_id, gen); + } + + /// Returns remote layer file name, suitable for use in assert_remote_files + fn write_remote_layer( + &self, + file_name: LayerFileName, + gen: Generation, + ) -> anyhow::Result { + let tenant_id = self.harness.tenant_id; + let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); + let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path()); + std::fs::create_dir_all(&remote_timeline_path)?; + let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix()); + + let content: Vec = format!("placeholder contents of {file_name}").into(); + + std::fs::write( + remote_timeline_path.join(remote_layer_file_name.clone()), + content, + )?; + + Ok(remote_layer_file_name) + } + } + + struct MockControlPlane { + pub latest_generation: std::sync::Mutex>, + } + + impl MockControlPlane { + fn new() -> Self { + Self { + latest_generation: std::sync::Mutex::new(HashMap::new()), + } + } + } + + unsafe impl Send for MockControlPlane {} + unsafe impl Sync for MockControlPlane {} + + #[async_trait::async_trait] + impl ControlPlaneGenerationsApi for MockControlPlane { + async fn re_attach(&self) -> anyhow::Result> { + unimplemented!() + } + async fn validate( + &self, + tenants: Vec<(TenantId, Generation)>, + ) -> anyhow::Result> { + let mut result = HashMap::new(); + + let latest_generation = self.latest_generation.lock().unwrap(); + + for (tenant_id, generation) in tenants { + if let Some(latest) = latest_generation.get(&tenant_id) { + result.insert(tenant_id, *latest == generation); + } + } + + Ok(result) + } + } + + fn setup(test_name: &str) -> anyhow::Result { + let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}"))); + let harness = TenantHarness::create(test_name)?; + + // We do not load() the harness: we only need its config and remote_storage + + // Set up a GenericRemoteStorage targetting a directory + let remote_fs_dir = harness.conf.workdir.join("remote_fs"); + std::fs::create_dir_all(remote_fs_dir)?; + let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?; + let storage_config = RemoteStorageConfig { + max_concurrent_syncs: std::num::NonZeroUsize::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, + ) + .unwrap(), + max_sync_errors: std::num::NonZeroU32::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, + ) + .unwrap(), + storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), + }; + let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); + + let mock_control_plane = Arc::new(MockControlPlane::new()); + + let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new( + Some(storage.clone()), + Some(mock_control_plane.clone()), + harness.conf, + CancellationToken::new(), + ); + + let mut fe_worker = fe_worker.unwrap(); + let mut be_worker = be_worker.unwrap(); + let mut ex_worker = ex_worker.unwrap(); + let fe_worker_join = tokio::spawn(async move { fe_worker.background().await }); + let be_worker_join = tokio::spawn(async move { be_worker.background().await }); + let ex_worker_join = tokio::spawn(async move { + drop(ex_worker.background().await); + }); + + Ok(TestSetup { + harness, + remote_fs_dir, + storage, + mock_control_plane, + deletion_queue, + fe_worker: fe_worker_join, + be_worker: be_worker_join, + ex_worker: ex_worker_join, + }) + } + + // TODO: put this in a common location so that we can share with remote_timeline_client's tests + fn assert_remote_files(expected: &[&str], remote_path: &Path) { + let mut expected: Vec = expected.iter().map(|x| String::from(*x)).collect(); + expected.sort(); + + let mut found: Vec = Vec::new(); + let dir = match std::fs::read_dir(remote_path) { + Ok(d) => d, + Err(e) => { + if e.kind() == ErrorKind::NotFound { + if expected.is_empty() { + // We are asserting prefix is empty: it is expected that the dir is missing + return; + } else { + assert_eq!(expected, Vec::::new()); + unreachable!(); + } + } else { + panic!( + "Unexpected error listing {}: {e}", + remote_path.to_string_lossy() + ); + } + } + }; + + for entry in dir.flatten() { + let entry_name = entry.file_name(); + let fname = entry_name.to_str().unwrap(); + found.push(String::from(fname)); + } + found.sort(); + + assert_eq!(expected, found); + } + + fn assert_local_files(expected: &[&str], directory: &Path) { + let dir = match std::fs::read_dir(directory) { + Ok(d) => d, + Err(_) => { + assert_eq!(expected, &Vec::::new()); + return; + } + }; + let mut found = Vec::new(); + for dentry in dir { + let dentry = dentry.unwrap(); + let file_name = dentry.file_name(); + let file_name_str = file_name.to_string_lossy(); + found.push(file_name_str.to_string()); + } + found.sort(); + assert_eq!(expected, found); + } + + #[tokio::test] + async fn deletion_queue_smoke() -> anyhow::Result<()> { + // Basic test that the deletion queue processes the deletions we pass into it + let ctx = setup("deletion_queue_smoke").expect("Failed test setup"); + let client = ctx.deletion_queue.new_client(); + + let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); + let tenant_id = ctx.harness.tenant_id; + + let content: Vec = "victim1 contents".into(); + let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); + let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); + let deletion_prefix = ctx.harness.conf.deletion_prefix(); + + // Exercise the distinction between the generation of the layers + // we delete, and the generation of the running Tenant. + let layer_generation = Generation::new(0xdeadbeef); + let now_generation = Generation::new(0xfeedbeef); + + let remote_layer_file_name_1 = + format!("{}{}", layer_file_name_1, layer_generation.get_suffix()); + + // Set mock control plane state to valid for our generation + ctx.set_latest_generation(now_generation); + + // Inject a victim file to remote storage + info!("Writing"); + std::fs::create_dir_all(&remote_timeline_path)?; + std::fs::write( + remote_timeline_path.join(remote_layer_file_name_1.clone()), + content, + )?; + assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path); + + // File should still be there after we push it to the queue (we haven't pushed enough to flush anything) + info!("Pushing"); + client + .push_layers( + tenant_id, + TIMELINE_ID, + now_generation, + [(layer_file_name_1.clone(), layer_generation)].to_vec(), + ) + .await?; + assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path); + + assert_local_files(&[], &deletion_prefix); + + // File should still be there after we write a deletion list (we haven't pushed enough to execute anything) + info!("Flushing"); + client.flush().await?; + assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path); + assert_local_files(&["0000000000000001-01.list"], &deletion_prefix); + + // File should go away when we execute + info!("Flush-executing"); + client.flush_execute().await?; + assert_remote_files(&[], &remote_timeline_path); + assert_local_files(&["header-01"], &deletion_prefix); + + // Flushing on an empty queue should succeed immediately, and not write any lists + info!("Flush-executing on empty"); + client.flush_execute().await?; + assert_local_files(&["header-01"], &deletion_prefix); + + Ok(()) + } + + #[tokio::test] + async fn deletion_queue_validation() -> anyhow::Result<()> { + let ctx = setup("deletion_queue_validation").expect("Failed test setup"); + let client = ctx.deletion_queue.new_client(); + + // Generation that the control plane thinks is current + let latest_generation = Generation::new(0xdeadbeef); + // Generation that our DeletionQueue thinks the tenant is running with + let stale_generation = latest_generation.previous(); + // Generation that our example layer file was written with + let layer_generation = stale_generation.previous(); + + ctx.set_latest_generation(latest_generation); + + let tenant_id = ctx.harness.tenant_id; + let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); + let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); + + // Initial state: a remote layer exists + let remote_layer_name = ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?; + assert_remote_files(&[&remote_layer_name], &remote_timeline_path); + + tracing::debug!("Pushing..."); + client + .push_layers( + tenant_id, + TIMELINE_ID, + stale_generation, + [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), + ) + .await?; + + // We enqueued the operation in a stale generation: it should have failed validation + tracing::debug!("Flushing..."); + tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??; + assert_remote_files(&[&remote_layer_name], &remote_timeline_path); + + tracing::debug!("Pushing..."); + client + .push_layers( + tenant_id, + TIMELINE_ID, + latest_generation, + [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), + ) + .await?; + + // We enqueued the operation in a fresh generation: it should have passed validation + tracing::debug!("Flushing..."); + tokio::time::timeout(Duration::from_secs(5), client.flush_execute()).await??; + assert_remote_files(&[], &remote_timeline_path); + + Ok(()) + } + + #[tokio::test] + async fn deletion_queue_recovery() -> anyhow::Result<()> { + // Basic test that the deletion queue processes the deletions we pass into it + let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup"); + let client = ctx.deletion_queue.new_client(); + + let tenant_id = ctx.harness.tenant_id; + + let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); + let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); + let deletion_prefix = ctx.harness.conf.deletion_prefix(); + + let layer_generation = Generation::new(0xdeadbeef); + let now_generation = Generation::new(0xfeedbeef); + + // Inject a deletion in the generation before generation_now: after restart, + // this deletion should _not_ get executed (only the immediately previous + // generation gets that treatment) + let remote_layer_file_name_historical = + ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?; + client + .push_layers( + tenant_id, + TIMELINE_ID, + now_generation.previous(), + [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), + ) + .await?; + + // Inject a deletion in the generation before generation_now: after restart, + // this deletion should get executed, because we execute deletions in the + // immediately previous generation on the same node. + let remote_layer_file_name_previous = + ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?; + client + .push_layers( + tenant_id, + TIMELINE_ID, + now_generation, + [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(), + ) + .await?; + + client.flush().await?; + assert_remote_files( + &[ + &remote_layer_file_name_historical, + &remote_layer_file_name_previous, + ], + &remote_timeline_path, + ); + + // Different generatinos for the same tenant will cause two separate + // deletion lists to be emitted. + assert_local_files( + &["0000000000000001-01.list", "0000000000000002-01.list"], + &deletion_prefix, + ); + + // Simulate a node restart: the latest generation advances + let now_generation = now_generation.next(); + ctx.set_latest_generation(now_generation); + + // Restart the deletion queue + drop(client); + ctx.restart().await; + let client = ctx.deletion_queue.new_client(); + client + .recover(HashMap::from([(tenant_id, now_generation)])) + .await?; + + info!("Flush-executing"); + client.flush_execute().await?; + // The deletion from immediately prior generation was executed, the one from + // an older generation was not. + assert_remote_files(&[&remote_layer_file_name_historical], &remote_timeline_path); + Ok(()) + } +} + +/// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence +/// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it. +#[cfg(test)] +pub mod mock { + use tracing::info; + + use crate::tenant::remote_timeline_client::remote_layer_path; + + use super::*; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + pub struct ConsumerState { + rx: tokio::sync::mpsc::Receiver, + executor_rx: tokio::sync::mpsc::Receiver, + } + + impl ConsumerState { + async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize { + let mut executed = 0; + + info!("Executing all pending deletions"); + + // Transform all executor messages to generic frontend messages + while let Ok(msg) = self.executor_rx.try_recv() { + match msg { + ExecutorMessage::Delete(objects) => { + for path in objects { + match remote_storage.delete(&path).await { + Ok(_) => { + debug!("Deleted {path}"); + } + Err(e) => { + error!("Failed to delete {path}, leaking object! ({e})"); + } + } + executed += 1; + } + } + ExecutorMessage::Flush(flush_op) => { + flush_op.fire(); + } + } + } + + while let Ok(msg) = self.rx.try_recv() { + match msg { + FrontendQueueMessage::Delete(op) => { + let mut objects = op.objects; + for (layer, generation) in op.layers { + objects.push(remote_layer_path( + &op.tenant_id, + &op.timeline_id, + &layer, + generation, + )); + } + + for path in objects { + info!("Executing deletion {path}"); + match remote_storage.delete(&path).await { + Ok(_) => { + debug!("Deleted {path}"); + } + Err(e) => { + error!("Failed to delete {path}, leaking object! ({e})"); + } + } + executed += 1; + } + } + FrontendQueueMessage::Flush(op) => { + op.fire(); + } + FrontendQueueMessage::FlushExecute(op) => { + // We have already executed all prior deletions because mock does them inline + op.fire(); + } + FrontendQueueMessage::Recover(_) => { + // no-op in mock + } + } + info!("All pending deletions have been executed"); + } + + executed + } + } + + pub struct MockDeletionQueue { + tx: tokio::sync::mpsc::Sender, + executor_tx: tokio::sync::mpsc::Sender, + executed: Arc, + remote_storage: Option, + consumer: std::sync::Mutex, + lsn_table: Arc>, + } + + impl MockDeletionQueue { + pub fn new(remote_storage: Option) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(16384); + let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384); + + let executed = Arc::new(AtomicUsize::new(0)); + + Self { + tx, + executor_tx, + executed, + remote_storage, + consumer: std::sync::Mutex::new(ConsumerState { rx, executor_rx }), + lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())), + } + } + + pub fn get_executed(&self) -> usize { + self.executed.load(Ordering::Relaxed) + } + + #[allow(clippy::await_holding_lock)] + pub async fn pump(&self) { + if let Some(remote_storage) = &self.remote_storage { + // Permit holding mutex across await, because this is only ever + // called once at a time in tests. + let mut locked = self.consumer.lock().unwrap(); + let count = locked.consume(remote_storage).await; + self.executed.fetch_add(count, Ordering::Relaxed); + } + } + + pub(crate) fn new_client(&self) -> DeletionQueueClient { + DeletionQueueClient { + tx: self.tx.clone(), + executor_tx: self.executor_tx.clone(), + lsn_table: self.lsn_table.clone(), + } + } + } +} diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs new file mode 100644 index 0000000000..a24811e0d5 --- /dev/null +++ b/pageserver/src/deletion_queue/backend.rs @@ -0,0 +1,330 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use tokio_util::sync::CancellationToken; +use tracing::debug; +use tracing::info; +use tracing::warn; + +use crate::config::PageServerConf; +use crate::control_plane_client::ControlPlaneGenerationsApi; +use crate::metrics::DELETION_QUEUE_DROPPED; +use crate::metrics::DELETION_QUEUE_ERRORS; + +use super::executor::ExecutorMessage; +use super::DeletionHeader; +use super::DeletionList; +use super::DeletionQueueError; +use super::FlushOp; +use super::VisibleLsnUpdates; + +// After this length of time, do any validation work that is pending, +// even if we haven't accumulated many keys to delete. +// +// This also causes updates to remote_consistent_lsn to be validated, even +// if there were no deletions enqueued. +const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10); + +// If we have received this number of keys, proceed with attempting to execute +const AUTOFLUSH_KEY_COUNT: usize = 16384; + +#[derive(Debug)] +pub(super) enum BackendQueueMessage { + Delete(DeletionList), + Flush(FlushOp), +} +pub struct BackendQueueWorker { + conf: &'static PageServerConf, + rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, + + // Client for calling into control plane API for validation of deletes + control_plane_client: Option>, + + // DeletionLists which are waiting generation validation. Not safe to + // execute until [`validate`] has processed them. + pending_lists: Vec, + + // DeletionLists which have passed validation and are ready to execute. + validated_lists: Vec, + + // Sum of all the lengths of lists in pending_lists + pending_key_count: usize, + + // Lsn validation state: we read projected LSNs and write back visible LSNs + // after validation. This is the LSN equivalent of `pending_validation_lists`: + // it is drained in [`validate`] + lsn_table: Arc>, + + cancel: CancellationToken, +} + +impl BackendQueueWorker { + pub(super) fn new( + conf: &'static PageServerConf, + rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, + control_plane_client: Option>, + lsn_table: Arc>, + cancel: CancellationToken, + ) -> Self { + Self { + conf, + rx, + tx, + control_plane_client, + lsn_table, + pending_lists: Vec::new(), + validated_lists: Vec::new(), + pending_key_count: 0, + cancel, + } + } + + async fn cleanup_lists(&mut self, lists: Vec) { + for list in lists { + let list_path = self.conf.deletion_list_path(list.sequence); + debug!("Removing deletion list {list} at {}", list_path.display()); + + if let Err(e) = tokio::fs::remove_file(&list_path).await { + // Unexpected: we should have permissions and nothing else should + // be touching these files. We will leave the file behind. Subsequent + // pageservers will try and load it again: hopefully whatever storage + // issue (probably permissions) has been fixed by then. + tracing::error!("Failed to delete {}: {e:#}", list_path.display()); + break; + } + } + } + + /// Process any outstanding validations of generations of pending LSN updates or pending + /// DeletionLists. + /// + /// Valid LSN updates propagate back to their result channel immediately, valid DeletionLists + /// go into the queue of ready-to-execute lists. + pub async fn validate(&mut self) -> Result<(), DeletionQueueError> { + let mut tenant_generations = HashMap::new(); + for list in &self.pending_lists { + for (tenant_id, tenant_list) in &list.tenants { + // Note: DeletionLists are in logical time order, so generation always + // goes up. By doing a simple insert() we will always end up with + // the latest generation seen for a tenant. + tenant_generations.insert(*tenant_id, tenant_list.generation); + } + } + + let pending_lsn_updates = { + let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned"); + let mut pending_updates = VisibleLsnUpdates::new(); + std::mem::swap(&mut pending_updates, &mut lsn_table); + pending_updates + }; + for (tenant_id, update) in &pending_lsn_updates.tenants { + let entry = tenant_generations + .entry(*tenant_id) + .or_insert(update.generation); + if update.generation > *entry { + *entry = update.generation; + } + } + + if tenant_generations.is_empty() { + // No work to do + return Ok(()); + } + + let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client { + control_plane_client + .validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect()) + .await + // The only wait a validation call returns an error is when the cancellation token fires + .map_err(|_| DeletionQueueError::ShuttingDown)? + } else { + // Control plane API disabled. In legacy mode we consider everything valid. + tenant_generations.keys().map(|k| (*k, true)).collect() + }; + + let mut validated_sequence: Option = None; + + // Apply the validation results to the pending LSN updates + for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants { + let validated_generation = tenant_generations + .get(&tenant_id) + .expect("Map was built from the same keys we're reading"); + + // If the tenant was missing from the validation response, it has been deleted. We may treat + // deletions as valid as the tenant's remote storage is all to be wiped anyway. + let valid = tenants_valid.get(&tenant_id).copied().unwrap_or(true); + + if valid && *validated_generation == tenant_lsn_state.generation { + for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines { + // Drop result of send: it is legal for the Timeline to have been dropped along + // with its queue receiver while we were doing validation. + drop(pending_lsn.result_tx.send(pending_lsn.projected).await); + } + } else { + // If we failed validation, then do not apply any of the projected updates + warn!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {0:?}", tenant_lsn_state.generation); + } + } + + // Apply the validation results to the pending deletion lists + for list in &mut self.pending_lists { + // Filter the list based on whether the server responded valid: true. + // If a tenant is omitted in the response, it has been deleted, and we should + // proceed with deletion. + let mut mutated = false; + list.tenants.retain(|tenant_id, tenant| { + let validated_generation = tenant_generations + .get(tenant_id) + .expect("Map was built from the same keys we're reading"); + + // If the tenant was missing from the validation response, it has been deleted. We may treat + // deletions as valid as the tenant's remote storage is all to be wiped anyway. + let valid = tenants_valid.get(tenant_id).copied().unwrap_or(true); + + // A list is valid if it comes from the current _or previous_ generation. + // The previous generation case is due to how we store deletion lists locally: + // if we see the immediately previous generation in a locally stored deletion list, + // it proves that this node's disk was used for both current & previous generations, + // and therefore no other node was involved in between: the two generations may be + // logically treated as the same. + let this_list_valid = valid + && (tenant.generation == *validated_generation); + + if !this_list_valid { + warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation); + DELETION_QUEUE_DROPPED.inc_by(tenant.len() as u64); + mutated = true; + } + this_list_valid + }); + list.validated = true; + + if mutated { + // Save the deletion list if we had to make changes due to stale generations. The + // saved list is valid for execution. + if let Err(e) = list.save(self.conf).await { + // Highly unexpected. Could happen if e.g. disk full. + // If we didn't save the trimmed list, it is _not_ valid to execute. + warn!("Failed to save modified deletion list {list}: {e:#}"); + + // Rather than have a complex retry process, just drop it and leak the objects, + // scrubber will clean up eventually. + list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution. + } + } + + validated_sequence = Some(list.sequence); + } + + if let Some(validated_sequence) = validated_sequence { + // Write the queue header to record how far validation progressed. This avoids having + // to rewrite each DeletionList to set validated=true in it. + let header = DeletionHeader::new(validated_sequence); + + // Drop result because the validated_sequence is an optimization. If we fail to save it, + // then restart, we will drop some deletion lists, creating work for scrubber. + // The save() function logs a warning on error. + if let Err(e) = header.save(self.conf).await { + warn!("Failed to write deletion queue header: {e:#}"); + DELETION_QUEUE_ERRORS + .with_label_values(&["put_header"]) + .inc(); + } + } + + // Transfer the validated lists to the validated queue, for eventual execution + self.validated_lists.append(&mut self.pending_lists); + + Ok(()) + } + + pub async fn flush(&mut self) { + // Issue any required generation validation calls to the control plane + if let Err(DeletionQueueError::ShuttingDown) = self.validate().await { + warn!("Shutting down"); + return; + } + + // After successful validation, nothing is pending: any lists that + // made it through validation will be in validated_lists. + assert!(self.pending_lists.is_empty()); + self.pending_key_count = 0; + + // Return quickly if we have no validated lists to execute. + if self.validated_lists.is_empty() { + return; + } + + // Drain `validated_lists` into the executor + let mut executing_lists = Vec::new(); + for mut list in self.validated_lists.drain(..) { + let objects = list.drain_paths(); + if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await { + warn!("Shutting down"); + return; + }; + + executing_lists.push(list); + } + + // Flush the executor, so that all the keys referenced by these deletion lists + // are actually removed from remote storage. This is a precondition to deleting + // the deletion lists themselves. + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let flush_op = FlushOp { tx }; + if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await { + warn!("Shutting down"); + return; + }; + if rx.await.is_err() { + warn!("Shutting down"); + return; + } + + // Erase the deletion lists whose keys have all be deleted from remote storage + self.cleanup_lists(executing_lists).await; + } + + pub async fn background(&mut self) { + while !self.cancel.is_cancelled() { + let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await { + Ok(Some(m)) => m, + Ok(None) => { + // All queue senders closed + info!("Shutting down"); + break; + } + Err(_) => { + // Timeout, we hit deadline to execute whatever we have in hand. These functions will + // return immediately if no work is pending + self.flush().await; + + continue; + } + }; + + match msg { + BackendQueueMessage::Delete(list) => { + if list.validated { + self.validated_lists.push(list) + } else { + self.pending_key_count += list.len(); + self.pending_lists.push(list); + } + + if self.pending_key_count > AUTOFLUSH_KEY_COUNT { + self.flush().await; + } + } + BackendQueueMessage::Flush(op) => { + self.flush().await; + + op.fire(); + } + } + } + } +} diff --git a/pageserver/src/deletion_queue/executor.rs b/pageserver/src/deletion_queue/executor.rs new file mode 100644 index 0000000000..4d47695aa2 --- /dev/null +++ b/pageserver/src/deletion_queue/executor.rs @@ -0,0 +1,143 @@ +use remote_storage::GenericRemoteStorage; +use remote_storage::RemotePath; +use remote_storage::MAX_KEYS_PER_DELETE; +use std::time::Duration; +use tokio_util::sync::CancellationToken; +use tracing::info; +use tracing::warn; + +use crate::metrics::DELETION_QUEUE_ERRORS; +use crate::metrics::DELETION_QUEUE_EXECUTED; + +use super::DeletionQueueError; +use super::FlushOp; + +const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10); + +pub(super) enum ExecutorMessage { + Delete(Vec), + Flush(FlushOp), +} + +/// Non-persistent deletion queue, for coalescing multiple object deletes into +/// larger DeleteObjects requests. +pub struct ExecutorWorker { + // Accumulate up to 1000 keys for the next deletion operation + accumulator: Vec, + + rx: tokio::sync::mpsc::Receiver, + + cancel: CancellationToken, + remote_storage: GenericRemoteStorage, +} + +impl ExecutorWorker { + pub(super) fn new( + remote_storage: GenericRemoteStorage, + rx: tokio::sync::mpsc::Receiver, + cancel: CancellationToken, + ) -> Self { + Self { + remote_storage, + rx, + cancel, + accumulator: Vec::new(), + } + } + + /// Wrap the remote `delete_objects` with a failpoint + pub async fn remote_delete(&self) -> Result<(), anyhow::Error> { + fail::fail_point!("deletion-queue-before-execute", |_| { + info!("Skipping execution, failpoint set"); + DELETION_QUEUE_ERRORS + .with_label_values(&["failpoint"]) + .inc(); + Err(anyhow::anyhow!("failpoint hit")) + }); + + self.remote_storage.delete_objects(&self.accumulator).await + } + + /// Block until everything in accumulator has been executed + pub async fn flush(&mut self) -> Result<(), DeletionQueueError> { + while !self.accumulator.is_empty() && !self.cancel.is_cancelled() { + match self.remote_delete().await { + Ok(()) => { + // Note: we assume that the remote storage layer returns Ok(()) if some + // or all of the deleted objects were already gone. + DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64); + info!( + "Executed deletion batch {}..{}", + self.accumulator + .first() + .expect("accumulator should be non-empty"), + self.accumulator + .last() + .expect("accumulator should be non-empty"), + ); + self.accumulator.clear(); + } + Err(e) => { + warn!("DeleteObjects request failed: {e:#}, will retry"); + DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc(); + } + }; + } + if self.cancel.is_cancelled() { + // Expose an error because we may not have actually flushed everything + Err(DeletionQueueError::ShuttingDown) + } else { + Ok(()) + } + } + + pub async fn background(&mut self) -> Result<(), DeletionQueueError> { + self.accumulator.reserve(MAX_KEYS_PER_DELETE); + + loop { + if self.cancel.is_cancelled() { + return Err(DeletionQueueError::ShuttingDown); + } + + let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await { + Ok(Some(m)) => m, + Ok(None) => { + // All queue senders closed + info!("Shutting down"); + return Err(DeletionQueueError::ShuttingDown); + } + Err(_) => { + // Timeout, we hit deadline to execute whatever we have in hand. These functions will + // return immediately if no work is pending + self.flush().await?; + + continue; + } + }; + + match msg { + ExecutorMessage::Delete(mut list) => { + while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE { + if self.accumulator.len() == MAX_KEYS_PER_DELETE { + self.flush().await?; + // If we have received this number of keys, proceed with attempting to execute + assert_eq!(self.accumulator.len(), 0); + } + + let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len(); + let take_count = std::cmp::min(available_slots, list.len()); + for path in list.drain(list.len() - take_count..) { + self.accumulator.push(path); + } + } + } + ExecutorMessage::Flush(flush_op) => { + // If flush() errors, we drop the flush_op and the caller will get + // an error recv()'ing their oneshot channel. + self.flush().await?; + flush_op.fire(); + } + } + } + } +} diff --git a/pageserver/src/deletion_queue/frontend.rs b/pageserver/src/deletion_queue/frontend.rs new file mode 100644 index 0000000000..54510202ba --- /dev/null +++ b/pageserver/src/deletion_queue/frontend.rs @@ -0,0 +1,430 @@ +use super::BackendQueueMessage; +use super::DeletionHeader; +use super::DeletionList; +use super::FlushOp; + +use std::collections::HashMap; +use std::fs::create_dir_all; +use std::time::Duration; + +use regex::Regex; +use remote_storage::RemotePath; +use tokio_util::sync::CancellationToken; +use tracing::debug; +use tracing::info; +use tracing::warn; +use utils::generation::Generation; +use utils::id::TenantId; +use utils::id::TimelineId; + +use crate::config::PageServerConf; +use crate::metrics::DELETION_QUEUE_ERRORS; +use crate::metrics::DELETION_QUEUE_SUBMITTED; +use crate::tenant::remote_timeline_client::remote_layer_path; +use crate::tenant::storage_layer::LayerFileName; + +// The number of keys in a DeletionList before we will proactively persist it +// (without reaching a flush deadline). This aims to deliver objects of the order +// of magnitude 1MB when we are under heavy delete load. +const DELETION_LIST_TARGET_SIZE: usize = 16384; + +// Ordinarily, we only flush to DeletionList periodically, to bound the window during +// which we might leak objects from not flushing a DeletionList after +// the objects are already unlinked from timeline metadata. +const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000); + +// If someone is waiting for a flush to DeletionList, only delay a little to accumulate +// more objects before doing the flush. +const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100); + +#[derive(Debug)] +pub(super) struct DeletionOp { + pub(super) tenant_id: TenantId, + pub(super) timeline_id: TimelineId, + // `layers` and `objects` are both just lists of objects. `layers` is used if you do not + // have a config object handy to project it to a remote key, and need the consuming worker + // to do it for you. + pub(super) layers: Vec<(LayerFileName, Generation)>, + pub(super) objects: Vec, + + /// The _current_ generation of the Tenant attachment in which we are enqueuing + /// this deletion. + pub(super) generation: Generation, +} + +#[derive(Debug)] +pub(super) struct RecoverOp { + pub(super) attached_tenants: HashMap, +} + +#[derive(Debug)] +pub(super) enum FrontendQueueMessage { + Delete(DeletionOp), + // Wait until all prior deletions make it into a persistent DeletionList + Flush(FlushOp), + // Wait until all prior deletions have been executed (i.e. objects are actually deleted) + FlushExecute(FlushOp), + // Call once after re-attaching to control plane, to notify the deletion queue about + // latest attached generations & load any saved deletion lists from disk. + Recover(RecoverOp), +} + +pub struct FrontendQueueWorker { + conf: &'static PageServerConf, + + // Incoming frontend requests to delete some keys + rx: tokio::sync::mpsc::Receiver, + + // Outbound requests to the backend to execute deletion lists we have composed. + tx: tokio::sync::mpsc::Sender, + + // The list we are currently building, contains a buffer of keys to delete + // and our next sequence number + pending: DeletionList, + + // These FlushOps should fire the next time we flush + pending_flushes: Vec, + + // Worker loop is torn down when this fires. + cancel: CancellationToken, +} + +impl FrontendQueueWorker { + // Initially DeletionHeader.validated_sequence is zero. The place we start our + // sequence numbers must be higher than that. + const BASE_SEQUENCE: u64 = 1; + + pub(super) fn new( + conf: &'static PageServerConf, + rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, + cancel: CancellationToken, + ) -> Self { + Self { + pending: DeletionList::new(Self::BASE_SEQUENCE), + conf, + rx, + tx, + pending_flushes: Vec::new(), + cancel, + } + } + + /// Try to flush `list` to persistent storage + /// + /// This does not return errors, because on failure to flush we do not lose + /// any state: flushing will be retried implicitly on the next deadline + async fn flush(&mut self) { + if self.pending.is_empty() { + for f in self.pending_flushes.drain(..) { + f.fire(); + } + return; + } + + match self.pending.save(self.conf).await { + Ok(_) => { + info!(sequence = self.pending.sequence, "Stored deletion list"); + + for f in self.pending_flushes.drain(..) { + f.fire(); + } + + let onward_list = self.pending.drain(); + + // We have consumed out of pending: reset it for the next incoming deletions to accumulate there + self.pending = DeletionList::new(self.pending.sequence + 1); + + if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await { + // This is allowed to fail: it will only happen if the backend worker is shut down, + // so we can just drop this on the floor. + info!("Deletion list dropped, this is normal during shutdown ({e:#})"); + } + } + Err(e) => { + DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc(); + warn!( + sequence = self.pending.sequence, + "Failed to write deletion list, will retry later ({e:#})" + ); + } + } + } + + /// Load the header, to learn the sequence number up to which deletions + /// have been validated. We will apply validated=true to DeletionLists + /// <= this sequence when loading them. + /// + /// It is not an error for the header to not exist: we return None, and + /// the caller should act as if validated_sequence is 0 + async fn load_validated_sequence(&self) -> Result, anyhow::Error> { + let header_path = self.conf.deletion_header_path(); + match tokio::fs::read(&header_path).await { + Ok(header_bytes) => { + match serde_json::from_slice::(&header_bytes) { + Ok(h) => Ok(Some(h.validated_sequence)), + Err(e) => { + warn!( + "Failed to deserialize deletion header, ignoring {}: {e:#}", + header_path.display() + ); + // This should never happen unless we make a mistake with our serialization. + // Ignoring a deletion header is not consequential for correctnes because all deletions + // are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up. + Ok(None) + } + } + } + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + debug!( + "Deletion header {} not found, first start?", + header_path.display() + ); + Ok(None) + } else { + Err(anyhow::anyhow!(e)) + } + } + } + } + + async fn recover( + &mut self, + attached_tenants: HashMap, + ) -> Result<(), anyhow::Error> { + debug!( + "recovering with {} attached tenants", + attached_tenants.len() + ); + + // Load the header + let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0); + + // Start our next deletion list from after the last location validated by + // previous process lifetime, or after the last location found (it is updated + // below after enumerating the deletion lists) + self.pending.sequence = std::cmp::max(self.pending.sequence, validated_sequence + 1); + + let deletion_directory = self.conf.deletion_prefix(); + let mut dir = match tokio::fs::read_dir(&deletion_directory).await { + Ok(d) => d, + Err(e) => { + warn!( + "Failed to open deletion list directory {}: {e:#}", + deletion_directory.display(), + ); + + // Give up: if we can't read the deletion list directory, we probably can't + // write lists into it later, so the queue won't work. + return Err(e.into()); + } + }; + + let list_name_pattern = Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{2}).list").unwrap(); + + let header_path = self.conf.deletion_header_path(); + let mut seqs: Vec = Vec::new(); + while let Some(dentry) = dir.next_entry().await? { + if Some(dentry.file_name().as_os_str()) == header_path.file_name() { + // Don't try and parse the header's name like a list + continue; + } + + let file_name = dentry.file_name().to_owned(); + let basename = file_name.to_string_lossy(); + let seq_part = if let Some(m) = list_name_pattern.captures(&basename) { + m.get(1) + .expect("Non optional group should be present") + .as_str() + } else { + warn!("Unexpected key in deletion queue: {basename}"); + continue; + }; + + let seq: u64 = match u64::from_str_radix(seq_part, 16) { + Ok(s) => s, + Err(e) => { + warn!("Malformed key '{basename}': {e}"); + continue; + } + }; + seqs.push(seq); + } + seqs.sort(); + + // Initialize the next sequence number in the frontend based on the maximum of the highest list we see, + // and the last list that was deleted according to the header. Combined with writing out the header + // prior to deletions, this guarnatees no re-use of sequence numbers. + if let Some(max_list_seq) = seqs.last() { + self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1); + } + + for s in seqs { + let list_path = self.conf.deletion_list_path(s); + + let list_bytes = tokio::fs::read(&list_path).await?; + + let mut deletion_list = match serde_json::from_slice::(&list_bytes) { + Ok(l) => l, + Err(e) => { + // Drop the list on the floor: any objects it referenced will be left behind + // for scrubbing to clean up. This should never happen unless we have a serialization bug. + warn!(sequence = s, "Failed to deserialize deletion list: {e}"); + continue; + } + }; + + if deletion_list.sequence <= validated_sequence { + // If the deletion list falls below valid_seq, we may assume that it was + // already validated the last time this pageserver ran. Otherwise, we still + // load it, as it may still contain content valid in this generation. + deletion_list.validated = true; + } else { + // Special case optimization: if a tenant is still attached, and no other + // generation was issued to another node in the interval while we restarted, + // then we may treat deletion lists from the previous generation as if they + // belong to our currently attached generation, and proceed to validate & execute. + for (tenant_id, tenant_list) in &mut deletion_list.tenants { + if let Some(attached_gen) = attached_tenants.get(tenant_id) { + if attached_gen.previous() == tenant_list.generation { + tenant_list.generation = *attached_gen; + } + } + } + } + + info!( + validated = deletion_list.validated, + sequence = deletion_list.sequence, + "Recovered deletion list" + ); + + // We will drop out of recovery if this fails: it indicates that we are shutting down + // or the backend has panicked + DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64); + self.tx + .send(BackendQueueMessage::Delete(deletion_list)) + .await?; + } + + info!(next_sequence = self.pending.sequence, "Replay complete"); + + Ok(()) + } + + /// This is the front-end ingest, where we bundle up deletion requests into DeletionList + /// and write them out, for later validation by the backend and execution by the executor. + pub async fn background(&mut self) { + info!("Started deletion frontend worker"); + + // Synchronous, but we only do it once per process lifetime so it's tolerable + if let Err(e) = create_dir_all(&self.conf.deletion_prefix()) { + tracing::error!( + "Failed to create deletion list directory {}, deletions will not be executed ({e})", + self.conf.deletion_prefix().display() + ); + return; + } + + while !self.cancel.is_cancelled() { + let timeout = if self.pending_flushes.is_empty() { + FRONTEND_DEFAULT_TIMEOUT + } else { + FRONTEND_FLUSHING_TIMEOUT + }; + + let msg = match tokio::time::timeout(timeout, self.rx.recv()).await { + Ok(Some(msg)) => msg, + Ok(None) => { + // Queue sender destroyed, shutting down + break; + } + Err(_) => { + // Hit deadline, flush. + self.flush().await; + continue; + } + }; + + match msg { + FrontendQueueMessage::Delete(op) => { + debug!( + "Delete: ingesting {} layers, {} other objects", + op.layers.len(), + op.objects.len() + ); + + let mut layer_paths = Vec::new(); + for (layer, generation) in op.layers { + layer_paths.push(remote_layer_path( + &op.tenant_id, + &op.timeline_id, + &layer, + generation, + )); + } + layer_paths.extend(op.objects); + + if !self.pending.push( + &op.tenant_id, + &op.timeline_id, + op.generation, + &mut layer_paths, + ) { + self.flush().await; + let retry_succeeded = self.pending.push( + &op.tenant_id, + &op.timeline_id, + op.generation, + &mut layer_paths, + ); + if !retry_succeeded { + // Unexpected: after we flush, we should have + // drained self.pending, so a conflict on + // generation numbers should be impossible. + tracing::error!( + "Failed to enqueue deletions, leaking objects. This is a bug." + ); + } + } + } + FrontendQueueMessage::Flush(op) => { + if self.pending.is_empty() { + // Execute immediately + debug!("Flush: No pending objects, flushing immediately"); + op.fire() + } else { + // Execute next time we flush + debug!("Flush: adding to pending flush list for next deadline flush"); + self.pending_flushes.push(op); + } + } + FrontendQueueMessage::FlushExecute(op) => { + debug!("FlushExecute: passing through to backend"); + // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute + if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await { + info!("Can't flush, shutting down ({e})"); + // Caller will get error when their oneshot sender was dropped. + } + } + FrontendQueueMessage::Recover(op) => { + if let Err(e) = self.recover(op.attached_tenants).await { + // This should only happen in truly unrecoverable cases, like the recovery finding that the backend + // queue receiver has been dropped, or something is critically broken with + // the local filesystem holding deletion lists. + info!( + "Deletion queue recover aborted, deletion queue will not proceed ({e})" + ); + return; + } + } + } + + if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() { + self.flush().await; + } + } + info!("Deletion queue shut down."); + } +} diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5fdff3a27c..488e02963c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -117,7 +117,7 @@ mod span; pub mod metadata; mod par_fsync; -mod remote_timeline_client; +pub mod remote_timeline_client; pub mod storage_layer; pub mod config;