mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
storcon: initial autosplit tweaks (#11134)
## Problem This patch makes some initial tweaks as preparation for https://github.com/neondatabase/cloud/issues/22532, where we will be introducing additional autosplit logic. The plan is outlined in https://github.com/neondatabase/cloud/issues/22532#issuecomment-2706215907. ## Summary of changes Minor code cleanups and behavioral changes: * Decide that we'll split based on `max_logical_size` (later possibly `total_logical_size`). * Fix a bug that would split the smallest candidate rather than the largest. * Pick the largest candidate by `max_logical_size` rather than `resident_size`, for consistency (debatable). * Split out `get_top_tenant_shards()` to fetch split candidates. * Fetch candidates concurrently from all nodes. * Make `TenantShard.get_scheduling_policy()` return a copy instead of a reference.
This commit is contained in:
@@ -36,9 +36,9 @@ use pageserver_api::models::{
|
||||
self, LocationConfig, LocationConfigListResponse, LocationConfigMode, PageserverUtilization,
|
||||
SecondaryProgress, ShardParameters, TenantConfig, TenantConfigPatchRequest,
|
||||
TenantConfigRequest, TenantLocationConfigRequest, TenantLocationConfigResponse,
|
||||
TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse, TenantSorting,
|
||||
TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineInfo,
|
||||
TopTenantShardsRequest,
|
||||
TopTenantShardItem, TopTenantShardsRequest,
|
||||
};
|
||||
use pageserver_api::shard::{
|
||||
ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
|
||||
@@ -53,7 +53,7 @@ use safekeeper_api::models::SafekeeperUtilization;
|
||||
use tokio::sync::TryAcquireError;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, instrument};
|
||||
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
|
||||
use utils::completion::Barrier;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
@@ -370,8 +370,12 @@ pub struct Config {
|
||||
/// tenant-scoped API endpoints. Further API requests queue until ready.
|
||||
pub tenant_rate_limit: NonZeroU32,
|
||||
|
||||
/// How large must a shard grow in bytes before we split it?
|
||||
/// None disables auto-splitting.
|
||||
/// The size at which an unsharded tenant should be split (into 8 shards). This uses the logical
|
||||
/// size of the largest timeline in the shard (i.e. max_logical_size).
|
||||
///
|
||||
/// None or 0 disables auto-splitting.
|
||||
///
|
||||
/// TODO: consider using total logical size of all timelines instead.
|
||||
pub split_threshold: Option<u64>,
|
||||
|
||||
// TODO: make this cfg(feature = "testing")
|
||||
@@ -4364,7 +4368,7 @@ impl Service {
|
||||
is_reconciling: shard.reconciler.is_some(),
|
||||
is_pending_compute_notification: shard.pending_compute_notification,
|
||||
is_splitting: matches!(shard.splitting, SplitState::Splitting),
|
||||
scheduling_policy: *shard.get_scheduling_policy(),
|
||||
scheduling_policy: shard.get_scheduling_policy(),
|
||||
preferred_az_id: shard.preferred_az().map(ToString::to_string),
|
||||
})
|
||||
}
|
||||
@@ -7232,86 +7236,57 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
/// Look for shards which are oversized and in need of splitting
|
||||
/// Asynchronously split a tenant that's eligible for automatic splits:
|
||||
///
|
||||
/// * The tenant is unsharded.
|
||||
/// * The logical size of its largest timeline exceeds split_threshold.
|
||||
/// * The tenant's scheduling policy is active.
|
||||
///
|
||||
/// At most one tenant will be split per call: the one with the largest max logical size. It
|
||||
/// will split 1 → 8 shards.
|
||||
///
|
||||
/// TODO: consider splitting based on total logical size rather than max logical size.
|
||||
///
|
||||
/// TODO: consider spawning multiple splits in parallel: this is only called once every 20
|
||||
/// seconds, so a large backlog can take a long time, and if a tenant fails to split it will
|
||||
/// block all other splits.
|
||||
async fn autosplit_tenants(self: &Arc<Self>) {
|
||||
let Some(split_threshold) = self.config.split_threshold else {
|
||||
// Auto-splitting is disabled
|
||||
return; // auto-splits are disabled
|
||||
};
|
||||
if split_threshold == 0 {
|
||||
return;
|
||||
};
|
||||
|
||||
let nodes = self.inner.read().unwrap().nodes.clone();
|
||||
|
||||
const SPLIT_TO_MAX: ShardCount = ShardCount::new(8);
|
||||
|
||||
let mut top_n = Vec::new();
|
||||
|
||||
// Call into each node to look for big tenants
|
||||
let top_n_request = TopTenantShardsRequest {
|
||||
// We currently split based on logical size, for simplicity: logical size is a signal of
|
||||
// the user's intent to run a large database, whereas physical/resident size can be symptoms
|
||||
// of compaction issues. Eventually we should switch to using resident size to bound the
|
||||
// disk space impact of one shard.
|
||||
order_by: models::TenantSorting::MaxLogicalSize,
|
||||
limit: 10,
|
||||
where_shards_lt: Some(SPLIT_TO_MAX),
|
||||
where_gt: Some(split_threshold),
|
||||
};
|
||||
for node in nodes.values() {
|
||||
let request_ref = &top_n_request;
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move {
|
||||
let request = request_ref.clone();
|
||||
client.top_tenant_shards(request.clone()).await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(Ok(node_top_n)) => {
|
||||
top_n.extend(node_top_n.shards.into_iter());
|
||||
}
|
||||
Some(Err(mgmt_api::Error::Cancelled)) => {
|
||||
continue;
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
tracing::warn!("Failed to fetch top N tenants from {node}: {e}");
|
||||
continue;
|
||||
}
|
||||
None => {
|
||||
// Node is shutting down
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Pick the biggest tenant to split first
|
||||
top_n.sort_by_key(|i| i.resident_size);
|
||||
// Fetch the largest eligible shards by logical size.
|
||||
const MAX_SHARDS: ShardCount = ShardCount::new(8);
|
||||
|
||||
// Filter out tenants in a prohibiting scheduling mode
|
||||
let mut top_n = self
|
||||
.get_top_tenant_shards(&TopTenantShardsRequest {
|
||||
order_by: TenantSorting::MaxLogicalSize,
|
||||
limit: 10,
|
||||
where_shards_lt: Some(MAX_SHARDS),
|
||||
where_gt: Some(split_threshold),
|
||||
})
|
||||
.await;
|
||||
|
||||
// Filter out tenants in a prohibiting scheduling mode.
|
||||
{
|
||||
let locked = self.inner.read().unwrap();
|
||||
let state = self.inner.read().unwrap();
|
||||
top_n.retain(|i| {
|
||||
if let Some(shard) = locked.tenants.get(&i.id) {
|
||||
matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy());
|
||||
policy == Some(ShardSchedulingPolicy::Active)
|
||||
});
|
||||
}
|
||||
|
||||
let Some(split_candidate) = top_n.into_iter().next() else {
|
||||
tracing::debug!("No split-elegible shards found");
|
||||
debug!("No split-elegible shards found");
|
||||
return;
|
||||
};
|
||||
|
||||
// We spawn a task to run this, so it's exactly like some external API client requesting it. We don't
|
||||
// want to block the background reconcile loop on this.
|
||||
tracing::info!(
|
||||
// We spawn a task to run this, so it's exactly like some external API client requesting it.
|
||||
// We don't want to block the background reconcile loop on this.
|
||||
info!(
|
||||
"Auto-splitting tenant for size threshold {split_threshold}: current size {split_candidate:?}"
|
||||
);
|
||||
|
||||
@@ -7322,29 +7297,70 @@ impl Service {
|
||||
.tenant_shard_split(
|
||||
split_candidate.id.tenant_id,
|
||||
TenantShardSplitRequest {
|
||||
// Always split to the max number of shards: this avoids stepping through
|
||||
// intervening shard counts and encountering the overrhead of a split+cleanup
|
||||
// each time as a tenant grows, and is not too expensive because our max shard
|
||||
// count is relatively low anyway.
|
||||
// This policy will be adjusted in future once we support higher shard count.
|
||||
new_shard_count: SPLIT_TO_MAX.literal(),
|
||||
// Always split to the max number of shards: this avoids stepping
|
||||
// through intervening shard counts and encountering the overhead of a
|
||||
// split+cleanup each time as a tenant grows, and is not too expensive
|
||||
// because our max shard count is relatively low anyway. This policy
|
||||
// will be adjusted in future once we support higher shard count.
|
||||
new_shard_count: MAX_SHARDS.literal(),
|
||||
new_stripe_size: Some(ShardParameters::DEFAULT_STRIPE_SIZE),
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
tracing::info!("Successful auto-split");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Auto-split failed: {e}");
|
||||
}
|
||||
Ok(_) => info!("Successful auto-split"),
|
||||
Err(err) => error!("Auto-split failed: {err}"),
|
||||
}
|
||||
}
|
||||
.instrument(tracing::info_span!("auto_split", tenant_id=%split_candidate.id.tenant_id)),
|
||||
.instrument(info_span!("auto_split", tenant_id=%split_candidate.id.tenant_id)),
|
||||
);
|
||||
}
|
||||
|
||||
/// Fetches the top tenant shards from every node, in descending order of
|
||||
/// max logical size. Any node errors will be logged and ignored.
|
||||
async fn get_top_tenant_shards(
|
||||
&self,
|
||||
request: &TopTenantShardsRequest,
|
||||
) -> Vec<TopTenantShardItem> {
|
||||
let nodes = self
|
||||
.inner
|
||||
.read()
|
||||
.unwrap()
|
||||
.nodes
|
||||
.values()
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
|
||||
let mut futures = FuturesUnordered::new();
|
||||
for node in nodes {
|
||||
futures.push(async move {
|
||||
node.with_client_retries(
|
||||
|client| async move { client.top_tenant_shards(request.clone()).await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
let mut top = Vec::new();
|
||||
while let Some(output) = futures.next().await {
|
||||
match output {
|
||||
Some(Ok(response)) => top.extend(response.shards),
|
||||
Some(Err(mgmt_api::Error::Cancelled)) => {}
|
||||
Some(Err(err)) => warn!("failed to fetch top tenants: {err}"),
|
||||
None => {} // node is shutting down
|
||||
}
|
||||
}
|
||||
|
||||
top.sort_by_key(|i| i.max_logical_size);
|
||||
top.reverse();
|
||||
top
|
||||
}
|
||||
|
||||
/// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but
|
||||
/// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
|
||||
/// put the system into a quiescent state where future background reconciliations won't do anything.
|
||||
|
||||
@@ -1710,8 +1710,8 @@ impl TenantShard {
|
||||
self.scheduling_policy = p;
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
|
||||
&self.scheduling_policy
|
||||
pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy {
|
||||
self.scheduling_policy
|
||||
}
|
||||
|
||||
pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
|
||||
|
||||
Reference in New Issue
Block a user