mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
tests: controller rolling failure test
This commit is contained in:
@@ -1518,6 +1518,7 @@ class NeonCli(AbstractNeonCli):
|
||||
conf: Optional[Dict[str, Any]] = None,
|
||||
shard_count: Optional[int] = None,
|
||||
shard_stripe_size: Optional[int] = None,
|
||||
placement_policy: Optional[str] = None,
|
||||
set_default: bool = False,
|
||||
) -> Tuple[TenantId, TimelineId]:
|
||||
"""
|
||||
@@ -1551,6 +1552,9 @@ class NeonCli(AbstractNeonCli):
|
||||
if shard_stripe_size is not None:
|
||||
args.extend(["--shard-stripe-size", str(shard_stripe_size)])
|
||||
|
||||
if placement_policy is not None:
|
||||
args.extend(["--placement-policy", str(placement_policy)])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
return tenant_id, timeline_id
|
||||
@@ -2168,6 +2172,20 @@ class NeonAttachmentService(MetricsGetter):
|
||||
)
|
||||
log.info("Attachment service passed consistency check")
|
||||
|
||||
def balance_all(self):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/control/v1/balance/all",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def balance_attached(self):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/control/v1/balance/attached",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def __enter__(self) -> "NeonAttachmentService":
|
||||
return self
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -11,6 +12,10 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||
ENDPOINT_LOCK = threading.Lock()
|
||||
|
||||
|
||||
class Workload:
|
||||
"""
|
||||
@@ -41,17 +46,30 @@ class Workload:
|
||||
|
||||
self._endpoint: Optional[Endpoint] = None
|
||||
|
||||
def reconfigure(self):
|
||||
"""
|
||||
Request the endpoint to reconfigure based on location reported by storage controller
|
||||
"""
|
||||
if self._endpoint is not None:
|
||||
with ENDPOINT_LOCK:
|
||||
self._endpoint.reconfigure()
|
||||
|
||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id="ep-workload",
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
# We may be running alongside other Workloads for different tenants. Full TTID is
|
||||
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
|
||||
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
|
||||
|
||||
with ENDPOINT_LOCK:
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id=endpoint_id,
|
||||
)
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
|
||||
connstring = self._endpoint.safe_psql(
|
||||
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import concurrent.futures
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
@@ -23,8 +25,9 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
@@ -770,3 +773,186 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
|
||||
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
|
||||
def test_storcon_rolling_failures(
|
||||
neon_env_builder: NeonEnvBuilder, httpserver: HTTPServer, httpserver_listen_address
|
||||
):
|
||||
neon_env_builder.num_pageservers = 8
|
||||
|
||||
(host, port) = httpserver_listen_address
|
||||
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify-attach"
|
||||
|
||||
workloads: dict[TenantId, Workload] = {}
|
||||
|
||||
# Do neon_local endpoint reconfiguration in the background so that we can
|
||||
# accept a healthy rate of calls into notify-attach.
|
||||
reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
|
||||
def handler(request: Request):
|
||||
"""
|
||||
Although storage controller can use neon_local directly, this causes problems when
|
||||
the test is also concurrently modifying endpoints. Instead, configure storage controller
|
||||
to send notifications up to this test code, which will route all endpoint updates
|
||||
through Workload, which has a mutex to make it all safe.
|
||||
"""
|
||||
assert request.json is not None
|
||||
body: dict[str, Any] = request.json
|
||||
log.info(f"notify-attach request: {body}")
|
||||
|
||||
try:
|
||||
workload = workloads[TenantId(body["tenant_id"])]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
# This causes the endpoint to query storage controller for its location, which
|
||||
# is redundant since we already have it here, but this avoids extending the
|
||||
# neon_local CLI to take full lists of locations
|
||||
reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return]
|
||||
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for ps in env.pageservers:
|
||||
# We will do unclean detaches
|
||||
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||
|
||||
n_tenants = 32
|
||||
tenants = [(env.initial_tenant, env.initial_timeline)]
|
||||
for i in range(0, n_tenants - 1):
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
shard_count = [1, 2, 4][i % 3]
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
|
||||
)
|
||||
tenants.append((tenant_id, timeline_id))
|
||||
|
||||
# Background pain:
|
||||
# - TODO: some fraction of pageserver API requests hang
|
||||
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
|
||||
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
|
||||
# the ones we're using for availability checks.
|
||||
|
||||
rng = random.Random(0xDEADBEEF)
|
||||
|
||||
for tenant_id, timeline_id in tenants:
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workloads[tenant_id] = workload
|
||||
|
||||
def node_evacuated(node_id: int):
|
||||
counts = get_node_shard_counts(env, [t[0] for t in tenants])
|
||||
assert counts[node_id] == 0
|
||||
|
||||
def attachments_active():
|
||||
for tid, _tlid in tenants:
|
||||
for shard in env.attachment_service.locate(tid):
|
||||
psid = shard["node_id"]
|
||||
tsid = TenantShardId.parse(shard["shard_id"])
|
||||
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
|
||||
assert status["state"]["slug"] == "Active"
|
||||
log.info(f"Shard {tsid} active on node {psid}")
|
||||
|
||||
failpoints = ("api-503", "5%1000*return(1)")
|
||||
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints(failpoints)
|
||||
|
||||
def for_all_workloads(callback, timeout=60):
|
||||
futs = []
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
for _tenant_id, workload in workloads.items():
|
||||
futs.append(pool.submit(callback, workload))
|
||||
|
||||
for f in futs:
|
||||
f.result(timeout=timeout)
|
||||
|
||||
def clean_fail_restore():
|
||||
"""
|
||||
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
|
||||
locations to activate, then SIGTERM it.
|
||||
- Endpoints should not fail any queries
|
||||
- New attach locations should activate within bounded time.
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
|
||||
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
victim.stop(immediate=False)
|
||||
|
||||
traffic()
|
||||
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
|
||||
# Revert shards to attach at their original locations
|
||||
env.attachment_service.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def hard_fail_restore():
|
||||
"""
|
||||
Simulate an unexpected death of a pageserver node
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
victim.stop(immediate=True)
|
||||
# TODO: once we implement heartbeats detecting node failures, remove this
|
||||
# explicit marking offline and rely on storage controller to detect it itself.
|
||||
env.attachment_service.node_configure(victim.id, {"availability": "Offline"})
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
traffic()
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
env.attachment_service.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def traffic():
|
||||
"""
|
||||
Check that all tenants are working for postgres clients
|
||||
"""
|
||||
|
||||
def exercise_one(workload):
|
||||
workload.churn_rows(100)
|
||||
workload.validate()
|
||||
|
||||
for_all_workloads(exercise_one)
|
||||
|
||||
def init_one(workload):
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
for_all_workloads(init_one, timeout=60)
|
||||
|
||||
for i in range(0, 20):
|
||||
mode = rng.choice([0, 1, 2])
|
||||
log.info(f"Iteration {i}, mode {mode}")
|
||||
if mode == 0:
|
||||
# Traffic interval: sometimes, instead of a failure, just let the clients
|
||||
# write a load of data. This avoids chaos tests ending up with unrealistically
|
||||
# small quantities of data in flight.
|
||||
traffic()
|
||||
elif mode == 1:
|
||||
clean_fail_restore()
|
||||
elif mode == 2:
|
||||
hard_fail_restore()
|
||||
|
||||
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
|
||||
# Success criteria:
|
||||
# - New attach locations should activate within bounded time
|
||||
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
|
||||
|
||||
# TODO: fail and remove: fail a node, and remove it from the cluster.
|
||||
# Success criteria:
|
||||
# - Endpoints should not fail any queries
|
||||
# - New attach locations should activate within bounded time
|
||||
# - New secondary locations should fill up with data within bounded time
|
||||
|
||||
# TODO: somehow need to wait for reconciles to complete before doing consistency check
|
||||
# (or make the check wait).
|
||||
|
||||
# Do consistency check on every iteration, not just at the end: this makes it more obvious
|
||||
# which change caused an issue.
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
Reference in New Issue
Block a user