From eedd179f0c9f8bf5eb32dd6f3d4245b4595bbec3 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 7 Mar 2025 15:38:01 +0100 Subject: [PATCH] storcon: initial autosplit tweaks (#11134) ## Problem This patch makes some initial tweaks as preparation for https://github.com/neondatabase/cloud/issues/22532, where we will be introducing additional autosplit logic. The plan is outlined in https://github.com/neondatabase/cloud/issues/22532#issuecomment-2706215907. ## Summary of changes Minor code cleanups and behavioral changes: * Decide that we'll split based on `max_logical_size` (later possibly `total_logical_size`). * Fix a bug that would split the smallest candidate rather than the largest. * Pick the largest candidate by `max_logical_size` rather than `resident_size`, for consistency (debatable). * Split out `get_top_tenant_shards()` to fetch split candidates. * Fetch candidates concurrently from all nodes. * Make `TenantShard.get_scheduling_policy()` return a copy instead of a reference. --- storage_controller/src/service.rs | 184 ++++++++++++++----------- storage_controller/src/tenant_shard.rs | 4 +- 2 files changed, 102 insertions(+), 86 deletions(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index e12bd299ce..caa2040ce2 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -36,9 +36,9 @@ use pageserver_api::models::{ self, LocationConfig, LocationConfigListResponse, LocationConfigMode, PageserverUtilization, SecondaryProgress, ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest, TenantLocationConfigResponse, - TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse, + TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineInfo, - TopTenantShardsRequest, + TopTenantShardItem, TopTenantShardsRequest, }; use pageserver_api::shard::{ ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, @@ -53,7 +53,7 @@ use safekeeper_api::models::SafekeeperUtilization; use tokio::sync::TryAcquireError; use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, instrument}; +use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use utils::completion::Barrier; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; @@ -370,8 +370,12 @@ pub struct Config { /// tenant-scoped API endpoints. Further API requests queue until ready. pub tenant_rate_limit: NonZeroU32, - /// How large must a shard grow in bytes before we split it? - /// None disables auto-splitting. + /// The size at which an unsharded tenant should be split (into 8 shards). This uses the logical + /// size of the largest timeline in the shard (i.e. max_logical_size). + /// + /// None or 0 disables auto-splitting. + /// + /// TODO: consider using total logical size of all timelines instead. pub split_threshold: Option, // TODO: make this cfg(feature = "testing") @@ -4364,7 +4368,7 @@ impl Service { is_reconciling: shard.reconciler.is_some(), is_pending_compute_notification: shard.pending_compute_notification, is_splitting: matches!(shard.splitting, SplitState::Splitting), - scheduling_policy: *shard.get_scheduling_policy(), + scheduling_policy: shard.get_scheduling_policy(), preferred_az_id: shard.preferred_az().map(ToString::to_string), }) } @@ -7232,86 +7236,57 @@ impl Service { } } - /// Look for shards which are oversized and in need of splitting + /// Asynchronously split a tenant that's eligible for automatic splits: + /// + /// * The tenant is unsharded. + /// * The logical size of its largest timeline exceeds split_threshold. + /// * The tenant's scheduling policy is active. + /// + /// At most one tenant will be split per call: the one with the largest max logical size. It + /// will split 1 → 8 shards. + /// + /// TODO: consider splitting based on total logical size rather than max logical size. + /// + /// TODO: consider spawning multiple splits in parallel: this is only called once every 20 + /// seconds, so a large backlog can take a long time, and if a tenant fails to split it will + /// block all other splits. async fn autosplit_tenants(self: &Arc) { let Some(split_threshold) = self.config.split_threshold else { - // Auto-splitting is disabled + return; // auto-splits are disabled + }; + if split_threshold == 0 { return; - }; - - let nodes = self.inner.read().unwrap().nodes.clone(); - - const SPLIT_TO_MAX: ShardCount = ShardCount::new(8); - - let mut top_n = Vec::new(); - - // Call into each node to look for big tenants - let top_n_request = TopTenantShardsRequest { - // We currently split based on logical size, for simplicity: logical size is a signal of - // the user's intent to run a large database, whereas physical/resident size can be symptoms - // of compaction issues. Eventually we should switch to using resident size to bound the - // disk space impact of one shard. - order_by: models::TenantSorting::MaxLogicalSize, - limit: 10, - where_shards_lt: Some(SPLIT_TO_MAX), - where_gt: Some(split_threshold), - }; - for node in nodes.values() { - let request_ref = &top_n_request; - match node - .with_client_retries( - |client| async move { - let request = request_ref.clone(); - client.top_tenant_shards(request.clone()).await - }, - &self.config.pageserver_jwt_token, - 3, - 3, - Duration::from_secs(5), - &self.cancel, - ) - .await - { - Some(Ok(node_top_n)) => { - top_n.extend(node_top_n.shards.into_iter()); - } - Some(Err(mgmt_api::Error::Cancelled)) => { - continue; - } - Some(Err(e)) => { - tracing::warn!("Failed to fetch top N tenants from {node}: {e}"); - continue; - } - None => { - // Node is shutting down - continue; - } - }; } - // Pick the biggest tenant to split first - top_n.sort_by_key(|i| i.resident_size); + // Fetch the largest eligible shards by logical size. + const MAX_SHARDS: ShardCount = ShardCount::new(8); - // Filter out tenants in a prohibiting scheduling mode + let mut top_n = self + .get_top_tenant_shards(&TopTenantShardsRequest { + order_by: TenantSorting::MaxLogicalSize, + limit: 10, + where_shards_lt: Some(MAX_SHARDS), + where_gt: Some(split_threshold), + }) + .await; + + // Filter out tenants in a prohibiting scheduling mode. { - let locked = self.inner.read().unwrap(); + let state = self.inner.read().unwrap(); top_n.retain(|i| { - if let Some(shard) = locked.tenants.get(&i.id) { - matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) - } else { - false - } + let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy()); + policy == Some(ShardSchedulingPolicy::Active) }); } let Some(split_candidate) = top_n.into_iter().next() else { - tracing::debug!("No split-elegible shards found"); + debug!("No split-elegible shards found"); return; }; - // We spawn a task to run this, so it's exactly like some external API client requesting it. We don't - // want to block the background reconcile loop on this. - tracing::info!( + // We spawn a task to run this, so it's exactly like some external API client requesting it. + // We don't want to block the background reconcile loop on this. + info!( "Auto-splitting tenant for size threshold {split_threshold}: current size {split_candidate:?}" ); @@ -7322,29 +7297,70 @@ impl Service { .tenant_shard_split( split_candidate.id.tenant_id, TenantShardSplitRequest { - // Always split to the max number of shards: this avoids stepping through - // intervening shard counts and encountering the overrhead of a split+cleanup - // each time as a tenant grows, and is not too expensive because our max shard - // count is relatively low anyway. - // This policy will be adjusted in future once we support higher shard count. - new_shard_count: SPLIT_TO_MAX.literal(), + // Always split to the max number of shards: this avoids stepping + // through intervening shard counts and encountering the overhead of a + // split+cleanup each time as a tenant grows, and is not too expensive + // because our max shard count is relatively low anyway. This policy + // will be adjusted in future once we support higher shard count. + new_shard_count: MAX_SHARDS.literal(), new_stripe_size: Some(ShardParameters::DEFAULT_STRIPE_SIZE), }, ) .await { - Ok(_) => { - tracing::info!("Successful auto-split"); - } - Err(e) => { - tracing::error!("Auto-split failed: {e}"); - } + Ok(_) => info!("Successful auto-split"), + Err(err) => error!("Auto-split failed: {err}"), } } - .instrument(tracing::info_span!("auto_split", tenant_id=%split_candidate.id.tenant_id)), + .instrument(info_span!("auto_split", tenant_id=%split_candidate.id.tenant_id)), ); } + /// Fetches the top tenant shards from every node, in descending order of + /// max logical size. Any node errors will be logged and ignored. + async fn get_top_tenant_shards( + &self, + request: &TopTenantShardsRequest, + ) -> Vec { + let nodes = self + .inner + .read() + .unwrap() + .nodes + .values() + .cloned() + .collect_vec(); + + let mut futures = FuturesUnordered::new(); + for node in nodes { + futures.push(async move { + node.with_client_retries( + |client| async move { client.top_tenant_shards(request.clone()).await }, + &self.config.pageserver_jwt_token, + 3, + 3, + Duration::from_secs(5), + &self.cancel, + ) + .await + }); + } + + let mut top = Vec::new(); + while let Some(output) = futures.next().await { + match output { + Some(Ok(response)) => top.extend(response.shards), + Some(Err(mgmt_api::Error::Cancelled)) => {} + Some(Err(err)) => warn!("failed to fetch top tenants: {err}"), + None => {} // node is shutting down + } + } + + top.sort_by_key(|i| i.max_logical_size); + top.reverse(); + top + } + /// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but /// also wait for any generated Reconcilers to complete. Calling this until it returns zero should /// put the system into a quiescent state where future background reconciliations won't do anything. diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 34fd244023..27e478043e 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -1710,8 +1710,8 @@ impl TenantShard { self.scheduling_policy = p; } - pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy { - &self.scheduling_policy + pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy { + self.scheduling_policy } pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {