mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
deletion queue: expose errors from push/flush
This commit is contained in:
@@ -9,7 +9,7 @@ use anyhow::{anyhow, Context};
|
||||
use clap::{Arg, ArgAction, Command};
|
||||
use fail::FailScenario;
|
||||
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use pageserver::deletion_queue::DeletionQueue;
|
||||
use pageserver::deletion_queue::{DeletionQueue, DeletionQueueError};
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
|
||||
@@ -633,8 +633,21 @@ fn start_pageserver(
|
||||
let dq = deletion_queue.clone();
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
match tokio::time::timeout(Duration::from_secs(5), dq.new_client().flush()).await {
|
||||
Ok(()) => {
|
||||
info!("Deletion queue flushed successfully on shutdown");
|
||||
Ok(flush_r) => {
|
||||
match flush_r {
|
||||
Ok(()) => {
|
||||
info!("Deletion queue flushed successfully on shutdown")
|
||||
}
|
||||
Err(e) => {
|
||||
match e {
|
||||
DeletionQueueError::ShuttingDown => {
|
||||
// This is not harmful for correctness, but is unexpected: the deletion
|
||||
// queue's workers should stay alive as long as there are any client handles instantiated.
|
||||
warn!("Deletion queue stopped prematurely");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Timed out flushing deletion queue on shutdown ({e})")
|
||||
|
||||
@@ -5,6 +5,7 @@ use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_with::serde_as;
|
||||
use thiserror::Error;
|
||||
use tokio;
|
||||
use tokio::time::Duration;
|
||||
use tracing::{self, debug, error, info, warn};
|
||||
@@ -158,15 +159,22 @@ impl DeletionList {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DeletionQueueError {
|
||||
#[error("Deletion queue unavailable during shutdown")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
impl DeletionQueueClient {
|
||||
async fn do_push(&self, msg: FrontendQueueMessage) {
|
||||
async fn do_push(&self, msg: FrontendQueueMessage) -> Result<(), DeletionQueueError> {
|
||||
match self.tx.send(msg).await {
|
||||
Ok(_) => {}
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// This shouldn't happen, we should shut down all tenants before
|
||||
// we shut down the global delete queue. If we encounter a bug like this,
|
||||
// we may leak objects as deletions won't be processed.
|
||||
error!("Deletion queue closed while pushing, shutting down? ({e})");
|
||||
Err(DeletionQueueError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -180,7 +188,7 @@ impl DeletionQueueClient {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
layers: Vec<LayerFileName>,
|
||||
) {
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64);
|
||||
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
|
||||
tenant_id,
|
||||
@@ -188,7 +196,7 @@ impl DeletionQueueClient {
|
||||
layers,
|
||||
objects: Vec::new(),
|
||||
}))
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
|
||||
/// Just like push_layers, but using some already-known remote paths, instead of abstract layer names
|
||||
@@ -197,7 +205,7 @@ impl DeletionQueueClient {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
objects: Vec<RemotePath>,
|
||||
) {
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
DELETION_QUEUE_SUBMITTED.inc_by(objects.len() as u64);
|
||||
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
|
||||
tenant_id,
|
||||
@@ -205,38 +213,46 @@ impl DeletionQueueClient {
|
||||
layers: Vec::new(),
|
||||
objects,
|
||||
}))
|
||||
.await;
|
||||
.await
|
||||
}
|
||||
|
||||
async fn do_flush(&self, msg: FrontendQueueMessage, rx: tokio::sync::oneshot::Receiver<()>) {
|
||||
self.do_push(msg).await;
|
||||
async fn do_flush(
|
||||
&self,
|
||||
msg: FrontendQueueMessage,
|
||||
rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.do_push(msg).await?;
|
||||
if rx.await.is_err() {
|
||||
// This shouldn't happen if tenants are shut down before deletion queue. If we
|
||||
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
|
||||
// when it hasn't, possibly leading to leaking objects.
|
||||
error!("Deletion queue dropped flush op while client was still waiting");
|
||||
Err(DeletionQueueError::ShuttingDown)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
|
||||
pub async fn flush(&self) {
|
||||
pub async fn flush(&self) -> Result<(), DeletionQueueError> {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx)
|
||||
.await
|
||||
}
|
||||
|
||||
// Wait until all previous deletions are executed
|
||||
pub async fn flush_execute(&self) {
|
||||
pub async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
|
||||
debug!("flush_execute: flushing to deletion lists...");
|
||||
// Flush any buffered work to deletion lists
|
||||
self.flush().await;
|
||||
self.flush().await?;
|
||||
|
||||
// Flush execution of deletion lists
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
debug!("flush_execute: flushing execution...");
|
||||
self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx)
|
||||
.await;
|
||||
.await?;
|
||||
debug!("flush_execute: finished flushing execution...");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1029,13 +1045,13 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
[layer_file_name_1.clone()].to_vec(),
|
||||
));
|
||||
))?;
|
||||
assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path);
|
||||
assert_remote_files(&[], &remote_deletion_prefix);
|
||||
|
||||
// File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
|
||||
info!("Flushing");
|
||||
ctx.runtime.block_on(client.flush());
|
||||
ctx.runtime.block_on(client.flush())?;
|
||||
assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path);
|
||||
assert_remote_files(
|
||||
&["0000000000000001-00000000-01.list"],
|
||||
@@ -1044,13 +1060,13 @@ mod test {
|
||||
|
||||
// File should go away when we execute
|
||||
info!("Flush-executing");
|
||||
ctx.runtime.block_on(client.flush_execute());
|
||||
ctx.runtime.block_on(client.flush_execute())?;
|
||||
assert_remote_files(&[], &remote_timeline_path);
|
||||
assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix);
|
||||
|
||||
// Flushing on an empty queue should succeed immediately, and not write any lists
|
||||
info!("Flush-executing on empty");
|
||||
ctx.runtime.block_on(client.flush_execute());
|
||||
ctx.runtime.block_on(client.flush_execute())?;
|
||||
assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix);
|
||||
|
||||
Ok(())
|
||||
@@ -1086,8 +1102,8 @@ mod test {
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
[layer_file_name_1.clone()].to_vec(),
|
||||
));
|
||||
ctx.runtime.block_on(client.flush());
|
||||
))?;
|
||||
ctx.runtime.block_on(client.flush())?;
|
||||
assert_remote_files(
|
||||
&["0000000000000001-00000000-01.list"],
|
||||
&remote_deletion_prefix,
|
||||
@@ -1100,7 +1116,7 @@ mod test {
|
||||
|
||||
// If we have recovered the deletion list properly, then executing after restart should purge it
|
||||
info!("Flush-executing");
|
||||
ctx.runtime.block_on(client.flush_execute());
|
||||
ctx.runtime.block_on(client.flush_execute())?;
|
||||
assert_remote_files(&[], &remote_timeline_path);
|
||||
assert_remote_files(&["header-00000000-01"], &remote_deletion_prefix);
|
||||
Ok(())
|
||||
|
||||
@@ -23,7 +23,7 @@ use super::models::{
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueue;
|
||||
use crate::deletion_queue::{DeletionQueue, DeletionQueueError};
|
||||
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
@@ -1148,14 +1148,25 @@ async fn deletion_queue_flush(
|
||||
let queue_client = state.deletion_queue.new_client();
|
||||
|
||||
tokio::select! {
|
||||
_ = async {
|
||||
flush_result = async {
|
||||
if execute {
|
||||
queue_client.flush_execute().await;
|
||||
queue_client.flush_execute().await
|
||||
} else {
|
||||
queue_client.flush().await;
|
||||
queue_client.flush().await
|
||||
}
|
||||
} => {
|
||||
json_response(StatusCode::OK, ())
|
||||
match flush_result {
|
||||
Ok(())=> {
|
||||
json_response(StatusCode::OK, ())
|
||||
},
|
||||
Err(e) => {
|
||||
match e {
|
||||
DeletionQueueError::ShuttingDown => {
|
||||
Err(ApiError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
Err(ApiError::ShuttingDown)
|
||||
|
||||
@@ -664,7 +664,7 @@ impl RemoteTimelineClient {
|
||||
// Enqueue deletions
|
||||
deletion_queue_client
|
||||
.push_layers(self.tenant_id, self.timeline_id, names.to_vec())
|
||||
.await;
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -817,7 +817,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
deletion_queue
|
||||
.push_layers(self.tenant_id, self.timeline_id, layers)
|
||||
.await;
|
||||
.await?;
|
||||
|
||||
// Do not delete index part yet, it is needed for possible retry. If we remove it first
|
||||
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
|
||||
@@ -826,7 +826,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Execute all pending deletions, so that when we prroceed to do a list_prefixes below, we aren't
|
||||
// taking the burden of listing all the layers that we already know we should delete.
|
||||
deletion_queue.flush_execute().await;
|
||||
deletion_queue.flush_execute().await?;
|
||||
|
||||
let remaining = backoff::retry(
|
||||
|| async {
|
||||
@@ -858,7 +858,7 @@ impl RemoteTimelineClient {
|
||||
if !remaining.is_empty() {
|
||||
deletion_queue
|
||||
.push_objects(self.tenant_id, self.timeline_id, remaining)
|
||||
.await;
|
||||
.await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-delete", |_| {
|
||||
@@ -872,11 +872,11 @@ impl RemoteTimelineClient {
|
||||
debug!("enqueuing index part deletion");
|
||||
deletion_queue
|
||||
.push_objects(self.tenant_id, self.timeline_id, [index_file_path].to_vec())
|
||||
.await;
|
||||
.await?;
|
||||
|
||||
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
|
||||
// for a flush to a persistent deletion list so that we may be sure deletion will occur.
|
||||
deletion_queue.flush_execute().await;
|
||||
deletion_queue.flush_execute().await?;
|
||||
|
||||
fail::fail_point!("timeline-delete-after-index-delete", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
|
||||
Reference in New Issue
Block a user