From ddd2c83c64c0bec951f44cb028813a953ae52881 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Thu, 26 Aug 2021 14:23:01 +0300 Subject: [PATCH] Change test_restart_compute to expose safekeeper problems. Make this test look like 'test_compute_restart.sh' by @ololobus, which was surprisingly good for checking safekeepers behavior. This test adds an intermediate compute node start with bulk select that causes a lot of FPI's and select itself wouldn't wait for all that WAL to be replicated. So if we kill compute node right after that we end up with lagging safekeepers with VCL != flush_lsn. And starting new node from that state takes special care. Also, run and print `pg_controldata` output after each compute node start to eyeball lsn/checkpoint info of basebackup. This commit only adds test without fixing the problem. --- test_runner/batch_others/test_auth.py | 4 +- .../batch_others/test_restart_compute.py | 56 +++++-- test_runner/batch_others/test_twophase.py | 4 +- test_runner/fixtures/zenith_fixtures.py | 152 ++++++++++-------- 4 files changed, 129 insertions(+), 87 deletions(-) diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py index 6b37dad5c2..10e5bb22b5 100644 --- a/test_runner/batch_others/test_auth.py +++ b/test_runner/batch_others/test_auth.py @@ -2,7 +2,7 @@ from contextlib import closing from uuid import uuid4 import psycopg2 -from fixtures.zenith_fixtures import Postgres, ZenithCli, ZenithPageserver +from fixtures.zenith_fixtures import Postgres, ZenithCli, ZenithPageserver, PgBin import pytest @@ -42,6 +42,7 @@ def test_compute_auth_to_pageserver( pageserver_auth_enabled: ZenithPageserver, repo_dir: str, with_wal_acceptors: bool, + pg_bin: PgBin ): ps = pageserver_auth_enabled # since we are in progress of refactoring protocols between compute safekeeper and page server @@ -56,6 +57,7 @@ def test_compute_auth_to_pageserver( with Postgres( zenith_cli=zenith_cli, repo_dir=repo_dir, + pg_bin=pg_bin, tenant_id=ps.initial_tenant, port=55432, # FIXME port distribution is hardcoded in tests and in cli ).create_start( diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index 10d9929873..60015c03db 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -9,7 +9,10 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test restarting and recreating a postgres instance # -@pytest.mark.parametrize('with_wal_acceptors', [False, True]) +# XXX: with_wal_acceptors=True fails now, would be fixed with +# `postgres --sync-walkeepers` patches. +# +@pytest.mark.parametrize('with_wal_acceptors', [False]) def test_restart_compute( zenith_cli, pageserver: ZenithPageserver, @@ -31,31 +34,58 @@ def test_restart_compute( with closing(pg.connect()) as conn: with conn.cursor() as cur: - # Create table, and insert a row - cur.execute('CREATE TABLE foo (t text)') - cur.execute("INSERT INTO foo VALUES ('bar')") + cur.execute('CREATE TABLE t(key int primary key, value text)') + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute('SELECT sum(key) FROM t') + r = cur.fetchone() + assert r == (5000050000, ) + print("res = ", r) - # Stop and restart the Postgres instance + # Remove data directory and restart pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs) + with closing(pg.connect()) as conn: with conn.cursor() as cur: # We can still see the row - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (1, ) + cur.execute('SELECT sum(key) FROM t') + r = cur.fetchone() + assert r == (5000050000, ) + print("res = ", r) # Insert another row - cur.execute("INSERT INTO foo VALUES ('bar2')") - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (2, ) + cur.execute("INSERT INTO t VALUES (100001, 'payload2')") + cur.execute('SELECT count(*) FROM t') - # Stop, and destroy the Postgres instance. Then recreate and restart it. + r = cur.fetchone() + assert r == (100001, ) + print("res = ", r) + + # Again remove data directory and restart + pg.stop_and_destroy().create_start('test_restart_compute', + wal_acceptors=wal_acceptor_connstrs) + + # That select causes lots of FPI's and increases probability of wakeepers + # lagging behind after query completion + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # We can still see the rows + cur.execute('SELECT count(*) FROM t') + + r = cur.fetchone() + assert r == (100001, ) + print("res = ", r) + + # And again remove data directory and restart pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs) with closing(pg.connect()) as conn: with conn.cursor() as cur: # We can still see the rows - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (2, ) + cur.execute('SELECT count(*) FROM t') + + r = cur.fetchone() + assert r == (100001, ) + print("res = ", r) diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index 1d572c5992..ee5015e2cf 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -1,6 +1,6 @@ import os -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver, PgBin pytest_plugins = ("fixtures.zenith_fixtures") @@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test branching, when a transaction is in prepared state # -def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): +def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin: PgBin): zenith_cli.run(["branch", "test_twophase", "empty"]) pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5']) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index d8e0a79167..bef4acbd4a 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -348,6 +348,64 @@ def pageserver(zenith_cli: ZenithCli, repo_dir: str) -> Iterator[ZenithPageserve print('Starting pageserver cleanup') ps.stop() +class PgBin: + """ A helper class for executing postgres binaries """ + def __init__(self, log_dir: str, pg_distrib_dir: str): + self.log_dir = log_dir + self.pg_install_path = pg_distrib_dir + self.pg_bin_path = os.path.join(self.pg_install_path, 'bin') + self.env = os.environ.copy() + self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib') + + def _fixpath(self, command: List[str]) -> None: + if '/' not in command[0]: + command[0] = os.path.join(self.pg_bin_path, command[0]) + + def _build_env(self, env_add: Optional[Env]) -> Env: + if env_add is None: + return self.env + env = self.env.copy() + env.update(env_add) + return env + + def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None: + """ + Run one of the postgres binaries. + + The command should be in list form, e.g. ['pgbench', '-p', '55432'] + + All the necessary environment variables will be set. + + If the first argument (the command name) doesn't include a path (no '/' + characters present), then it will be edited to include the correct path. + + If you want stdout/stderr captured to files, use `run_capture` instead. + """ + + self._fixpath(command) + print('Running command "{}"'.format(' '.join(command))) + env = self._build_env(env) + subprocess.run(command, env=env, cwd=cwd, check=True) + + def run_capture(self, + command: List[str], + env: Optional[Env] = None, + cwd: Optional[str] = None) -> None: + """ + Run one of the postgres binaries, with stderr and stdout redirected to a file. + + This is just like `run`, but for chatty programs. + """ + + self._fixpath(command) + print('Running command "{}"'.format(' '.join(command))) + env = self._build_env(env) + subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True) + + +@zenfixture +def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin: + return PgBin(test_output_dir, pg_distrib_dir) @pytest.fixture def pageserver_auth_enabled(zenith_cli: ZenithCli, repo_dir: str): @@ -359,7 +417,7 @@ def pageserver_auth_enabled(zenith_cli: ZenithCli, repo_dir: str): class Postgres(PgProtocol): """ An object representing a running postgres daemon. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int): super().__init__(host='localhost', port=port) self.zenith_cli = zenith_cli @@ -368,6 +426,8 @@ class Postgres(PgProtocol): self.branch: Optional[str] = None # dubious, see asserts below self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA self.tenant_id = tenant_id + self.pg_bin = pg_bin + # path to conf is /pgdatadirs/tenants///postgresql.conf def create( self, @@ -409,25 +469,32 @@ class Postgres(PgProtocol): """ assert self.branch is not None + + print(f"Starting postgres on brach {self.branch}") + self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}']) self.running = True + self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()]) + return self + def pg_data_dir_path(self) -> str: + """ Path to data directory """ + path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch + return os.path.join(self.repo_dir, path) + def pg_xact_dir_path(self) -> str: """ Path to pg_xact dir """ - return os.path.join(self.pgdata_dir, 'pg_xact') + return os.path.join(self.pg_data_dir_path(), 'pg_xact') def pg_twophase_dir_path(self) -> str: """ Path to pg_twophase dir """ - print(self.tenant_id) - print(self.branch) - path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_twophase' - return os.path.join(self.repo_dir, path) + return os.path.join(self.pg_data_dir_path(), 'pg_twophase') def config_file_path(self) -> str: """ Path to postgresql.conf """ - return os.path.join(self.pgdata_dir, 'postgresql.conf') + return os.path.join(self.pg_data_dir_path(), 'postgresql.conf') def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres': """ @@ -519,13 +586,14 @@ class Postgres(PgProtocol): class PostgresFactory: """ An object representing multiple running postgres daemons. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431): self.zenith_cli = zenith_cli self.repo_dir = repo_dir self.num_instances = 0 self.instances: List[Postgres] = [] self.initial_tenant: str = initial_tenant self.base_port = base_port + self.pg_bin = pg_bin def create_start( self, @@ -538,6 +606,7 @@ class PostgresFactory: pg = Postgres( zenith_cli=self.zenith_cli, repo_dir=self.repo_dir, + pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, port=self.base_port + self.num_instances + 1, ) @@ -562,6 +631,7 @@ class PostgresFactory: pg = Postgres( zenith_cli=self.zenith_cli, repo_dir=self.repo_dir, + pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, port=self.base_port + self.num_instances + 1, ) @@ -586,6 +656,7 @@ class PostgresFactory: pg = Postgres( zenith_cli=self.zenith_cli, repo_dir=self.repo_dir, + pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, port=self.base_port + self.num_instances + 1, ) @@ -611,8 +682,8 @@ def initial_tenant(pageserver: ZenithPageserver): @zenfixture -def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]: - pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant) +def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]: + pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant) yield pgfactory @@ -620,67 +691,6 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Itera print('Starting postgres cleanup') pgfactory.stop_all() - -class PgBin: - """ A helper class for executing postgres binaries """ - def __init__(self, log_dir: str, pg_distrib_dir: str): - self.log_dir = log_dir - self.pg_install_path = pg_distrib_dir - self.pg_bin_path = os.path.join(self.pg_install_path, 'bin') - self.env = os.environ.copy() - self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib') - - def _fixpath(self, command: List[str]) -> None: - if '/' not in command[0]: - command[0] = os.path.join(self.pg_bin_path, command[0]) - - def _build_env(self, env_add: Optional[Env]) -> Env: - if env_add is None: - return self.env - env = self.env.copy() - env.update(env_add) - return env - - def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None: - """ - Run one of the postgres binaries. - - The command should be in list form, e.g. ['pgbench', '-p', '55432'] - - All the necessary environment variables will be set. - - If the first argument (the command name) doesn't include a path (no '/' - characters present), then it will be edited to include the correct path. - - If you want stdout/stderr captured to files, use `run_capture` instead. - """ - - self._fixpath(command) - print('Running command "{}"'.format(' '.join(command))) - env = self._build_env(env) - subprocess.run(command, env=env, cwd=cwd, check=True) - - def run_capture(self, - command: List[str], - env: Optional[Env] = None, - cwd: Optional[str] = None) -> None: - """ - Run one of the postgres binaries, with stderr and stdout redirected to a file. - - This is just like `run`, but for chatty programs. - """ - - self._fixpath(command) - print('Running command "{}"'.format(' '.join(command))) - env = self._build_env(env) - subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True) - - -@zenfixture -def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin: - return PgBin(test_output_dir, pg_distrib_dir) - - def read_pid(path: Path): """ Read content of file into number """ return int(path.read_text())