From b76f08e8634256ad8d2000b69e49e18f26d0fcbc Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 12 Sep 2023 15:03:09 +0100 Subject: [PATCH] deletion queue: wrap workers in an opaque struct --- pageserver/src/bin/pageserver.rs | 27 +---- pageserver/src/deletion_queue.rs | 133 ++++++++++++---------- pageserver/src/deletion_queue/backend.rs | 8 +- pageserver/src/deletion_queue/executor.rs | 8 +- pageserver/src/deletion_queue/frontend.rs | 4 +- 5 files changed, 85 insertions(+), 95 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 94fd339391..dc7575f642 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -364,31 +364,10 @@ fn start_pageserver( }; // Set up deletion queue - let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) = + let (deletion_queue, deletion_workers) = DeletionQueue::new(remote_storage.clone(), control_plane_client, conf); - if let Some(mut deletion_frontend) = deletion_frontend { - BACKGROUND_RUNTIME.spawn(async move { - deletion_frontend - .background() - .instrument(info_span!(parent:None, "deletion frontend")) - .await - }); - } - if let Some(mut deletion_backend) = deletion_backend { - BACKGROUND_RUNTIME.spawn(async move { - deletion_backend - .background() - .instrument(info_span!(parent: None, "deletion backend")) - .await - }); - } - if let Some(mut deletion_executor) = deletion_executor { - BACKGROUND_RUNTIME.spawn(async move { - deletion_executor - .background() - .instrument(info_span!(parent: None, "deletion executor")) - .await - }); + if let Some(deletion_workers) = deletion_workers { + deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); } // Up to this point no significant I/O has been done: this should have been fast. Record diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 1c6d27da2c..2526786821 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -20,15 +20,16 @@ use serde_with::serde_as; use thiserror::Error; use tokio; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use tracing::{self, debug, error}; use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; -pub(crate) use self::backend::BackendQueueWorker; +use self::backend::BackendQueueWorker; use self::executor::ExecutorWorker; use self::frontend::DeletionOp; -pub(crate) use self::frontend::FrontendQueueWorker; +use self::frontend::FrontendQueueWorker; use self::frontend::RecoverOp; use backend::BackendQueueMessage; use executor::ExecutorMessage; @@ -77,6 +78,45 @@ pub struct DeletionQueue { cancel: CancellationToken, } +/// Opaque wrapper around individual worker tasks, to avoid making the +/// worker objects themselves public +pub struct DeletionQueueWorkers { + frontend: FrontendQueueWorker, + backend: BackendQueueWorker, + executor: ExecutorWorker, +} + +impl DeletionQueueWorkers { + pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> { + let jh_frontend = runtime.spawn(async move { + self.frontend + .background() + .instrument(tracing::info_span!(parent:None, "deletion frontend")) + .await + }); + let jh_backend = runtime.spawn(async move { + self.backend + .background() + .instrument(tracing::info_span!(parent:None, "deletion backend")) + .await + }); + let jh_executor = runtime.spawn(async move { + self.executor + .background() + .instrument(tracing::info_span!(parent:None, "deletion executor")) + .await + }); + + runtime.spawn({ + async move { + jh_frontend.await.expect("error joining frontend worker"); + jh_backend.await.expect("error joining backend worker"); + drop(jh_executor.await.expect("error joining executor worker")); + } + }) + } +} + #[derive(Debug)] struct FlushOp { tx: tokio::sync::oneshot::Sender<()>, @@ -599,12 +639,7 @@ impl DeletionQueue { remote_storage: Option, control_plane_client: Option>, conf: &'static PageServerConf, - ) -> ( - Self, - Option, - Option, - Option, - ) { + ) -> (Self, Option) { // Deep channel: it consumes deletions from all timelines and we do not want to block them let (tx, rx) = tokio::sync::mpsc::channel(16384); @@ -634,8 +669,6 @@ impl DeletionQueue { cancel, }, None, - None, - None, ) } Some(r) => r, @@ -650,25 +683,18 @@ impl DeletionQueue { }, cancel: cancel.clone(), }, - Some(FrontendQueueWorker::new( - conf, - rx, - backend_tx, - cancel.clone(), - )), - Some(BackendQueueWorker::new( - conf, - backend_rx, - executor_tx, - control_plane_client, - lsn_table.clone(), - cancel.clone(), - )), - Some(ExecutorWorker::new( - remote_storage, - executor_rx, - cancel.clone(), - )), + Some(DeletionQueueWorkers { + frontend: FrontendQueueWorker::new(conf, rx, backend_tx, cancel.clone()), + backend: BackendQueueWorker::new( + conf, + backend_rx, + executor_tx, + control_plane_client, + lsn_table.clone(), + cancel.clone(), + ), + executor: ExecutorWorker::new(remote_storage, executor_rx, cancel.clone()), + }), ) } @@ -741,38 +767,31 @@ mod test { storage: GenericRemoteStorage, mock_control_plane: Arc, deletion_queue: DeletionQueue, - fe_worker: JoinHandle<()>, - be_worker: JoinHandle<()>, - ex_worker: JoinHandle<()>, + worker_join: JoinHandle<()>, } impl TestSetup { /// Simulate a pageserver restart by destroying and recreating the deletion queue async fn restart(&mut self) { - let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new( + let (deletion_queue, workers) = DeletionQueue::new( Some(self.storage.clone()), Some(self.mock_control_plane.clone()), self.harness.conf, ); - self.deletion_queue = deletion_queue; + tracing::debug!("Spawning worker for new queue queue"); + let worker_join = workers + .unwrap() + .spawn_with(&tokio::runtime::Handle::current()); - let mut fe_worker = fe_worker.unwrap(); - let mut be_worker = be_worker.unwrap(); - let mut ex_worker = ex_worker.unwrap(); - let mut fe_worker = tokio::spawn(async move { fe_worker.background().await }); - let mut be_worker = tokio::spawn(async move { be_worker.background().await }); - let mut ex_worker = tokio::spawn(async move { - drop(ex_worker.background().await); - }); - std::mem::swap(&mut self.fe_worker, &mut fe_worker); - std::mem::swap(&mut self.be_worker, &mut be_worker); - std::mem::swap(&mut self.ex_worker, &mut ex_worker); + let old_worker_join = std::mem::replace(&mut self.worker_join, worker_join); + let old_deletion_queue = std::mem::replace(&mut self.deletion_queue, deletion_queue); - // Join the old workers - fe_worker.await.unwrap(); - be_worker.await.unwrap(); - ex_worker.await.unwrap(); + tracing::debug!("Joining worker from previous queue"); + old_deletion_queue.cancel.cancel(); + old_worker_join + .await + .expect("Failed to join workers for previous deletion queue"); } fn set_latest_generation(&self, gen: Generation) { @@ -871,20 +890,14 @@ mod test { let mock_control_plane = Arc::new(MockControlPlane::new()); - let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new( + let (deletion_queue, worker) = DeletionQueue::new( Some(storage.clone()), Some(mock_control_plane.clone()), harness.conf, ); - let mut fe_worker = fe_worker.unwrap(); - let mut be_worker = be_worker.unwrap(); - let mut ex_worker = ex_worker.unwrap(); - let fe_worker_join = tokio::spawn(async move { fe_worker.background().await }); - let be_worker_join = tokio::spawn(async move { be_worker.background().await }); - let ex_worker_join = tokio::spawn(async move { - drop(ex_worker.background().await); - }); + let worker = worker.unwrap(); + let worker_join = worker.spawn_with(&tokio::runtime::Handle::current()); Ok(TestSetup { harness, @@ -892,9 +905,7 @@ mod test { storage, mock_control_plane, deletion_queue, - fe_worker: fe_worker_join, - be_worker: be_worker_join, - ex_worker: ex_worker_join, + worker_join, }) } diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index f8410eb52e..f214616be9 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -34,7 +34,7 @@ pub(super) enum BackendQueueMessage { Delete(DeletionList), Flush(FlushOp), } -pub struct BackendQueueWorker { +pub(super) struct BackendQueueWorker { conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, @@ -103,7 +103,7 @@ impl BackendQueueWorker { /// /// Valid LSN updates propagate back to their result channel immediately, valid DeletionLists /// go into the queue of ready-to-execute lists. - pub async fn validate(&mut self) -> Result<(), DeletionQueueError> { + async fn validate(&mut self) -> Result<(), DeletionQueueError> { let mut tenant_generations = HashMap::new(); for list in &self.pending_lists { for (tenant_id, tenant_list) in &list.tenants { @@ -241,7 +241,7 @@ impl BackendQueueWorker { Ok(()) } - pub async fn flush(&mut self) -> Result<(), DeletionQueueError> { + async fn flush(&mut self) -> Result<(), DeletionQueueError> { tracing::debug!("Flushing with {} pending lists", self.pending_lists.len()); // Issue any required generation validation calls to the control plane @@ -296,7 +296,7 @@ impl BackendQueueWorker { rx.await.map_err(|_| DeletionQueueError::ShuttingDown) } - pub async fn background(&mut self) { + pub(super) async fn background(&mut self) { tracing::info!("Started deletion backend worker"); while !self.cancel.is_cancelled() { diff --git a/pageserver/src/deletion_queue/executor.rs b/pageserver/src/deletion_queue/executor.rs index 4d47695aa2..3e0b134d90 100644 --- a/pageserver/src/deletion_queue/executor.rs +++ b/pageserver/src/deletion_queue/executor.rs @@ -21,7 +21,7 @@ pub(super) enum ExecutorMessage { /// Non-persistent deletion queue, for coalescing multiple object deletes into /// larger DeleteObjects requests. -pub struct ExecutorWorker { +pub(super) struct ExecutorWorker { // Accumulate up to 1000 keys for the next deletion operation accumulator: Vec, @@ -46,7 +46,7 @@ impl ExecutorWorker { } /// Wrap the remote `delete_objects` with a failpoint - pub async fn remote_delete(&self) -> Result<(), anyhow::Error> { + async fn remote_delete(&self) -> Result<(), anyhow::Error> { fail::fail_point!("deletion-queue-before-execute", |_| { info!("Skipping execution, failpoint set"); DELETION_QUEUE_ERRORS @@ -59,7 +59,7 @@ impl ExecutorWorker { } /// Block until everything in accumulator has been executed - pub async fn flush(&mut self) -> Result<(), DeletionQueueError> { + async fn flush(&mut self) -> Result<(), DeletionQueueError> { while !self.accumulator.is_empty() && !self.cancel.is_cancelled() { match self.remote_delete().await { Ok(()) => { @@ -91,7 +91,7 @@ impl ExecutorWorker { } } - pub async fn background(&mut self) -> Result<(), DeletionQueueError> { + pub(super) async fn background(&mut self) -> Result<(), DeletionQueueError> { self.accumulator.reserve(MAX_KEYS_PER_DELETE); loop { diff --git a/pageserver/src/deletion_queue/frontend.rs b/pageserver/src/deletion_queue/frontend.rs index 54510202ba..b9abd1dc99 100644 --- a/pageserver/src/deletion_queue/frontend.rs +++ b/pageserver/src/deletion_queue/frontend.rs @@ -69,7 +69,7 @@ pub(super) enum FrontendQueueMessage { Recover(RecoverOp), } -pub struct FrontendQueueWorker { +pub(super) struct FrontendQueueWorker { conf: &'static PageServerConf, // Incoming frontend requests to delete some keys @@ -315,7 +315,7 @@ impl FrontendQueueWorker { /// This is the front-end ingest, where we bundle up deletion requests into DeletionList /// and write them out, for later validation by the backend and execution by the executor. - pub async fn background(&mut self) { + pub(super) async fn background(&mut self) { info!("Started deletion frontend worker"); // Synchronous, but we only do it once per process lifetime so it's tolerable