mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
pageserver: fix executor flush
This commit is contained in:
@@ -384,8 +384,12 @@ impl DeletionQueueClient {
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_push(&self, msg: FrontendQueueMessage) -> Result<(), DeletionQueueError> {
|
||||
match self.tx.send(msg).await {
|
||||
async fn do_push<T>(
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::Sender<T>,
|
||||
msg: T,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
match queue.send(msg).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// This shouldn't happen, we should shut down all tenants before
|
||||
@@ -401,9 +405,10 @@ impl DeletionQueueClient {
|
||||
&self,
|
||||
attached_tenants: HashMap<TenantId, Generation>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.do_push(FrontendQueueMessage::Recover(RecoverOp {
|
||||
attached_tenants,
|
||||
}))
|
||||
self.do_push(
|
||||
&self.tx,
|
||||
FrontendQueueMessage::Recover(RecoverOp { attached_tenants }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -481,22 +486,26 @@ impl DeletionQueueClient {
|
||||
}
|
||||
|
||||
DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64);
|
||||
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
layers,
|
||||
generation: current_generation,
|
||||
objects: Vec::new(),
|
||||
}))
|
||||
self.do_push(
|
||||
&self.tx,
|
||||
FrontendQueueMessage::Delete(DeletionOp {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
layers,
|
||||
generation: current_generation,
|
||||
objects: Vec::new(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn do_flush(
|
||||
async fn do_flush<T>(
|
||||
&self,
|
||||
msg: FrontendQueueMessage,
|
||||
queue: &tokio::sync::mpsc::Sender<T>,
|
||||
msg: T,
|
||||
rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.do_push(msg).await?;
|
||||
self.do_push(queue, msg).await?;
|
||||
if rx.await.is_err() {
|
||||
// This shouldn't happen if tenants are shut down before deletion queue. If we
|
||||
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
|
||||
@@ -511,7 +520,7 @@ impl DeletionQueueClient {
|
||||
/// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
|
||||
pub async fn flush(&self) -> Result<(), DeletionQueueError> {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx)
|
||||
self.do_flush(&self.tx, FrontendQueueMessage::Flush(FlushOp { tx }), rx)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -521,11 +530,27 @@ impl DeletionQueueClient {
|
||||
// Flush any buffered work to deletion lists
|
||||
self.flush().await?;
|
||||
|
||||
// Flush execution of deletion lists
|
||||
// Flush the backend into the executor of deletion lists
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
debug!("flush_execute: flushing backend...");
|
||||
self.do_flush(
|
||||
&self.tx,
|
||||
FrontendQueueMessage::FlushExecute(FlushOp { tx }),
|
||||
rx,
|
||||
)
|
||||
.await?;
|
||||
debug!("flush_execute: finished flushing backend...");
|
||||
|
||||
// Flush any immediate-mode deletions (the above backend flush will only flush
|
||||
// the executor if deletions had flowed through the backend)
|
||||
debug!("flush_execute: flushing execution...");
|
||||
self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx)
|
||||
.await?;
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
self.do_flush(
|
||||
&self.executor_tx,
|
||||
ExecutorMessage::Flush(FlushOp { tx }),
|
||||
rx,
|
||||
)
|
||||
.await?;
|
||||
debug!("flush_execute: finished flushing execution...");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -241,54 +241,64 @@ impl BackendQueueWorker {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn flush(&mut self) {
|
||||
pub async fn flush(&mut self) -> Result<(), DeletionQueueError> {
|
||||
tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
|
||||
|
||||
// Issue any required generation validation calls to the control plane
|
||||
if let Err(DeletionQueueError::ShuttingDown) = self.validate().await {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
}
|
||||
self.validate().await?;
|
||||
|
||||
// After successful validation, nothing is pending: any lists that
|
||||
// made it through validation will be in validated_lists.
|
||||
assert!(self.pending_lists.is_empty());
|
||||
self.pending_key_count = 0;
|
||||
|
||||
// Return quickly if we have no validated lists to execute.
|
||||
tracing::debug!(
|
||||
"Validation complete, have {} validated lists",
|
||||
self.validated_lists.len()
|
||||
);
|
||||
|
||||
// Return quickly if we have no validated lists to execute. This avoids flushing the
|
||||
// executor when an idle backend hits its autoflush interval
|
||||
if self.validated_lists.is_empty() {
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Drain `validated_lists` into the executor
|
||||
let mut executing_lists = Vec::new();
|
||||
for mut list in self.validated_lists.drain(..) {
|
||||
let objects = list.drain_paths();
|
||||
if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
};
|
||||
|
||||
self.tx
|
||||
.send(ExecutorMessage::Delete(objects))
|
||||
.await
|
||||
.map_err(|_| DeletionQueueError::ShuttingDown)?;
|
||||
executing_lists.push(list);
|
||||
}
|
||||
|
||||
self.flush_executor().await?;
|
||||
|
||||
// Erase the deletion lists whose keys have all be deleted from remote storage
|
||||
self.cleanup_lists(executing_lists).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
|
||||
// Flush the executor, so that all the keys referenced by these deletion lists
|
||||
// are actually removed from remote storage. This is a precondition to deleting
|
||||
// the deletion lists themselves.
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
let flush_op = FlushOp { tx };
|
||||
if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
};
|
||||
if rx.await.is_err() {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
}
|
||||
self.tx
|
||||
.send(ExecutorMessage::Flush(flush_op))
|
||||
.await
|
||||
.map_err(|_| DeletionQueueError::ShuttingDown)?;
|
||||
|
||||
// Erase the deletion lists whose keys have all be deleted from remote storage
|
||||
self.cleanup_lists(executing_lists).await;
|
||||
rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
|
||||
}
|
||||
|
||||
pub async fn background(&mut self) {
|
||||
tracing::info!("Started deletion backend worker");
|
||||
|
||||
while !self.cancel.is_cancelled() {
|
||||
let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
|
||||
Ok(Some(m)) => m,
|
||||
@@ -299,8 +309,9 @@ impl BackendQueueWorker {
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
|
||||
// return immediately if no work is pending
|
||||
self.flush().await;
|
||||
// return immediately if no work is pending.
|
||||
// Drop result, because it' a background flush and we don't care whether it really worked.
|
||||
drop(self.flush().await);
|
||||
|
||||
continue;
|
||||
}
|
||||
@@ -316,13 +327,15 @@ impl BackendQueueWorker {
|
||||
}
|
||||
|
||||
if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
|
||||
self.flush().await;
|
||||
// Drop possible shutdown error, because we will just fall out of loop if that happens
|
||||
drop(self.flush().await);
|
||||
}
|
||||
}
|
||||
BackendQueueMessage::Flush(op) => {
|
||||
self.flush().await;
|
||||
|
||||
op.fire();
|
||||
if let Ok(()) = self.flush().await {
|
||||
// If we fail due to shutting down, we will just drop `op` to propagate that status.
|
||||
op.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user