From 20a3a9be7024f7fb9505c64152764df82cb29b0b Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 12 Sep 2023 16:02:58 +0100 Subject: [PATCH] refactor Arc to generics for control plane client mocking --- pageserver/src/bin/pageserver.rs | 18 +++++--------- pageserver/src/control_plane_client.rs | 3 --- pageserver/src/deletion_queue.rs | 30 ++++++++++++++++-------- pageserver/src/deletion_queue/backend.rs | 14 +++++++---- 4 files changed, 36 insertions(+), 29 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index dc7575f642..795914107e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,7 +8,7 @@ use anyhow::{anyhow, Context}; use clap::{Arg, ArgAction, Command}; use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp}; -use pageserver::control_plane_client::{ControlPlaneClient, ControlPlaneGenerationsApi}; +use pageserver::control_plane_client::ControlPlaneClient; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; use pageserver::task_mgr::WALRECEIVER_RUNTIME; @@ -354,18 +354,12 @@ fn start_pageserver( // Set up remote storage client let remote_storage = create_remote_storage_client(conf)?; - // Set up control plane client - let control_plane_client = match ControlPlaneClient::new(conf, &shutdown_pageserver) { - Some(c) => { - let inner: Arc = Arc::new(c); - Some(inner) - } - None => None, - }; - // Set up deletion queue - let (deletion_queue, deletion_workers) = - DeletionQueue::new(remote_storage.clone(), control_plane_client, conf); + let (deletion_queue, deletion_workers) = DeletionQueue::new( + remote_storage.clone(), + ControlPlaneClient::new(conf, &shutdown_pageserver), + conf, + ); if let Some(deletion_workers) = deletion_workers { deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); } diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index bd09e9c4d7..0953af0a47 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -32,9 +32,6 @@ pub trait ControlPlaneGenerationsApi { ) -> anyhow::Result>; } -unsafe impl Send for ControlPlaneClient {} -unsafe impl Sync for ControlPlaneClient {} - impl ControlPlaneClient { /// A None return value indicates that the input `conf` object does not have control /// plane API enabled. diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 7e321ca744..a65e39dcd0 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -81,13 +81,19 @@ pub struct DeletionQueue { /// Opaque wrapper around individual worker tasks, to avoid making the /// worker objects themselves public -pub struct DeletionQueueWorkers { +pub struct DeletionQueueWorkers +where + C: ControlPlaneGenerationsApi + Send + Sync, +{ frontend: FrontendQueueWorker, - backend: BackendQueueWorker, + backend: BackendQueueWorker, executor: ExecutorWorker, } -impl DeletionQueueWorkers { +impl DeletionQueueWorkers +where + C: ControlPlaneGenerationsApi + Send + Sync + 'static, +{ pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> { let jh_frontend = runtime.spawn(async move { self.frontend @@ -629,11 +635,14 @@ impl DeletionQueue { /// we don't spawn those inside new() so that the caller can use their runtime/spans of choice. /// /// If remote_storage is None, then the returned workers will also be None. - pub fn new( + pub fn new( remote_storage: Option, - control_plane_client: Option>, + control_plane_client: Option, conf: &'static PageServerConf, - ) -> (Self, Option) { + ) -> (Self, Option>) + where + C: ControlPlaneGenerationsApi + Send + Sync, + { // Deep channel: it consumes deletions from all timelines and we do not want to block them let (tx, rx) = tokio::sync::mpsc::channel(16384); @@ -759,7 +768,7 @@ mod test { harness: TenantHarness, remote_fs_dir: PathBuf, storage: GenericRemoteStorage, - mock_control_plane: Arc, + mock_control_plane: MockControlPlane, deletion_queue: DeletionQueue, worker_join: JoinHandle<()>, } @@ -820,14 +829,15 @@ mod test { } } + #[derive(Debug, Clone)] struct MockControlPlane { - pub latest_generation: std::sync::Mutex>, + pub latest_generation: std::sync::Arc>>, } impl MockControlPlane { fn new() -> Self { Self { - latest_generation: std::sync::Mutex::new(HashMap::new()), + latest_generation: Arc::new(std::sync::Mutex::new(HashMap::new())), } } } @@ -882,7 +892,7 @@ mod test { }; let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); - let mock_control_plane = Arc::new(MockControlPlane::new()); + let mock_control_plane = MockControlPlane::new(); let (deletion_queue, worker) = DeletionQueue::new( Some(storage.clone()), diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index f4b67d3d20..7777ded893 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -34,13 +34,16 @@ pub(super) enum BackendQueueMessage { Delete(DeletionList), Flush(FlushOp), } -pub(super) struct BackendQueueWorker { +pub(super) struct BackendQueueWorker +where + C: ControlPlaneGenerationsApi, +{ conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, // Client for calling into control plane API for validation of deletes - control_plane_client: Option>, + control_plane_client: Option, // DeletionLists which are waiting generation validation. Not safe to // execute until [`validate`] has processed them. @@ -60,12 +63,15 @@ pub(super) struct BackendQueueWorker { cancel: CancellationToken, } -impl BackendQueueWorker { +impl BackendQueueWorker +where + C: ControlPlaneGenerationsApi, +{ pub(super) fn new( conf: &'static PageServerConf, rx: tokio::sync::mpsc::Receiver, tx: tokio::sync::mpsc::Sender, - control_plane_client: Option>, + control_plane_client: Option, lsn_table: Arc>, cancel: CancellationToken, ) -> Self {