mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
storage controller: scheduling optimization for sharded tenants (#7181)
## Problem - When we scheduled locations, we were doing it without any context about other shards in the same tenant - After a shard split, there wasn't an automatic mechanism to migrate the attachments away from the split location - After a shard split and the migration away from the split location, there wasn't an automatic mechanism to pick new secondary locations so that the end state has no concentration of locations on the nodes where the split happened. Partially completes: https://github.com/neondatabase/neon/issues/7139 ## Summary of changes - Scheduler now takes a `ScheduleContext` object that can be populated with information about other shards - During tenant creation and shard split, we incrementally build up the ScheduleContext, updating it for each shard as we proceed. - When scheduling new locations, the ScheduleContext is used to apply a soft anti-affinity to nodes where a tenant already has shards. - The background reconciler task now has an extra phase `optimize_all`, which runs only if the primary `reconcile_all` phase didn't generate any work. The separation is that `reconcile_all` is needed for availability, but optimize_all is purely "nice to have" work to balance work across the nodes better. - optimize_all calls into two new TenantState methods called optimize_attachment and optimize_secondary, which seek out opportunities to improve placment: - optimize_attachment: if the node where we're currently attached has an excess of attached shard locations for this tenant compared with the node where we have a secondary location, then cut over to the secondary location. - optimize_secondary: if the node holding our secondary location has an excessive number of locations for this tenant compared with some other node where we don't currently have a location, then create a new secondary location on that other node. - a new debug API endpoint is provided to run background tasks on-demand. This returns a number of reconciliations in progress, so callers can keep calling until they get a `0` to advance the system to its final state without waiting for many iterations of the background task. Optimization is run at an implicitly low priority by: - Omitting the phase entirely if reconcile_all has work to do - Skipping optimization of any tenant that has reconciles in flight - Limiting the total number of optimizations that will be run from one call to optimize_all to a constant (currently 2). The idea of that low priority execution is to minimize the operational risk that optimization work overloads any part of the system. It happens to also make the system easier to observe and debug, as we avoid running large numbers of concurrent changes. Eventually we may relax these limitations: there is no correctness problem with optimizing lots of tenants concurrently, and optimizing multiple shards in one tenant just requires housekeeping changes to update ShardContext with the result of one optimization before proceeding to the next shard.
This commit is contained in:
@@ -146,7 +146,7 @@ def test_sharding_split_smoke(
|
||||
# 8 shards onto separate pageservers
|
||||
shard_count = 4
|
||||
split_shard_count = 8
|
||||
neon_env_builder.num_pageservers = split_shard_count
|
||||
neon_env_builder.num_pageservers = split_shard_count * 2
|
||||
|
||||
# 1MiB stripes: enable getting some meaningful data distribution without
|
||||
# writing large quantities of data in this test. The stripe size is given
|
||||
@@ -174,6 +174,7 @@ def test_sharding_split_smoke(
|
||||
placement_policy='{"Attached": 1}',
|
||||
conf=non_default_tenant_config,
|
||||
)
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
||||
workload.init()
|
||||
|
||||
@@ -252,6 +253,10 @@ def test_sharding_split_smoke(
|
||||
# The old parent shards should no longer exist on disk
|
||||
assert not shards_on_disk(old_shard_ids)
|
||||
|
||||
# Enough background reconciliations should result in the shards being properly distributed.
|
||||
# Run this before the workload, because its LSN-waiting code presumes stable locations.
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
workload.validate()
|
||||
|
||||
workload.churn_rows(256)
|
||||
@@ -265,27 +270,6 @@ def test_sharding_split_smoke(
|
||||
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
||||
workload.validate()
|
||||
|
||||
migrate_to_pageserver_ids = list(
|
||||
set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids)
|
||||
)
|
||||
assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count
|
||||
|
||||
# Migrate shards away from the node where the split happened
|
||||
for ps_id in pre_split_pageserver_ids:
|
||||
shards_here = [
|
||||
tenant_shard_id
|
||||
for (tenant_shard_id, pageserver) in all_shards
|
||||
if pageserver.id == ps_id
|
||||
]
|
||||
assert len(shards_here) == 2
|
||||
migrate_shard = shards_here[0]
|
||||
destination = migrate_to_pageserver_ids.pop()
|
||||
|
||||
log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
|
||||
env.storage_controller.tenant_shard_migrate(migrate_shard, destination)
|
||||
|
||||
workload.validate()
|
||||
|
||||
# Assert on how many reconciles happened during the process. This is something of an
|
||||
# implementation detail, but it is useful to detect any bugs that might generate spurious
|
||||
# extra reconcile iterations.
|
||||
@@ -294,8 +278,9 @@ def test_sharding_split_smoke(
|
||||
# - shard_count reconciles for the original setup of the tenant
|
||||
# - shard_count reconciles for detaching the original secondary locations during split
|
||||
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
|
||||
# - shard_count reconciles for the migrations we did to move child shards away from their split location
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + shard_count
|
||||
# - shard_count of the child shards will need to fail over to their secondaries
|
||||
# - shard_count of the child shard secondary locations will get moved to emptier nodes
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2
|
||||
reconcile_ok = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
@@ -343,6 +328,31 @@ def test_sharding_split_smoke(
|
||||
assert sum(total.values()) == split_shard_count * 2
|
||||
check_effective_tenant_config()
|
||||
|
||||
# More specific check: that we are fully balanced. This is deterministic because
|
||||
# the order in which we consider shards for optimization is deterministic, and the
|
||||
# order of preference of nodes is also deterministic (lower node IDs win).
|
||||
log.info(f"total: {total}")
|
||||
assert total == {
|
||||
1: 1,
|
||||
2: 1,
|
||||
3: 1,
|
||||
4: 1,
|
||||
5: 1,
|
||||
6: 1,
|
||||
7: 1,
|
||||
8: 1,
|
||||
9: 1,
|
||||
10: 1,
|
||||
11: 1,
|
||||
12: 1,
|
||||
13: 1,
|
||||
14: 1,
|
||||
15: 1,
|
||||
16: 1,
|
||||
}
|
||||
log.info(f"attached: {attached}")
|
||||
assert attached == {1: 1, 2: 1, 3: 1, 5: 1, 6: 1, 7: 1, 9: 1, 11: 1}
|
||||
|
||||
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
|
||||
# correctly wrote config to disk, and the storage controller responds correctly
|
||||
# to /re-attach)
|
||||
@@ -401,6 +411,7 @@ def test_sharding_split_stripe_size(
|
||||
env.storage_controller.tenant_shard_split(
|
||||
tenant_id, shard_count=2, shard_stripe_size=new_stripe_size
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Check that we ended up with the stripe size that we expected, both on the pageserver
|
||||
# and in the notifications to compute
|
||||
@@ -869,6 +880,7 @@ def test_sharding_split_failures(
|
||||
# Having failed+rolled back, we should be able to split again
|
||||
# No failures this time; it will succeed
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=30)
|
||||
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
@@ -922,6 +934,10 @@ def test_sharding_split_failures(
|
||||
finish_split()
|
||||
assert_split_done()
|
||||
|
||||
# Having completed the split, pump the background reconciles to ensure that
|
||||
# the scheduler reaches an idle state
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=30)
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user