diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index a2395b0dca..ab8cb84120 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -150,10 +150,30 @@ impl FlushOp { } } +/// A DeletionNotify can be used to wait for a deletion to have been executed. +#[derive(Debug)] +pub struct DeletionNotify { + /// Receives the `DeletionListSeq` from `ListWriter` when scheduled in a `DeletionList`. + seq_rx: tokio::sync::oneshot::Receiver, + /// Watches the last executed `DeletionListSeq`. + executed_rx: tokio::sync::watch::Receiver, +} + +impl DeletionNotify { + /// Waits for the deletion to have been executed. + pub async fn notify(mut self) { + let Ok(wait_seq) = self.seq_rx.await else { + return; // TODO return error + }; + self.executed_rx.wait_for(|&seq| seq >= wait_seq).await.ok(); + } +} + #[derive(Clone, Debug)] pub struct DeletionQueueClient { tx: tokio::sync::mpsc::UnboundedSender, executor_tx: tokio::sync::mpsc::Sender, + executed_rx: tokio::sync::watch::Receiver, lsn_table: Arc>, } @@ -176,6 +196,9 @@ impl TenantDeletionList { } } +/// Deletion list sequence number. Monotonically increasing, even across restarts. +type DeletionListSeq = u64; + /// Files ending with this suffix will be ignored and erased /// during recovery as startup. const TEMP_SUFFIX: &str = "tmp"; @@ -185,8 +208,9 @@ struct DeletionList { /// Serialization version, for future use version: u8, - /// Used for constructing a unique key for each deletion list we write out. - sequence: u64, + /// Used for constructing a unique key for each deletion list we write out, and to notify + /// callers when a deletion has been executed (and will not be retried later). + sequence: DeletionListSeq, /// To avoid repeating tenant/timeline IDs in every key, we store keys in /// nested HashMaps by TenantTimelineID. Each Tenant only appears once @@ -214,13 +238,13 @@ struct DeletionHeader { /// The highest sequence number (inclusive) that has been validated. All deletion /// lists on disk with a sequence <= this value are safe to execute. - validated_sequence: u64, + validated_sequence: DeletionListSeq, } impl DeletionHeader { const VERSION_LATEST: u8 = 1; - fn new(validated_sequence: u64) -> Self { + fn new(validated_sequence: DeletionListSeq) -> Self { Self { version: Self::VERSION_LATEST, validated_sequence, @@ -242,7 +266,7 @@ impl DeletionHeader { impl DeletionList { const VERSION_LATEST: u8 = 1; - fn new(sequence: u64) -> Self { + fn new(sequence: DeletionListSeq) -> Self { Self { version: Self::VERSION_LATEST, sequence, @@ -460,6 +484,8 @@ impl DeletionQueueClient { /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer /// references them). /// + /// The returned `DeletionNotify` can be used to wait for the deletion to execute. + /// /// The `current_generation` is the generation of this pageserver's current attachment. The /// generations in `layers` are the generations in which those layers were written. pub(crate) fn push_layers( @@ -468,12 +494,14 @@ impl DeletionQueueClient { timeline_id: TimelineId, current_generation: Generation, layers: Vec<(LayerName, LayerFileMetadata)>, - ) -> Result<(), DeletionQueueError> { + ) -> Result { // None generations are not valid for attached tenants: they must always be attached in // a known generation. None generations are still permitted for layers in the index because // they may be historical. assert!(!current_generation.is_none()); + let (seq_tx, seq_rx) = tokio::sync::oneshot::channel(); + metrics::DELETION_QUEUE .keys_submitted .inc_by(layers.len() as u64); @@ -485,8 +513,14 @@ impl DeletionQueueClient { layers, generation: current_generation, objects: Vec::new(), + seq_tx, }), - ) + )?; + + Ok(DeletionNotify { + seq_rx, + executed_rx: self.executed_rx.clone(), + }) } /// This is cancel-safe. If you drop the future the flush may still happen in the background. @@ -610,6 +644,10 @@ impl DeletionQueue { // happen in the backend (persistent), not in this queue. let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16); + // Notifies clients about executed deletions. + // TODO: recover the last sequence number on startup. + let (executed_tx, executed_rx) = tokio::sync::watch::channel(0); + let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())); // The deletion queue has an independent cancellation token to @@ -622,6 +660,7 @@ impl DeletionQueue { client: DeletionQueueClient { tx, executor_tx: executor_tx.clone(), + executed_rx, lsn_table: lsn_table.clone(), }, cancel: cancel.clone(), @@ -632,6 +671,7 @@ impl DeletionQueue { conf, backend_rx, executor_tx, + executed_tx, controller_upcall_client, lsn_table.clone(), cancel.clone(), @@ -1228,6 +1268,7 @@ pub(crate) mod mock { DeletionQueueClient { tx: self.tx.clone(), executor_tx: self.executor_tx.clone(), + executed_rx: todo!(), lsn_table: self.lsn_table.clone(), } } diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index ae3b2c9180..1ae41f935a 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -12,6 +12,7 @@ use super::DeletionHeader; use super::DeletionList; +use super::DeletionListSeq; use super::FlushOp; use super::ValidatorQueueMessage; @@ -65,6 +66,9 @@ pub(super) struct DeletionOp { /// The _current_ generation of the Tenant shard attachment in which we are enqueuing /// this deletion. pub(super) generation: Generation, + + /// Return channel for the `DeletionList::sequence` this deletion is included in. + pub(super) seq_tx: tokio::sync::oneshot::Sender, } #[derive(Debug)] @@ -175,7 +179,7 @@ impl ListWriter { /// /// It is not an error for the header to not exist: we return None, and /// the caller should act as if validated_sequence is 0 - async fn load_validated_sequence(&self) -> Result, anyhow::Error> { + async fn load_validated_sequence(&self) -> Result, anyhow::Error> { let header_path = self.conf.deletion_header_path(); match tokio::fs::read(&header_path).await { Ok(header_bytes) => { @@ -228,7 +232,7 @@ impl ListWriter { let temp_extension = format!(".{TEMP_SUFFIX}"); let header_path = self.conf.deletion_header_path(); - let mut seqs: Vec = Vec::new(); + let mut seqs: Vec = Vec::new(); while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") { let file_name = dentry.file_name(); let dentry_str = file_name.to_string_lossy(); @@ -433,6 +437,9 @@ impl ListWriter { metrics::DELETION_QUEUE.unexpected_errors.inc(); } } + + // Notify the client about the sequence number of this deletion. + op.seq_tx.send(self.pending.sequence).ok(); } ListWriterQueueMessage::Flush(op) => { if self.pending.is_empty() { diff --git a/pageserver/src/deletion_queue/validator.rs b/pageserver/src/deletion_queue/validator.rs index 1d55581ebd..801de2e66a 100644 --- a/pageserver/src/deletion_queue/validator.rs +++ b/pageserver/src/deletion_queue/validator.rs @@ -33,6 +33,7 @@ use crate::virtual_file::MaybeFatalIo; use super::deleter::DeleterMessage; use super::DeletionHeader; use super::DeletionList; +use super::DeletionListSeq; use super::DeletionQueueError; use super::FlushOp; use super::VisibleLsnUpdates; @@ -60,6 +61,9 @@ where rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, + /// Notifies clients about the last executed DeletionList sequence number. + executed_tx: tokio::sync::watch::Sender, + // Client for calling into control plane API for validation of deletes controller_upcall_client: Option, @@ -94,6 +98,7 @@ where conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, + executed_tx: tokio::sync::watch::Sender, controller_upcall_client: Option, lsn_table: Arc>, cancel: CancellationToken, @@ -102,6 +107,7 @@ where conf, rx, tx, + executed_tx, controller_upcall_client, lsn_table, pending_lists: Vec::new(), @@ -161,7 +167,7 @@ where tenant_generations.keys().map(|k| (*k, true)).collect() }; - let mut validated_sequence: Option = None; + let mut validated_sequence: Option = None; // Apply the validation results to the pending LSN updates for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants { @@ -295,6 +301,7 @@ where async fn cleanup_lists(&mut self, list_paths: Vec) { for list_path in list_paths { debug!("Removing deletion list {list_path}"); + // TODO: this needs to fsync the removal. tokio::fs::remove_file(&list_path) .await .fatal_err("remove deletion list"); @@ -324,6 +331,7 @@ where } // Drain `validated_lists` into the executor + let executed_seq = self.validated_lists.iter().map(|l| l.sequence).max(); let mut executing_lists = Vec::new(); for list in self.validated_lists.drain(..) { let list_path = self.conf.deletion_list_path(list.sequence); @@ -340,6 +348,14 @@ where // Erase the deletion lists whose keys have all be deleted from remote storage self.cleanup_lists(executing_lists).await; + // Notify any waiters that the deletions have been executed. + // + // TODO: this will wait for all pending lists to be deleted. Consider making it more + // responsive by processing lists one by one. + if let Some(executed_seq) = executed_seq { + self.executed_tx.send_replace(executed_seq); + } + Ok(()) } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 713efbb9a4..30489939e0 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -2151,6 +2151,7 @@ impl RemoteTimelineClient { self.generation, delete.layers.clone(), ) + .map(|_| ()) .map_err(|e| anyhow::anyhow!(e)) } }