diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index d902a325cf..7d9bf0a35c 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -2,6 +2,8 @@ mod backend; mod executor; mod frontend; +use std::collections::HashMap; + use crate::metrics::DELETION_QUEUE_SUBMITTED; use remote_storage::{GenericRemoteStorage, RemotePath}; use serde::Deserialize; @@ -11,7 +13,7 @@ use thiserror::Error; use tokio; use tokio_util::sync::CancellationToken; use tracing::{self, debug, error}; -use utils::id::{TenantId, TimelineId}; +use utils::id::{TenantId, TenantTimelineId, TimelineId}; pub(crate) use self::backend::BackendQueueWorker; use self::executor::ExecutorWorker; @@ -88,6 +90,14 @@ pub struct DeletionQueueClient { executor_tx: tokio::sync::mpsc::Sender, } +#[derive(Debug, Serialize, Deserialize)] +struct TimelineDeletionList { + objects: Vec, + // TODO: Tenant attachment generation will go here + // (see https://github.com/neondatabase/neon/pull/4919) + // attach_gen: u32, +} + #[serde_as] #[derive(Debug, Serialize, Deserialize)] struct DeletionList { @@ -97,9 +107,12 @@ struct DeletionList { /// Used for constructing a unique key for each deletion list we write out. sequence: u64, - /// These objects are elegible for deletion: they are unlinked from timeline metadata, and - /// we are free to delete them at any time from their presence in this data structure onwards. - objects: Vec, + /// To avoid repeating tenant/timeline IDs in every key, we store keys in + /// nested HashMaps by TenantTimelineID + objects: HashMap, + // TODO: Node generation will go here + // (see https://github.com/neondatabase/neon/pull/4919) + // node_gen: u32, } #[serde_as] @@ -134,9 +147,42 @@ impl DeletionList { Self { version: Self::VERSION_LATEST, sequence, - objects: Vec::new(), + objects: HashMap::new(), } } + + fn is_empty(&self) -> bool { + self.objects.is_empty() + } + + fn len(&self) -> usize { + self.objects.values().map(|v| v.objects.len()).sum() + } + + fn push(&mut self, tenant: &TenantId, timeline: &TimelineId, mut objects: Vec) { + if objects.is_empty() { + // Avoid inserting an empty TimelineDeletionList: this preserves the property + // that if we have no keys, then self.objects is empty (used in Self::is_empty) + return; + } + + let key = TenantTimelineId::new(tenant.clone(), timeline.clone()); + let entry = self + .objects + .entry(key) + .or_insert_with(|| TimelineDeletionList { + objects: Vec::new(), + }); + entry.objects.append(&mut objects) + } + + fn take_paths(&mut self) -> Vec { + self.objects + .drain() + .map(|(_k, v)| v.objects.into_iter()) + .flatten() + .collect() + } } #[derive(Error, Debug)] diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index 933c4da1af..b803fc2c79 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -131,30 +131,34 @@ impl BackendQueueWorker { } pub async fn flush(&mut self) { - let mut onward_lists: Vec = Vec::new(); - std::mem::swap(&mut onward_lists, &mut self.pending_lists); - for list in onward_lists { - let objects = list.objects.clone(); - // TODO: a take_objects method - self.executed_lists.push(list); + self.pending_key_count = 0; + + // Submit all keys from pending DeletionLists into the executor + for list in &mut self.pending_lists { + let objects = list.take_paths(); if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await { warn!("Shutting down"); return; }; } + // Flush the executor to ensure all the operations we just submitted have been executed let (tx, rx) = tokio::sync::oneshot::channel::<()>(); let flush_op = FlushOp { tx }; if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await { warn!("Shutting down"); return; }; - if rx.await.is_err() { warn!("Shutting down"); return; } + // After flush, we are assured that all contents of the pending lists + // are executed + self.executed_lists.append(&mut self.pending_lists); + + // Erase the lists we executed self.cleanup_lists().await; } @@ -184,15 +188,6 @@ impl BackendQueueWorker { match msg { BackendQueueMessage::Delete(list) => { - if list.objects.is_empty() { - // This shouldn't happen, but is harmless. warn so that - // tests will fail if we have such a bug, but proceed with - // processing subsequent messages. - warn!("Empty DeletionList passed to deletion backend"); - self.executed_lists.push(list); - continue; - } - self.pending_key_count += list.objects.len(); self.pending_lists.push(list); diff --git a/pageserver/src/deletion_queue/frontend.rs b/pageserver/src/deletion_queue/frontend.rs index 3cc603880d..5f366e2f78 100644 --- a/pageserver/src/deletion_queue/frontend.rs +++ b/pageserver/src/deletion_queue/frontend.rs @@ -122,9 +122,7 @@ impl FrontendQueueWorker { /// This does not return errors, because on failure to flush we do not lose /// any state: flushing will be retried implicitly on the next deadline async fn flush(&mut self) { - if self.pending.objects.is_empty() { - // We do not expect to be called in this state, but handle it so that later - // logging code can be assured that therre is always a first+last key to print + if self.pending.is_empty() { for f in self.pending_flushes.drain(..) { f.fire(); } @@ -133,18 +131,7 @@ impl FrontendQueueWorker { match self.upload_pending_list().await { Ok(_) => { - info!( - sequence = self.pending.sequence, - "Stored deletion list ({0}..{1})", - self.pending - .objects - .first() - .expect("list should be non-empty"), - self.pending - .objects - .last() - .expect("list should be non-empty"), - ); + info!(sequence = self.pending.sequence, "Stored deletion list"); for f in self.pending_flushes.drain(..) { f.fire(); @@ -300,7 +287,7 @@ impl FrontendQueueWorker { // We will drop out of recovery if this fails: it indicates that we are shutting down // or the backend has panicked - DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.objects.len() as u64); + DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64); self.tx .send(BackendQueueMessage::Delete(deletion_list)) .await?; @@ -358,12 +345,13 @@ impl FrontendQueueWorker { match msg { FrontendQueueMessage::Delete(op) => { debug!( - "Deletion enqueue {0} layers, {1} other objects", + "Delete: ingesting {0} layers, {1} other objects", op.layers.len(), op.objects.len() ); let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id); + let mut layer_paths = Vec::new(); for layer in op.layers { // TODO go directly to remote path without composing local path let local_path = timeline_path.join(layer.file_name()); @@ -373,22 +361,27 @@ impl FrontendQueueWorker { panic!("Can't make a timeline path! {e}"); } }; - self.pending.objects.push(path); + layer_paths.push(path); } - self.pending.objects.extend(op.objects.into_iter()) + self.pending + .push(&op.tenant_id, &op.timeline_id, layer_paths); + self.pending + .push(&op.tenant_id, &op.timeline_id, op.objects); } FrontendQueueMessage::Flush(op) => { if self.pending.objects.is_empty() { // Execute immediately - debug!("No pending objects, flushing immediately"); + debug!("Flush: No pending objects, flushing immediately"); op.fire() } else { // Execute next time we flush + debug!("Flush: adding to pending flush list for next deadline flush"); self.pending_flushes.push(op); } } FrontendQueueMessage::FlushExecute(op) => { + debug!("FlushExecute: passing through to backend"); // We do not flush to a deletion list here: the client sends a Flush before the FlushExecute if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await { info!("Can't flush, shutting down ({e})");