Compare commits

...

11 Commits

Author SHA1 Message Date
Vlad Lazar
b30e80512e storcon: don't call schedule 2024-06-20 14:03:08 +01:00
Vlad Lazar
964030f30e test: assert before restarts 2024-06-20 12:19:42 +01:00
Vlad Lazar
e7ec5962da tests: dump shard intent and observed states nicely 2024-06-20 11:42:07 +01:00
Vlad Lazar
7e6b8abf46 tests: check consistency after fill 2024-06-20 10:59:25 +01:00
Vlad Lazar
82af640484 test: bring back consistency checks 2024-06-20 10:33:44 +01:00
Vlad Lazar
48615a4f21 tests: make storage controller scale test do rolling restart 2024-06-20 10:04:17 +01:00
Vlad Lazar
fd4b12c598 storcon: use attached shard counts for initial shard placement 2024-06-20 10:03:41 +01:00
Vlad Lazar
240197d22a storcon: separate scheduling context for each tenant in fill/drain 2024-06-20 10:02:52 +01:00
Vlad Lazar
78b6c6dedc storcon: add transactional-ish demotion to secondary 2024-06-20 10:01:05 +01:00
Vlad Lazar
9bcb8796a8 review: use ShardCount::count 2024-06-20 10:00:40 +01:00
Vlad Lazar
3a74253bcb storcon: avoid promoting too many shards of the same tenant 2024-06-20 10:00:40 +01:00
4 changed files with 234 additions and 38 deletions

View File

