Compare commits

...

8 Commits

Author SHA1 Message Date
John Spray
fed137a5dc wip stress tests 2024-06-17 09:10:41 +01:00
John Spray
c2d7aadd70 pageserver: add api-hang failpoint 2024-06-17 09:09:56 +01:00
John Spray
6b6ec97881 tests: storage controller stress test 2024-06-16 14:49:47 +01:00
John Spray
e1e6caceb8 Merge remote-tracking branch 'upstream/vlad/storcon-heartbeat-and-reattach-race-fix' into jcsp/simplified-online-transition 2024-06-16 13:55:57 +01:00
Vlad Lazar
a0fa1b928d storcon: refine heartbeat handling of new addded nodes 2024-06-14 15:21:34 +01:00
Vlad Lazar
d3887504a0 tests: test heartbeats when the entire cluster goes for lunch 2024-06-14 12:06:04 +01:00
Vlad Lazar
677c1662a4 storcon: do not detach tenants when all nodes are unvailable
Previously, when all nodes in the cluster became unavailable at the same
time, we would detach all tenant shards. This is due to a bug in
`Service::node_configure`. If all nodes are unavailable, there's no
chance of reschedulling anything, so we should leave the intent states
untouced.

This commit adds a special case which detects this situation and skips
any reschedullings.
2024-06-14 11:32:04 +01:00
Vlad Lazar
8270b58f39 storcon: handle reattach and heartbeat race
Consider the case when the storage controller handles the re-attach of a node
before the heartbeats detect that the node is back online. We still need
to reconfigure the node (by calling `Service::node_configure`) to migrate
attachments back onto the node.

In order to determine if node reconfiguration is required, we call into
`Node::get_availability_transition`. This commit updates the function
to consider the transition from "node just re-attached" (with no
utilisation score) to "node responded to the first heartbeat after a
period of unavailablity" (with some utilisation score).
2024-06-14 11:28:11 +01:00
7 changed files with 366 additions and 58 deletions

View File

@@ -47,6 +47,7 @@ use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::failpoint_support;
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::prometheus_metrics_handler;
use utils::http::endpoint::request_span;
@@ -2587,7 +2588,9 @@ where
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
{
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap()
&& request.uri() != &"/v1/status".parse::<Uri>().unwrap()
{
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
"failpoint".into()
)));
@@ -2595,6 +2598,11 @@ where
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
anyhow::anyhow!("failpoint")
)));
// This failpoint is meant to check that callers handle hanging requests properly, but not
// to block pageserver clean shutdown: hence it respects the tenant manager cancellation token.
let state = get_state(&request);
failpoint_support::sleep_millis_async!("api-hang", &state.tenant_manager.cancel);
}
// Spawn a new task to handle the request, to protect the handler from unexpected

View File

