mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
deletion queue: cancellable retries
This commit is contained in:
@@ -190,6 +190,8 @@ pub enum DownloadError {
|
||||
BadInput(anyhow::Error),
|
||||
/// The file was not found in the remote storage.
|
||||
NotFound,
|
||||
/// The client was shut down
|
||||
Shutdown,
|
||||
/// The file was found in the remote storage, but the download failed.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -201,6 +203,7 @@ impl std::fmt::Display for DownloadError {
|
||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||
}
|
||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||
DownloadError::Shutdown => write!(f, "Client shutting down"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -352,8 +352,9 @@ fn start_pageserver(
|
||||
let remote_storage = create_remote_storage_client(conf)?;
|
||||
|
||||
// Set up deletion queue
|
||||
let deletion_queue_cancel = tokio_util::sync::CancellationToken::new();
|
||||
let (deletion_queue, deletion_frontend, deletion_backend) =
|
||||
DeletionQueue::new(remote_storage.clone(), conf);
|
||||
DeletionQueue::new(remote_storage.clone(), conf, deletion_queue_cancel.clone());
|
||||
if let Some(mut deletion_frontend) = deletion_frontend {
|
||||
BACKGROUND_RUNTIME.spawn(async move {
|
||||
deletion_frontend
|
||||
@@ -655,6 +656,9 @@ fn start_pageserver(
|
||||
}
|
||||
});
|
||||
|
||||
// Clean shutdown of deletion queue workers
|
||||
deletion_queue_cancel.cancel();
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -8,6 +8,7 @@ use serde_with::serde_as;
|
||||
use thiserror::Error;
|
||||
use tokio;
|
||||
use tokio::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{self, debug, error, info, warn};
|
||||
use utils::backoff;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -517,6 +518,9 @@ pub struct FrontendQueueWorker {
|
||||
// this is lazy usually, but after a failed flush it is set to a smaller time
|
||||
// period to drive retries
|
||||
timeout: Duration,
|
||||
|
||||
// Worker loop is torn down when this fires.
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl FrontendQueueWorker {
|
||||
@@ -592,6 +596,7 @@ impl FrontendQueueWorker {
|
||||
3,
|
||||
u32::MAX,
|
||||
"Reading deletion queue header",
|
||||
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -622,15 +627,15 @@ impl FrontendQueueWorker {
|
||||
};
|
||||
};
|
||||
|
||||
// TODO: this needs a CancellationToken or equivalent: usual worker teardown happens via the channel
|
||||
let prefix = RemotePath::new(&self.conf.remote_deletion_node_prefix())
|
||||
.expect("Failed to compose path");
|
||||
let lists = backoff::retry(
|
||||
|| async { self.remote_storage.list_prefixes(Some(&prefix)).await },
|
||||
|_| false, // TODO impl is_permanent
|
||||
|_| false,
|
||||
3,
|
||||
u32::MAX, // There's no point giving up, since once we do that the deletion queue is stuck
|
||||
"Recovering deletion lists",
|
||||
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -693,6 +698,7 @@ impl FrontendQueueWorker {
|
||||
3,
|
||||
u32::MAX,
|
||||
"Reading a deletion list",
|
||||
backoff::Cancel::new(self.cancel.clone(), || DownloadError::Shutdown),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -826,6 +832,7 @@ impl DeletionQueue {
|
||||
pub fn new(
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
conf: &'static PageServerConf,
|
||||
cancel: CancellationToken,
|
||||
) -> (
|
||||
Self,
|
||||
Option<FrontendQueueWorker>,
|
||||
@@ -850,6 +857,7 @@ impl DeletionQueue {
|
||||
tx: backend_tx,
|
||||
timeout: FLUSH_DEFAULT_DEADLINE,
|
||||
pending_flushes: Vec::new(),
|
||||
cancel,
|
||||
}),
|
||||
Some(BackendQueueWorker {
|
||||
remote_storage,
|
||||
@@ -896,8 +904,11 @@ mod test {
|
||||
impl TestSetup {
|
||||
/// Simulate a pageserver restart by destroying and recreating the deletion queue
|
||||
fn restart(&mut self) {
|
||||
let (deletion_queue, fe_worker, be_worker) =
|
||||
DeletionQueue::new(Some(self.storage.clone()), self.harness.conf);
|
||||
let (deletion_queue, fe_worker, be_worker) = DeletionQueue::new(
|
||||
Some(self.storage.clone()),
|
||||
self.harness.conf,
|
||||
CancellationToken::new(),
|
||||
);
|
||||
|
||||
self.deletion_queue = deletion_queue;
|
||||
|
||||
@@ -948,8 +959,11 @@ mod test {
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let (deletion_queue, fe_worker, be_worker) =
|
||||
DeletionQueue::new(Some(storage.clone()), harness.conf);
|
||||
let (deletion_queue, fe_worker, be_worker) = DeletionQueue::new(
|
||||
Some(storage.clone()),
|
||||
harness.conf,
|
||||
CancellationToken::new(),
|
||||
);
|
||||
|
||||
let mut fe_worker = fe_worker.unwrap();
|
||||
let mut be_worker = be_worker.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user