mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
storcon: add repeated auto-splits and initial splits (#11122)
## Problem Currently, we only split tenants into 8 shards once, at the 64 GB split threshold. For very large tenants, we need to keep splitting to avoid huge shards. And we also want to eagerly split at a lower threshold to improve throughput during initial ingestion. See https://github.com/neondatabase/cloud/issues/22532#issuecomment-2706215907 for details. Touches https://github.com/neondatabase/cloud/issues/22532. Requires #11157. ## Summary of changes This adds parameters and logic to enable repeated splits when a tenant's largest timeline divided by shard count exceeds `split_threshold`, as well as eager initial splits at a lower threshold to speed up initial ingestion. The default parameters are all set such that they retain the current behavior in production (only split into 8 shards once, at 64 GB). * `split_threshold` now specifies a maximum shard size. When a shard exceeds it, all tenant shards are split by powers of 2 such that all tenant shards fall below `split_threshold`. Disabled by default, like today. * Add `max_split_shards` to specify a max shard count for autosplits. Defaults to 8 to retain current behavior. * Add `initial_split_threshold` and `initial_split_shards` to specify a threshold and target count for eager splits of unsharded tenants. Defaults to 64 GB and 8 shards to retain current production behavior. Because this PR sets `initial_split_threshold` to 64 GB by default, it has the effect of enabling autosplits by default. This was not the case previously, since `split_threshold` defaults to None, but it is already enabled across production and staging. This is temporary until we complete the production rollout. For more details, see code comments. This must wait until #11157 has been deployed to Pageservers. Once this has been deployed to production, we plan to change the parameters to: * `split-threshold`: 256 GB * `initial-split-threshold`: 16 GB * `initial-split-shards`: 4 * `max-split-shards`: 16 The final split points will thus be: * Start: 1 shard * 16 GB: 4 shards * 1 TB: 8 shards * 2 TB: 16 shards We will then change the default settings to be disabled by default. --------- Co-authored-by: John Spray <john@neon.tech>
This commit is contained in:
@@ -113,6 +113,21 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
split_threshold: Option<u64>,
|
||||
|
||||
/// Maximum number of shards during autosplits. 0 disables autosplits.
|
||||
// TODO: defaults to 8 for backwards compatibility, should default to 255.
|
||||
#[arg(long, default_value = "8")]
|
||||
max_split_shards: u8,
|
||||
|
||||
/// Size threshold for initial shard splits of unsharded tenants. 0 disables initial splits.
|
||||
// TODO: defaults to 64 GB for backwards compatibility. Should default to None.
|
||||
#[arg(long, default_value = "68719476736")]
|
||||
initial_split_threshold: u64,
|
||||
|
||||
/// Number of target shards for initial splits. 0 or 1 disables initial splits.
|
||||
// TODO: defaults to 8 for backwards compatibility. Should default to 2.
|
||||
#[arg(long, default_value = "8")]
|
||||
initial_split_shards: u8,
|
||||
|
||||
/// Maximum number of normal-priority reconcilers that may run in parallel
|
||||
#[arg(long)]
|
||||
reconciler_concurrency: Option<usize>,
|
||||
@@ -388,6 +403,9 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
.unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT),
|
||||
tenant_rate_limit: args.tenant_rate_limit,
|
||||
split_threshold: args.split_threshold,
|
||||
max_split_shards: args.max_split_shards,
|
||||
initial_split_threshold: Some(args.initial_split_threshold),
|
||||
initial_split_shards: args.initial_split_shards,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
max_secondary_lag_bytes: args.max_secondary_lag_bytes,
|
||||
heartbeat_interval: args
|
||||
|
||||
@@ -389,14 +389,41 @@ pub struct Config {
|
||||
/// tenant-scoped API endpoints. Further API requests queue until ready.
|
||||
pub tenant_rate_limit: NonZeroU32,
|
||||
|
||||
/// The size at which an unsharded tenant should be split (into 8 shards). This uses the logical
|
||||
/// size of the largest timeline in the shard (i.e. max_logical_size).
|
||||
/// If a tenant shard's largest timeline (max_logical_size) exceeds this value, all tenant
|
||||
/// shards will be split in 2 until they fall below split_threshold (up to max_split_shards).
|
||||
///
|
||||
/// This will greedily split into as many shards as necessary to fall below split_threshold, as
|
||||
/// powers of 2: if a tenant shard is 7 times larger than split_threshold, it will split into 8
|
||||
/// immediately, rather than first 2 then 4 then 8.
|
||||
///
|
||||
/// None or 0 disables auto-splitting.
|
||||
///
|
||||
/// TODO: consider using total logical size of all timelines instead.
|
||||
pub split_threshold: Option<u64>,
|
||||
|
||||
/// The maximum number of shards a tenant can be split into during autosplits. Does not affect
|
||||
/// manual split requests. 0 or 1 disables autosplits, as we already have 1 shard.
|
||||
pub max_split_shards: u8,
|
||||
|
||||
/// The size at which an unsharded tenant should initially split. Ingestion is significantly
|
||||
/// faster with multiple shards, so eagerly splitting below split_threshold will typically speed
|
||||
/// up initial ingestion of large tenants.
|
||||
///
|
||||
/// This should be below split_threshold, but it is not required. If both split_threshold and
|
||||
/// initial_split_threshold qualify, the largest number of target shards will be used.
|
||||
///
|
||||
/// Does not apply to already sharded tenants: changing initial_split_threshold or
|
||||
/// initial_split_shards is not retroactive for already-sharded tenants.
|
||||
///
|
||||
/// None or 0 disables initial splits.
|
||||
pub initial_split_threshold: Option<u64>,
|
||||
|
||||
/// The number of shards to split into when reaching initial_split_threshold. Will
|
||||
/// be clamped to max_split_shards.
|
||||
///
|
||||
/// 0 or 1 disables initial splits. Has no effect if initial_split_threshold is disabled.
|
||||
pub initial_split_shards: u8,
|
||||
|
||||
// TODO: make this cfg(feature = "testing")
|
||||
pub neon_local_repo_dir: Option<PathBuf>,
|
||||
|
||||
@@ -572,6 +599,22 @@ enum TenantShardSplitAbortError {
|
||||
Unavailable,
|
||||
}
|
||||
|
||||
/// Inputs for computing a target shard count for a tenant.
|
||||
struct ShardSplitInputs {
|
||||
/// Current shard count.
|
||||
shard_count: ShardCount,
|
||||
/// Total size of largest timeline summed across all shards.
|
||||
max_logical_size: u64,
|
||||
/// Size-based split threshold. Zero if size-based splits are disabled.
|
||||
split_threshold: u64,
|
||||
/// Upper bound on target shards. 0 or 1 disables splits.
|
||||
max_split_shards: u8,
|
||||
/// Initial split threshold. Zero if initial splits are disabled.
|
||||
initial_split_threshold: u64,
|
||||
/// Number of shards for initial splits. 0 or 1 disables initial splits.
|
||||
initial_split_shards: u8,
|
||||
}
|
||||
|
||||
struct ShardUpdate {
|
||||
tenant_shard_id: TenantShardId,
|
||||
placement_policy: PlacementPolicy,
|
||||
@@ -7565,99 +7608,232 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
/// Asynchronously split a tenant that's eligible for automatic splits:
|
||||
/// Asynchronously split a tenant that's eligible for automatic splits. At most one tenant will
|
||||
/// be split per call.
|
||||
///
|
||||
/// * The tenant is unsharded.
|
||||
/// * The logical size of its largest timeline exceeds split_threshold.
|
||||
/// * The tenant's scheduling policy is active.
|
||||
/// Two sets of criteria are used: initial splits and size-based splits (in that order).
|
||||
/// Initial splits are used to eagerly split unsharded tenants that may be performing initial
|
||||
/// ingestion, since sharded tenants have significantly better ingestion throughput. Size-based
|
||||
/// splits are used to bound the maximum shard size and balance out load.
|
||||
///
|
||||
/// At most one tenant will be split per call: the one with the largest max logical size. It
|
||||
/// will split 1 → 8 shards.
|
||||
/// Splits are based on max_logical_size, i.e. the logical size of the largest timeline in a
|
||||
/// tenant. We use this instead of the total logical size because branches will duplicate
|
||||
/// logical size without actually using more storage. We could also use visible physical size,
|
||||
/// but this might overestimate tenants that frequently churn branches.
|
||||
///
|
||||
/// Initial splits (initial_split_threshold):
|
||||
/// * Applies to tenants with 1 shard.
|
||||
/// * The largest timeline (max_logical_size) exceeds initial_split_threshold.
|
||||
/// * Splits into initial_split_shards.
|
||||
///
|
||||
/// Size-based splits (split_threshold):
|
||||
/// * Applies to all tenants.
|
||||
/// * The largest timeline (max_logical_size) divided by shard count exceeds split_threshold.
|
||||
/// * Splits such that max_logical_size / shard_count <= split_threshold, in powers of 2.
|
||||
///
|
||||
/// Tenant shards are ordered by descending max_logical_size, first initial split candidates
|
||||
/// then size-based split candidates. The first matching candidate is split.
|
||||
///
|
||||
/// The shard count is clamped to max_split_shards. If a candidate is eligible for both initial
|
||||
/// and size-based splits, the largest shard count will be used.
|
||||
///
|
||||
/// An unsharded tenant will get DEFAULT_STRIPE_SIZE, regardless of what its ShardIdentity says.
|
||||
/// A sharded tenant will retain its stripe size, as splits do not allow changing it.
|
||||
///
|
||||
/// TODO: consider splitting based on total logical size rather than max logical size.
|
||||
///
|
||||
/// TODO: consider spawning multiple splits in parallel: this is only called once every 20
|
||||
/// seconds, so a large backlog can take a long time, and if a tenant fails to split it will
|
||||
/// block all other splits.
|
||||
async fn autosplit_tenants(self: &Arc<Self>) {
|
||||
let Some(split_threshold) = self.config.split_threshold else {
|
||||
return; // auto-splits are disabled
|
||||
};
|
||||
if split_threshold == 0 {
|
||||
// If max_split_shards is set to 0 or 1, we can't split.
|
||||
let max_split_shards = self.config.max_split_shards;
|
||||
if max_split_shards <= 1 {
|
||||
return;
|
||||
}
|
||||
|
||||
// Fetch the largest eligible shards by logical size.
|
||||
const MAX_SHARDS: ShardCount = ShardCount::new(8);
|
||||
// If initial_split_shards is set to 0 or 1, disable initial splits.
|
||||
let mut initial_split_threshold = self.config.initial_split_threshold.unwrap_or(0);
|
||||
let initial_split_shards = self.config.initial_split_shards;
|
||||
if initial_split_shards <= 1 {
|
||||
initial_split_threshold = 0;
|
||||
}
|
||||
|
||||
let mut top_n = self
|
||||
.get_top_tenant_shards(&TopTenantShardsRequest {
|
||||
order_by: TenantSorting::MaxLogicalSize,
|
||||
limit: 10,
|
||||
where_shards_lt: Some(MAX_SHARDS),
|
||||
where_gt: Some(split_threshold),
|
||||
})
|
||||
.await;
|
||||
// If no split_threshold nor initial_split_threshold, disable autosplits.
|
||||
let split_threshold = self.config.split_threshold.unwrap_or(0);
|
||||
if split_threshold == 0 && initial_split_threshold == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
// Fetch split candidates in prioritized order.
|
||||
//
|
||||
// If initial splits are enabled, fetch eligible tenants first. We prioritize initial splits
|
||||
// over size-based splits, since these are often performing initial ingestion and rely on
|
||||
// splits to improve ingest throughput.
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
if initial_split_threshold > 0 {
|
||||
// Initial splits: fetch tenants with 1 shard where the logical size of the largest
|
||||
// timeline exceeds the initial split threshold.
|
||||
let initial_candidates = self
|
||||
.get_top_tenant_shards(&TopTenantShardsRequest {
|
||||
order_by: TenantSorting::MaxLogicalSize,
|
||||
limit: 10,
|
||||
where_shards_lt: Some(ShardCount(2)),
|
||||
where_gt: Some(initial_split_threshold),
|
||||
})
|
||||
.await;
|
||||
candidates.extend(initial_candidates);
|
||||
}
|
||||
|
||||
if split_threshold > 0 {
|
||||
// Size-based splits: fetch tenants where the logical size of the largest timeline
|
||||
// divided by shard count exceeds the split threshold.
|
||||
//
|
||||
// max_logical_size is only tracked on shard 0, and contains the total logical size
|
||||
// across all shards. We have to order and filter by MaxLogicalSizePerShard, i.e.
|
||||
// max_logical_size / shard_count, such that we only receive tenants that are actually
|
||||
// eligible for splits. But we still use max_logical_size for later split calculations.
|
||||
let size_candidates = self
|
||||
.get_top_tenant_shards(&TopTenantShardsRequest {
|
||||
order_by: TenantSorting::MaxLogicalSizePerShard,
|
||||
limit: 10,
|
||||
where_shards_lt: Some(ShardCount(max_split_shards)),
|
||||
where_gt: Some(split_threshold),
|
||||
})
|
||||
.await;
|
||||
#[cfg(feature = "testing")]
|
||||
assert!(
|
||||
size_candidates.iter().all(|c| c.id.is_shard_zero()),
|
||||
"MaxLogicalSizePerShard returned non-zero shard: {size_candidates:?}",
|
||||
);
|
||||
candidates.extend(size_candidates);
|
||||
}
|
||||
|
||||
// Filter out tenants in a prohibiting scheduling mode.
|
||||
{
|
||||
let state = self.inner.read().unwrap();
|
||||
top_n.retain(|i| {
|
||||
candidates.retain(|i| {
|
||||
let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy());
|
||||
policy == Some(ShardSchedulingPolicy::Active)
|
||||
});
|
||||
}
|
||||
|
||||
let Some(split_candidate) = top_n.into_iter().next() else {
|
||||
debug!("No split-elegible shards found");
|
||||
// Pick the first candidate to split. This will generally always be the first one in
|
||||
// candidates, but we defensively skip candidates that end up not actually splitting.
|
||||
let Some((candidate, new_shard_count)) = candidates
|
||||
.into_iter()
|
||||
.filter_map(|candidate| {
|
||||
let new_shard_count = Self::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: candidate.id.shard_count,
|
||||
max_logical_size: candidate.max_logical_size,
|
||||
split_threshold,
|
||||
max_split_shards,
|
||||
initial_split_threshold,
|
||||
initial_split_shards,
|
||||
});
|
||||
new_shard_count.map(|shards| (candidate, shards.count()))
|
||||
})
|
||||
.next()
|
||||
else {
|
||||
debug!("no split-eligible tenants found");
|
||||
return;
|
||||
};
|
||||
|
||||
// We spawn a task to run this, so it's exactly like some external API client requesting it.
|
||||
// We don't want to block the background reconcile loop on this.
|
||||
info!(
|
||||
"Auto-splitting tenant for size threshold {split_threshold}: current size {split_candidate:?}"
|
||||
);
|
||||
|
||||
// Retain the stripe size of sharded tenants, as splits don't allow changing it. Otherwise,
|
||||
// use DEFAULT_STRIPE_SIZE for unsharded tenants -- their stripe size doesn't really matter,
|
||||
// and if we change the default stripe size we want to use the new default rather than an
|
||||
// old, persisted stripe size.
|
||||
let new_stripe_size = match split_candidate.id.shard_count.count() {
|
||||
let new_stripe_size = match candidate.id.shard_count.count() {
|
||||
0 => panic!("invalid shard count 0"),
|
||||
1 => Some(ShardParameters::DEFAULT_STRIPE_SIZE),
|
||||
2.. => None,
|
||||
};
|
||||
|
||||
// We spawn a task to run this, so it's exactly like some external API client requesting
|
||||
// it. We don't want to block the background reconcile loop on this.
|
||||
let old_shard_count = candidate.id.shard_count.count();
|
||||
info!(
|
||||
"auto-splitting tenant {old_shard_count} → {new_shard_count} shards, \
|
||||
current size {candidate:?} (split_threshold={split_threshold} \
|
||||
initial_split_threshold={initial_split_threshold})"
|
||||
);
|
||||
|
||||
let this = self.clone();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
match this
|
||||
.tenant_shard_split(
|
||||
split_candidate.id.tenant_id,
|
||||
candidate.id.tenant_id,
|
||||
TenantShardSplitRequest {
|
||||
// Always split to the max number of shards: this avoids stepping
|
||||
// through intervening shard counts and encountering the overhead of a
|
||||
// split+cleanup each time as a tenant grows, and is not too expensive
|
||||
// because our max shard count is relatively low anyway. This policy
|
||||
// will be adjusted in future once we support higher shard count.
|
||||
new_shard_count: MAX_SHARDS.literal(),
|
||||
new_shard_count,
|
||||
new_stripe_size,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => info!("Successful auto-split"),
|
||||
Err(err) => error!("Auto-split failed: {err}"),
|
||||
Ok(_) => {
|
||||
info!("successful auto-split {old_shard_count} → {new_shard_count} shards")
|
||||
}
|
||||
Err(err) => error!("auto-split failed: {err}"),
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("auto_split", tenant_id=%split_candidate.id.tenant_id)),
|
||||
.instrument(info_span!("auto_split", tenant_id=%candidate.id.tenant_id)),
|
||||
);
|
||||
}
|
||||
|
||||
/// Returns the number of shards to split a tenant into, or None if the tenant shouldn't split,
|
||||
/// based on the total logical size of the largest timeline summed across all shards. Uses the
|
||||
/// larger of size-based and initial splits, clamped to max_split_shards.
|
||||
///
|
||||
/// NB: the thresholds are exclusive, since TopTenantShardsRequest uses where_gt.
|
||||
fn compute_split_shards(inputs: ShardSplitInputs) -> Option<ShardCount> {
|
||||
let ShardSplitInputs {
|
||||
shard_count,
|
||||
max_logical_size,
|
||||
split_threshold,
|
||||
max_split_shards,
|
||||
initial_split_threshold,
|
||||
initial_split_shards,
|
||||
} = inputs;
|
||||
|
||||
let mut new_shard_count: u8 = shard_count.count();
|
||||
|
||||
// Size-based splits. Ensures max_logical_size / new_shard_count <= split_threshold, using
|
||||
// power-of-two shard counts.
|
||||
//
|
||||
// If the current shard count is not a power of two, and does not exceed split_threshold,
|
||||
// then we leave it alone rather than forcing a power-of-two split.
|
||||
if split_threshold > 0
|
||||
&& max_logical_size.div_ceil(split_threshold) > shard_count.count() as u64
|
||||
{
|
||||
new_shard_count = max_logical_size
|
||||
.div_ceil(split_threshold)
|
||||
.checked_next_power_of_two()
|
||||
.unwrap_or(u8::MAX as u64)
|
||||
.try_into()
|
||||
.unwrap_or(u8::MAX);
|
||||
}
|
||||
|
||||
// Initial splits. Use the larger of size-based and initial split shard counts. This only
|
||||
// applies to unsharded tenants, i.e. changes to initial_split_threshold or
|
||||
// initial_split_shards are not retroactive for sharded tenants.
|
||||
if initial_split_threshold > 0
|
||||
&& shard_count.count() <= 1
|
||||
&& max_logical_size > initial_split_threshold
|
||||
{
|
||||
new_shard_count = new_shard_count.max(initial_split_shards);
|
||||
}
|
||||
|
||||
// Clamp to max shards.
|
||||
new_shard_count = new_shard_count.min(max_split_shards);
|
||||
|
||||
// Don't split if we're not increasing the shard count.
|
||||
if new_shard_count <= shard_count.count() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(ShardCount(new_shard_count))
|
||||
}
|
||||
|
||||
/// Fetches the top tenant shards from every node, in descending order of
|
||||
/// max logical size. Any node errors will be logged and ignored.
|
||||
async fn get_top_tenant_shards(
|
||||
@@ -8420,3 +8596,329 @@ impl Service {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// Tests Service::compute_split_shards. For readability, this specifies sizes in GBs rather
|
||||
/// than bytes. Note that max_logical_size is the total logical size of the largest timeline
|
||||
/// summed across all shards.
|
||||
#[test]
|
||||
fn compute_split_shards() {
|
||||
// Size-based split: two shards have a 500 GB timeline, which need to split into 8 shards
|
||||
// that are <= 64 GB,
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(2),
|
||||
max_logical_size: 500,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
Some(ShardCount(8))
|
||||
);
|
||||
|
||||
// Size-based split: noop at or below threshold, fires above.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(2),
|
||||
max_logical_size: 127,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(2),
|
||||
max_logical_size: 128,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(2),
|
||||
max_logical_size: 129,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
Some(ShardCount(4)),
|
||||
);
|
||||
|
||||
// Size-based split: clamped to max_split_shards.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(2),
|
||||
max_logical_size: 10000,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
Some(ShardCount(16))
|
||||
);
|
||||
|
||||
// Size-based split: tenant already at or beyond max_split_shards is not split.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(16),
|
||||
max_logical_size: 10000,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
None
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(32),
|
||||
max_logical_size: 10000,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
None
|
||||
);
|
||||
|
||||
// Size-based split: a non-power-of-2 shard count is normalized to power-of-2 if it
|
||||
// exceeds split_threshold (i.e. a 3-shard tenant splits into 8, not 6).
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(3),
|
||||
max_logical_size: 320,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
Some(ShardCount(8))
|
||||
);
|
||||
|
||||
// Size-based split: a non-power-of-2 shard count is not normalized to power-of-2 if the
|
||||
// existing shards are below or at split_threshold, but splits into 4 if it exceeds it.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(3),
|
||||
max_logical_size: 191,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(3),
|
||||
max_logical_size: 192,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(3),
|
||||
max_logical_size: 193,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 0,
|
||||
initial_split_shards: 0,
|
||||
}),
|
||||
Some(ShardCount(4))
|
||||
);
|
||||
|
||||
// Initial split: tenant has a 10 GB timeline, split into 4 shards.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 10,
|
||||
split_threshold: 0,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
Some(ShardCount(4))
|
||||
);
|
||||
|
||||
// Initial split: 0 ShardCount is equivalent to 1.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(0),
|
||||
max_logical_size: 10,
|
||||
split_threshold: 0,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
Some(ShardCount(4))
|
||||
);
|
||||
|
||||
// Initial split: at or below threshold is noop.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 7,
|
||||
split_threshold: 0,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 8,
|
||||
split_threshold: 0,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 9,
|
||||
split_threshold: 0,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
Some(ShardCount(4))
|
||||
);
|
||||
|
||||
// Initial split: already sharded tenant is not affected, even if above threshold and below
|
||||
// shard count.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(2),
|
||||
max_logical_size: 20,
|
||||
split_threshold: 0,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
// Initial split: clamped to max_shards.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 10,
|
||||
split_threshold: 0,
|
||||
max_split_shards: 3,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
Some(ShardCount(3)),
|
||||
);
|
||||
|
||||
// Initial+size split: tenant eligible for both will use the larger shard count.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 10,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
Some(ShardCount(4)),
|
||||
);
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 500,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 4,
|
||||
}),
|
||||
Some(ShardCount(8)),
|
||||
);
|
||||
|
||||
// Initial+size split: sharded tenant is only eligible for size-based split.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(2),
|
||||
max_logical_size: 200,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 8,
|
||||
}),
|
||||
Some(ShardCount(4)),
|
||||
);
|
||||
|
||||
// Initial+size split: uses the larger shard count even with initial_split_threshold above
|
||||
// split_threshold.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 10,
|
||||
split_threshold: 4,
|
||||
max_split_shards: 16,
|
||||
initial_split_threshold: 8,
|
||||
initial_split_shards: 8,
|
||||
}),
|
||||
Some(ShardCount(8)),
|
||||
);
|
||||
|
||||
// Test backwards compatibility with production settings when initial/size-based splits were
|
||||
// rolled out: a single split into 8 shards at 64 GB. Any already sharded tenants with <8
|
||||
// shards will split according to split_threshold.
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 65,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 8,
|
||||
initial_split_threshold: 64,
|
||||
initial_split_shards: 8,
|
||||
}),
|
||||
Some(ShardCount(8)),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(1),
|
||||
max_logical_size: 64,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 8,
|
||||
initial_split_threshold: 64,
|
||||
initial_split_shards: 8,
|
||||
}),
|
||||
None,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
Service::compute_split_shards(ShardSplitInputs {
|
||||
shard_count: ShardCount(2),
|
||||
max_logical_size: 129,
|
||||
split_threshold: 64,
|
||||
max_split_shards: 8,
|
||||
initial_split_threshold: 64,
|
||||
initial_split_shards: 8,
|
||||
}),
|
||||
Some(ShardCount(4)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user