@@ -298,7 +298,7 @@ pub struct TenantManager {
// This is for edge cases like tenant deletion. In normal cases (within a Tenant lifetime),
// tenants have their own cancellation tokens, which we fire individually in [`Self::shutdown`], or
// when the tenant detaches.
cancel: CancellationToken,
pub(crate) cancel: CancellationToken,
}
fn emergency_generations(

View File

@@ -31,6 +31,7 @@ pub(crate) enum PageserverState {
Available {
last_seen_at: Instant,
utilization: PageserverUtilization,
new: bool,
},
Offline,
}
@@ -127,6 +128,7 @@ impl HeartbeaterTask {
heartbeat_futs.push({
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();
let new_node = !self.state.contains_key(node_id);
// Clone the node and mark it as available such that the request
// goes through to the pageserver even when the node is marked offline.
@@ -159,6 +161,7 @@ impl HeartbeaterTask {
PageserverState::Available {
last_seen_at: Instant::now(),
utilization,
new: new_node,
}
} else {
PageserverState::Offline
@@ -220,6 +223,7 @@ impl HeartbeaterTask {
}
},
Vacant(_) => {
// This is a new node. Don't generate a delta for it.
deltas.push((node_id, ps_state.clone()));
}
}

View File

@@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
TenantLocateResponseShard,
TenantLocateResponseShard, UtilizationScore,
},
shard::TenantShardId,
};
@@ -116,6 +116,15 @@ impl Node {
match (self.availability, availability) {
(Offline, Active(_)) => ToActive,
(Active(_), Offline) => ToOffline,
// Consider the case when the storage controller handles the re-attach of a node
// before the heartbeats detect that the node is back online. We still need
// [`Service::node_configure`] to migrate attachments back onto the node.
// The unsavoury match arm below handles this situation.
(Active(lhs), Active(rhs))
if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() =>
{
ToActive
}
_ => Unchanged,
}
}

View File

@@ -747,29 +747,61 @@ impl Service {
let res = self.heartbeater.heartbeat(nodes).await;
if let Ok(deltas) = res {
for (node_id, state) in deltas.0 {
let new_availability = match state {
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
UtilizationScore(utilization.utilization_score),
let (new_node, new_availability) = match state {
PageserverState::Available {
utilization, new, ..
} => (
new,
NodeAvailability::Active(UtilizationScore(
utilization.utilization_score,
)),
),
PageserverState::Offline => NodeAvailability::Offline,
PageserverState::Offline => (false, NodeAvailability::Offline),
};
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;
match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!("Node {} was not found after heartbeat round", node_id);
if new_node {
// When the heartbeats detect a newly added node, we don't wish
// to attempt to reconcile the shards assigned to it. The node
// is likely handling it's re-attach response, so reconciling now
// would be counterproductive.
//
// Instead, update the in-memory state with the details learned about the
// node.
let mut locked = self.inner.write().unwrap();
let (nodes, _tenants, scheduler) = locked.parts_mut();
let mut new_nodes = (**nodes).clone();
if let Some(node) = new_nodes.get_mut(&node_id) {
node.set_availability(new_availability);
scheduler.node_upsert(node);
}
Err(err) => {
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);
locked.nodes = Arc::new(new_nodes);
} else {
// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
.node_configure(node_id, Some(new_availability), None)
.await;
match res {
Ok(()) => {}
Err(ApiError::NotFound(_)) => {
// This should be rare, but legitimate since the heartbeats are done
// on a snapshot of the nodes.
tracing::info!(
"Node {} was not found after heartbeat round",
node_id
);
}
Err(err) => {
tracing::error!(
"Failed to update node {} after heartbeat round: {}",
node_id,
err
);
}
}
}
}
@@ -4316,6 +4348,13 @@ impl Service {
continue;
}
if !new_nodes.values().any(Node::is_available) {
// Special case for when all nodes are unavailable: there is no point
// trying to reschedule since there's nowhere else to go. Without this
// branch we incorrectly detach tenants in response to node unavailability.
continue;
}
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
@@ -4353,6 +4392,12 @@ impl Service {
// When a node comes back online, we must reconcile any tenant that has a None observed
// location on the node.
for tenant_shard in locked.tenants.values_mut() {
// If a reconciliation is already in progress, rely on the previous scheduling
// decision and skip triggering a new reconciliation.
if tenant_shard.reconciler.is_some() {
continue;
}
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(tenant_shard, &new_nodes);

View File

@@ -934,19 +934,27 @@ class Failure:
def clear(self, env: NeonEnv):
raise NotImplementedError()
def nodes(self):
raise NotImplementedError()
class NodeStop(Failure):
def __init__(self, pageserver_id, immediate):
self.pageserver_id = pageserver_id
def __init__(self, pageserver_ids, immediate):
self.pageserver_ids = pageserver_ids
self.immediate = immediate
def apply(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.stop(immediate=self.immediate)
for ps_id in self.pageserver_ids:
pageserver = env.get_pageserver(ps_id)
pageserver.stop(immediate=self.immediate)
def clear(self, env: NeonEnv):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.start()
for ps_id in self.pageserver_ids:
pageserver = env.get_pageserver(ps_id)
pageserver.start()
def nodes(self):
return self.pageserver_ids
class PageserverFailpoint(Failure):
@@ -962,6 +970,9 @@ class PageserverFailpoint(Failure):
pageserver = env.get_pageserver(self.pageserver_id)
pageserver.http_client().configure_failpoints((self.failpoint, "off"))
def nodes(self):
return [self.pageserver_id]
def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
tenants = env.storage_controller.tenant_list()
@@ -985,8 +996,9 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
@pytest.mark.parametrize(
"failure",
[
NodeStop(pageserver_id=1, immediate=False),
NodeStop(pageserver_id=1, immediate=True),
NodeStop(pageserver_ids=[1], immediate=False),
NodeStop(pageserver_ids=[1], immediate=True),
NodeStop(pageserver_ids=[1, 2], immediate=True),
PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"),
],
)
@@ -1039,33 +1051,50 @@ def test_storage_controller_heartbeats(
wait_until(10, 1, tenants_placed)
# ... then we apply the failure
offline_node_id = failure.pageserver_id
online_node_id = (set(range(1, len(env.pageservers) + 1)) - {offline_node_id}).pop()
env.get_pageserver(offline_node_id).allowed_errors.append(
# In the case of the failpoint failure, the impacted pageserver
# still believes it has the tenant attached since location
# config calls into it will fail due to being marked offline.
".*Dropped remote consistent LSN updates.*",
)
offline_node_ids = set(failure.nodes())
online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids
for node_id in offline_node_ids:
env.get_pageserver(node_id).allowed_errors.append(
# In the case of the failpoint failure, the impacted pageserver
# still believes it has the tenant attached since location
# config calls into it will fail due to being marked offline.
".*Dropped remote consistent LSN updates.*",
)
if len(offline_node_ids) > 1:
env.get_pageserver(node_id).allowed_errors.append(
".*Scheduling error when marking pageserver.*offline.*",
)
failure.apply(env)
# ... expecting the heartbeats to mark it offline
def node_offline():
def nodes_offline():
nodes = env.storage_controller.node_list()
log.info(f"{nodes=}")
target = next(n for n in nodes if n["id"] == offline_node_id)
assert target["availability"] == "Offline"
for node in nodes:
if node["id"] in offline_node_ids:
assert node["availability"] == "Offline"
# A node is considered offline if the last successful heartbeat
# was more than 10 seconds ago (hardcoded in the storage controller).
wait_until(20, 1, node_offline)
wait_until(20, 1, nodes_offline)
# .. expecting the tenant on the offline node to be migrated
def tenant_migrated():
if len(online_node_ids) == 0:
time.sleep(5)
return
node_to_tenants = build_node_to_tenants_map(env)
log.info(f"{node_to_tenants=}")
assert set(node_to_tenants[online_node_id]) == set(tenant_ids)
observed_tenants = set()
for node_id in online_node_ids:
observed_tenants |= set(node_to_tenants[node_id])
assert observed_tenants == set(tenant_ids)
wait_until(10, 1, tenant_migrated)
@@ -1073,31 +1102,24 @@ def test_storage_controller_heartbeats(
failure.clear(env)
# ... expecting the offline node to become active again
def node_online():
def nodes_online():
nodes = env.storage_controller.node_list()
target = next(n for n in nodes if n["id"] == offline_node_id)
assert target["availability"] == "Active"
for node in nodes:
if node["id"] in online_node_ids:
assert node["availability"] == "Active"
wait_until(10, 1, node_online)
wait_until(10, 1, nodes_online)
time.sleep(5)
# ... then we create a new tenant
tid = TenantId.generate()
env.storage_controller.tenant_create(tid)
# ... expecting it to be placed on the node that just came back online
tenants = env.storage_controller.tenant_list()
newest_tenant = next(t for t in tenants if t["tenant_shard_id"] == str(tid))
locations = list(newest_tenant["observed"]["locations"].keys())
locations = [int(node_id) for node_id in locations]
assert locations == [offline_node_id]
node_to_tenants = build_node_to_tenants_map(env)
log.info(f"Back online: {node_to_tenants=}")
# ... expecting the storage controller to reach a consistent state
def storage_controller_consistent():
env.storage_controller.consistency_check()
wait_until(10, 1, storage_controller_consistent)
wait_until(30, 1, storage_controller_consistent)
def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):

View File

@@ -0,0 +1,220 @@
import concurrent.futures
import random
from collections import defaultdict
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.utils import wait_until
from fixtures.workload import Workload
def get_node_shard_counts(env: NeonEnv, tenant_ids):
total: defaultdict[int, int] = defaultdict(int)
attached: defaultdict[int, int] = defaultdict(int)
for tid in tenant_ids:
for shard in env.storage_controller.tenant_describe(tid)["shards"]:
log.info(
f"{shard['tenant_shard_id']}: attached={shard['node_attached']}, secondary={shard['node_secondary']} "
)
for node in shard["node_secondary"]:
total[int(node)] += 1
attached[int(shard["node_attached"])] += 1
total[int(shard["node_attached"])] += 1
return total, attached
FAILPOINT_API_503 = ("api-503", "5%1000*return(1)")
FAILPOINT_API_500 = ("api-500", "5%1000*return(1)")
FAILPOINT_API_HANG = ("api-hang", "5%1000*return(60000)")
@pytest.mark.parametrize(
"failpoints",
[
# [],
# [FAILPOINT_API_503, FAILPOINT_API_500],
# [FAILPOINT_API_HANG],
[FAILPOINT_API_503, FAILPOINT_API_500, FAILPOINT_API_HANG],
],
)
def test_storcon_rolling_failures(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure, failpoints
):
neon_env_builder.num_pageservers = 8
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
workloads: dict[TenantId, Workload] = {}
env = neon_env_builder.init_start()
env.storage_controller.allowed_errors.extend(
[
# This log is emitted when a node comes online but then fails to respond to a request: this is
# expected, because we do API-level failure injection.
".*Failed to query node location configs, cannot activate.*"
]
)
for ps in env.pageservers:
# We will do unclean detaches
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
n_tenants = 8
tenants = [(env.initial_tenant, env.initial_timeline)]
for i in range(0, n_tenants - 1):
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
shard_count = [1, 2, 4][i % 3]
env.neon_cli.create_tenant(
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Attached":1}'
)
tenants.append((tenant_id, timeline_id))
# Background pain:
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
# the ones we're using for availability checks.
rng = random.Random(0xDEADBEEF)
for tenant_id, timeline_id in tenants:
workload = Workload(env, tenant_id, timeline_id)
compute_reconfigure_listener.register_workload(workload)
workloads[tenant_id] = workload
def node_evacuated(node_id: int):
total, attached = get_node_shard_counts(env, [t[0] for t in tenants])
assert attached[node_id] == 0
def attachments_active():
for tid, _tlid in tenants:
for shard in env.storage_controller.locate(tid):
psid = shard["node_id"]
tsid = TenantShardId.parse(shard["shard_id"])
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
log.info(f"Shard {tsid} status on node {psid}: {status}")
assert status["state"]["slug"] == "Active"
log.info(f"Shard {tsid} active on node {psid}")
failpoints = [("api-503", "5%1000*return(1)")]
# failpoints_str = f"{failpoints[0]}={failpoints[1]}"
failpoints_str = ",".join(f"{f[0]}={f[1]}" for f in failpoints)
for ps in env.pageservers:
ps.http_client().configure_failpoints(failpoints)
def for_all_workloads(callback, timeout=60):
futs = []
with concurrent.futures.ThreadPoolExecutor() as pool:
for _tenant_id, workload in workloads.items():
futs.append(pool.submit(callback, workload))
for f in futs:
f.result(timeout=timeout)
def clean_fail_restore():
"""
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
locations to activate, then SIGTERM it.
- Endpoints should not fail any queries
- New attach locations should activate within bounded time.
"""
victim = rng.choice(env.pageservers)
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
victim.stop(immediate=False)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# Revert shards to attach at their original locations
# TODO
# env.storage_controller.balance_attached()
wait_until(10, 1, attachments_active)
def hard_fail_restore():
"""
Simulate an unexpected death of a pageserver node
"""
victim = rng.choice(env.pageservers)
victim.stop(immediate=True)
# TODO: once we implement heartbeats detecting node failures, remove this
# explicit marking offline and rely on storage controller to detect it itself.
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# TODO
# env.storage_controller.balance_attached()
wait_until(10, 1, attachments_active)
def traffic():
"""
Check that all tenants are working for postgres clients
"""
def exercise_one(workload):
workload.churn_rows(100)
workload.validate()
for_all_workloads(exercise_one)
def init_one(workload):
workload.init()
workload.write_rows(100)
for_all_workloads(init_one, timeout=60)
for i in range(0, 4):
mode = rng.choice([0, 1, 2, 3])
log.info(f"Iteration {i}, mode {mode}")
if mode == 0:
# Traffic interval: sometimes, instead of a failure, just let the clients
# write a load of data. This avoids chaos tests ending up with unrealistically
# small quantities of data in flight.
traffic()
elif mode == 1:
# Consistency check: quiesce the controller and check that runtime state matches
# database. We intentionally do _not_ do this on every iteration, so that we sometimes leave
# some background reconciliations running across iterations, rather than entering each iteration
# in a pristine state.
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
elif mode == 2:
clean_fail_restore()
elif mode == 3:
hard_fail_restore()
# For convenience when developing, we surface unexpected log errors at every iteration rather than waiting
# for the end of the test.
env.storage_controller.assert_no_errors()
for ps in env.pageservers:
ps.assert_no_errors()
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
# Success criteria:
# - New attach locations should activate within bounded time
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
# TODO: fail and remove: fail a node, and remove it from the cluster.
# Success criteria:
# - Endpoints should not fail any queries
# - New attach locations should activate within bounded time
# - New secondary locations should fill up with data within bounded time
# Final check that we can reconcile to a clean state
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()