From 0b3592921179fee85ab28725a6601213d195ba53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 29 Apr 2025 14:46:15 +0200 Subject: [PATCH] Make SafekeeperReconciler parallel via semaphore (#11757) Right now we only support running one reconciliation per safekeeper. This is of course usually way below of what a safekeeper can do. Therefore, introduce a semaphore and spawn the tasks asynchronously as they come in. Part of #11670 --- storage_controller/src/main.rs | 10 +++- storage_controller/src/service.rs | 4 ++ .../src/service/safekeeper_reconciler.rs | 59 +++++++++++++------ 3 files changed, 55 insertions(+), 18 deletions(-) diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 71dde9e126..2eea2f9d10 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -19,7 +19,8 @@ use storage_controller::service::chaos_injector::ChaosInjector; use storage_controller::service::{ Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, - PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, Service, + PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, + SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT, Service, }; use tokio::signal::unix::SignalKind; use tokio_util::sync::CancellationToken; @@ -132,6 +133,10 @@ struct Cli { #[arg(long)] priority_reconciler_concurrency: Option, + /// Maximum number of safekeeper reconciliations that may run in parallel (per safekeeper) + #[arg(long)] + safekeeper_reconciler_concurrency: Option, + /// Tenant API rate limit, as requests per second per tenant. #[arg(long, default_value = "10")] tenant_rate_limit: NonZeroU32, @@ -403,6 +408,9 @@ async fn async_main() -> anyhow::Result<()> { priority_reconciler_concurrency: args .priority_reconciler_concurrency .unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT), + safekeeper_reconciler_concurrency: args + .safekeeper_reconciler_concurrency + .unwrap_or(SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT), tenant_rate_limit: args.tenant_rate_limit, split_threshold: args.split_threshold, max_split_shards: args.max_split_shards, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 0f71a87f13..50ce559cc0 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -194,6 +194,7 @@ pub(crate) enum LeadershipStatus { pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128; pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256; +pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32; // Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately. // This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly @@ -382,6 +383,9 @@ pub struct Config { /// How many high-priority Reconcilers may be spawned concurrently pub priority_reconciler_concurrency: usize, + /// How many safekeeper reconciles may happen concurrently (per safekeeper) + pub safekeeper_reconciler_concurrency: usize, + /// How many API requests per second to allow per tenant, across all /// tenant-scoped API endpoints. Further API requests queue until ready. pub tenant_rate_limit: NonZeroU32, diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index b15772a36c..74308cabff 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -3,7 +3,10 @@ use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use clashmap::{ClashMap, Entry}; use safekeeper_api::models::PullTimelineRequest; use safekeeper_client::mgmt_api; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::{ + Semaphore, + mpsc::{self, UnboundedReceiver, UnboundedSender}, +}; use tokio_util::sync::CancellationToken; use tracing::Instrument; use utils::{ @@ -206,18 +209,27 @@ impl ReconcilerHandle { } pub(crate) struct SafekeeperReconciler { - service: Arc, + inner: SafekeeperReconcilerInner, + concurrency_limiter: Arc, rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>, cancel: CancellationToken, } +/// Thin wrapper over `Service` to not clutter its inherent functions +#[derive(Clone)] +struct SafekeeperReconcilerInner { + service: Arc, +} + impl SafekeeperReconciler { fn spawn(cancel: CancellationToken, service: Arc) -> ReconcilerHandle { // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking. let (tx, rx) = mpsc::unbounded_channel(); + let concurrency = service.config.safekeeper_reconciler_concurrency; let mut reconciler = SafekeeperReconciler { - service, + inner: SafekeeperReconcilerInner { service }, rx, + concurrency_limiter: Arc::new(Semaphore::new(concurrency)), cancel: cancel.clone(), }; let handle = ReconcilerHandle { @@ -230,31 +242,44 @@ impl SafekeeperReconciler { } async fn run(&mut self) { loop { - // TODO add parallelism with semaphore here let req = tokio::select! { req = self.rx.recv() => req, _ = self.cancel.cancelled() => break, }; let Some((req, req_cancel)) = req else { break }; + + let permit_res = tokio::select! { + req = self.concurrency_limiter.clone().acquire_owned() => req, + _ = self.cancel.cancelled() => break, + }; + let Ok(_permit) = permit_res else { return }; + + let inner = self.inner.clone(); if req_cancel.is_cancelled() { continue; } - let kind = req.kind; - let tenant_id = req.tenant_id; - let timeline_id = req.timeline_id; - let node_id = req.safekeeper.skp.id; - self.reconcile_one(req, req_cancel) - .instrument(tracing::info_span!( - "reconcile_one", - ?kind, - %tenant_id, - ?timeline_id, - %node_id, - )) - .await; + tokio::task::spawn(async move { + let kind = req.kind; + let tenant_id = req.tenant_id; + let timeline_id = req.timeline_id; + let node_id = req.safekeeper.skp.id; + inner + .reconcile_one(req, req_cancel) + .instrument(tracing::info_span!( + "reconcile_one", + ?kind, + %tenant_id, + ?timeline_id, + %node_id, + )) + .await; + }); } } +} + +impl SafekeeperReconcilerInner { async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) { let req_host = req.safekeeper.skp.host.clone(); match req.kind {