deletion queue: wrap workers in an opaque struct

This commit is contained in:
John Spray
2023-09-12 15:03:09 +01:00
parent fad9c45f11
commit b76f08e863
5 changed files with 85 additions and 95 deletions

View File

@@ -364,31 +364,10 @@ fn start_pageserver(
};
// Set up deletion queue
let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) =
let (deletion_queue, deletion_workers) =
DeletionQueue::new(remote_storage.clone(), control_plane_client, conf);
if let Some(mut deletion_frontend) = deletion_frontend {
BACKGROUND_RUNTIME.spawn(async move {
deletion_frontend
.background()
.instrument(info_span!(parent:None, "deletion frontend"))
.await
});
}
if let Some(mut deletion_backend) = deletion_backend {
BACKGROUND_RUNTIME.spawn(async move {
deletion_backend
.background()
.instrument(info_span!(parent: None, "deletion backend"))
.await
});
}
if let Some(mut deletion_executor) = deletion_executor {
BACKGROUND_RUNTIME.spawn(async move {
deletion_executor
.background()
.instrument(info_span!(parent: None, "deletion executor"))
.await
});
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
}
// Up to this point no significant I/O has been done: this should have been fast. Record

View File

@@ -20,15 +20,16 @@ use serde_with::serde_as;
use thiserror::Error;
use tokio;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tracing::{self, debug, error};
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
pub(crate) use self::backend::BackendQueueWorker;
use self::backend::BackendQueueWorker;
use self::executor::ExecutorWorker;
use self::frontend::DeletionOp;
pub(crate) use self::frontend::FrontendQueueWorker;
use self::frontend::FrontendQueueWorker;
use self::frontend::RecoverOp;
use backend::BackendQueueMessage;
use executor::ExecutorMessage;
@@ -77,6 +78,45 @@ pub struct DeletionQueue {
cancel: CancellationToken,
}
/// Opaque wrapper around individual worker tasks, to avoid making the
/// worker objects themselves public
pub struct DeletionQueueWorkers {
frontend: FrontendQueueWorker,
backend: BackendQueueWorker,
executor: ExecutorWorker,
}
impl DeletionQueueWorkers {
pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
let jh_frontend = runtime.spawn(async move {
self.frontend
.background()
.instrument(tracing::info_span!(parent:None, "deletion frontend"))
.await
});
let jh_backend = runtime.spawn(async move {
self.backend
.background()
.instrument(tracing::info_span!(parent:None, "deletion backend"))
.await
});
let jh_executor = runtime.spawn(async move {
self.executor
.background()
.instrument(tracing::info_span!(parent:None, "deletion executor"))
.await
});
runtime.spawn({
async move {
jh_frontend.await.expect("error joining frontend worker");
jh_backend.await.expect("error joining backend worker");
drop(jh_executor.await.expect("error joining executor worker"));
}
})
}
}
#[derive(Debug)]
struct FlushOp {
tx: tokio::sync::oneshot::Sender<()>,
@@ -599,12 +639,7 @@ impl DeletionQueue {
remote_storage: Option<GenericRemoteStorage>,
control_plane_client: Option<Arc<dyn ControlPlaneGenerationsApi + Send + Sync>>,
conf: &'static PageServerConf,
) -> (
Self,
Option<FrontendQueueWorker>,
Option<BackendQueueWorker>,
Option<ExecutorWorker>,
) {
) -> (Self, Option<DeletionQueueWorkers>) {
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
@@ -634,8 +669,6 @@ impl DeletionQueue {
cancel,
},
None,
None,
None,
)
}
Some(r) => r,
@@ -650,25 +683,18 @@ impl DeletionQueue {
},
cancel: cancel.clone(),
},
Some(FrontendQueueWorker::new(
conf,
rx,
backend_tx,
cancel.clone(),
)),
Some(BackendQueueWorker::new(
conf,
backend_rx,
executor_tx,
control_plane_client,
lsn_table.clone(),
cancel.clone(),
)),
Some(ExecutorWorker::new(
remote_storage,
executor_rx,
cancel.clone(),
)),
Some(DeletionQueueWorkers {
frontend: FrontendQueueWorker::new(conf, rx, backend_tx, cancel.clone()),
backend: BackendQueueWorker::new(
conf,
backend_rx,
executor_tx,
control_plane_client,
lsn_table.clone(),
cancel.clone(),
),
executor: ExecutorWorker::new(remote_storage, executor_rx, cancel.clone()),
}),
)
}
@@ -741,38 +767,31 @@ mod test {
storage: GenericRemoteStorage,
mock_control_plane: Arc<MockControlPlane>,
deletion_queue: DeletionQueue,
fe_worker: JoinHandle<()>,
be_worker: JoinHandle<()>,
ex_worker: JoinHandle<()>,
worker_join: JoinHandle<()>,
}
impl TestSetup {
/// Simulate a pageserver restart by destroying and recreating the deletion queue
async fn restart(&mut self) {
let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new(
let (deletion_queue, workers) = DeletionQueue::new(
Some(self.storage.clone()),
Some(self.mock_control_plane.clone()),
self.harness.conf,
);
self.deletion_queue = deletion_queue;
tracing::debug!("Spawning worker for new queue queue");
let worker_join = workers
.unwrap()
.spawn_with(&tokio::runtime::Handle::current());
let mut fe_worker = fe_worker.unwrap();
let mut be_worker = be_worker.unwrap();
let mut ex_worker = ex_worker.unwrap();
let mut fe_worker = tokio::spawn(async move { fe_worker.background().await });
let mut be_worker = tokio::spawn(async move { be_worker.background().await });
let mut ex_worker = tokio::spawn(async move {
drop(ex_worker.background().await);
});
std::mem::swap(&mut self.fe_worker, &mut fe_worker);
std::mem::swap(&mut self.be_worker, &mut be_worker);
std::mem::swap(&mut self.ex_worker, &mut ex_worker);
let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join);
let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue);
// Join the old workers
fe_worker.await.unwrap();
be_worker.await.unwrap();
ex_worker.await.unwrap();
tracing::debug!("Joining worker from previous queue");
old_deletion_queue.cancel.cancel();
old_worker_join
.await
.expect("Failed to join workers for previous deletion queue");
}
fn set_latest_generation(&self, gen: Generation) {
@@ -871,20 +890,14 @@ mod test {
let mock_control_plane = Arc::new(MockControlPlane::new());
let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new(
let (deletion_queue, worker) = DeletionQueue::new(
Some(storage.clone()),
Some(mock_control_plane.clone()),
harness.conf,
);
let mut fe_worker = fe_worker.unwrap();
let mut be_worker = be_worker.unwrap();
let mut ex_worker = ex_worker.unwrap();
let fe_worker_join = tokio::spawn(async move { fe_worker.background().await });
let be_worker_join = tokio::spawn(async move { be_worker.background().await });
let ex_worker_join = tokio::spawn(async move {
drop(ex_worker.background().await);
});
let worker = worker.unwrap();
let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());
Ok(TestSetup {
harness,
@@ -892,9 +905,7 @@ mod test {
storage,
mock_control_plane,
deletion_queue,
fe_worker: fe_worker_join,
be_worker: be_worker_join,
ex_worker: ex_worker_join,
worker_join,
})
}

View File

@@ -34,7 +34,7 @@ pub(super) enum BackendQueueMessage {
Delete(DeletionList),
Flush(FlushOp),
}
pub struct BackendQueueWorker {
pub(super) struct BackendQueueWorker {
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
@@ -103,7 +103,7 @@ impl BackendQueueWorker {
///
/// Valid LSN updates propagate back to their result channel immediately, valid DeletionLists
/// go into the queue of ready-to-execute lists.
pub async fn validate(&mut self) -> Result<(), DeletionQueueError> {
async fn validate(&mut self) -> Result<(), DeletionQueueError> {
let mut tenant_generations = HashMap::new();
for list in &self.pending_lists {
for (tenant_id, tenant_list) in &list.tenants {
@@ -241,7 +241,7 @@ impl BackendQueueWorker {
Ok(())
}
pub async fn flush(&mut self) -> Result<(), DeletionQueueError> {
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
// Issue any required generation validation calls to the control plane
@@ -296,7 +296,7 @@ impl BackendQueueWorker {
rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
}
pub async fn background(&mut self) {
pub(super) async fn background(&mut self) {
tracing::info!("Started deletion backend worker");
while !self.cancel.is_cancelled() {

View File

@@ -21,7 +21,7 @@ pub(super) enum ExecutorMessage {
/// Non-persistent deletion queue, for coalescing multiple object deletes into
/// larger DeleteObjects requests.
pub struct ExecutorWorker {
pub(super) struct ExecutorWorker {
// Accumulate up to 1000 keys for the next deletion operation
accumulator: Vec<RemotePath>,
@@ -46,7 +46,7 @@ impl ExecutorWorker {
}
/// Wrap the remote `delete_objects` with a failpoint
pub async fn remote_delete(&self) -> Result<(), anyhow::Error> {
async fn remote_delete(&self) -> Result<(), anyhow::Error> {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
DELETION_QUEUE_ERRORS
@@ -59,7 +59,7 @@ impl ExecutorWorker {
}
/// Block until everything in accumulator has been executed
pub async fn flush(&mut self) -> Result<(), DeletionQueueError> {
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
match self.remote_delete().await {
Ok(()) => {
@@ -91,7 +91,7 @@ impl ExecutorWorker {
}
}
pub async fn background(&mut self) -> Result<(), DeletionQueueError> {
pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> {
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
loop {

View File

@@ -69,7 +69,7 @@ pub(super) enum FrontendQueueMessage {
Recover(RecoverOp),
}
pub struct FrontendQueueWorker {
pub(super) struct FrontendQueueWorker {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
@@ -315,7 +315,7 @@ impl FrontendQueueWorker {
/// This is the front-end ingest, where we bundle up deletion requests into DeletionList
/// and write them out, for later validation by the backend and execution by the executor.
pub async fn background(&mut self) {
pub(super) async fn background(&mut self) {
info!("Started deletion frontend worker");
// Synchronous, but we only do it once per process lifetime so it's tolerable