refactor Arc<dyn> to generics for control plane client mocking

This commit is contained in:
John Spray
2023-09-12 16:02:58 +01:00
parent db9a49ed91
commit 20a3a9be70
4 changed files with 36 additions and 29 deletions

View File

@@ -8,7 +8,7 @@ use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::control_plane_client::{ControlPlaneClient, ControlPlaneGenerationsApi};
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
@@ -354,18 +354,12 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// Set up control plane client
let control_plane_client = match ControlPlaneClient::new(conf, &shutdown_pageserver) {
Some(c) => {
let inner: Arc<dyn ControlPlaneGenerationsApi + Send + Sync> = Arc::new(c);
Some(inner)
}
None => None,
};
// Set up deletion queue
let (deletion_queue, deletion_workers) =
DeletionQueue::new(remote_storage.clone(), control_plane_client, conf);
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
ControlPlaneClient::new(conf, &shutdown_pageserver),
conf,
);
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
}

View File

@@ -32,9 +32,6 @@ pub trait ControlPlaneGenerationsApi {
) -> anyhow::Result<HashMap<TenantId, bool>>;
}
unsafe impl Send for ControlPlaneClient {}
unsafe impl Sync for ControlPlaneClient {}
impl ControlPlaneClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.

View File

@@ -81,13 +81,19 @@ pub struct DeletionQueue {
/// Opaque wrapper around individual worker tasks, to avoid making the
/// worker objects themselves public
pub struct DeletionQueueWorkers {
pub struct DeletionQueueWorkers<C>
where
C: ControlPlaneGenerationsApi + Send + Sync,
{
frontend: FrontendQueueWorker,
backend: BackendQueueWorker,
backend: BackendQueueWorker<C>,
executor: ExecutorWorker,
}
impl DeletionQueueWorkers {
impl<C> DeletionQueueWorkers<C>
where
C: ControlPlaneGenerationsApi + Send + Sync + 'static,
{
pub fn spawn_with(mut self, runtime: &tokio::runtime::Handle) -> tokio::task::JoinHandle<()> {
let jh_frontend = runtime.spawn(async move {
self.frontend
@@ -629,11 +635,14 @@ impl DeletionQueue {
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
///
/// If remote_storage is None, then the returned workers will also be None.
pub fn new(
pub fn new<C>(
remote_storage: Option<GenericRemoteStorage>,
control_plane_client: Option<Arc<dyn ControlPlaneGenerationsApi + Send + Sync>>,
control_plane_client: Option<C>,
conf: &'static PageServerConf,
) -> (Self, Option<DeletionQueueWorkers>) {
) -> (Self, Option<DeletionQueueWorkers<C>>)
where
C: ControlPlaneGenerationsApi + Send + Sync,
{
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
@@ -759,7 +768,7 @@ mod test {
harness: TenantHarness,
remote_fs_dir: PathBuf,
storage: GenericRemoteStorage,
mock_control_plane: Arc<MockControlPlane>,
mock_control_plane: MockControlPlane,
deletion_queue: DeletionQueue,
worker_join: JoinHandle<()>,
}
@@ -820,14 +829,15 @@ mod test {
}
}
#[derive(Debug, Clone)]
struct MockControlPlane {
pub latest_generation: std::sync::Mutex<HashMap<TenantId, Generation>>,
pub latest_generation: std::sync::Arc<std::sync::Mutex<HashMap<TenantId, Generation>>>,
}
impl MockControlPlane {
fn new() -> Self {
Self {
latest_generation: std::sync::Mutex::new(HashMap::new()),
latest_generation: Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
}
@@ -882,7 +892,7 @@ mod test {
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let mock_control_plane = Arc::new(MockControlPlane::new());
let mock_control_plane = MockControlPlane::new();
let (deletion_queue, worker) = DeletionQueue::new(
Some(storage.clone()),

View File

@@ -34,13 +34,16 @@ pub(super) enum BackendQueueMessage {
Delete(DeletionList),
Flush(FlushOp),
}
pub(super) struct BackendQueueWorker {
pub(super) struct BackendQueueWorker<C>
where
C: ControlPlaneGenerationsApi,
{
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
// Client for calling into control plane API for validation of deletes
control_plane_client: Option<Arc<dyn ControlPlaneGenerationsApi + Send + Sync>>,
control_plane_client: Option<C>,
// DeletionLists which are waiting generation validation. Not safe to
// execute until [`validate`] has processed them.
@@ -60,12 +63,15 @@ pub(super) struct BackendQueueWorker {
cancel: CancellationToken,
}
impl BackendQueueWorker {
impl<C> BackendQueueWorker<C>
where
C: ControlPlaneGenerationsApi,
{
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
control_plane_client: Option<Arc<dyn ControlPlaneGenerationsApi + Send + Sync>>,
control_plane_client: Option<C>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken,
) -> Self {