mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
11 Commits
release-pr
...
vlad/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b30e80512e | ||
|
|
964030f30e | ||
|
|
e7ec5962da | ||
|
|
7e6b8abf46 | ||
|
|
82af640484 | ||
|
|
48615a4f21 | ||
|
|
fd4b12c598 | ||
|
|
240197d22a | ||
|
|
78b6c6dedc | ||
|
|
9bcb8796a8 | ||
|
|
3a74253bcb |
@@ -391,7 +391,7 @@ impl Scheduler {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
|
||||
let mut scores: Vec<(NodeId, AffinityScore, usize, usize)> = self
|
||||
.nodes
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
@@ -402,6 +402,7 @@ impl Scheduler {
|
||||
*k,
|
||||
context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
|
||||
v.shard_count,
|
||||
v.attached_shard_count,
|
||||
))
|
||||
}
|
||||
})
|
||||
@@ -409,9 +410,12 @@ impl Scheduler {
|
||||
|
||||
// Sort by, in order of precedence:
|
||||
// 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
|
||||
// 2nd: Utilization. Within nodes with the same affinity, use the least loaded nodes.
|
||||
// 3rd: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
|
||||
scores.sort_by_key(|i| (i.1, i.2, i.0));
|
||||
// 2nd: Attached shard count. Within nodes with the same affinity, we always pick the node with
|
||||
// the least number of attached shards.
|
||||
// 3rd: Total shard count. Within nodes with the same affinity and attached shard count, use nodes
|
||||
// with the lower total shard count.
|
||||
// 4th: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
|
||||
scores.sort_by_key(|i| (i.1, i.3, i.2, i.0));
|
||||
|
||||
if scores.is_empty() {
|
||||
// After applying constraints, no pageservers were left.
|
||||
|
||||
@@ -5281,7 +5281,6 @@ impl Service {
|
||||
let mut last_inspected_shard: Option<TenantShardId> = None;
|
||||
let mut inspected_all_shards = false;
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
while !inspected_all_shards {
|
||||
if cancel.is_cancelled() {
|
||||
@@ -5322,28 +5321,26 @@ impl Service {
|
||||
}
|
||||
};
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Scheduling error when draining pageserver {} : {e}", node_id
|
||||
);
|
||||
}
|
||||
Ok(()) => {
|
||||
let scheduled_to = tenant_shard.intent.get_attached();
|
||||
tracing::info!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Rescheduled shard while draining node {}: {} -> {:?}",
|
||||
node_id,
|
||||
node_id,
|
||||
scheduled_to
|
||||
);
|
||||
match tenant_shard.reschedule_to_secondary(Some(node_id), None, scheduler) {
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Scheduling error when draining pageserver {} : {e}", node_id
|
||||
);
|
||||
}
|
||||
Ok(()) => {
|
||||
let scheduled_to = tenant_shard.intent.get_attached();
|
||||
tracing::info!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Rescheduled shard while draining node {}: {} -> {:?}",
|
||||
node_id,
|
||||
node_id,
|
||||
scheduled_to
|
||||
);
|
||||
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5394,6 +5391,9 @@ impl Service {
|
||||
/// throughout the cluster. We achieve this by picking tenant shards from each node,
|
||||
/// starting from the ones with the largest number of attached shards, until the node
|
||||
/// reaches the expected cluster average.
|
||||
/// 3. Avoid promoting more shards of the same tenant than required. The upper bound
|
||||
/// for the number of tenants from the same shard promoted to the node being filled is:
|
||||
/// shard count for the tenant divided by the number of nodes in the cluster.
|
||||
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
|
||||
@@ -5415,8 +5415,18 @@ impl Service {
|
||||
let expected_attached = locked.scheduler.expected_attached_shard_count();
|
||||
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
|
||||
|
||||
let mut promoted_per_tenant: HashMap<TenantId, usize> = HashMap::new();
|
||||
let mut plan = Vec::new();
|
||||
|
||||
for (node_id, attached) in nodes_by_load {
|
||||
let available = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.map_or(false, |n| n.is_available());
|
||||
if !available {
|
||||
continue;
|
||||
}
|
||||
|
||||
if plan.len() >= fill_requirement
|
||||
|| tids_by_node.is_empty()
|
||||
|| attached <= expected_attached
|
||||
@@ -5424,13 +5434,22 @@ impl Service {
|
||||
break;
|
||||
}
|
||||
|
||||
let can_take = attached - expected_attached;
|
||||
let mut can_take = attached - expected_attached;
|
||||
let mut remove_node = false;
|
||||
for _ in 0..can_take {
|
||||
while can_take > 0 {
|
||||
match tids_by_node.get_mut(&node_id) {
|
||||
Some(tids) => match tids.pop() {
|
||||
Some(tid) => {
|
||||
plan.push(tid);
|
||||
let max_promote_for_tenant = std::cmp::max(
|
||||
tid.shard_count.count() as usize / locked.nodes.len(),
|
||||
1,
|
||||
);
|
||||
let promoted = promoted_per_tenant.entry(tid.tenant_id).or_default();
|
||||
if *promoted < max_promote_for_tenant {
|
||||
plan.push(tid);
|
||||
*promoted += 1;
|
||||
can_take -= 1;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
remove_node = true;
|
||||
@@ -5464,9 +5483,7 @@ impl Service {
|
||||
// secondaries are warm. This is not always true (e.g. we just migrated the
|
||||
// tenant). Take that into consideration by checking the secondary status.
|
||||
let mut tids_to_promote = self.fill_node_plan(node_id);
|
||||
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
// Execute the plan we've composed above. Before aplying each move from the plan,
|
||||
// we validate to ensure that it has not gone stale in the meantime.
|
||||
@@ -5502,9 +5519,11 @@ impl Service {
|
||||
}
|
||||
|
||||
let previously_attached_to = *tenant_shard.intent.get_attached();
|
||||
|
||||
tenant_shard.intent.promote_attached(scheduler, node_id);
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
match tenant_shard.reschedule_to_secondary(
|
||||
previously_attached_to,
|
||||
Some(node_id),
|
||||
scheduler,
|
||||
) {
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
|
||||
@@ -646,6 +646,37 @@ impl TenantShard {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: comment
|
||||
pub(crate) fn reschedule_to_secondary(
|
||||
&mut self,
|
||||
attached_to: Option<NodeId>,
|
||||
promote_to: Option<NodeId>,
|
||||
scheduler: &mut Scheduler,
|
||||
) -> Result<(), ScheduleError> {
|
||||
let promote_to = match promote_to {
|
||||
Some(node) => node,
|
||||
None => match scheduler.node_preferred(self.intent.get_secondary()) {
|
||||
Some(node) => node,
|
||||
None => {
|
||||
return Err(ScheduleError::ImpossibleConstraint);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
assert!(self.intent.get_secondary().contains(&promote_to));
|
||||
|
||||
if let Some(node) = attached_to {
|
||||
let demoted = self.intent.demote_attached(scheduler, node);
|
||||
if !demoted {
|
||||
return Err(ScheduleError::ImpossibleConstraint);
|
||||
}
|
||||
}
|
||||
|
||||
self.intent.promote_attached(scheduler, promote_to);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Optimize attachments: if a shard has a secondary location that is preferable to
|
||||
/// its primary location based on soft constraints, switch that secondary location
|
||||
/// to be attached.
|
||||
|
||||
@@ -3,15 +3,121 @@ import random
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from collections import defaultdict
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
NeonEnvBuilder, StorageControllerApiException, NeonEnv
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pg_version import PgVersion
|
||||
from typing import Dict
|
||||
|
||||
def get_consistent_node_shard_counts(env: NeonEnv, total_shards):
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
|
||||
intent = dict()
|
||||
observed = dict()
|
||||
|
||||
tenant_placement: defaultdict[str, Dict] = defaultdict(lambda: {"observed": {"attached": None, "secondary": []}, "intent": {"attached": None, "secondary": []}})
|
||||
|
||||
for t in tenants:
|
||||
for node_id, loc_state in t["observed"]["locations"].items():
|
||||
if (
|
||||
loc_state is not None
|
||||
and "conf" in loc_state
|
||||
and loc_state["conf"] is not None
|
||||
and loc_state["conf"]["mode"] in set(["AttachedSingle", "AttachedMulti", "AttachedStale"])
|
||||
):
|
||||
observed[t["tenant_shard_id"]] = int(node_id)
|
||||
tenant_placement[t["tenant_shard_id"]]["observed"]["attached"] = int(node_id)
|
||||
|
||||
if (
|
||||
loc_state is not None
|
||||
and "conf" in loc_state
|
||||
and loc_state["conf"] is not None
|
||||
and loc_state["conf"]["mode"] == "Secondary"
|
||||
):
|
||||
tenant_placement[t["tenant_shard_id"]]["observed"]["secondary"].append(int(node_id))
|
||||
|
||||
if "attached" in t["intent"]:
|
||||
intent[t["tenant_shard_id"]] = t["intent"]["attached"]
|
||||
tenant_placement[t["tenant_shard_id"]]["intent"]["attached"] = t["intent"]["attached"]
|
||||
|
||||
if "secondary" in t["intent"]:
|
||||
tenant_placement[t["tenant_shard_id"]]["intent"]["secondary"] += t["intent"]["secondary"]
|
||||
|
||||
log.info(f"{tenant_placement=}")
|
||||
|
||||
matching = {tid: intent[tid] for tid in observed if tid in intent and intent[tid] == observed[tid]}
|
||||
assert len(matching) == total_shards
|
||||
|
||||
attached_per_node: defaultdict[str, int] = defaultdict(int)
|
||||
for node_id in matching.values():
|
||||
attached_per_node[node_id] += 1
|
||||
|
||||
return attached_per_node
|
||||
|
||||
def assert_consistent_balanced_attachments(env: NeonEnv, total_shards):
|
||||
attached_per_node = get_consistent_node_shard_counts(env, total_shards)
|
||||
|
||||
min_shard_count = min(attached_per_node.values())
|
||||
max_shard_count = max(attached_per_node.values())
|
||||
|
||||
flake_factor = 5 / 100
|
||||
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
|
||||
|
||||
def retryable_node_operation(op, ps_id, max_attempts, backoff):
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
op(ps_id)
|
||||
return
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def poll_node_status(env, node_id, desired_scheduling_policy, max_attempts, backoff):
|
||||
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
status = env.storage_controller.node_status(node_id)
|
||||
policy = status["scheduling"]
|
||||
if policy == desired_scheduling_policy:
|
||||
return
|
||||
else:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise AssertionError(
|
||||
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
|
||||
)
|
||||
|
||||
time.sleep(backoff)
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call failed ({max_attempts} retries left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
|
||||
# Assert that all nodes have some attached shards
|
||||
assert len(shard_counts) == len(env.pageservers)
|
||||
|
||||
min_shard_count = min(shard_counts.values())
|
||||
max_shard_count = max(shard_counts.values())
|
||||
|
||||
flake_factor = 5 / 100
|
||||
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
|
||||
|
||||
@pytest.mark.timeout(3600) # super long running test: should go down as we optimize
|
||||
def test_storage_controller_many_tenants(
|
||||
@@ -50,6 +156,9 @@ def test_storage_controller_many_tenants(
|
||||
# of shards are hitting the delayed path.
|
||||
env.storage_controller.allowed_errors.append(".*Many shards are waiting to reconcile")
|
||||
|
||||
# TODO: explain
|
||||
env.storage_controller.allowed_errors.append(".*Scheduling error when draining pageserver.*")
|
||||
|
||||
for ps in env.pageservers:
|
||||
# This can happen because when we do a loop over all pageservers and mark them offline/active,
|
||||
# reconcilers might get cancelled, and the next reconcile can follow a not-so-elegant path of
|
||||
@@ -70,6 +179,8 @@ def test_storage_controller_many_tenants(
|
||||
shard_count = 2
|
||||
stripe_size = 1024
|
||||
|
||||
total_shards = tenant_count * shard_count + 1
|
||||
|
||||
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
|
||||
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
@@ -104,6 +215,7 @@ def test_storage_controller_many_tenants(
|
||||
stripe_size,
|
||||
# Upload heatmaps fast, so that secondary downloads happen promptly, enabling
|
||||
# the controller's optimization migrations to proceed promptly.
|
||||
# TODO: update other test with this and use reconcile_till_idle
|
||||
tenant_config={"heatmap_period": "10s"},
|
||||
placement_policy={"Attached": 1},
|
||||
)
|
||||
@@ -186,10 +298,40 @@ def test_storage_controller_many_tenants(
|
||||
env.storage_controller.consistency_check()
|
||||
check_memory()
|
||||
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts before rolling restart: {shard_counts}")
|
||||
|
||||
assert_consistent_balanced_attachments(env, total_shards)
|
||||
|
||||
# Restart pageservers: this exercises the /re-attach API
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
for ps in env.pageservers:
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(env, ps.id, "PauseForRestart", max_attempts=24, backoff=5)
|
||||
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
|
||||
# Assert that we've drained the node
|
||||
assert shard_counts[str(ps.id)] == 0
|
||||
# Assert that those shards actually went somewhere
|
||||
assert sum(shard_counts.values()) == total_shards
|
||||
|
||||
ps.restart()
|
||||
poll_node_status(env, ps.id, "Active", max_attempts=24, backoff=1)
|
||||
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(env, ps.id, "Active", max_attempts=24, backoff=5)
|
||||
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
|
||||
|
||||
assert_consistent_balanced_attachments(env, total_shards)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
|
||||
# as they were not offline long enough to trigger any scheduling changes.
|
||||
|
||||
Reference in New Issue
Block a user