diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index 3f4ca8070d..1bac528397 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -364,3 +364,67 @@ def test_slots_and_branching(neon_simple_env: NeonEnv): # Check that we can create slot with the same name ws_cur = ws_branch.connect().cursor() ws_cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')") + + +def test_replication_shutdown(neon_simple_env: NeonEnv): + # Ensure Postgres can exit without stuck when a replication job is active + neon extension installed + env = neon_simple_env + env.neon_cli.create_branch("test_replication_shutdown_publisher", "empty") + pub = env.endpoints.create("test_replication_shutdown_publisher") + + env.neon_cli.create_branch("test_replication_shutdown_subscriber") + sub = env.endpoints.create("test_replication_shutdown_subscriber") + + pub.respec(skip_pg_catalog_updates=False) + pub.start() + + sub.respec(skip_pg_catalog_updates=False) + sub.start() + + pub.wait_for_migrations() + sub.wait_for_migrations() + + with pub.cursor() as cur: + cur.execute( + "CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser" + ) + cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers") + cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser") + + # If we don't do this, creating the subscription will fail later on PG16 + pub.edit_hba(["host all mr_whiskers 0.0.0.0/0 md5"]) + + with sub.cursor() as cur: + cur.execute( + "CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser" + ) + cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers") + cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser") + + with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur: + cur.execute("CREATE PUBLICATION pub FOR ALL TABLES") + cur.execute("CREATE TABLE t (a int)") + cur.execute("INSERT INTO t VALUES (10), (20)") + cur.execute("SELECT * from t") + res = cur.fetchall() + assert [r[0] for r in res] == [10, 20] + + with sub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur: + cur.execute("CREATE TABLE t (a int)") + + pub_conn = f"host=localhost port={pub.pg_port} dbname=neondb user=mr_whiskers password=cat" + query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub" + log.info(f"Creating subscription: {query}") + cur.execute(query) + + with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as pcur: + pcur.execute("INSERT INTO t VALUES (30), (40)") + + def check_that_changes_propagated(): + cur.execute("SELECT * FROM t") + res = cur.fetchall() + log.info(res) + assert len(res) == 4 + assert [r[0] for r in res] == [10, 20, 30, 40] + + wait_until(10, 0.5, check_that_changes_propagated) diff --git a/test_runner/regress/test_neon_extension.py b/test_runner/regress/test_neon_extension.py index e31e1cab51..39b4865026 100644 --- a/test_runner/regress/test_neon_extension.py +++ b/test_runner/regress/test_neon_extension.py @@ -1,3 +1,4 @@ +import time from contextlib import closing from fixtures.log_helper import log @@ -43,6 +44,12 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder): with closing(endpoint_main.connect()) as conn: with conn.cursor() as cur: + cur.execute("SELECT extversion from pg_extension where extname='neon'") + # IMPORTANT: + # If the version has changed, the test should be updated. + # Ensure that the default version is also updated in the neon.control file + assert cur.fetchone() == ("1.3",) + cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE") all_versions = ["1.3", "1.2", "1.1", "1.0"] current_version = "1.3" for idx, begin_version in enumerate(all_versions): @@ -60,3 +67,30 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder): cur.execute( f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {target_version}->{begin_version}" ) + + +# Verify that the neon extension can be auto-upgraded to the latest version. +def test_neon_extension_auto_upgrade(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + env.neon_cli.create_branch("test_neon_extension_auto_upgrade") + + endpoint_main = env.endpoints.create("test_neon_extension_auto_upgrade") + # don't skip pg_catalog updates - it runs CREATE EXTENSION neon + endpoint_main.respec(skip_pg_catalog_updates=False) + endpoint_main.start() + + with closing(endpoint_main.connect()) as conn: + with conn.cursor() as cur: + cur.execute("ALTER EXTENSION neon UPDATE TO '1.0';") + cur.execute("SELECT extversion from pg_extension where extname='neon'") + assert cur.fetchone() == ("1.0",) # Ensure the extension gets downgraded + + endpoint_main.stop() + time.sleep(1) + endpoint_main.start() + time.sleep(1) + + with closing(endpoint_main.connect()) as conn: + with conn.cursor() as cur: + cur.execute("SELECT extversion from pg_extension where extname='neon'") + assert cur.fetchone() != ("1.0",) # Ensure the extension gets upgraded diff --git a/test_runner/regress/test_storage_controller_stress.py b/test_runner/regress/test_storage_controller_stress.py new file mode 100644 index 0000000000..0e9224378d --- /dev/null +++ b/test_runner/regress/test_storage_controller_stress.py @@ -0,0 +1,188 @@ +import concurrent.futures +import random +from collections import defaultdict + +from fixtures.compute_reconfigure import ComputeReconfigure +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, +) +from fixtures.types import TenantId, TenantShardId, TimelineId +from fixtures.utils import wait_until +from fixtures.workload import Workload + + +def get_node_shard_counts(env: NeonEnv, tenant_ids): + total: defaultdict[int, int] = defaultdict(int) + attached: defaultdict[int, int] = defaultdict(int) + for tid in tenant_ids: + for shard in env.storage_controller.tenant_describe(tid)["shards"]: + log.info( + f"{shard['tenant_shard_id']}: attached={shard['node_attached']}, secondary={shard['node_secondary']} " + ) + for node in shard["node_secondary"]: + total[int(node)] += 1 + attached[int(shard["node_attached"])] += 1 + total[int(shard["node_attached"])] += 1 + + return total, attached + + +def test_storcon_rolling_failures( + neon_env_builder: NeonEnvBuilder, + compute_reconfigure_listener: ComputeReconfigure, +): + neon_env_builder.num_pageservers = 8 + + neon_env_builder.control_plane_compute_hook_api = ( + compute_reconfigure_listener.control_plane_compute_hook_api + ) + + workloads: dict[TenantId, Workload] = {} + + env = neon_env_builder.init_start() + + for ps in env.pageservers: + # We will do unclean detaches + ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*") + + n_tenants = 32 + tenants = [(env.initial_tenant, env.initial_timeline)] + for i in range(0, n_tenants - 1): + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + shard_count = [1, 2, 4][i % 3] + env.neon_cli.create_tenant( + tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}' + ) + tenants.append((tenant_id, timeline_id)) + + # Background pain: + # - TODO: some fraction of pageserver API requests hang + # (this requires implementing wrap of location_conf calls with proper timeline/cancel) + # - TODO: continuous tenant/timeline creation/destruction over a different ID range than + # the ones we're using for availability checks. + + rng = random.Random(0xDEADBEEF) + + for tenant_id, timeline_id in tenants: + workload = Workload(env, tenant_id, timeline_id) + compute_reconfigure_listener.register_workload(workload) + workloads[tenant_id] = workload + + def node_evacuated(node_id: int): + total, attached = get_node_shard_counts(env, [t[0] for t in tenants]) + assert attached[node_id] == 0 + + def attachments_active(): + for tid, _tlid in tenants: + for shard in env.storage_controller.locate(tid): + psid = shard["node_id"] + tsid = TenantShardId.parse(shard["shard_id"]) + status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid) + assert status["state"]["slug"] == "Active" + log.info(f"Shard {tsid} active on node {psid}") + + failpoints = ("api-503", "5%1000*return(1)") + failpoints_str = f"{failpoints[0]}={failpoints[1]}" + for ps in env.pageservers: + ps.http_client().configure_failpoints(failpoints) + + def for_all_workloads(callback, timeout=60): + futs = [] + with concurrent.futures.ThreadPoolExecutor() as pool: + for _tenant_id, workload in workloads.items(): + futs.append(pool.submit(callback, workload)) + + for f in futs: + f.result(timeout=timeout) + + def clean_fail_restore(): + """ + Clean shutdown of a node: mark it offline in storage controller, wait for new attachment + locations to activate, then SIGTERM it. + - Endpoints should not fail any queries + - New attach locations should activate within bounded time. + """ + victim = rng.choice(env.pageservers) + env.storage_controller.node_configure(victim.id, {"availability": "Offline"}) + + wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc] + wait_until(10, 1, attachments_active) + + victim.stop(immediate=False) + + traffic() + + victim.start(extra_env_vars={"FAILPOINTS": failpoints_str}) + + # Revert shards to attach at their original locations + # TODO + # env.storage_controller.balance_attached() + wait_until(10, 1, attachments_active) + + def hard_fail_restore(): + """ + Simulate an unexpected death of a pageserver node + """ + victim = rng.choice(env.pageservers) + victim.stop(immediate=True) + # TODO: once we implement heartbeats detecting node failures, remove this + # explicit marking offline and rely on storage controller to detect it itself. + env.storage_controller.node_configure(victim.id, {"availability": "Offline"}) + wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc] + wait_until(10, 1, attachments_active) + traffic() + victim.start(extra_env_vars={"FAILPOINTS": failpoints_str}) + # TODO + # env.storage_controller.balance_attached() + wait_until(10, 1, attachments_active) + + def traffic(): + """ + Check that all tenants are working for postgres clients + """ + + def exercise_one(workload): + workload.churn_rows(100) + workload.validate() + + for_all_workloads(exercise_one) + + def init_one(workload): + workload.init() + workload.write_rows(100) + + for_all_workloads(init_one, timeout=60) + + for i in range(0, 20): + mode = rng.choice([0, 1, 2]) + log.info(f"Iteration {i}, mode {mode}") + if mode == 0: + # Traffic interval: sometimes, instead of a failure, just let the clients + # write a load of data. This avoids chaos tests ending up with unrealistically + # small quantities of data in flight. + traffic() + elif mode == 1: + clean_fail_restore() + elif mode == 2: + hard_fail_restore() + + # Fail and restart: hard-kill one node. Notify the storage controller that it is offline. + # Success criteria: + # - New attach locations should activate within bounded time + # - TODO: once we do heartbeating, we should not have to explicitly mark the node offline + + # TODO: fail and remove: fail a node, and remove it from the cluster. + # Success criteria: + # - Endpoints should not fail any queries + # - New attach locations should activate within bounded time + # - New secondary locations should fill up with data within bounded time + + # TODO: somehow need to wait for reconciles to complete before doing consistency check + # (or make the check wait). + + # Do consistency check on every iteration, not just at the end: this makes it more obvious + # which change caused an issue. + env.storage_controller.consistency_check()