From 72de70a8cc5872111d0765f0cf0c61711201b065 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Wed, 25 Aug 2021 00:42:08 +0300 Subject: [PATCH] Change test_restart_compute to expose safekeeper problems --- pageserver/src/page_service.rs | 2 +- pageserver/src/restore_local_repo.rs | 4 +- .../batch_others/test_restart_compute.py | 51 ++++-- test_runner/fixtures/zenith_fixtures.py | 151 +++++++++--------- vendor/postgres | 2 +- walkeeper/src/receive_wal.rs | 2 +- walkeeper/src/replication.rs | 4 +- zenith_utils/src/postgres_backend.rs | 2 +- 8 files changed, 124 insertions(+), 94 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 93edbe9f0e..831c7a5301 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -362,7 +362,7 @@ impl PageServerHandler { timeline.wait_lsn(lsn)?; lsn } - None => timeline.get_last_valid_lsn() + None => timeline.get_last_valid_lsn(), }; { let mut writer = CopyDataSink { pgb }; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 4cc7c7feb5..f87012403f 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -427,7 +427,9 @@ pub fn save_decoded_record( // Remove twophase file. see RemoveTwoPhaseFile() in postgres code trace!( "unlink twophaseFile for xid {} parsed_xact.xid {} here at {}", - decoded.xl_xid, parsed_xact.xid, lsn + decoded.xl_xid, + parsed_xact.xid, + lsn ); timeline.put_unlink( RelishTag::TwoPhase { diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index 10d9929873..0b2aee876e 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test restarting and recreating a postgres instance # -@pytest.mark.parametrize('with_wal_acceptors', [False, True]) +@pytest.mark.parametrize('with_wal_acceptors', [True, False]) def test_restart_compute( zenith_cli, pageserver: ZenithPageserver, @@ -31,31 +31,56 @@ def test_restart_compute( with closing(pg.connect()) as conn: with conn.cursor() as cur: - # Create table, and insert a row - cur.execute('CREATE TABLE foo (t text)') - cur.execute("INSERT INTO foo VALUES ('bar')") + cur.execute('CREATE TABLE t(key int primary key, value text)') + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute('SELECT sum(key) FROM t') + r = cur.fetchone() + assert r == (5000050000, ) + print("res = ", r) - # Stop and restart the Postgres instance + # Remove data directory and restart pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs) + with closing(pg.connect()) as conn: with conn.cursor() as cur: # We can still see the row - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (1, ) + cur.execute('SELECT sum(key) FROM t') + r = cur.fetchone() + assert r == (5000050000, ) + print("res = ", r) # Insert another row - cur.execute("INSERT INTO foo VALUES ('bar2')") - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (2, ) + cur.execute("INSERT INTO t VALUES (100001, 'payload2')") + cur.execute('SELECT count(*) FROM t') - # Stop, and destroy the Postgres instance. Then recreate and restart it. + r = cur.fetchone() + assert r == (100001, ) + print("res = ", r) + + # Again remove data directory and restart pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs) with closing(pg.connect()) as conn: with conn.cursor() as cur: # We can still see the rows - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (2, ) + cur.execute('SELECT count(*) FROM t') + + r = cur.fetchone() + assert r == (100001, ) + print("res = ", r) + + # And again remove data directory and restart + pg.stop_and_destroy().create_start('test_restart_compute', + wal_acceptors=wal_acceptor_connstrs) + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # We can still see the rows + cur.execute('SELECT count(*) FROM t') + + r = cur.fetchone() + assert r == (100001, ) + print("res = ", r) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index afb350bfea..d05abe581d 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -249,10 +249,68 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]: print('Starting pageserver cleanup') ps.stop() +class PgBin: + """ A helper class for executing postgres binaries """ + def __init__(self, log_dir: str, pg_distrib_dir: str): + self.log_dir = log_dir + self.pg_install_path = pg_distrib_dir + self.pg_bin_path = os.path.join(self.pg_install_path, 'bin') + self.env = os.environ.copy() + self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib') + + def _fixpath(self, command: List[str]) -> None: + if '/' not in command[0]: + command[0] = os.path.join(self.pg_bin_path, command[0]) + + def _build_env(self, env_add: Optional[Env]) -> Env: + if env_add is None: + return self.env + env = self.env.copy() + env.update(env_add) + return env + + def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None: + """ + Run one of the postgres binaries. + + The command should be in list form, e.g. ['pgbench', '-p', '55432'] + + All the necessary environment variables will be set. + + If the first argument (the command name) doesn't include a path (no '/' + characters present), then it will be edited to include the correct path. + + If you want stdout/stderr captured to files, use `run_capture` instead. + """ + + self._fixpath(command) + print('Running command "{}"'.format(' '.join(command))) + env = self._build_env(env) + subprocess.run(command, env=env, cwd=cwd, check=True) + + def run_capture(self, + command: List[str], + env: Optional[Env] = None, + cwd: Optional[str] = None) -> None: + """ + Run one of the postgres binaries, with stderr and stdout redirected to a file. + + This is just like `run`, but for chatty programs. + """ + + self._fixpath(command) + print('Running command "{}"'.format(' '.join(command))) + env = self._build_env(env) + subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True) + + +@zenfixture +def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin: + return PgBin(test_output_dir, pg_distrib_dir) class Postgres(PgProtocol): """ An object representing a running postgres daemon. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, tenant_id: str, port: int): super().__init__(host='localhost', port=port) self.zenith_cli = zenith_cli @@ -260,6 +318,7 @@ class Postgres(PgProtocol): self.repo_dir = repo_dir self.branch: Optional[str] = None # dubious, see asserts below self.tenant_id = tenant_id + self.pg_bin = pg_bin # path to conf is /pgdatadirs/tenants///postgresql.conf def create( @@ -299,27 +358,32 @@ class Postgres(PgProtocol): """ assert self.branch is not None + + print(f"Starting postgres on brach {self.branch}") + self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}']) self.running = True + self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()]) + return self + def pg_data_dir_path(self) -> str: + """ Path to data directory """ + path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch + return os.path.join(self.repo_dir, path) + def pg_xact_dir_path(self) -> str: """ Path to pg_xact dir """ - path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_xact' - return os.path.join(self.repo_dir, path) + return os.path.join(self.pg_data_dir_path(), 'pg_xact') def pg_twophase_dir_path(self) -> str: """ Path to pg_twophase dir """ - print(self.tenant_id) - print(self.branch) - path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'pg_twophase' - return os.path.join(self.repo_dir, path) + return os.path.join(self.pg_data_dir_path(), 'pg_twophase') def config_file_path(self) -> str: """ Path to postgresql.conf """ - filename = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch / 'postgresql.conf' - return os.path.join(self.repo_dir, filename) + return os.path.join(self.pg_data_dir_path(), 'postgresql.conf') def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres': """ @@ -411,13 +475,14 @@ class Postgres(PgProtocol): class PostgresFactory: """ An object representing multiple running postgres daemons. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, initial_tenant: str, base_port: int = 55431): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431): self.zenith_cli = zenith_cli self.repo_dir = repo_dir self.num_instances = 0 self.instances: List[Postgres] = [] self.initial_tenant: str = initial_tenant self.base_port = base_port + self.pg_bin = pg_bin def create_start( self, @@ -430,6 +495,7 @@ class PostgresFactory: pg = Postgres( zenith_cli=self.zenith_cli, repo_dir=self.repo_dir, + pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, port=self.base_port + self.num_instances + 1, ) @@ -503,8 +569,8 @@ def initial_tenant(pageserver: ZenithPageserver): @zenfixture -def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Iterator[PostgresFactory]: - pgfactory = PostgresFactory(zenith_cli, repo_dir, initial_tenant=initial_tenant) +def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]: + pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant) yield pgfactory @@ -512,67 +578,6 @@ def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str) -> Itera print('Starting postgres cleanup') pgfactory.stop_all() - -class PgBin: - """ A helper class for executing postgres binaries """ - def __init__(self, log_dir: str, pg_distrib_dir: str): - self.log_dir = log_dir - self.pg_install_path = pg_distrib_dir - self.pg_bin_path = os.path.join(self.pg_install_path, 'bin') - self.env = os.environ.copy() - self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib') - - def _fixpath(self, command: List[str]) -> None: - if '/' not in command[0]: - command[0] = os.path.join(self.pg_bin_path, command[0]) - - def _build_env(self, env_add: Optional[Env]) -> Env: - if env_add is None: - return self.env - env = self.env.copy() - env.update(env_add) - return env - - def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None) -> None: - """ - Run one of the postgres binaries. - - The command should be in list form, e.g. ['pgbench', '-p', '55432'] - - All the necessary environment variables will be set. - - If the first argument (the command name) doesn't include a path (no '/' - characters present), then it will be edited to include the correct path. - - If you want stdout/stderr captured to files, use `run_capture` instead. - """ - - self._fixpath(command) - print('Running command "{}"'.format(' '.join(command))) - env = self._build_env(env) - subprocess.run(command, env=env, cwd=cwd, check=True) - - def run_capture(self, - command: List[str], - env: Optional[Env] = None, - cwd: Optional[str] = None) -> None: - """ - Run one of the postgres binaries, with stderr and stdout redirected to a file. - - This is just like `run`, but for chatty programs. - """ - - self._fixpath(command) - print('Running command "{}"'.format(' '.join(command))) - env = self._build_env(env) - subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True) - - -@zenfixture -def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin: - return PgBin(test_output_dir, pg_distrib_dir) - - def read_pid(path: Path): """ Read content of file into number """ return int(path.read_text()) diff --git a/vendor/postgres b/vendor/postgres index c6fcfd604f..022285aea5 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit c6fcfd604fb31810ffe44fe94f6539c60768d394 +Subproject commit 022285aea5767d3c33e0a95e8a2cffec6421369d diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 0ca2d2252c..ac9b620b65 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -322,7 +322,7 @@ impl<'pg> ReceiveWalConn<'pg> { } info!( - "Start streaming from timeline {} tenant {} address {:?} flush_lsn={}", + "Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}", server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn ); diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index cabd706e0d..8d38adc4cf 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -77,9 +77,7 @@ impl ReplicationConn { subscriber.add_hs_feedback(feedback); } FeMessage::Sync => {} - FeMessage::CopyFailed => { - return Err(anyhow!("Copy failed")) - } + FeMessage::CopyFailed => return Err(anyhow!("Copy failed")), _ => { // We only handle `CopyData`, 'Sync', 'CopyFailed' messages. Anything else is ignored. info!("unexpected message {:?}", msg); diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index df155a1926..9f211e905b 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -341,7 +341,7 @@ impl PostgresBackend { // We prefer explicit pattern matching to wildcards, because // this helps us spot the places where new variants are missing - FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => { + FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFailed => { bail!("unexpected message type: {:?}", msg); } }