From cedde559b8ea93555196bbbb11013d426d1967d0 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 27 Jan 2022 17:26:55 +0300 Subject: [PATCH] Add test for replacement of the failed safekeeper (#1179) * Add test to replace failed safekeeper * Restart safekeepers in test_replace_safekeeper * Update vendor/postgres --- test_runner/batch_others/test_wal_acceptor.py | 91 ++++++++++++++++++- vendor/postgres | 2 +- 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 3822036521..a020bf6339 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -12,7 +12,7 @@ from contextlib import closing from dataclasses import dataclass, field from multiprocessing import Process, Value from pathlib import Path -from fixtures.zenith_fixtures import PgBin, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol +from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol from fixtures.utils import lsn_to_hex, mkdir_if_needed from fixtures.log_helper import log from typing import List, Optional, Any @@ -603,3 +603,92 @@ def test_safekeeper_without_pageserver(test_output_dir: str, env.postgres.safe_psql("insert into t select generate_series(1, 100)") res = env.postgres.safe_psql("select sum(i) from t")[0][0] assert res == 5050 + + +def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): + def safekeepers_guc(env: ZenithEnv, sk_names: List[str]) -> str: + return ','.join( + [f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.name in sk_names]) + + def execute_payload(pg: Postgres): + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute('CREATE TABLE IF NOT EXISTS t(key int, value text)') + cur.execute("INSERT INTO t VALUES (0, 'something')") + cur.execute('SELECT SUM(key) FROM t') + sum_before = cur.fetchone()[0] + + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute('SELECT SUM(key) FROM t') + sum_after = cur.fetchone()[0] + assert sum_after == sum_before + 5000050000 + + def show_statuses(safekeepers: List[Safekeeper], tenant_id: str, timeline_id: str): + for sk in safekeepers: + http_cli = sk.http_client() + try: + status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"Safekeeper {sk.name} status: {status}") + except Exception as e: + log.info(f"Safekeeper {sk.name} status error: {e}") + + zenith_env_builder.num_safekeepers = 4 + env = zenith_env_builder.init() + env.zenith_cli(["branch", "test_replace_safekeeper", "main"]) + + log.info("Use only first 3 safekeepers") + env.safekeepers[3].stop() + active_safekeepers = ['sk1', 'sk2', 'sk3'] + pg = env.postgres.create('test_replace_safekeeper') + pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) + pg.start() + + # learn zenith timeline from compute + tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0] + timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0] + + execute_payload(pg) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Restart all safekeepers to flush everything") + env.safekeepers[0].stop(immediate=True) + execute_payload(pg) + env.safekeepers[0].start() + env.safekeepers[1].stop(immediate=True) + execute_payload(pg) + env.safekeepers[1].start() + env.safekeepers[2].stop(immediate=True) + execute_payload(pg) + env.safekeepers[2].start() + + env.safekeepers[0].stop(immediate=True) + env.safekeepers[1].stop(immediate=True) + env.safekeepers[2].stop(immediate=True) + env.safekeepers[0].start() + env.safekeepers[1].start() + env.safekeepers[2].start() + + execute_payload(pg) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Stop sk1 (simulate failure) and use only quorum of sk2 and sk3") + env.safekeepers[0].stop(immediate=True) + execute_payload(pg) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Recreate postgres to replace failed sk1 with new sk4") + pg.stop_and_destroy().create('test_replace_safekeeper') + active_safekeepers = ['sk2', 'sk3', 'sk4'] + env.safekeepers[3].start() + pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) + pg.start() + + execute_payload(pg) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Stop sk2 to require quorum of sk3 and sk4 for normal work") + env.safekeepers[1].stop(immediate=True) + execute_payload(pg) + show_statuses(env.safekeepers, tenant_id, timeline_id) diff --git a/vendor/postgres b/vendor/postgres index ca5e7beaf8..99b91512bd 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit ca5e7beaf8c6c58871d427bec81716432eeaacd8 +Subproject commit 99b91512bd8e9f9c971cccab4f713f3568a54f30