diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index f1454af533..ce8f8d0cdd 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -66,6 +66,10 @@ struct Cli { #[arg(long)] max_unavailable_interval: Option, + /// Size threshold for automatically splitting shards (disabled by default) + #[arg(long)] + split_threshold: Option, + /// Maximum number of reconcilers that may run in parallel #[arg(long)] reconciler_concurrency: Option, @@ -255,6 +259,7 @@ async fn async_main() -> anyhow::Result<()> { reconciler_concurrency: args .reconciler_concurrency .unwrap_or(RECONCILER_CONCURRENCY_DEFAULT), + split_threshold: args.split_threshold, }; // After loading secrets & config, but before starting anything else, apply database migrations diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index 25b6b67e12..16f6861115 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -2,7 +2,7 @@ use pageserver_api::{ models::{ LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress, TenantScanRemoteStorageResponse, TenantShardSplitRequest, TenantShardSplitResponse, - TimelineCreateRequest, TimelineInfo, + TimelineCreateRequest, TimelineInfo, TopNTenantShardsRequest, TopNTenantShardsResponse, }, shard::TenantShardId, }; @@ -234,4 +234,16 @@ impl PageserverClient { self.inner.get_utilization().await ) } + + pub(crate) async fn top_n_tenant_shards( + &self, + request: TopNTenantShardsRequest, + ) -> Result { + measured_request!( + "top_n_tenants", + crate::metrics::Method::Post, + &self.node_id_label, + self.inner.top_n_tenant_shards(request).await + ) + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ae7e8d3d7d..865cd1f887 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -32,10 +32,10 @@ use pageserver_api::{ TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore, }, - models::{SecondaryProgress, TenantConfigRequest}, + models::{SecondaryProgress, TenantConfigRequest, TopNTenantShardsRequest}, }; use reqwest::StatusCode; -use tracing::instrument; +use tracing::{instrument, Instrument}; use crate::pageserver_client::PageserverClient; use pageserver_api::{ @@ -222,6 +222,10 @@ pub struct Config { /// How many Reconcilers may be spawned concurrently pub reconciler_concurrency: usize, + + /// How large must a shard grow in bytes before we split it? + /// None disables auto-splitting. + pub split_threshold: Option, } impl From for ApiError { @@ -699,7 +703,7 @@ impl Service { /// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible /// for those retries. #[instrument(skip_all)] - async fn background_reconcile(&self) { + async fn background_reconcile(self: &Arc) { self.startup_complete.clone().wait().await; const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20); @@ -711,7 +715,11 @@ impl Service { let reconciles_spawned = self.reconcile_all(); if reconciles_spawned == 0 { // Run optimizer only when we didn't find any other work to do - self.optimize_all().await; + let optimizations = self.optimize_all().await; + if optimizations == 0 { + // Run new splits only when no optimizations are pending + self.autosplit_tenants().await; + } } } _ = self.cancel.cancelled() => return @@ -4766,6 +4774,104 @@ impl Service { validated_work } + /// Look for shards which are oversized and in need of splitting + async fn autosplit_tenants(self: &Arc) { + let Some(split_threshold) = self.config.split_threshold else { + // Auto-splitting is disabled + return; + }; + + let nodes = self.inner.read().unwrap().nodes.clone(); + + const SPLIT_TO_MAX: ShardCount = ShardCount::new(8); + + let mut top_n = Vec::new(); + + // Call into each node to look for big tenants + let top_n_request = TopNTenantShardsRequest { + // We currently split based on logical size, for simplicity: logical size is a signal of + // the user's intent to run a large database, whereas physical/resident size can be symptoms + // of compaction issues. Eventually we should switch to using resident size to bound the + // disk space impact of one shard. + order_by: models::TopNSorting::MaxLogicalSize, + limit: 10, + where_shards_lt: Some(SPLIT_TO_MAX), + where_gt: Some(split_threshold), + }; + for node in nodes.values() { + let request_ref = &top_n_request; + match node + .with_client_retries( + |client| async move { + let request = request_ref.clone(); + client.top_n_tenant_shards(request.clone()).await + }, + &self.config.jwt_token, + 3, + 3, + Duration::from_secs(5), + &self.cancel, + ) + .await + { + Some(Ok(node_top_n)) => { + top_n.extend(node_top_n.shards.into_iter()); + } + Some(Err(mgmt_api::Error::Cancelled)) => { + continue; + } + Some(Err(e)) => { + tracing::warn!("Failed to fetch top N tenants from {node}: {e}"); + continue; + } + None => { + // Node is shutting down + continue; + } + }; + } + + // Pick the biggest tenant to split first + top_n.sort_by_key(|i| i.resident_size); + let Some(split_candidate) = top_n.into_iter().next() else { + tracing::debug!("No split-elegible shards found"); + return; + }; + + // We spawn a task to run this, so it's exactly like some external API client requesting it. We don't + // want to block the background reconcile loop on this. + tracing::info!("Auto-splitting tenant for size threshold {split_threshold}: current size {split_candidate:?}"); + + let this = self.clone(); + tokio::spawn( + async move { + match this + .tenant_shard_split( + split_candidate.id.tenant_id, + TenantShardSplitRequest { + // Always split to the max number of shards: this avoids stepping through + // intervening shard counts and encountering the overrhead of a split+cleanup + // each time as a tenant grows, and is not too expensive because our max shard + // count is relatively low anyway. + // This policy will be adjusted in future once we support higher shard count. + new_shard_count: SPLIT_TO_MAX.literal(), + new_stripe_size: Some(ShardParameters::DEFAULT_STRIPE_SIZE), + }, + ) + .await + { + Ok(_) => { + tracing::info!("Successful auto-split"); + } + Err(e) => { + tracing::error!("Auto-split failed: {e}"); + } + } + } + .instrument(tracing::info_span!("auto_split", tenant_id=%split_candidate.id.tenant_id)), + ); + } + /// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but /// also wait for any generated Reconcilers to complete. Calling this until it returns zero should /// put the system into a quiescent state where future background reconciliations won't do anything.