diff --git a/libs/utils/src/generation.rs b/libs/utils/src/generation.rs index a339e2ac10..ff4772b403 100644 --- a/libs/utils/src/generation.rs +++ b/libs/utils/src/generation.rs @@ -67,6 +67,14 @@ impl Generation { Self::none() } } + + pub fn into(self) -> Option { + if let Self::Valid(v) = self { + Some(v) + } else { + None + } + } } impl Serialize for Generation { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index eefc436463..729899c841 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -3,8 +3,10 @@ mod executor; mod frontend; use std::collections::HashMap; +use std::path::PathBuf; use crate::metrics::DELETION_QUEUE_SUBMITTED; +use crate::tenant::remote_timeline_client::remote_timeline_path; use remote_storage::{GenericRemoteStorage, RemotePath}; use serde::Deserialize; use serde::Serialize; @@ -14,7 +16,7 @@ use tokio; use tokio_util::sync::CancellationToken; use tracing::{self, debug, error}; use utils::generation::Generation; -use utils::id::{TenantId, TenantTimelineId, TimelineId}; +use utils::id::{TenantId, TimelineId}; pub(crate) use self::backend::BackendQueueWorker; use self::executor::ExecutorWorker; @@ -84,11 +86,15 @@ pub struct DeletionQueueClient { } #[derive(Debug, Serialize, Deserialize)] -struct TimelineDeletionList { - objects: Vec, - // TODO: Tenant attachment generation will go here - // (see https://github.com/neondatabase/neon/pull/4919) - // attach_gen: u32, +struct TenantDeletionList { + /// For each Timeline, a list of key fragments to append to the timeline remote path + /// when reconstructing a full key + timelines: HashMap>, + + /// The generation in which this deletion was emitted: note that this may not be the + /// same as the generation of any layers being deleted. The generation of the layer + /// has already been absorbed into the keys in `objects` + generation: Generation, } #[serde_as] @@ -101,11 +107,13 @@ struct DeletionList { sequence: u64, /// To avoid repeating tenant/timeline IDs in every key, we store keys in - /// nested HashMaps by TenantTimelineID - objects: HashMap, - // TODO: Node generation will go here - // (see https://github.com/neondatabase/neon/pull/4919) - // node_gen: u32, + /// nested HashMaps by TenantTimelineID. Each Tenant only appears once + /// with one unique generation ID: if someone tries to push a second generation + /// ID for the same tenant, we will start a new DeletionList. + tenants: HashMap, + + /// Avoid having to walk `tenants` to calculate size + size: usize, } #[serde_as] @@ -140,40 +148,92 @@ impl DeletionList { Self { version: Self::VERSION_LATEST, sequence, - objects: HashMap::new(), + tenants: HashMap::new(), + size: 0, } } + fn drain(&mut self) -> Self { + let mut tenants = HashMap::new(); + std::mem::swap(&mut self.tenants, &mut tenants); + let other = Self { + version: Self::VERSION_LATEST, + sequence: self.sequence, + tenants, + size: self.size, + }; + self.size = 0; + other + } + fn is_empty(&self) -> bool { - self.objects.is_empty() + self.tenants.is_empty() } fn len(&self) -> usize { - self.objects.values().map(|v| v.objects.len()).sum() + self.size } - fn push(&mut self, tenant: &TenantId, timeline: &TimelineId, mut objects: Vec) { + /// Returns true if the push was accepted, false if the caller must start a new + /// deletion list. + fn push( + &mut self, + tenant: &TenantId, + timeline: &TimelineId, + generation: Generation, + objects: &mut Vec, + ) -> bool { if objects.is_empty() { // Avoid inserting an empty TimelineDeletionList: this preserves the property // that if we have no keys, then self.objects is empty (used in Self::is_empty) - return; + return true; } - let key = TenantTimelineId::new(*tenant, *timeline); - let entry = self - .objects - .entry(key) - .or_insert_with(|| TimelineDeletionList { - objects: Vec::new(), + let tenant_entry = self + .tenants + .entry(*tenant) + .or_insert_with(|| TenantDeletionList { + timelines: HashMap::new(), + generation: generation, }); - entry.objects.append(&mut objects) + + if tenant_entry.generation != generation { + // Only one generation per tenant per list: signal to + // caller to start a new list. + return false; + } + + let timeline_entry = tenant_entry + .timelines + .entry(*timeline) + .or_insert_with(|| Vec::new()); + + let timeline_remote_path = remote_timeline_path(tenant, timeline); + + self.size += objects.len(); + timeline_entry.extend(objects.drain(..).map(|p| { + p.strip_prefix(&timeline_remote_path) + .expect("Timeline paths always start with the timeline prefix") + .to_string_lossy() + .to_string() + })); + true } - fn take_paths(&mut self) -> Vec { - self.objects - .drain() - .flat_map(|(_k, v)| v.objects.into_iter()) - .collect() + fn take_paths(self) -> Vec { + let mut result = Vec::new(); + for (tenant, tenant_deletions) in self.tenants.into_iter() { + for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() { + let timeline_remote_path = remote_timeline_path(&tenant, &timeline); + result.extend( + timeline_layers + .into_iter() + .map(|l| timeline_remote_path.join(&PathBuf::from(l))), + ); + } + } + + result } } @@ -205,6 +265,7 @@ impl DeletionQueueClient { &self, tenant_id: TenantId, timeline_id: TimelineId, + generation: Generation, layers: Vec<(LayerFileName, Generation)>, ) -> Result<(), DeletionQueueError> { DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64); @@ -212,6 +273,7 @@ impl DeletionQueueClient { tenant_id, timeline_id, layers, + generation, objects: Vec::new(), })) .await @@ -342,7 +404,12 @@ impl DeletionQueue { backend_tx, cancel.clone(), )), - Some(BackendQueueWorker::new(conf, backend_rx, executor_tx)), + Some(BackendQueueWorker::new( + conf, + backend_rx, + executor_tx, + cancel.clone(), + )), Some(ExecutorWorker::new( remote_storage, executor_rx, @@ -543,8 +610,13 @@ mod test { let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); let deletion_prefix = ctx.harness.conf.deletion_prefix(); - let generation = Generation::new(0xdeadbeef); - let remote_layer_file_name_1 = format!("{}{}", layer_file_name_1, generation.get_suffix()); + // Exercise the distinction between the generation of the layers + // we delete, and the generation of the running Tenant. + let layer_generation = Generation::new(0xdeadbeef); + let now_generation = Generation::new(0xfeedbeef); + + let remote_layer_file_name_1 = + format!("{}{}", layer_file_name_1, layer_generation.get_suffix()); // Inject a victim file to remote storage info!("Writing"); @@ -560,7 +632,8 @@ mod test { ctx.runtime.block_on(client.push_layers( tenant_id, TIMELINE_ID, - [(layer_file_name_1.clone(), generation)].to_vec(), + now_generation, + [(layer_file_name_1.clone(), layer_generation)].to_vec(), ))?; assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path); @@ -599,8 +672,10 @@ mod test { let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); let deletion_prefix = ctx.harness.conf.deletion_prefix(); - let generation = Generation::new(0xdeadbeef); - let remote_layer_file_name_1 = format!("{}{}", layer_file_name_1, generation.get_suffix()); + let layer_generation = Generation::new(0xdeadbeef); + let now_generation = Generation::new(0xfeedbeef); + let remote_layer_file_name_1 = + format!("{}{}", layer_file_name_1, layer_generation.get_suffix()); // Inject a file, delete it, and flush to a deletion list std::fs::create_dir_all(&remote_timeline_path)?; @@ -611,7 +686,8 @@ mod test { ctx.runtime.block_on(client.push_layers( tenant_id, TIMELINE_ID, - [(layer_file_name_1.clone(), generation)].to_vec(), + now_generation, + [(layer_file_name_1.clone(), layer_generation)].to_vec(), ))?; ctx.runtime.block_on(client.flush())?; assert_local_files(&["0000000000000001-01.list"], &deletion_prefix); diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index 8d32d23668..2bcf58e50e 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -1,8 +1,15 @@ +use std::collections::HashMap; use std::time::Duration; +use futures::future::TryFutureExt; +use pageserver_api::control_api::HexTenantId; +use pageserver_api::control_api::{ValidateRequest, ValidateRequestTenant, ValidateResponse}; +use serde::de::DeserializeOwned; +use tokio_util::sync::CancellationToken; use tracing::debug; use tracing::info; use tracing::warn; +use utils::backoff; use crate::config::PageServerConf; use crate::metrics::DELETION_QUEUE_ERRORS; @@ -10,6 +17,7 @@ use crate::metrics::DELETION_QUEUE_ERRORS; use super::executor::ExecutorMessage; use super::DeletionHeader; use super::DeletionList; +use super::DeletionQueueError; use super::FlushOp; // After this length of time, execute deletions which are elegible to run, @@ -41,6 +49,58 @@ pub struct BackendQueueWorker { // DeletionLists we have fully executed, which may be deleted // from remote storage. executed_lists: Vec, + + cancel: CancellationToken, +} + +#[derive(thiserror::Error, Debug)] +enum ValidateCallError { + #[error("shutdown")] + Shutdown, + #[error("remote: {0}")] + Remote(reqwest::Error), +} + +async fn retry_http_forever( + url: &url::Url, + request: ValidateRequest, + cancel: CancellationToken, +) -> Result +where + T: DeserializeOwned, +{ + let client = reqwest::ClientBuilder::new() + .build() + .expect("Failed to construct http client"); + + let response = match backoff::retry( + || { + client + .post(url.clone()) + .json(&request) + .send() + .map_err(|e| ValidateCallError::Remote(e)) + }, + |_| false, + 3, + u32::MAX, + "calling control plane generation validation API", + backoff::Cancel::new(cancel.clone(), || ValidateCallError::Shutdown), + ) + .await + { + Err(ValidateCallError::Shutdown) => { + return Err(DeletionQueueError::ShuttingDown); + } + Err(ValidateCallError::Remote(_)) => { + panic!("We retry forever"); + } + Ok(r) => r, + }; + + // TODO: handle non-200 response + // TODO: handle decode error + Ok(response.json::().await.unwrap()) } impl BackendQueueWorker { @@ -48,6 +108,7 @@ impl BackendQueueWorker { conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, + cancel: CancellationToken, ) -> Self { Self { conf, @@ -56,6 +117,7 @@ impl BackendQueueWorker { pending_lists: Vec::new(), pending_key_count: 0, executed_lists: Vec::new(), + cancel, } } @@ -106,11 +168,67 @@ impl BackendQueueWorker { } } + pub async fn validate_lists(&mut self) -> Result<(), DeletionQueueError> { + let control_plane_api = match &self.conf.control_plane_api { + None => { + // Generations are not switched on yet. + return Ok(()); + } + Some(api) => api, + }; + + let validate_path = control_plane_api + .join("validate") + .expect("Failed to build validate path"); + + for list in &mut self.pending_lists { + let request = ValidateRequest { + tenants: list + .tenants + .iter() + .map(|(tid, tdl)| ValidateRequestTenant { + id: HexTenantId::new(*tid), + gen: tdl.generation.into().expect( + "Generation should always be valid for a Tenant doing deletions", + ), + }) + .collect(), + }; + + // Retry forever, we cannot make progress until we get a response + let response: ValidateResponse = + retry_http_forever(&validate_path, request, self.cancel.clone()).await?; + + let tenants_valid: HashMap<_, _> = response + .tenants + .into_iter() + .map(|t| (t.id.take(), t.valid)) + .collect(); + + // Filter the list based on whether the server responded valid: true. + // If a tenant is omitted in the response, it has been deleted, and we should + // proceed with deletion. + list.tenants.retain(|tenant_id, _tenant| { + let r = tenants_valid.get(tenant_id).map(|v| *v).unwrap_or(true); + if !r { + warn!("Dropping stale deletions for tenant {tenant_id}, objects may be leaked"); + } + r + }); + } + + Ok(()) + } + pub async fn flush(&mut self) { - self.pending_key_count = 0; + // Issue any required generation validation calls to the control plane + if let Err(DeletionQueueError::ShuttingDown) = self.validate_lists().await { + warn!("Shutting down"); + return; + } // Submit all keys from pending DeletionLists into the executor - for list in &mut self.pending_lists { + for list in self.pending_lists.drain(..) { let objects = list.take_paths(); if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await { warn!("Shutting down"); @@ -132,6 +250,7 @@ impl BackendQueueWorker { // After flush, we are assured that all contents of the pending lists // are executed + self.pending_key_count = 0; self.executed_lists.append(&mut self.pending_lists); // Erase the lists we executed @@ -164,7 +283,7 @@ impl BackendQueueWorker { match msg { BackendQueueMessage::Delete(list) => { - self.pending_key_count += list.objects.len(); + self.pending_key_count += list.len(); self.pending_lists.push(list); if self.pending_key_count > AUTOFLUSH_KEY_COUNT { diff --git a/pageserver/src/deletion_queue/frontend.rs b/pageserver/src/deletion_queue/frontend.rs index 0ce47da53e..badd5a333c 100644 --- a/pageserver/src/deletion_queue/frontend.rs +++ b/pageserver/src/deletion_queue/frontend.rs @@ -45,6 +45,10 @@ pub(super) struct DeletionOp { // to do it for you. pub(super) layers: Vec<(LayerFileName, Generation)>, pub(super) objects: Vec, + + /// The _current_ generation of the Tenant attachment in which we are enqueuing + /// this deletion. + pub(super) generation: Generation, } #[derive(Debug)] @@ -121,8 +125,7 @@ impl FrontendQueueWorker { f.fire(); } - let mut onward_list = DeletionList::new(self.pending.sequence); - std::mem::swap(&mut onward_list.objects, &mut self.pending.objects); + let onward_list = self.pending.drain(); // We have consumed out of pending: reset it for the next incoming deletions to accumulate there self.pending = DeletionList::new(self.pending.sequence + 1); @@ -317,14 +320,34 @@ impl FrontendQueueWorker { generation, )); } + layer_paths.extend(op.objects); - self.pending - .push(&op.tenant_id, &op.timeline_id, layer_paths); - self.pending - .push(&op.tenant_id, &op.timeline_id, op.objects); + if self.pending.push( + &op.tenant_id, + &op.timeline_id, + op.generation, + &mut layer_paths, + ) == false + { + self.flush().await; + let retry = self.pending.push( + &op.tenant_id, + &op.timeline_id, + op.generation, + &mut layer_paths, + ); + if retry != true { + // Unexpeted: after we flush, we should have + // drained self.pending, so a conflict on + // generation numbers should be impossible. + tracing::error!( + "Failed to enqueue deletions, leaking objects. This is a bug." + ); + } + } } FrontendQueueMessage::Flush(op) => { - if self.pending.objects.is_empty() { + if self.pending.is_empty() { // Execute immediately debug!("Flush: No pending objects, flushing immediately"); op.fire() @@ -344,9 +367,7 @@ impl FrontendQueueWorker { } } - if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE - || !self.pending_flushes.is_empty() - { + if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() { self.flush().await; } } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 70c25f9e1b..11397ef18a 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -693,7 +693,12 @@ impl RemoteTimelineClient { // Enqueue deletions deletion_queue_client - .push_layers(self.tenant_id, self.timeline_id, with_generations) + .push_layers( + self.tenant_id, + self.timeline_id, + self.generation, + with_generations, + ) .await?; Ok(()) } @@ -1509,7 +1514,7 @@ mod tests { )), }); - let deletion_queue = MockDeletionQueue::new(Some(storage), harness.conf); + let deletion_queue = MockDeletionQueue::new(Some(storage)); Ok(Self { harness,