Compare commits

...

1 Commits

Author SHA1 Message Date
Erik Grinaker
377af2c673 pageserver: add DeletionNotify to wait for deletion queue execution 2025-02-12 18:20:39 +01:00
4 changed files with 75 additions and 10 deletions

View File

@@ -150,10 +150,30 @@ impl FlushOp {
} }
} }
/// A DeletionNotify can be used to wait for a deletion to have been executed.
#[derive(Debug)]
pub struct DeletionNotify {
/// Receives the `DeletionListSeq` from `ListWriter` when scheduled in a `DeletionList`.
seq_rx: tokio::sync::oneshot::Receiver<DeletionListSeq>,
/// Watches the last executed `DeletionListSeq`.
executed_rx: tokio::sync::watch::Receiver<DeletionListSeq>,
}
impl DeletionNotify {
/// Waits for the deletion to have been executed.
pub async fn notify(mut self) {
let Ok(wait_seq) = self.seq_rx.await else {
return; // TODO return error
};
self.executed_rx.wait_for(|&seq| seq >= wait_seq).await.ok();
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DeletionQueueClient { pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>, tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>, executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
executed_rx: tokio::sync::watch::Receiver<DeletionListSeq>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>, lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
} }
@@ -176,6 +196,9 @@ impl TenantDeletionList {
} }
} }
/// Deletion list sequence number. Monotonically increasing, even across restarts.
type DeletionListSeq = u64;
/// Files ending with this suffix will be ignored and erased /// Files ending with this suffix will be ignored and erased
/// during recovery as startup. /// during recovery as startup.
const TEMP_SUFFIX: &str = "tmp"; const TEMP_SUFFIX: &str = "tmp";
@@ -185,8 +208,9 @@ struct DeletionList {
/// Serialization version, for future use /// Serialization version, for future use
version: u8, version: u8,
/// Used for constructing a unique key for each deletion list we write out. /// Used for constructing a unique key for each deletion list we write out, and to notify
sequence: u64, /// callers when a deletion has been executed (and will not be retried later).
sequence: DeletionListSeq,
/// To avoid repeating tenant/timeline IDs in every key, we store keys in /// To avoid repeating tenant/timeline IDs in every key, we store keys in
/// nested HashMaps by TenantTimelineID. Each Tenant only appears once /// nested HashMaps by TenantTimelineID. Each Tenant only appears once
@@ -214,13 +238,13 @@ struct DeletionHeader {
/// The highest sequence number (inclusive) that has been validated. All deletion /// The highest sequence number (inclusive) that has been validated. All deletion
/// lists on disk with a sequence <= this value are safe to execute. /// lists on disk with a sequence <= this value are safe to execute.
validated_sequence: u64, validated_sequence: DeletionListSeq,
} }
impl DeletionHeader { impl DeletionHeader {
const VERSION_LATEST: u8 = 1; const VERSION_LATEST: u8 = 1;
fn new(validated_sequence: u64) -> Self { fn new(validated_sequence: DeletionListSeq) -> Self {
Self { Self {
version: Self::VERSION_LATEST, version: Self::VERSION_LATEST,
validated_sequence, validated_sequence,
@@ -242,7 +266,7 @@ impl DeletionHeader {
impl DeletionList { impl DeletionList {
const VERSION_LATEST: u8 = 1; const VERSION_LATEST: u8 = 1;
fn new(sequence: u64) -> Self { fn new(sequence: DeletionListSeq) -> Self {
Self { Self {
version: Self::VERSION_LATEST, version: Self::VERSION_LATEST,
sequence, sequence,
@@ -460,6 +484,8 @@ impl DeletionQueueClient {
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer /// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
/// references them). /// references them).
/// ///
/// The returned `DeletionNotify` can be used to wait for the deletion to execute.
///
/// The `current_generation` is the generation of this pageserver's current attachment. The /// The `current_generation` is the generation of this pageserver's current attachment. The
/// generations in `layers` are the generations in which those layers were written. /// generations in `layers` are the generations in which those layers were written.
pub(crate) fn push_layers( pub(crate) fn push_layers(
@@ -468,12 +494,14 @@ impl DeletionQueueClient {
timeline_id: TimelineId, timeline_id: TimelineId,
current_generation: Generation, current_generation: Generation,
layers: Vec<(LayerName, LayerFileMetadata)>, layers: Vec<(LayerName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> { ) -> Result<DeletionNotify, DeletionQueueError> {
// None generations are not valid for attached tenants: they must always be attached in // None generations are not valid for attached tenants: they must always be attached in
// a known generation. None generations are still permitted for layers in the index because // a known generation. None generations are still permitted for layers in the index because
// they may be historical. // they may be historical.
assert!(!current_generation.is_none()); assert!(!current_generation.is_none());
let (seq_tx, seq_rx) = tokio::sync::oneshot::channel();
metrics::DELETION_QUEUE metrics::DELETION_QUEUE
.keys_submitted .keys_submitted
.inc_by(layers.len() as u64); .inc_by(layers.len() as u64);
@@ -485,8 +513,14 @@ impl DeletionQueueClient {
layers, layers,
generation: current_generation, generation: current_generation,
objects: Vec::new(), objects: Vec::new(),
seq_tx,
}), }),
) )?;
Ok(DeletionNotify {
seq_rx,
executed_rx: self.executed_rx.clone(),
})
} }
/// This is cancel-safe. If you drop the future the flush may still happen in the background. /// This is cancel-safe. If you drop the future the flush may still happen in the background.
@@ -610,6 +644,10 @@ impl DeletionQueue {
// happen in the backend (persistent), not in this queue. // happen in the backend (persistent), not in this queue.
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16); let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
// Notifies clients about executed deletions.
// TODO: recover the last sequence number on startup.
let (executed_tx, executed_rx) = tokio::sync::watch::channel(0);
let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())); let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
// The deletion queue has an independent cancellation token to // The deletion queue has an independent cancellation token to
@@ -622,6 +660,7 @@ impl DeletionQueue {
client: DeletionQueueClient { client: DeletionQueueClient {
tx, tx,
executor_tx: executor_tx.clone(), executor_tx: executor_tx.clone(),
executed_rx,
lsn_table: lsn_table.clone(), lsn_table: lsn_table.clone(),
}, },
cancel: cancel.clone(), cancel: cancel.clone(),
@@ -632,6 +671,7 @@ impl DeletionQueue {
conf, conf,
backend_rx, backend_rx,
executor_tx, executor_tx,
executed_tx,
controller_upcall_client, controller_upcall_client,
lsn_table.clone(), lsn_table.clone(),
cancel.clone(), cancel.clone(),
@@ -1228,6 +1268,7 @@ pub(crate) mod mock {
DeletionQueueClient { DeletionQueueClient {
tx: self.tx.clone(), tx: self.tx.clone(),
executor_tx: self.executor_tx.clone(), executor_tx: self.executor_tx.clone(),
executed_rx: todo!(),
lsn_table: self.lsn_table.clone(), lsn_table: self.lsn_table.clone(),
} }
} }

View File

@@ -12,6 +12,7 @@
use super::DeletionHeader; use super::DeletionHeader;
use super::DeletionList; use super::DeletionList;
use super::DeletionListSeq;
use super::FlushOp; use super::FlushOp;
use super::ValidatorQueueMessage; use super::ValidatorQueueMessage;
@@ -65,6 +66,9 @@ pub(super) struct DeletionOp {
/// The _current_ generation of the Tenant shard attachment in which we are enqueuing /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
/// this deletion. /// this deletion.
pub(super) generation: Generation, pub(super) generation: Generation,
/// Return channel for the `DeletionList::sequence` this deletion is included in.
pub(super) seq_tx: tokio::sync::oneshot::Sender<DeletionListSeq>,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -175,7 +179,7 @@ impl ListWriter {
/// ///
/// It is not an error for the header to not exist: we return None, and /// It is not an error for the header to not exist: we return None, and
/// the caller should act as if validated_sequence is 0 /// the caller should act as if validated_sequence is 0
async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> { async fn load_validated_sequence(&self) -> Result<Option<DeletionListSeq>, anyhow::Error> {
let header_path = self.conf.deletion_header_path(); let header_path = self.conf.deletion_header_path();
match tokio::fs::read(&header_path).await { match tokio::fs::read(&header_path).await {
Ok(header_bytes) => { Ok(header_bytes) => {
@@ -228,7 +232,7 @@ impl ListWriter {
let temp_extension = format!(".{TEMP_SUFFIX}"); let temp_extension = format!(".{TEMP_SUFFIX}");
let header_path = self.conf.deletion_header_path(); let header_path = self.conf.deletion_header_path();
let mut seqs: Vec<u64> = Vec::new(); let mut seqs: Vec<DeletionListSeq> = Vec::new();
while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") { while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
let file_name = dentry.file_name(); let file_name = dentry.file_name();
let dentry_str = file_name.to_string_lossy(); let dentry_str = file_name.to_string_lossy();
@@ -433,6 +437,9 @@ impl ListWriter {
metrics::DELETION_QUEUE.unexpected_errors.inc(); metrics::DELETION_QUEUE.unexpected_errors.inc();
} }
} }
// Notify the client about the sequence number of this deletion.
op.seq_tx.send(self.pending.sequence).ok();
} }
ListWriterQueueMessage::Flush(op) => { ListWriterQueueMessage::Flush(op) => {
if self.pending.is_empty() { if self.pending.is_empty() {

View File

@@ -33,6 +33,7 @@ use crate::virtual_file::MaybeFatalIo;
use super::deleter::DeleterMessage; use super::deleter::DeleterMessage;
use super::DeletionHeader; use super::DeletionHeader;
use super::DeletionList; use super::DeletionList;
use super::DeletionListSeq;
use super::DeletionQueueError; use super::DeletionQueueError;
use super::FlushOp; use super::FlushOp;
use super::VisibleLsnUpdates; use super::VisibleLsnUpdates;
@@ -60,6 +61,9 @@ where
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>, rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
tx: tokio::sync::mpsc::Sender<DeleterMessage>, tx: tokio::sync::mpsc::Sender<DeleterMessage>,
/// Notifies clients about the last executed DeletionList sequence number.
executed_tx: tokio::sync::watch::Sender<DeletionListSeq>,
// Client for calling into control plane API for validation of deletes // Client for calling into control plane API for validation of deletes
controller_upcall_client: Option<C>, controller_upcall_client: Option<C>,
@@ -94,6 +98,7 @@ where
conf: &'static PageServerConf, conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>, rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
tx: tokio::sync::mpsc::Sender<DeleterMessage>, tx: tokio::sync::mpsc::Sender<DeleterMessage>,
executed_tx: tokio::sync::watch::Sender<DeletionListSeq>,
controller_upcall_client: Option<C>, controller_upcall_client: Option<C>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>, lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken, cancel: CancellationToken,
@@ -102,6 +107,7 @@ where
conf, conf,
rx, rx,
tx, tx,
executed_tx,
controller_upcall_client, controller_upcall_client,
lsn_table, lsn_table,
pending_lists: Vec::new(), pending_lists: Vec::new(),
@@ -161,7 +167,7 @@ where
tenant_generations.keys().map(|k| (*k, true)).collect() tenant_generations.keys().map(|k| (*k, true)).collect()
}; };
let mut validated_sequence: Option<u64> = None; let mut validated_sequence: Option<DeletionListSeq> = None;
// Apply the validation results to the pending LSN updates // Apply the validation results to the pending LSN updates
for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants { for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
@@ -295,6 +301,7 @@ where
async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) { async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
for list_path in list_paths { for list_path in list_paths {
debug!("Removing deletion list {list_path}"); debug!("Removing deletion list {list_path}");
// TODO: this needs to fsync the removal.
tokio::fs::remove_file(&list_path) tokio::fs::remove_file(&list_path)
.await .await
.fatal_err("remove deletion list"); .fatal_err("remove deletion list");
@@ -324,6 +331,7 @@ where
} }
// Drain `validated_lists` into the executor // Drain `validated_lists` into the executor
let executed_seq = self.validated_lists.iter().map(|l| l.sequence).max();
let mut executing_lists = Vec::new(); let mut executing_lists = Vec::new();
for list in self.validated_lists.drain(..) { for list in self.validated_lists.drain(..) {
let list_path = self.conf.deletion_list_path(list.sequence); let list_path = self.conf.deletion_list_path(list.sequence);
@@ -340,6 +348,14 @@ where
// Erase the deletion lists whose keys have all be deleted from remote storage // Erase the deletion lists whose keys have all be deleted from remote storage
self.cleanup_lists(executing_lists).await; self.cleanup_lists(executing_lists).await;
// Notify any waiters that the deletions have been executed.
//
// TODO: this will wait for all pending lists to be deleted. Consider making it more
// responsive by processing lists one by one.
if let Some(executed_seq) = executed_seq {
self.executed_tx.send_replace(executed_seq);
}
Ok(()) Ok(())
} }

View File

@@ -2151,6 +2151,7 @@ impl RemoteTimelineClient {
self.generation, self.generation,
delete.layers.clone(), delete.layers.clone(),
) )
.map(|_| ())
.map_err(|e| anyhow::anyhow!(e)) .map_err(|e| anyhow::anyhow!(e))
} }
} }