mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 18:10:37 +00:00
Compare commits
1 Commits
vlad/storc
...
vlad/reset
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83fdc07b20 |
@@ -183,7 +183,8 @@ runs:
|
||||
|
||||
# Run the tests.
|
||||
#
|
||||
# --alluredir saves test results in Allure format (in a specified directory)
|
||||
# The junit.xml file allows CI tools to display more fine-grained test information
|
||||
# in its "Tests" tab in the results page.
|
||||
# --verbose prints name of each test (helpful when there are
|
||||
# multiple tests in one file)
|
||||
# -rA prints summary in the end
|
||||
@@ -192,6 +193,7 @@ runs:
|
||||
#
|
||||
mkdir -p $TEST_OUTPUT/allure/results
|
||||
"${cov_prefix[@]}" ./scripts/pytest \
|
||||
--junitxml=$TEST_OUTPUT/junit.xml \
|
||||
--alluredir=$TEST_OUTPUT/allure/results \
|
||||
--tb=short \
|
||||
--verbose \
|
||||
|
||||
9
.github/workflows/actionlint.yml
vendored
9
.github/workflows/actionlint.yml
vendored
@@ -36,16 +36,15 @@ jobs:
|
||||
fail_on_error: true
|
||||
filter_mode: nofilter
|
||||
level: error
|
||||
|
||||
- name: Disallow 'ubuntu-latest' runners
|
||||
run: |
|
||||
- run: |
|
||||
PAT='^\s*runs-on:.*-latest'
|
||||
if grep -ERq $PAT .github/workflows; then
|
||||
if grep -ERq $PAT .github/workflows
|
||||
then
|
||||
grep -ERl $PAT .github/workflows |\
|
||||
while read -r f
|
||||
do
|
||||
l=$(grep -nE $PAT .github/workflows/release.yml | awk -F: '{print $1}' | head -1)
|
||||
echo "::error file=$f,line=$l::Please use 'ubuntu-22.04' instead of 'ubuntu-latest'"
|
||||
echo "::error file=$f,line=$l::Please, do not use ubuntu-latest images to run on, use LTS instead."
|
||||
done
|
||||
exit 1
|
||||
fi
|
||||
|
||||
17
.github/workflows/build_and_test.yml
vendored
17
.github/workflows/build_and_test.yml
vendored
@@ -1023,18 +1023,6 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
# The default value is ~/.docker
|
||||
- name: Set custom docker config directory
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
# `neondatabase/neon` contains multiple binaries, all of them use the same input for the version into the same version formatting library.
|
||||
# Pick pageserver as currently the only binary with extra "version" features printed in the string to verify.
|
||||
# Regular pageserver version string looks like
|
||||
@@ -1069,11 +1057,6 @@ jobs:
|
||||
docker compose -f ./docker-compose/docker-compose.yml logs || 0
|
||||
docker compose -f ./docker-compose/docker-compose.yml down
|
||||
|
||||
- name: Remove custom docker config directory
|
||||
if: always()
|
||||
run: |
|
||||
rm -rf .docker-custom
|
||||
|
||||
promote-images:
|
||||
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
12
.github/workflows/release.yml
vendored
12
.github/workflows/release.yml
vendored
@@ -52,15 +52,13 @@ jobs:
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
TITLE="Storage & Compute release ${RELEASE_DATE}"
|
||||
|
||||
cat << EOF > body.md
|
||||
## ${TITLE}
|
||||
## Storage & Compute release ${RELEASE_DATE}
|
||||
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
EOF
|
||||
|
||||
gh pr create --title "${TITLE}" \
|
||||
gh pr create --title "Release ${RELEASE_DATE}" \
|
||||
--body-file "body.md" \
|
||||
--head "${RELEASE_BRANCH}" \
|
||||
--base "release"
|
||||
@@ -93,15 +91,13 @@ jobs:
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
TITLE="Proxy release ${RELEASE_DATE}"
|
||||
|
||||
cat << EOF > body.md
|
||||
## ${TITLE}
|
||||
## Proxy release ${RELEASE_DATE}
|
||||
|
||||
**Please merge this Pull Request using 'Create a merge commit' button**
|
||||
EOF
|
||||
|
||||
gh pr create --title "${TITLE}" \
|
||||
gh pr create --title "Proxy release ${RELEASE_DATE}" \
|
||||
--body-file "body.md" \
|
||||
--head "${RELEASE_BRANCH}" \
|
||||
--base "release-proxy"
|
||||
|
||||
@@ -391,7 +391,7 @@ impl Scheduler {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut scores: Vec<(NodeId, AffinityScore, usize, usize)> = self
|
||||
let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
|
||||
.nodes
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
@@ -402,7 +402,6 @@ impl Scheduler {
|
||||
*k,
|
||||
context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
|
||||
v.shard_count,
|
||||
v.attached_shard_count,
|
||||
))
|
||||
}
|
||||
})
|
||||
@@ -410,12 +409,9 @@ impl Scheduler {
|
||||
|
||||
// Sort by, in order of precedence:
|
||||
// 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
|
||||
// 2nd: Attached shard count. Within nodes with the same affinity, we always pick the node with
|
||||
// the least number of attached shards.
|
||||
// 3rd: Total shard count. Within nodes with the same affinity and attached shard count, use nodes
|
||||
// with the lower total shard count.
|
||||
// 4th: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
|
||||
scores.sort_by_key(|i| (i.1, i.3, i.2, i.0));
|
||||
// 2nd: Utilization. Within nodes with the same affinity, use the least loaded nodes.
|
||||
// 3rd: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
|
||||
scores.sort_by_key(|i| (i.1, i.2, i.0));
|
||||
|
||||
if scores.is_empty() {
|
||||
// After applying constraints, no pageservers were left.
|
||||
|
||||
@@ -5281,6 +5281,7 @@ impl Service {
|
||||
let mut last_inspected_shard: Option<TenantShardId> = None;
|
||||
let mut inspected_all_shards = false;
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
while !inspected_all_shards {
|
||||
if cancel.is_cancelled() {
|
||||
@@ -5321,26 +5322,36 @@ impl Service {
|
||||
}
|
||||
};
|
||||
|
||||
match tenant_shard.reschedule_to_secondary(Some(node_id), None, scheduler) {
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Scheduling error when draining pageserver {} : {e}", node_id
|
||||
);
|
||||
}
|
||||
Ok(()) => {
|
||||
let scheduled_to = tenant_shard.intent.get_attached();
|
||||
tracing::info!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Rescheduled shard while draining node {}: {} -> {:?}",
|
||||
node_id,
|
||||
node_id,
|
||||
scheduled_to
|
||||
);
|
||||
// Reset the scheduling context if we have moved over to a new tenant.
|
||||
// This is required since the affinity scores stored in the scheduling
|
||||
// context should be tenant specific. Note that we are relying on
|
||||
// [`ServiceState::tenants`] being ordered by tenant id.
|
||||
if last_inspected_shard.map(|tid| tid.tenant_id) != Some(tid.tenant_id) {
|
||||
schedule_context = ScheduleContext::default();
|
||||
}
|
||||
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Scheduling error when draining pageserver {} : {e}", node_id
|
||||
);
|
||||
}
|
||||
Ok(()) => {
|
||||
let scheduled_to = tenant_shard.intent.get_attached();
|
||||
tracing::info!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Rescheduled shard while draining node {}: {} -> {:?}",
|
||||
node_id,
|
||||
node_id,
|
||||
scheduled_to
|
||||
);
|
||||
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5391,9 +5402,6 @@ impl Service {
|
||||
/// throughout the cluster. We achieve this by picking tenant shards from each node,
|
||||
/// starting from the ones with the largest number of attached shards, until the node
|
||||
/// reaches the expected cluster average.
|
||||
/// 3. Avoid promoting more shards of the same tenant than required. The upper bound
|
||||
/// for the number of tenants from the same shard promoted to the node being filled is:
|
||||
/// shard count for the tenant divided by the number of nodes in the cluster.
|
||||
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
|
||||
@@ -5415,18 +5423,8 @@ impl Service {
|
||||
let expected_attached = locked.scheduler.expected_attached_shard_count();
|
||||
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
|
||||
|
||||
let mut promoted_per_tenant: HashMap<TenantId, usize> = HashMap::new();
|
||||
let mut plan = Vec::new();
|
||||
|
||||
for (node_id, attached) in nodes_by_load {
|
||||
let available = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.map_or(false, |n| n.is_available());
|
||||
if !available {
|
||||
continue;
|
||||
}
|
||||
|
||||
if plan.len() >= fill_requirement
|
||||
|| tids_by_node.is_empty()
|
||||
|| attached <= expected_attached
|
||||
@@ -5434,22 +5432,13 @@ impl Service {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut can_take = attached - expected_attached;
|
||||
let can_take = attached - expected_attached;
|
||||
let mut remove_node = false;
|
||||
while can_take > 0 {
|
||||
for _ in 0..can_take {
|
||||
match tids_by_node.get_mut(&node_id) {
|
||||
Some(tids) => match tids.pop() {
|
||||
Some(tid) => {
|
||||
let max_promote_for_tenant = std::cmp::max(
|
||||
tid.shard_count.count() as usize / locked.nodes.len(),
|
||||
1,
|
||||
);
|
||||
let promoted = promoted_per_tenant.entry(tid.tenant_id).or_default();
|
||||
if *promoted < max_promote_for_tenant {
|
||||
plan.push(tid);
|
||||
*promoted += 1;
|
||||
can_take -= 1;
|
||||
}
|
||||
plan.push(tid);
|
||||
}
|
||||
None => {
|
||||
remove_node = true;
|
||||
@@ -5483,7 +5472,9 @@ impl Service {
|
||||
// secondaries are warm. This is not always true (e.g. we just migrated the
|
||||
// tenant). Take that into consideration by checking the secondary status.
|
||||
let mut tids_to_promote = self.fill_node_plan(node_id);
|
||||
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
// Execute the plan we've composed above. Before aplying each move from the plan,
|
||||
// we validate to ensure that it has not gone stale in the meantime.
|
||||
@@ -5509,8 +5500,17 @@ impl Service {
|
||||
));
|
||||
}
|
||||
|
||||
let mut last_inspected_tenant = None;
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
if let Some(tid) = tids_to_promote.pop() {
|
||||
// Reset the scheduling context if we have moved over to a new tenant.
|
||||
// This is required since the affinity scores stored in the scheduling
|
||||
// context should be tenant specific. Note that we are relying on the
|
||||
// result [`Service::fill_node_plan`] being ordered by tenant id.
|
||||
if last_inspected_tenant != Some(tid.tenant_id) {
|
||||
schedule_context = ScheduleContext::default();
|
||||
}
|
||||
|
||||
if let Some(tenant_shard) = tenants.get_mut(&tid) {
|
||||
// If the node being filled is not a secondary anymore,
|
||||
// skip the promotion.
|
||||
@@ -5519,11 +5519,9 @@ impl Service {
|
||||
}
|
||||
|
||||
let previously_attached_to = *tenant_shard.intent.get_attached();
|
||||
match tenant_shard.reschedule_to_secondary(
|
||||
previously_attached_to,
|
||||
Some(node_id),
|
||||
scheduler,
|
||||
) {
|
||||
|
||||
tenant_shard.intent.promote_attached(scheduler, node_id);
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
@@ -5547,6 +5545,8 @@ impl Service {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
last_inspected_tenant = Some(tid.tenant_id);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -646,37 +646,6 @@ impl TenantShard {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: comment
|
||||
pub(crate) fn reschedule_to_secondary(
|
||||
&mut self,
|
||||
attached_to: Option<NodeId>,
|
||||
promote_to: Option<NodeId>,
|
||||
scheduler: &mut Scheduler,
|
||||
) -> Result<(), ScheduleError> {
|
||||
let promote_to = match promote_to {
|
||||
Some(node) => node,
|
||||
None => match scheduler.node_preferred(self.intent.get_secondary()) {
|
||||
Some(node) => node,
|
||||
None => {
|
||||
return Err(ScheduleError::ImpossibleConstraint);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
assert!(self.intent.get_secondary().contains(&promote_to));
|
||||
|
||||
if let Some(node) = attached_to {
|
||||
let demoted = self.intent.demote_attached(scheduler, node);
|
||||
if !demoted {
|
||||
return Err(ScheduleError::ImpossibleConstraint);
|
||||
}
|
||||
}
|
||||
|
||||
self.intent.promote_attached(scheduler, promote_to);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Optimize attachments: if a shard has a secondary location that is preferable to
|
||||
/// its primary location based on soft constraints, switch that secondary location
|
||||
/// to be attached.
|
||||
|
||||
@@ -3446,12 +3446,11 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
|
||||
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
|
||||
|
||||
# Semaphore is set to 1 when we start, and acquire'd back to zero when we stop
|
||||
#
|
||||
# We use a semaphore rather than a bool so that racing calls to stop() don't
|
||||
# try and stop the same process twice, as stop() is called by test teardown and
|
||||
# potentially by some __del__ chains in other threads.
|
||||
self._running = threading.Semaphore(0)
|
||||
# This lock prevents concurrent start & stop operations, keeping `self.running` consistent
|
||||
# with whether we're really running. Tests generally wouldn't try and do these concurrently,
|
||||
# but endpoints are also stopped during test teardown, which might happen concurrently with
|
||||
# destruction of objects in tests.
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def http_client(
|
||||
self, auth_token: Optional[str] = None, retries: Optional[Retry] = None
|
||||
@@ -3523,14 +3522,15 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
log.info(f"Starting postgres endpoint {self.endpoint_id}")
|
||||
|
||||
self.env.neon_cli.endpoint_start(
|
||||
self.endpoint_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
self._running.release(1)
|
||||
with self.lock:
|
||||
self.env.neon_cli.endpoint_start(
|
||||
self.endpoint_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
self.running = True
|
||||
|
||||
return self
|
||||
|
||||
@@ -3578,12 +3578,9 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
conf_file.write("\n".join(hba) + "\n")
|
||||
conf_file.write(data)
|
||||
|
||||
if self.is_running():
|
||||
if self.running:
|
||||
self.safe_psql("SELECT pg_reload_conf()")
|
||||
|
||||
def is_running(self):
|
||||
return self._running._value > 0
|
||||
|
||||
def reconfigure(self, pageserver_id: Optional[int] = None):
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
|
||||
@@ -3632,12 +3629,13 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
running = self._running.acquire(blocking=False)
|
||||
if running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
with self.lock:
|
||||
if self.running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
|
||||
@@ -3647,13 +3645,13 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
running = self._running.acquire(blocking=False)
|
||||
if running:
|
||||
with self.lock:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.endpoint_id = None
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
|
||||
|
||||
@@ -3,121 +3,15 @@ import random
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from collections import defaultdict
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder, StorageControllerApiException, NeonEnv
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pg_version import PgVersion
|
||||
from typing import Dict
|
||||
|
||||
def get_consistent_node_shard_counts(env: NeonEnv, total_shards):
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
|
||||
intent = dict()
|
||||
observed = dict()
|
||||
|
||||
tenant_placement: defaultdict[str, Dict] = defaultdict(lambda: {"observed": {"attached": None, "secondary": []}, "intent": {"attached": None, "secondary": []}})
|
||||
|
||||
for t in tenants:
|
||||
for node_id, loc_state in t["observed"]["locations"].items():
|
||||
if (
|
||||
loc_state is not None
|
||||
and "conf" in loc_state
|
||||
and loc_state["conf"] is not None
|
||||
and loc_state["conf"]["mode"] in set(["AttachedSingle", "AttachedMulti", "AttachedStale"])
|
||||
):
|
||||
observed[t["tenant_shard_id"]] = int(node_id)
|
||||
tenant_placement[t["tenant_shard_id"]]["observed"]["attached"] = int(node_id)
|
||||
|
||||
if (
|
||||
loc_state is not None
|
||||
and "conf" in loc_state
|
||||
and loc_state["conf"] is not None
|
||||
and loc_state["conf"]["mode"] == "Secondary"
|
||||
):
|
||||
tenant_placement[t["tenant_shard_id"]]["observed"]["secondary"].append(int(node_id))
|
||||
|
||||
if "attached" in t["intent"]:
|
||||
intent[t["tenant_shard_id"]] = t["intent"]["attached"]
|
||||
tenant_placement[t["tenant_shard_id"]]["intent"]["attached"] = t["intent"]["attached"]
|
||||
|
||||
if "secondary" in t["intent"]:
|
||||
tenant_placement[t["tenant_shard_id"]]["intent"]["secondary"] += t["intent"]["secondary"]
|
||||
|
||||
log.info(f"{tenant_placement=}")
|
||||
|
||||
matching = {tid: intent[tid] for tid in observed if tid in intent and intent[tid] == observed[tid]}
|
||||
assert len(matching) == total_shards
|
||||
|
||||
attached_per_node: defaultdict[str, int] = defaultdict(int)
|
||||
for node_id in matching.values():
|
||||
attached_per_node[node_id] += 1
|
||||
|
||||
return attached_per_node
|
||||
|
||||
def assert_consistent_balanced_attachments(env: NeonEnv, total_shards):
|
||||
attached_per_node = get_consistent_node_shard_counts(env, total_shards)
|
||||
|
||||
min_shard_count = min(attached_per_node.values())
|
||||
max_shard_count = max(attached_per_node.values())
|
||||
|
||||
flake_factor = 5 / 100
|
||||
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
|
||||
|
||||
def retryable_node_operation(op, ps_id, max_attempts, backoff):
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
op(ps_id)
|
||||
return
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def poll_node_status(env, node_id, desired_scheduling_policy, max_attempts, backoff):
|
||||
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
status = env.storage_controller.node_status(node_id)
|
||||
policy = status["scheduling"]
|
||||
if policy == desired_scheduling_policy:
|
||||
return
|
||||
else:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise AssertionError(
|
||||
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
|
||||
)
|
||||
|
||||
time.sleep(backoff)
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call failed ({max_attempts} retries left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
|
||||
# Assert that all nodes have some attached shards
|
||||
assert len(shard_counts) == len(env.pageservers)
|
||||
|
||||
min_shard_count = min(shard_counts.values())
|
||||
max_shard_count = max(shard_counts.values())
|
||||
|
||||
flake_factor = 5 / 100
|
||||
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
|
||||
|
||||
@pytest.mark.timeout(3600) # super long running test: should go down as we optimize
|
||||
def test_storage_controller_many_tenants(
|
||||
@@ -156,9 +50,6 @@ def test_storage_controller_many_tenants(
|
||||
# of shards are hitting the delayed path.
|
||||
env.storage_controller.allowed_errors.append(".*Many shards are waiting to reconcile")
|
||||
|
||||
# TODO: explain
|
||||
env.storage_controller.allowed_errors.append(".*Scheduling error when draining pageserver.*")
|
||||
|
||||
for ps in env.pageservers:
|
||||
# This can happen because when we do a loop over all pageservers and mark them offline/active,
|
||||
# reconcilers might get cancelled, and the next reconcile can follow a not-so-elegant path of
|
||||
@@ -179,8 +70,6 @@ def test_storage_controller_many_tenants(
|
||||
shard_count = 2
|
||||
stripe_size = 1024
|
||||
|
||||
total_shards = tenant_count * shard_count + 1
|
||||
|
||||
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
|
||||
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
@@ -215,7 +104,6 @@ def test_storage_controller_many_tenants(
|
||||
stripe_size,
|
||||
# Upload heatmaps fast, so that secondary downloads happen promptly, enabling
|
||||
# the controller's optimization migrations to proceed promptly.
|
||||
# TODO: update other test with this and use reconcile_till_idle
|
||||
tenant_config={"heatmap_period": "10s"},
|
||||
placement_policy={"Attached": 1},
|
||||
)
|
||||
@@ -298,40 +186,10 @@ def test_storage_controller_many_tenants(
|
||||
env.storage_controller.consistency_check()
|
||||
check_memory()
|
||||
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts before rolling restart: {shard_counts}")
|
||||
|
||||
assert_consistent_balanced_attachments(env, total_shards)
|
||||
|
||||
# Restart pageservers: this exercises the /re-attach API
|
||||
for ps in env.pageservers:
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(env, ps.id, "PauseForRestart", max_attempts=24, backoff=5)
|
||||
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
|
||||
# Assert that we've drained the node
|
||||
assert shard_counts[str(ps.id)] == 0
|
||||
# Assert that those shards actually went somewhere
|
||||
assert sum(shard_counts.values()) == total_shards
|
||||
|
||||
ps.restart()
|
||||
poll_node_status(env, ps.id, "Active", max_attempts=24, backoff=1)
|
||||
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(env, ps.id, "Active", max_attempts=24, backoff=5)
|
||||
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
|
||||
|
||||
assert_consistent_balanced_attachments(env, total_shards)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.consistency_check()
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
|
||||
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
|
||||
# as they were not offline long enough to trigger any scheduling changes.
|
||||
|
||||
Reference in New Issue
Block a user