@@ -391,7 +391,7 @@ impl Scheduler {
return Err(ScheduleError::NoPageservers);
}
let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
let mut scores: Vec<(NodeId, AffinityScore, usize, usize)> = self
.nodes
.iter()
.filter_map(|(k, v)| {
@@ -402,6 +402,7 @@ impl Scheduler {
*k,
context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
v.shard_count,
v.attached_shard_count,
))
}
})
@@ -409,9 +410,12 @@ impl Scheduler {
// Sort by, in order of precedence:
// 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
// 2nd: Utilization. Within nodes with the same affinity, use the least loaded nodes.
// 3rd: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
scores.sort_by_key(|i| (i.1, i.2, i.0));
// 2nd: Attached shard count. Within nodes with the same affinity, we always pick the node with
// the least number of attached shards.
// 3rd: Total shard count. Within nodes with the same affinity and attached shard count, use nodes
// with the lower total shard count.
// 4th: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
scores.sort_by_key(|i| (i.1, i.3, i.2, i.0));
if scores.is_empty() {
// After applying constraints, no pageservers were left.

View File

@@ -5281,7 +5281,6 @@ impl Service {
let mut last_inspected_shard: Option<TenantShardId> = None;
let mut inspected_all_shards = false;
let mut waiters = Vec::new();
let mut schedule_context = ScheduleContext::default();
while !inspected_all_shards {
if cancel.is_cancelled() {
@@ -5322,28 +5321,26 @@ impl Service {
}
};
if tenant_shard.intent.demote_attached(scheduler, node_id) {
match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
tracing::warn!(
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
"Scheduling error when draining pageserver {} : {e}", node_id
);
}
Ok(()) => {
let scheduled_to = tenant_shard.intent.get_attached();
tracing::info!(
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
"Rescheduled shard while draining node {}: {} -> {:?}",
node_id,
node_id,
scheduled_to
);
match tenant_shard.reschedule_to_secondary(Some(node_id), None, scheduler) {
Err(e) => {
tracing::warn!(
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
"Scheduling error when draining pageserver {} : {e}", node_id
);
}
Ok(()) => {
let scheduled_to = tenant_shard.intent.get_attached();
tracing::info!(
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
"Rescheduled shard while draining node {}: {} -> {:?}",
node_id,
node_id,
scheduled_to
);
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
if let Some(some) = waiter {
waiters.push(some);
}
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
if let Some(some) = waiter {
waiters.push(some);
}
}
}
@@ -5394,6 +5391,9 @@ impl Service {
/// throughout the cluster. We achieve this by picking tenant shards from each node,
/// starting from the ones with the largest number of attached shards, until the node
/// reaches the expected cluster average.
/// 3. Avoid promoting more shards of the same tenant than required. The upper bound
/// for the number of tenants from the same shard promoted to the node being filled is:
/// shard count for the tenant divided by the number of nodes in the cluster.
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
let mut locked = self.inner.write().unwrap();
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
@@ -5415,8 +5415,18 @@ impl Service {
let expected_attached = locked.scheduler.expected_attached_shard_count();
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
let mut promoted_per_tenant: HashMap<TenantId, usize> = HashMap::new();
let mut plan = Vec::new();
for (node_id, attached) in nodes_by_load {
let available = locked
.nodes
.get(&node_id)
.map_or(false, |n| n.is_available());
if !available {
continue;
}
if plan.len() >= fill_requirement
|| tids_by_node.is_empty()
|| attached <= expected_attached
@@ -5424,13 +5434,22 @@ impl Service {
break;
}
let can_take = attached - expected_attached;
let mut can_take = attached - expected_attached;
let mut remove_node = false;
for _ in 0..can_take {
while can_take > 0 {
match tids_by_node.get_mut(&node_id) {
Some(tids) => match tids.pop() {
Some(tid) => {
plan.push(tid);
let max_promote_for_tenant = std::cmp::max(
tid.shard_count.count() as usize / locked.nodes.len(),
1,
);
let promoted = promoted_per_tenant.entry(tid.tenant_id).or_default();
if *promoted < max_promote_for_tenant {
plan.push(tid);
*promoted += 1;
can_take -= 1;
}
}
None => {
remove_node = true;
@@ -5464,9 +5483,7 @@ impl Service {
// secondaries are warm. This is not always true (e.g. we just migrated the
// tenant). Take that into consideration by checking the secondary status.
let mut tids_to_promote = self.fill_node_plan(node_id);
let mut waiters = Vec::new();
let mut schedule_context = ScheduleContext::default();
// Execute the plan we've composed above. Before aplying each move from the plan,
// we validate to ensure that it has not gone stale in the meantime.
@@ -5502,9 +5519,11 @@ impl Service {
}
let previously_attached_to = *tenant_shard.intent.get_attached();
tenant_shard.intent.promote_attached(scheduler, node_id);
match tenant_shard.schedule(scheduler, &mut schedule_context) {
match tenant_shard.reschedule_to_secondary(
previously_attached_to,
Some(node_id),
scheduler,
) {
Err(e) => {
tracing::warn!(
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),

View File

@@ -646,6 +646,37 @@ impl TenantShard {
Ok(())
}
// TODO: comment
pub(crate) fn reschedule_to_secondary(
&mut self,
attached_to: Option<NodeId>,
promote_to: Option<NodeId>,
scheduler: &mut Scheduler,
) -> Result<(), ScheduleError> {
let promote_to = match promote_to {
Some(node) => node,
None => match scheduler.node_preferred(self.intent.get_secondary()) {
Some(node) => node,
None => {
return Err(ScheduleError::ImpossibleConstraint);
}
},
};
assert!(self.intent.get_secondary().contains(&promote_to));
if let Some(node) = attached_to {
let demoted = self.intent.demote_attached(scheduler, node);
if !demoted {
return Err(ScheduleError::ImpossibleConstraint);
}
}
self.intent.promote_attached(scheduler, promote_to);
Ok(())
}
/// Optimize attachments: if a shard has a secondary location that is preferable to
/// its primary location based on soft constraints, switch that secondary location
/// to be attached.

View File

@@ -3,15 +3,121 @@ import random
import time
import pytest
from collections import defaultdict
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
NeonEnvBuilder, StorageControllerApiException, NeonEnv
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion
from typing import Dict
def get_consistent_node_shard_counts(env: NeonEnv, total_shards):
tenants = env.storage_controller.tenant_list()
intent = dict()
observed = dict()
tenant_placement: defaultdict[str, Dict] = defaultdict(lambda: {"observed": {"attached": None, "secondary": []}, "intent": {"attached": None, "secondary": []}})
for t in tenants:
for node_id, loc_state in t["observed"]["locations"].items():
if (
loc_state is not None
and "conf" in loc_state
and loc_state["conf"] is not None
and loc_state["conf"]["mode"] in set(["AttachedSingle", "AttachedMulti", "AttachedStale"])
):
observed[t["tenant_shard_id"]] = int(node_id)
tenant_placement[t["tenant_shard_id"]]["observed"]["attached"] = int(node_id)
if (
loc_state is not None
and "conf" in loc_state
and loc_state["conf"] is not None
and loc_state["conf"]["mode"] == "Secondary"
):
tenant_placement[t["tenant_shard_id"]]["observed"]["secondary"].append(int(node_id))
if "attached" in t["intent"]:
intent[t["tenant_shard_id"]] = t["intent"]["attached"]
tenant_placement[t["tenant_shard_id"]]["intent"]["attached"] = t["intent"]["attached"]
if "secondary" in t["intent"]:
tenant_placement[t["tenant_shard_id"]]["intent"]["secondary"] += t["intent"]["secondary"]
log.info(f"{tenant_placement=}")
matching = {tid: intent[tid] for tid in observed if tid in intent and intent[tid] == observed[tid]}
assert len(matching) == total_shards
attached_per_node: defaultdict[str, int] = defaultdict(int)
for node_id in matching.values():
attached_per_node[node_id] += 1
return attached_per_node
def assert_consistent_balanced_attachments(env: NeonEnv, total_shards):
attached_per_node = get_consistent_node_shard_counts(env, total_shards)
min_shard_count = min(attached_per_node.values())
max_shard_count = max(attached_per_node.values())
flake_factor = 5 / 100
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
try:
op(ps_id)
return
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def poll_node_status(env, node_id, desired_scheduling_policy, max_attempts, backoff):
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
while max_attempts > 0:
try:
status = env.storage_controller.node_status(node_id)
policy = status["scheduling"]
if policy == desired_scheduling_policy:
return
else:
max_attempts -= 1
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
if max_attempts == 0:
raise AssertionError(
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
)
time.sleep(backoff)
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Status call failed ({max_attempts} retries left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
# Assert that all nodes have some attached shards
assert len(shard_counts) == len(env.pageservers)
min_shard_count = min(shard_counts.values())
max_shard_count = max(shard_counts.values())
flake_factor = 5 / 100
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
@pytest.mark.timeout(3600) # super long running test: should go down as we optimize
def test_storage_controller_many_tenants(
@@ -50,6 +156,9 @@ def test_storage_controller_many_tenants(
# of shards are hitting the delayed path.
env.storage_controller.allowed_errors.append(".*Many shards are waiting to reconcile")
# TODO: explain
env.storage_controller.allowed_errors.append(".*Scheduling error when draining pageserver.*")
for ps in env.pageservers:
# This can happen because when we do a loop over all pageservers and mark them offline/active,
# reconcilers might get cancelled, and the next reconcile can follow a not-so-elegant path of
@@ -70,6 +179,8 @@ def test_storage_controller_many_tenants(
shard_count = 2
stripe_size = 1024
total_shards = tenant_count * shard_count + 1
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
@@ -104,6 +215,7 @@ def test_storage_controller_many_tenants(
stripe_size,
# Upload heatmaps fast, so that secondary downloads happen promptly, enabling
# the controller's optimization migrations to proceed promptly.
# TODO: update other test with this and use reconcile_till_idle
tenant_config={"heatmap_period": "10s"},
placement_policy={"Attached": 1},
)
@@ -186,10 +298,40 @@ def test_storage_controller_many_tenants(
env.storage_controller.consistency_check()
check_memory()
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts before rolling restart: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
# Restart pageservers: this exercises the /re-attach API
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()
for ps in env.pageservers:
retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(env, ps.id, "PauseForRestart", max_attempts=24, backoff=5)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
# Assert that we've drained the node
assert shard_counts[str(ps.id)] == 0
# Assert that those shards actually went somewhere
assert sum(shard_counts.values()) == total_shards
ps.restart()
poll_node_status(env, ps.id, "Active", max_attempts=24, backoff=1)
retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(env, ps.id, "Active", max_attempts=24, backoff=5)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
# as they were not offline long enough to trigger any scheduling changes.