Make SafekeeperReconciler parallel via semaphore (#11757)

Right now we only support running one reconciliation per safekeeper.
This is of course usually way below of what a safekeeper can do.
Therefore, introduce a semaphore and spawn the tasks asynchronously as
they come in.

Part of #11670
This commit is contained in:
Arpad Müller
2025-04-29 14:46:15 +02:00
committed by GitHub
parent b3db7f66ac
commit 0b35929211
3 changed files with 55 additions and 18 deletions

View File

@@ -19,7 +19,8 @@ use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::{
Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, Service,
PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT, Service,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
@@ -132,6 +133,10 @@ struct Cli {
#[arg(long)]
priority_reconciler_concurrency: Option<usize>,
/// Maximum number of safekeeper reconciliations that may run in parallel (per safekeeper)
#[arg(long)]
safekeeper_reconciler_concurrency: Option<usize>,
/// Tenant API rate limit, as requests per second per tenant.
#[arg(long, default_value = "10")]
tenant_rate_limit: NonZeroU32,
@@ -403,6 +408,9 @@ async fn async_main() -> anyhow::Result<()> {
priority_reconciler_concurrency: args
.priority_reconciler_concurrency
.unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT),
safekeeper_reconciler_concurrency: args
.safekeeper_reconciler_concurrency
.unwrap_or(SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT),
tenant_rate_limit: args.tenant_rate_limit,
split_threshold: args.split_threshold,
max_split_shards: args.max_split_shards,

View File

@@ -194,6 +194,7 @@ pub(crate) enum LeadershipStatus {
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
@@ -382,6 +383,9 @@ pub struct Config {
/// How many high-priority Reconcilers may be spawned concurrently
pub priority_reconciler_concurrency: usize,
/// How many safekeeper reconciles may happen concurrently (per safekeeper)
pub safekeeper_reconciler_concurrency: usize,
/// How many API requests per second to allow per tenant, across all
/// tenant-scoped API endpoints. Further API requests queue until ready.
pub tenant_rate_limit: NonZeroU32,

View File

@@ -3,7 +3,10 @@ use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use clashmap::{ClashMap, Entry};
use safekeeper_api::models::PullTimelineRequest;
use safekeeper_client::mgmt_api;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::{
Semaphore,
mpsc::{self, UnboundedReceiver, UnboundedSender},
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
@@ -206,18 +209,27 @@ impl ReconcilerHandle {
}
pub(crate) struct SafekeeperReconciler {
service: Arc<Service>,
inner: SafekeeperReconcilerInner,
concurrency_limiter: Arc<Semaphore>,
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
cancel: CancellationToken,
}
/// Thin wrapper over `Service` to not clutter its inherent functions
#[derive(Clone)]
struct SafekeeperReconcilerInner {
service: Arc<Service>,
}
impl SafekeeperReconciler {
fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
// We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
let (tx, rx) = mpsc::unbounded_channel();
let concurrency = service.config.safekeeper_reconciler_concurrency;
let mut reconciler = SafekeeperReconciler {
service,
inner: SafekeeperReconcilerInner { service },
rx,
concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
cancel: cancel.clone(),
};
let handle = ReconcilerHandle {
@@ -230,31 +242,44 @@ impl SafekeeperReconciler {
}
async fn run(&mut self) {
loop {
// TODO add parallelism with semaphore here
let req = tokio::select! {
req = self.rx.recv() => req,
_ = self.cancel.cancelled() => break,
};
let Some((req, req_cancel)) = req else { break };
let permit_res = tokio::select! {
req = self.concurrency_limiter.clone().acquire_owned() => req,
_ = self.cancel.cancelled() => break,
};
let Ok(_permit) = permit_res else { return };
let inner = self.inner.clone();
if req_cancel.is_cancelled() {
continue;
}
let kind = req.kind;
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
self.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
%tenant_id,
?timeline_id,
%node_id,
))
.await;
tokio::task::spawn(async move {
let kind = req.kind;
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
inner
.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
%tenant_id,
?timeline_id,
%node_id,
))
.await;
});
}
}
}
impl SafekeeperReconcilerInner {
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
let req_host = req.safekeeper.skp.host.clone();
match req.kind {