tests: controller stress

This commit is contained in:
John Spray
2024-03-28 17:56:46 +00:00
parent 6d2a752dac
commit 1cdfa198ef
3 changed files with 286 additions and 0 deletions

View File

@@ -364,3 +364,67 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
# Check that we can create slot with the same name
ws_cur = ws_branch.connect().cursor()
ws_cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
def test_replication_shutdown(neon_simple_env: NeonEnv):
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
env = neon_simple_env
env.neon_cli.create_branch("test_replication_shutdown_publisher", "empty")
pub = env.endpoints.create("test_replication_shutdown_publisher")
env.neon_cli.create_branch("test_replication_shutdown_subscriber")
sub = env.endpoints.create("test_replication_shutdown_subscriber")
pub.respec(skip_pg_catalog_updates=False)
pub.start()
sub.respec(skip_pg_catalog_updates=False)
sub.start()
pub.wait_for_migrations()
sub.wait_for_migrations()
with pub.cursor() as cur:
cur.execute(
"CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
)
cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
# If we don't do this, creating the subscription will fail later on PG16
pub.edit_hba(["host all mr_whiskers 0.0.0.0/0 md5"])
with sub.cursor() as cur:
cur.execute(
"CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
)
cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
cur.execute("CREATE PUBLICATION pub FOR ALL TABLES")
cur.execute("CREATE TABLE t (a int)")
cur.execute("INSERT INTO t VALUES (10), (20)")
cur.execute("SELECT * from t")
res = cur.fetchall()
assert [r[0] for r in res] == [10, 20]
with sub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
cur.execute("CREATE TABLE t (a int)")
pub_conn = f"host=localhost port={pub.pg_port} dbname=neondb user=mr_whiskers password=cat"
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub"
log.info(f"Creating subscription: {query}")
cur.execute(query)
with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as pcur:
pcur.execute("INSERT INTO t VALUES (30), (40)")
def check_that_changes_propagated():
cur.execute("SELECT * FROM t")
res = cur.fetchall()
log.info(res)
assert len(res) == 4
assert [r[0] for r in res] == [10, 20, 30, 40]
wait_until(10, 0.5, check_that_changes_propagated)

View File

