mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 23:50:39 +00:00
Compare commits
1 Commits
quantumish
...
erik/delet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
377af2c673 |
@@ -150,10 +150,30 @@ impl FlushOp {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A DeletionNotify can be used to wait for a deletion to have been executed.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DeletionNotify {
|
||||||
|
/// Receives the `DeletionListSeq` from `ListWriter` when scheduled in a `DeletionList`.
|
||||||
|
seq_rx: tokio::sync::oneshot::Receiver<DeletionListSeq>,
|
||||||
|
/// Watches the last executed `DeletionListSeq`.
|
||||||
|
executed_rx: tokio::sync::watch::Receiver<DeletionListSeq>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DeletionNotify {
|
||||||
|
/// Waits for the deletion to have been executed.
|
||||||
|
pub async fn notify(mut self) {
|
||||||
|
let Ok(wait_seq) = self.seq_rx.await else {
|
||||||
|
return; // TODO return error
|
||||||
|
};
|
||||||
|
self.executed_rx.wait_for(|&seq| seq >= wait_seq).await.ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct DeletionQueueClient {
|
pub struct DeletionQueueClient {
|
||||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||||
|
executed_rx: tokio::sync::watch::Receiver<DeletionListSeq>,
|
||||||
|
|
||||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||||
}
|
}
|
||||||
@@ -176,6 +196,9 @@ impl TenantDeletionList {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Deletion list sequence number. Monotonically increasing, even across restarts.
|
||||||
|
type DeletionListSeq = u64;
|
||||||
|
|
||||||
/// Files ending with this suffix will be ignored and erased
|
/// Files ending with this suffix will be ignored and erased
|
||||||
/// during recovery as startup.
|
/// during recovery as startup.
|
||||||
const TEMP_SUFFIX: &str = "tmp";
|
const TEMP_SUFFIX: &str = "tmp";
|
||||||
@@ -185,8 +208,9 @@ struct DeletionList {
|
|||||||
/// Serialization version, for future use
|
/// Serialization version, for future use
|
||||||
version: u8,
|
version: u8,
|
||||||
|
|
||||||
/// Used for constructing a unique key for each deletion list we write out.
|
/// Used for constructing a unique key for each deletion list we write out, and to notify
|
||||||
sequence: u64,
|
/// callers when a deletion has been executed (and will not be retried later).
|
||||||
|
sequence: DeletionListSeq,
|
||||||
|
|
||||||
/// To avoid repeating tenant/timeline IDs in every key, we store keys in
|
/// To avoid repeating tenant/timeline IDs in every key, we store keys in
|
||||||
/// nested HashMaps by TenantTimelineID. Each Tenant only appears once
|
/// nested HashMaps by TenantTimelineID. Each Tenant only appears once
|
||||||
@@ -214,13 +238,13 @@ struct DeletionHeader {
|
|||||||
|
|
||||||
/// The highest sequence number (inclusive) that has been validated. All deletion
|
/// The highest sequence number (inclusive) that has been validated. All deletion
|
||||||
/// lists on disk with a sequence <= this value are safe to execute.
|
/// lists on disk with a sequence <= this value are safe to execute.
|
||||||
validated_sequence: u64,
|
validated_sequence: DeletionListSeq,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DeletionHeader {
|
impl DeletionHeader {
|
||||||
const VERSION_LATEST: u8 = 1;
|
const VERSION_LATEST: u8 = 1;
|
||||||
|
|
||||||
fn new(validated_sequence: u64) -> Self {
|
fn new(validated_sequence: DeletionListSeq) -> Self {
|
||||||
Self {
|
Self {
|
||||||
version: Self::VERSION_LATEST,
|
version: Self::VERSION_LATEST,
|
||||||
validated_sequence,
|
validated_sequence,
|
||||||
@@ -242,7 +266,7 @@ impl DeletionHeader {
|
|||||||
|
|
||||||
impl DeletionList {
|
impl DeletionList {
|
||||||
const VERSION_LATEST: u8 = 1;
|
const VERSION_LATEST: u8 = 1;
|
||||||
fn new(sequence: u64) -> Self {
|
fn new(sequence: DeletionListSeq) -> Self {
|
||||||
Self {
|
Self {
|
||||||
version: Self::VERSION_LATEST,
|
version: Self::VERSION_LATEST,
|
||||||
sequence,
|
sequence,
|
||||||
@@ -460,6 +484,8 @@ impl DeletionQueueClient {
|
|||||||
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
|
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
|
||||||
/// references them).
|
/// references them).
|
||||||
///
|
///
|
||||||
|
/// The returned `DeletionNotify` can be used to wait for the deletion to execute.
|
||||||
|
///
|
||||||
/// The `current_generation` is the generation of this pageserver's current attachment. The
|
/// The `current_generation` is the generation of this pageserver's current attachment. The
|
||||||
/// generations in `layers` are the generations in which those layers were written.
|
/// generations in `layers` are the generations in which those layers were written.
|
||||||
pub(crate) fn push_layers(
|
pub(crate) fn push_layers(
|
||||||
@@ -468,12 +494,14 @@ impl DeletionQueueClient {
|
|||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
current_generation: Generation,
|
current_generation: Generation,
|
||||||
layers: Vec<(LayerName, LayerFileMetadata)>,
|
layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||||
) -> Result<(), DeletionQueueError> {
|
) -> Result<DeletionNotify, DeletionQueueError> {
|
||||||
// None generations are not valid for attached tenants: they must always be attached in
|
// None generations are not valid for attached tenants: they must always be attached in
|
||||||
// a known generation. None generations are still permitted for layers in the index because
|
// a known generation. None generations are still permitted for layers in the index because
|
||||||
// they may be historical.
|
// they may be historical.
|
||||||
assert!(!current_generation.is_none());
|
assert!(!current_generation.is_none());
|
||||||
|
|
||||||
|
let (seq_tx, seq_rx) = tokio::sync::oneshot::channel();
|
||||||
|
|
||||||
metrics::DELETION_QUEUE
|
metrics::DELETION_QUEUE
|
||||||
.keys_submitted
|
.keys_submitted
|
||||||
.inc_by(layers.len() as u64);
|
.inc_by(layers.len() as u64);
|
||||||
@@ -485,8 +513,14 @@ impl DeletionQueueClient {
|
|||||||
layers,
|
layers,
|
||||||
generation: current_generation,
|
generation: current_generation,
|
||||||
objects: Vec::new(),
|
objects: Vec::new(),
|
||||||
|
seq_tx,
|
||||||
}),
|
}),
|
||||||
)
|
)?;
|
||||||
|
|
||||||
|
Ok(DeletionNotify {
|
||||||
|
seq_rx,
|
||||||
|
executed_rx: self.executed_rx.clone(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
|
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
|
||||||
@@ -610,6 +644,10 @@ impl DeletionQueue {
|
|||||||
// happen in the backend (persistent), not in this queue.
|
// happen in the backend (persistent), not in this queue.
|
||||||
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
|
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
|
||||||
|
|
||||||
|
// Notifies clients about executed deletions.
|
||||||
|
// TODO: recover the last sequence number on startup.
|
||||||
|
let (executed_tx, executed_rx) = tokio::sync::watch::channel(0);
|
||||||
|
|
||||||
let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
|
let lsn_table = Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new()));
|
||||||
|
|
||||||
// The deletion queue has an independent cancellation token to
|
// The deletion queue has an independent cancellation token to
|
||||||
@@ -622,6 +660,7 @@ impl DeletionQueue {
|
|||||||
client: DeletionQueueClient {
|
client: DeletionQueueClient {
|
||||||
tx,
|
tx,
|
||||||
executor_tx: executor_tx.clone(),
|
executor_tx: executor_tx.clone(),
|
||||||
|
executed_rx,
|
||||||
lsn_table: lsn_table.clone(),
|
lsn_table: lsn_table.clone(),
|
||||||
},
|
},
|
||||||
cancel: cancel.clone(),
|
cancel: cancel.clone(),
|
||||||
@@ -632,6 +671,7 @@ impl DeletionQueue {
|
|||||||
conf,
|
conf,
|
||||||
backend_rx,
|
backend_rx,
|
||||||
executor_tx,
|
executor_tx,
|
||||||
|
executed_tx,
|
||||||
controller_upcall_client,
|
controller_upcall_client,
|
||||||
lsn_table.clone(),
|
lsn_table.clone(),
|
||||||
cancel.clone(),
|
cancel.clone(),
|
||||||
@@ -1228,6 +1268,7 @@ pub(crate) mod mock {
|
|||||||
DeletionQueueClient {
|
DeletionQueueClient {
|
||||||
tx: self.tx.clone(),
|
tx: self.tx.clone(),
|
||||||
executor_tx: self.executor_tx.clone(),
|
executor_tx: self.executor_tx.clone(),
|
||||||
|
executed_rx: todo!(),
|
||||||
lsn_table: self.lsn_table.clone(),
|
lsn_table: self.lsn_table.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@
|
|||||||
|
|
||||||
use super::DeletionHeader;
|
use super::DeletionHeader;
|
||||||
use super::DeletionList;
|
use super::DeletionList;
|
||||||
|
use super::DeletionListSeq;
|
||||||
use super::FlushOp;
|
use super::FlushOp;
|
||||||
use super::ValidatorQueueMessage;
|
use super::ValidatorQueueMessage;
|
||||||
|
|
||||||
@@ -65,6 +66,9 @@ pub(super) struct DeletionOp {
|
|||||||
/// The _current_ generation of the Tenant shard attachment in which we are enqueuing
|
/// The _current_ generation of the Tenant shard attachment in which we are enqueuing
|
||||||
/// this deletion.
|
/// this deletion.
|
||||||
pub(super) generation: Generation,
|
pub(super) generation: Generation,
|
||||||
|
|
||||||
|
/// Return channel for the `DeletionList::sequence` this deletion is included in.
|
||||||
|
pub(super) seq_tx: tokio::sync::oneshot::Sender<DeletionListSeq>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -175,7 +179,7 @@ impl ListWriter {
|
|||||||
///
|
///
|
||||||
/// It is not an error for the header to not exist: we return None, and
|
/// It is not an error for the header to not exist: we return None, and
|
||||||
/// the caller should act as if validated_sequence is 0
|
/// the caller should act as if validated_sequence is 0
|
||||||
async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
|
async fn load_validated_sequence(&self) -> Result<Option<DeletionListSeq>, anyhow::Error> {
|
||||||
let header_path = self.conf.deletion_header_path();
|
let header_path = self.conf.deletion_header_path();
|
||||||
match tokio::fs::read(&header_path).await {
|
match tokio::fs::read(&header_path).await {
|
||||||
Ok(header_bytes) => {
|
Ok(header_bytes) => {
|
||||||
@@ -228,7 +232,7 @@ impl ListWriter {
|
|||||||
|
|
||||||
let temp_extension = format!(".{TEMP_SUFFIX}");
|
let temp_extension = format!(".{TEMP_SUFFIX}");
|
||||||
let header_path = self.conf.deletion_header_path();
|
let header_path = self.conf.deletion_header_path();
|
||||||
let mut seqs: Vec<u64> = Vec::new();
|
let mut seqs: Vec<DeletionListSeq> = Vec::new();
|
||||||
while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
|
while let Some(dentry) = dir.next_entry().await.fatal_err("read deletion dentry") {
|
||||||
let file_name = dentry.file_name();
|
let file_name = dentry.file_name();
|
||||||
let dentry_str = file_name.to_string_lossy();
|
let dentry_str = file_name.to_string_lossy();
|
||||||
@@ -433,6 +437,9 @@ impl ListWriter {
|
|||||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notify the client about the sequence number of this deletion.
|
||||||
|
op.seq_tx.send(self.pending.sequence).ok();
|
||||||
}
|
}
|
||||||
ListWriterQueueMessage::Flush(op) => {
|
ListWriterQueueMessage::Flush(op) => {
|
||||||
if self.pending.is_empty() {
|
if self.pending.is_empty() {
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ use crate::virtual_file::MaybeFatalIo;
|
|||||||
use super::deleter::DeleterMessage;
|
use super::deleter::DeleterMessage;
|
||||||
use super::DeletionHeader;
|
use super::DeletionHeader;
|
||||||
use super::DeletionList;
|
use super::DeletionList;
|
||||||
|
use super::DeletionListSeq;
|
||||||
use super::DeletionQueueError;
|
use super::DeletionQueueError;
|
||||||
use super::FlushOp;
|
use super::FlushOp;
|
||||||
use super::VisibleLsnUpdates;
|
use super::VisibleLsnUpdates;
|
||||||
@@ -60,6 +61,9 @@ where
|
|||||||
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
|
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
|
||||||
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||||
|
|
||||||
|
/// Notifies clients about the last executed DeletionList sequence number.
|
||||||
|
executed_tx: tokio::sync::watch::Sender<DeletionListSeq>,
|
||||||
|
|
||||||
// Client for calling into control plane API for validation of deletes
|
// Client for calling into control plane API for validation of deletes
|
||||||
controller_upcall_client: Option<C>,
|
controller_upcall_client: Option<C>,
|
||||||
|
|
||||||
@@ -94,6 +98,7 @@ where
|
|||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
|
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
|
||||||
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||||
|
executed_tx: tokio::sync::watch::Sender<DeletionListSeq>,
|
||||||
controller_upcall_client: Option<C>,
|
controller_upcall_client: Option<C>,
|
||||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
@@ -102,6 +107,7 @@ where
|
|||||||
conf,
|
conf,
|
||||||
rx,
|
rx,
|
||||||
tx,
|
tx,
|
||||||
|
executed_tx,
|
||||||
controller_upcall_client,
|
controller_upcall_client,
|
||||||
lsn_table,
|
lsn_table,
|
||||||
pending_lists: Vec::new(),
|
pending_lists: Vec::new(),
|
||||||
@@ -161,7 +167,7 @@ where
|
|||||||
tenant_generations.keys().map(|k| (*k, true)).collect()
|
tenant_generations.keys().map(|k| (*k, true)).collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut validated_sequence: Option<u64> = None;
|
let mut validated_sequence: Option<DeletionListSeq> = None;
|
||||||
|
|
||||||
// Apply the validation results to the pending LSN updates
|
// Apply the validation results to the pending LSN updates
|
||||||
for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
|
for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
|
||||||
@@ -295,6 +301,7 @@ where
|
|||||||
async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
|
async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
|
||||||
for list_path in list_paths {
|
for list_path in list_paths {
|
||||||
debug!("Removing deletion list {list_path}");
|
debug!("Removing deletion list {list_path}");
|
||||||
|
// TODO: this needs to fsync the removal.
|
||||||
tokio::fs::remove_file(&list_path)
|
tokio::fs::remove_file(&list_path)
|
||||||
.await
|
.await
|
||||||
.fatal_err("remove deletion list");
|
.fatal_err("remove deletion list");
|
||||||
@@ -324,6 +331,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Drain `validated_lists` into the executor
|
// Drain `validated_lists` into the executor
|
||||||
|
let executed_seq = self.validated_lists.iter().map(|l| l.sequence).max();
|
||||||
let mut executing_lists = Vec::new();
|
let mut executing_lists = Vec::new();
|
||||||
for list in self.validated_lists.drain(..) {
|
for list in self.validated_lists.drain(..) {
|
||||||
let list_path = self.conf.deletion_list_path(list.sequence);
|
let list_path = self.conf.deletion_list_path(list.sequence);
|
||||||
@@ -340,6 +348,14 @@ where
|
|||||||
// Erase the deletion lists whose keys have all be deleted from remote storage
|
// Erase the deletion lists whose keys have all be deleted from remote storage
|
||||||
self.cleanup_lists(executing_lists).await;
|
self.cleanup_lists(executing_lists).await;
|
||||||
|
|
||||||
|
// Notify any waiters that the deletions have been executed.
|
||||||
|
//
|
||||||
|
// TODO: this will wait for all pending lists to be deleted. Consider making it more
|
||||||
|
// responsive by processing lists one by one.
|
||||||
|
if let Some(executed_seq) = executed_seq {
|
||||||
|
self.executed_tx.send_replace(executed_seq);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2151,6 +2151,7 @@ impl RemoteTimelineClient {
|
|||||||
self.generation,
|
self.generation,
|
||||||
delete.layers.clone(),
|
delete.layers.clone(),
|
||||||
)
|
)
|
||||||
|
.map(|_| ())
|
||||||
.map_err(|e| anyhow::anyhow!(e))
|
.map_err(|e| anyhow::anyhow!(e))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user