mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-24 22:00:37 +00:00
Compare commits
1 Commits
release-pr
...
erik/delet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
377af2c673 |
@@ -150,10 +150,30 @@ impl FlushOp {
|
||||
}
|
||||
}
|
||||
|
||||
/// A DeletionNotify can be used to wait for a deletion to have been executed.
|
||||
#[derive(Debug)]
|
||||
pub struct DeletionNotify {
|
||||
/// Receives the `DeletionListSeq` from `ListWriter` when scheduled in a `DeletionList`.
|
||||
seq_rx: tokio::sync::oneshot::Receiver<DeletionListSeq>,
|
||||
/// Watches the last executed `DeletionListSeq`.
|
||||
executed_rx: tokio::sync::watch::Receiver<DeletionListSeq>,
|
||||
}
|
||||
|
||||
impl DeletionNotify {
|
||||
/// Waits for the deletion to have been executed.
|
||||
pub async fn notify(mut self) {
|
||||
let Ok(wait_seq) = self.seq_rx.await else {
|
||||
return; // TODO return error
|
||||
};
|
||||
self.executed_rx.wait_for(|&seq| seq >= wait_seq).await.ok();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DeletionQueueClient {
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
executed_rx: tokio::sync::watch::Receiver<DeletionListSeq>,
|
||||
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
}
|
||||
@@ -176,6 +196,9 @@ impl TenantDeletionList {
|
||||
}
|
||||
}
|
||||
|
||||
/// Deletion list sequence number. Monotonically increasing, even across restarts.
|
||||
type DeletionListSeq = u64;
|
||||
|
||||
/// Files ending with this suffix will be ignored and erased
|
||||
/// during recovery as startup.
|
||||
const TEMP_SUFFIX: &str = "tmp";
|
||||
@@ -185,8 +208,9 @@ struct DeletionList {
|
||||
/// Serialization version, for future use
|
||||
version: u8,
|
||||
|
||||
/// Used for constructing a unique key for each deletion list we write out.
|
||||
sequence: u64,
|
||||
/// Used for constructing a unique key for each deletion list we write out, and to notify
|
||||
/// callers when a deletion has been executed (and will not be retried later).
|
||||
sequence: DeletionListSeq,
|
||||
|
||||
/// To avoid repeating tenant/timeline IDs in every key, we store keys in
|
||||
/// nested HashMaps by TenantTimelineID. Each Tenant only appears once
|
||||
@@ -214,13 +238,13 @@ struct DeletionHeader {
|
||||
|
||||
/// The highest sequence number (inclusive) that has been validated. All deletion
|
||||
/// lists on disk with a sequence <= this value are safe to execute.
|
||||
validated_sequence: u64,
|
||||
validated_sequence: DeletionListSeq,
|
||||
}
|
||||
|
||||
impl DeletionHeader {
|
||||
const VERSION_LATEST: u8 = 1;
|
||||
|
||||
fn new(validated_sequence: u64) -> Self {
|
||||
fn new(validated_sequence: DeletionListSeq) -> Self {
|
||||
Self {
|
||||
version: Self::VERSION_LATEST,
|
||||
validated_sequence,
|
||||
@@ -242,7 +266,7 @@ impl DeletionHeader {
|
||||
|
||||
impl DeletionList {
|
||||
const VERSION_LATEST: u8 = 1;
|
||||
fn new(sequence: u64) -> Self {
|
||||
fn new(sequence: DeletionListSeq) -> Self {
|
||||
Self {
|
||||
version: Self::VERSION_LATEST,
|
||||
sequence,
|
||||
@@ -460,6 +484,8 @@ impl DeletionQueueClient {
|
||||
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
|
||||
/// references them).
|
||||
///
|
||||
/// The returned `DeletionNotify` can be used to wait for the deletion to execute.
|
||||
///
|
||||
/// The `current_generation` is the generation of this pageserver's current attachment. The
|
||||
/// generations in `layers` are the generations in which those layers were written.
|
||||
pub(crate) fn push_layers(
|
||||
@@ -468,12 +494,14 @@ impl DeletionQueueClient {
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
) -> Result<DeletionNotify, DeletionQueueError> {
|
||||
// None generations are not valid for attached tenants: they must always be attached in
|
||||
// a known generation. None generations are still permitted for layers in the index because
|
||||
// they may be historical.
|
||||
assert!(!current_generation.is_none());
|
||||
|
||||
let (seq_tx, seq_rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(layers.len() as u64);
|
||||
@@ -485,8 +513,14 @@ impl DeletionQueueClient {
|
||||
layers,
|
||||
generation: current_generation,
|
||||
objects: Vec::new(),
|
||||
seq_tx,
|
||||
}),
|
||||
)
|
||||
)?;
|
||||
|
||||
Ok(DeletionNotify {
|
||||
seq_rx,
|
||||
executed_rx: self.executed_rx.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
|
||||
@@ -610,6 +644,10 @@ impl DeletionQueue {
|
||||
// happen in the backend (persistent), not in this queue.
|
||||
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
// Notifies clients about executed deletions.
|
||||
// TODO: recover the last sequence number on startup.
|
||||
let (executed_tx, executed_rx) = tokio::sync::watch::channel(0);
|
||||
|
||||
let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
|
||||
|
||||
// The deletion queue has an independent cancellation token to
|
||||
@@ -622,6 +660,7 @@ impl DeletionQueue {
|
||||
client: DeletionQueueClient {
|
||||
tx,
|
||||
executor_tx: executor_tx.clone(),
|
||||
executed_rx,
|
||||
lsn_table: lsn_table.clone(),
|
||||
},
|
||||
cancel: cancel.clone(),
|
||||
@@ -632,6 +671,7 @@ impl DeletionQueue {
|
||||
conf,
|
||||
backend_rx,
|
||||
executor_tx,
|
||||
executed_tx,
|
||||
controller_upcall_client,
|
||||
lsn_table.clone(),
|
||||
cancel.clone(),
|
||||
@@ -1228,6 +1268,7 @@ pub(crate) mod mock {
|
||||
DeletionQueueClient {
|
||||
tx: self.tx.clone(),
|
||||
executor_tx: self.executor_tx.clone(),
|
||||
executed_rx: todo!(),
|
||||
lsn_table: self.lsn_table.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
use super::DeletionHeader;
|
||||
use super::DeletionList;
|
||||
use super::DeletionListSeq;
|
||||
use super::FlushOp;
|
||||
use super::ValidatorQueueMessage;
|
||||
|
||||
@@ -65,6 +66,9 @@ pub(super) struct DeletionOp {
|
||||
/// The _current_ generation of the Tenant shard attachment in which we are enqueuing
|
||||
/// this deletion.
|
||||
pub(super) generation: Generation,
|
||||
|
||||
/// Return channel for the `DeletionList::sequence` this deletion is included in.
|
||||
pub(super) seq_tx: tokio::sync::oneshot::Sender<DeletionListSeq>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -175,7 +179,7 @@ impl ListWriter {
|
||||
///
|
||||
/// It is not an error for the header to not exist: we return None, and
|
||||
/// the caller should act as if validated_sequence is 0
|
||||
async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
|
||||
async fn load_validated_sequence(&self) -> Result<Option<DeletionListSeq>, anyhow::Error> {
|
||||
let header_path = self.conf.deletion_header_path();
|
||||
match tokio::fs::read(&header_path).await {
|
||||
Ok(header_bytes) => {
|
||||
@@ -228,7 +232,7 @@ impl ListWriter {
|
||||
|
||||
let temp_extension = format!(".{TEMP_SUFFIX}");
|
||||
let header_path = self.conf.deletion_header_path();
|
||||
let mut seqs: Vec<u64> = Vec::new();
|
||||
let mut seqs: Vec<DeletionListSeq> = Vec::new();
|
||||
while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
|
||||
let file_name = dentry.file_name();
|
||||
let dentry_str = file_name.to_string_lossy();
|
||||
@@ -433,6 +437,9 @@ impl ListWriter {
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
}
|
||||
}
|
||||
|
||||
// Notify the client about the sequence number of this deletion.
|
||||
op.seq_tx.send(self.pending.sequence).ok();
|
||||
}
|
||||
ListWriterQueueMessage::Flush(op) => {
|
||||
if self.pending.is_empty() {
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::virtual_file::MaybeFatalIo;
|
||||
use super::deleter::DeleterMessage;
|
||||
use super::DeletionHeader;
|
||||
use super::DeletionList;
|
||||
use super::DeletionListSeq;
|
||||
use super::DeletionQueueError;
|
||||
use super::FlushOp;
|
||||
use super::VisibleLsnUpdates;
|
||||
@@ -60,6 +61,9 @@ where
|
||||
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
|
||||
/// Notifies clients about the last executed DeletionList sequence number.
|
||||
executed_tx: tokio::sync::watch::Sender<DeletionListSeq>,
|
||||
|
||||
// Client for calling into control plane API for validation of deletes
|
||||
controller_upcall_client: Option<C>,
|
||||
|
||||
@@ -94,6 +98,7 @@ where
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
executed_tx: tokio::sync::watch::Sender<DeletionListSeq>,
|
||||
controller_upcall_client: Option<C>,
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
cancel: CancellationToken,
|
||||
@@ -102,6 +107,7 @@ where
|
||||
conf,
|
||||
rx,
|
||||
tx,
|
||||
executed_tx,
|
||||
controller_upcall_client,
|
||||
lsn_table,
|
||||
pending_lists: Vec::new(),
|
||||
@@ -161,7 +167,7 @@ where
|
||||
tenant_generations.keys().map(|k| (*k, true)).collect()
|
||||
};
|
||||
|
||||
let mut validated_sequence: Option<u64> = None;
|
||||
let mut validated_sequence: Option<DeletionListSeq> = None;
|
||||
|
||||
// Apply the validation results to the pending LSN updates
|
||||
for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
|
||||
@@ -295,6 +301,7 @@ where
|
||||
async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
|
||||
for list_path in list_paths {
|
||||
debug!("Removing deletion list {list_path}");
|
||||
// TODO: this needs to fsync the removal.
|
||||
tokio::fs::remove_file(&list_path)
|
||||
.await
|
||||
.fatal_err("remove deletion list");
|
||||
@@ -324,6 +331,7 @@ where
|
||||
}
|
||||
|
||||
// Drain `validated_lists` into the executor
|
||||
let executed_seq = self.validated_lists.iter().map(|l| l.sequence).max();
|
||||
let mut executing_lists = Vec::new();
|
||||
for list in self.validated_lists.drain(..) {
|
||||
let list_path = self.conf.deletion_list_path(list.sequence);
|
||||
@@ -340,6 +348,14 @@ where
|
||||
// Erase the deletion lists whose keys have all be deleted from remote storage
|
||||
self.cleanup_lists(executing_lists).await;
|
||||
|
||||
// Notify any waiters that the deletions have been executed.
|
||||
//
|
||||
// TODO: this will wait for all pending lists to be deleted. Consider making it more
|
||||
// responsive by processing lists one by one.
|
||||
if let Some(executed_seq) = executed_seq {
|
||||
self.executed_tx.send_replace(executed_seq);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -2151,6 +2151,7 @@ impl RemoteTimelineClient {
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.map(|_| ())
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user