@@ -1,3 +1,4 @@
import time
from contextlib import closing
from fixtures.log_helper import log
@@ -43,6 +44,12 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT extversion from pg_extension where extname='neon'")
# IMPORTANT:
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.3",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.3", "1.2", "1.1", "1.0"]
current_version = "1.3"
for idx, begin_version in enumerate(all_versions):
@@ -60,3 +67,30 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
cur.execute(
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {target_version}->{begin_version}"
)
# Verify that the neon extension can be auto-upgraded to the latest version.
def test_neon_extension_auto_upgrade(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_neon_extension_auto_upgrade")
endpoint_main = env.endpoints.create("test_neon_extension_auto_upgrade")
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("ALTER EXTENSION neon UPDATE TO '1.0';")
cur.execute("SELECT extversion from pg_extension where extname='neon'")
assert cur.fetchone() == ("1.0",) # Ensure the extension gets downgraded
endpoint_main.stop()
time.sleep(1)
endpoint_main.start()
time.sleep(1)
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT extversion from pg_extension where extname='neon'")
assert cur.fetchone() != ("1.0",) # Ensure the extension gets upgraded

View File

@@ -0,0 +1,188 @@
import concurrent.futures
import random
from collections import defaultdict
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.types import TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
def get_node_shard_counts(env: NeonEnv, tenant_ids):
total: defaultdict[int, int] = defaultdict(int)
attached: defaultdict[int, int] = defaultdict(int)
for tid in tenant_ids:
for shard in env.storage_controller.tenant_describe(tid)["shards"]:
log.info(
f"{shard['tenant_shard_id']}: attached={shard['node_attached']}, secondary={shard['node_secondary']} "
)
for node in shard["node_secondary"]:
total[int(node)] += 1
attached[int(shard["node_attached"])] += 1
total[int(shard["node_attached"])] += 1
return total, attached
def test_storcon_rolling_failures(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
):
neon_env_builder.num_pageservers = 8
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
workloads: dict[TenantId, Workload] = {}
env = neon_env_builder.init_start()
for ps in env.pageservers:
# We will do unclean detaches
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
n_tenants = 32
tenants = [(env.initial_tenant, env.initial_timeline)]
for i in range(0, n_tenants - 1):
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
shard_count = [1, 2, 4][i % 3]
env.neon_cli.create_tenant(
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
)
tenants.append((tenant_id, timeline_id))
# Background pain:
# - TODO: some fraction of pageserver API requests hang
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
# the ones we're using for availability checks.
rng = random.Random(0xDEADBEEF)
for tenant_id, timeline_id in tenants:
workload = Workload(env, tenant_id, timeline_id)
compute_reconfigure_listener.register_workload(workload)
workloads[tenant_id] = workload
def node_evacuated(node_id: int):
total, attached = get_node_shard_counts(env, [t[0] for t in tenants])
assert attached[node_id] == 0
def attachments_active():
for tid, _tlid in tenants:
for shard in env.storage_controller.locate(tid):
psid = shard["node_id"]
tsid = TenantShardId.parse(shard["shard_id"])
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
assert status["state"]["slug"] == "Active"
log.info(f"Shard {tsid} active on node {psid}")
failpoints = ("api-503", "5%1000*return(1)")
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
for ps in env.pageservers:
ps.http_client().configure_failpoints(failpoints)
def for_all_workloads(callback, timeout=60):
futs = []
with concurrent.futures.ThreadPoolExecutor() as pool:
for _tenant_id, workload in workloads.items():
futs.append(pool.submit(callback, workload))
for f in futs:
f.result(timeout=timeout)
def clean_fail_restore():
"""
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
locations to activate, then SIGTERM it.
- Endpoints should not fail any queries
- New attach locations should activate within bounded time.
"""
victim = rng.choice(env.pageservers)
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
victim.stop(immediate=False)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# Revert shards to attach at their original locations
# TODO
# env.storage_controller.balance_attached()
wait_until(10, 1, attachments_active)
def hard_fail_restore():
"""
Simulate an unexpected death of a pageserver node
"""
victim = rng.choice(env.pageservers)
victim.stop(immediate=True)
# TODO: once we implement heartbeats detecting node failures, remove this
# explicit marking offline and rely on storage controller to detect it itself.
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# TODO
# env.storage_controller.balance_attached()
wait_until(10, 1, attachments_active)
def traffic():
"""
Check that all tenants are working for postgres clients
"""
def exercise_one(workload):
workload.churn_rows(100)
workload.validate()
for_all_workloads(exercise_one)
def init_one(workload):
workload.init()
workload.write_rows(100)
for_all_workloads(init_one, timeout=60)
for i in range(0, 20):
mode = rng.choice([0, 1, 2])
log.info(f"Iteration {i}, mode {mode}")
if mode == 0:
# Traffic interval: sometimes, instead of a failure, just let the clients
# write a load of data. This avoids chaos tests ending up with unrealistically
# small quantities of data in flight.
traffic()
elif mode == 1:
clean_fail_restore()
elif mode == 2:
hard_fail_restore()
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
# Success criteria:
# - New attach locations should activate within bounded time
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
# TODO: fail and remove: fail a node, and remove it from the cluster.
# Success criteria:
# - Endpoints should not fail any queries
# - New attach locations should activate within bounded time
# - New secondary locations should fill up with data within bounded time
# TODO: somehow need to wait for reconciles to complete before doing consistency check
# (or make the check wait).
# Do consistency check on every iteration, not just at the end: this makes it more obvious
# which change caused an issue.
env.storage_controller.consistency_check()