diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index 57076b2c33..bcb66cb1f3 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -1,5 +1,3 @@ -import psycopg2 - pytest_plugins = ("fixtures.zenith_fixtures") @@ -13,8 +11,7 @@ def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin): pgmain = postgres.create_start('test_branch_behind') print("postgres is running on 'test_branch_behind' branch") - main_pg_conn = psycopg2.connect(pgmain.connstr()) - main_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + main_pg_conn = pgmain.connect() main_cur = main_pg_conn.cursor() # Create table, and insert the first 100 rows @@ -60,15 +57,13 @@ def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin): pg_more = postgres.create_start("test_branch_behind_more") # On the 'hundred' branch, we should see only 100 rows - hundred_pg_conn = psycopg2.connect(pg_hundred.connstr()) - hundred_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + hundred_pg_conn = pg_hundred.connect() hundred_cur = hundred_pg_conn.cursor() hundred_cur.execute('SELECT count(*) FROM foo') assert hundred_cur.fetchone() == (100, ) # On the 'more' branch, we should see 100200 rows - more_pg_conn = psycopg2.connect(pg_more.connstr()) - more_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + more_pg_conn = pg_more.connect() more_cur = more_pg_conn.cursor() more_cur.execute('SELECT count(*) FROM foo') assert more_cur.fetchone() == (100100, ) diff --git a/test_runner/batch_others/test_config.py b/test_runner/batch_others/test_config.py index 2b0b04238e..00c27b1ac1 100644 --- a/test_runner/batch_others/test_config.py +++ b/test_runner/batch_others/test_config.py @@ -1,4 +1,4 @@ -import psycopg2 +from contextlib import closing pytest_plugins = ("fixtures.zenith_fixtures") @@ -14,9 +14,7 @@ def test_config(zenith_cli, pageserver, postgres, pg_bin): pg = postgres.create_start('test_config', config_lines=['log_min_messages=debug1']) print('postgres is running on test_config branch') - with psycopg2.connect(pg.connstr()) as conn: - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - + with closing(pg.connect()) as conn: with conn.cursor() as cur: cur.execute(''' SELECT setting diff --git a/test_runner/batch_others/test_createdb.py b/test_runner/batch_others/test_createdb.py index ab592822c7..bfc2224e05 100644 --- a/test_runner/batch_others/test_createdb.py +++ b/test_runner/batch_others/test_createdb.py @@ -1,4 +1,4 @@ -import psycopg2 +from contextlib import closing pytest_plugins = ("fixtures.zenith_fixtures") @@ -12,9 +12,7 @@ def test_createdb(zenith_cli, pageserver, postgres, pg_bin): pg = postgres.create_start('test_createdb') print("postgres is running on 'test_createdb' branch") - with psycopg2.connect(pg.connstr()) as conn: - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - + with closing(pg.connect()) as conn: with conn.cursor() as cur: # Cause a 'relmapper' change in the original branch cur.execute('VACUUM FULL pg_class') @@ -31,4 +29,4 @@ def test_createdb(zenith_cli, pageserver, postgres, pg_bin): # Test that you can connect to the new database on both branches for db in (pg, pg2): - psycopg2.connect(db.connstr('foodb')).close() + db.connect(dbname='foodb').close() diff --git a/test_runner/batch_others/test_createuser.py b/test_runner/batch_others/test_createuser.py index 649559e162..b821a233d1 100644 --- a/test_runner/batch_others/test_createuser.py +++ b/test_runner/batch_others/test_createuser.py @@ -1,4 +1,4 @@ -import psycopg2 +from contextlib import closing pytest_plugins = ("fixtures.zenith_fixtures") @@ -12,9 +12,7 @@ def test_createuser(zenith_cli, pageserver, postgres, pg_bin): pg = postgres.create_start('test_createuser') print("postgres is running on 'test_createuser' branch") - with psycopg2.connect(pg.connstr()) as conn: - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - + with closing(pg.connect()) as conn: with conn.cursor() as cur: # Cause a 'relmapper' change in the original branch cur.execute('CREATE USER testuser with password %s', ('testpwd', )) @@ -30,7 +28,4 @@ def test_createuser(zenith_cli, pageserver, postgres, pg_bin): pg2 = postgres.create_start('test_createuser2') # Test that you can connect to new branch as a new user - conn2 = psycopg2.connect(pg2.connstr(username='testuser')) - with conn2.cursor() as cur: - cur.execute('select current_user;') - assert cur.fetchone() == ('testuser', ) + assert pg2.safe_psql('select current_user', username='testuser') == [('testuser', )] diff --git a/test_runner/batch_others/test_multixact.py b/test_runner/batch_others/test_multixact.py index 522efdf709..f64be25bcf 100644 --- a/test_runner/batch_others/test_multixact.py +++ b/test_runner/batch_others/test_multixact.py @@ -1,5 +1,3 @@ -import psycopg2 - pytest_plugins = ("fixtures.zenith_fixtures") @@ -15,8 +13,7 @@ def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir): pg = postgres.create_start('test_multixact') print("postgres is running on 'test_multixact' branch") - pg_conn = psycopg2.connect(pg.connstr()) - pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + pg_conn = pg.connect() cur = pg_conn.cursor() cur.execute(''' @@ -31,10 +28,10 @@ def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir): nclients = 3 connections = [] for i in range(nclients): - con = psycopg2.connect(pg.connstr()) # Do not turn on autocommit. We want to hold the key-share locks. - con.cursor().execute('select * from t1 for key share') - connections.append(con) + conn = pg.connect(autocommit=False) + conn.cursor().execute('select * from t1 for key share') + connections.append(conn) # We should have a multixact now. We can close the connections. for c in connections: @@ -56,8 +53,7 @@ def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir): pg_new = postgres.create_start('test_multixact_new') print("postgres is running on 'test_multixact_new' branch") - pg_new_conn = psycopg2.connect(pg_new.connstr()) - pg_new_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + pg_new_conn = pg_new.connect() cur_new = pg_new_conn.cursor() cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint()') diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 2cb1a74a0e..e48c3b105d 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -1,28 +1,23 @@ -import psycopg2 import json pytest_plugins = ("fixtures.zenith_fixtures") def test_status(pageserver): - pg_conn = psycopg2.connect(pageserver.connstr()) - pg_conn.autocommit = True - cur = pg_conn.cursor() - cur.execute('status') - assert cur.fetchone() == ('hello world', ) - pg_conn.close() + assert pageserver.safe_psql('status') == [ + ('hello world', ), + ] def test_branch_list(pageserver, zenith_cli): # Create a branch for us zenith_cli.run(["branch", "test_branch_list_main", "empty"]) - page_server_conn = psycopg2.connect(pageserver.connstr()) - page_server_conn.autocommit = True - page_server_cur = page_server_conn.cursor() + conn = pageserver.connect() + cur = conn.cursor() - page_server_cur.execute('branch_list') - branches = json.loads(page_server_cur.fetchone()[0]) + cur.execute('branch_list') + branches = json.loads(cur.fetchone()[0]) # Filter out branches created by other tests branches = [x for x in branches if x['name'].startswith('test_branch_list')] @@ -37,8 +32,8 @@ def test_branch_list(pageserver, zenith_cli): zenith_cli.run(['branch', 'test_branch_list_experimental', 'test_branch_list_main']) zenith_cli.run(['pg', 'create', 'test_branch_list_experimental']) - page_server_cur.execute('branch_list') - new_branches = json.loads(page_server_cur.fetchone()[0]) + cur.execute('branch_list') + new_branches = json.loads(cur.fetchone()[0]) # Filter out branches created by other tests new_branches = [x for x in new_branches if x['name'].startswith('test_branch_list')] assert len(new_branches) == 2 @@ -50,4 +45,4 @@ def test_branch_list(pageserver, zenith_cli): # TODO: do the LSNs have to match here? assert new_branches[1] == branches[0] - page_server_conn.close() + conn.close() diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index a332753380..d11842e35f 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -1,4 +1,4 @@ -import psycopg2 +from contextlib import closing pytest_plugins = ("fixtures.zenith_fixtures") @@ -12,39 +12,31 @@ def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin): pg = postgres.create_start('test_restart_compute') print("postgres is running on 'test_restart_compute' branch") - pg_conn = psycopg2.connect(pg.connstr()) - pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = pg_conn.cursor() - - # Create table, and insert a row - cur.execute('CREATE TABLE foo (t text)') - cur.execute("INSERT INTO foo VALUES ('bar')") + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # Create table, and insert a row + cur.execute('CREATE TABLE foo (t text)') + cur.execute("INSERT INTO foo VALUES ('bar')") # Stop and restart the Postgres instance - pg_conn.close() - pg.stop() - pg.start() - pg_conn = psycopg2.connect(pg.connstr()) - pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = pg_conn.cursor() + pg.stop().start() - # We can still see the row - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (1, ) + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # We can still see the row + cur.execute('SELECT count(*) FROM foo') + assert cur.fetchone() == (1, ) - # Insert another row - cur.execute("INSERT INTO foo VALUES ('bar2')") - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (2, ) + # Insert another row + cur.execute("INSERT INTO foo VALUES ('bar2')") + cur.execute('SELECT count(*) FROM foo') + assert cur.fetchone() == (2, ) # Stop, and destroy the Postgres instance. Then recreate and restart it. - pg_conn.close() - pg.stop_and_destroy() - pg.create_start('test_restart_compute') - pg_conn = psycopg2.connect(pg.connstr()) - pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = pg_conn.cursor() + pg.stop_and_destroy().create_start('test_restart_compute') - # We can still see the rows - cur.execute('SELECT count(*) FROM foo') - assert cur.fetchone() == (2, ) + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # We can still see the rows + cur.execute('SELECT count(*) FROM foo') + assert cur.fetchone() == (2, ) diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index 4f95103fba..d4ffb9dc6c 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -1,5 +1,3 @@ -import psycopg2 - pytest_plugins = ("fixtures.zenith_fixtures") @@ -12,8 +10,7 @@ def test_twophase(zenith_cli, pageserver, postgres, pg_bin): pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5']) print("postgres is running on 'test_twophase' branch") - conn = psycopg2.connect(pg.connstr()) - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + conn = pg.connect() cur = conn.cursor() cur.execute('CREATE TABLE foo (t text)') @@ -33,8 +30,7 @@ def test_twophase(zenith_cli, pageserver, postgres, pg_bin): pg2 = postgres.create_start('test_twophase_prepared', config_lines=['max_prepared_transactions=5']) - conn2 = psycopg2.connect(pg2.connstr()) - conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + conn2 = pg2.connect() cur2 = conn2.cursor() # On the new branch, commit one of the prepared transactions, abort the other one. diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index f709be1b6d..bf5a7f19e7 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -1,8 +1,8 @@ -import psycopg2 import pytest import random import time +from contextlib import closing from multiprocessing import Process, Value pytest_plugins = ("fixtures.zenith_fixtures") @@ -16,15 +16,14 @@ def test_normal_work(zenith_cli, pageserver, postgres, wa_factory): pg = postgres.create_start('test_wal_acceptors_normal_work', wal_acceptors=wa_factory.get_connstrs()) - pg_conn = psycopg2.connect(pg.connstr()) - # do commit after each statement as waiting for acceptors happens there - pg_conn.autocommit = True - - cur = pg_conn.cursor() - cur.execute('CREATE TABLE t(key int primary key, value text)') - cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") - cur.execute('SELECT sum(key) FROM t') - assert cur.fetchone() == (5000050000, ) + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute('CREATE TABLE t(key int primary key, value text)') + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute('SELECT sum(key) FROM t') + assert cur.fetchone() == (5000050000, ) # Run page server and multiple acceptors, and multiple compute nodes running @@ -72,10 +71,9 @@ def test_restarts(zenith_cli, pageserver, postgres, wa_factory): pg = postgres.create_start('test_wal_acceptors_restarts', wal_acceptors=wa_factory.get_connstrs()) - pg_conn = psycopg2.connect(pg.connstr()) - # do commit after each statement as waiting for acceptors happens there - pg_conn.autocommit = True - + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + pg_conn = pg.connect() cur = pg_conn.cursor() failed_node = None @@ -110,9 +108,9 @@ def test_unavailability(zenith_cli, pageserver, postgres, wa_factory): pg = postgres.create_start('test_wal_acceptors_unavailability', wal_acceptors=wa_factory.get_connstrs()) - pg_conn = psycopg2.connect(pg.connstr()) - # do commit after each statement as waiting for acceptors happens there - pg_conn.autocommit = True + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + pg_conn = pg.connect() cur = pg_conn.cursor() # check basic work with table @@ -181,9 +179,9 @@ def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_valu pg = postgres.create_start('test_wal_acceptors_race_conditions', wal_acceptors=wa_factory.get_connstrs()) - pg_conn = psycopg2.connect(pg.connstr()) - # do commit after each statement as waiting for acceptors happens there - pg_conn.autocommit = True + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + pg_conn = pg.connect() cur = pg_conn.cursor() cur.execute('CREATE TABLE t(key int primary key, value text)') diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index 265831b5a2..987a38753a 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -1,4 +1,3 @@ -import psycopg2 import json pytest_plugins = ("fixtures.zenith_fixtures") @@ -24,8 +23,7 @@ def helper_compare_branch_list(page_server_cur, zenith_cli): def test_cli_branch_list(pageserver, zenith_cli): - page_server_conn = psycopg2.connect(pageserver.connstr()) - page_server_conn.autocommit = True + page_server_conn = pageserver.connect() page_server_cur = page_server_conn.cursor() # Initial sanity check diff --git a/test_runner/batch_pg_regress/test_isolation.py b/test_runner/batch_pg_regress/test_isolation.py index c19f538540..1bd5aeb0b9 100644 --- a/test_runner/batch_pg_regress/test_isolation.py +++ b/test_runner/batch_pg_regress/test_isolation.py @@ -1,5 +1,4 @@ import os -import psycopg2 from fixtures.utils import mkdir_if_needed @@ -15,11 +14,7 @@ def test_isolation(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg # Connect to postgres and create a database called "regression". # isolation tests use prepared transactions, so enable them pg = postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100']) - pg_conn = psycopg2.connect(pg.connstr()) - pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = pg_conn.cursor() - cur.execute('CREATE DATABASE isolation_regression') - pg_conn.close() + pg.safe_psql('CREATE DATABASE isolation_regression') # Create some local directories for pg_isolation_regress to run in. runpath = os.path.join(test_output_dir, 'regress') diff --git a/test_runner/batch_pg_regress/test_pg_regress.py b/test_runner/batch_pg_regress/test_pg_regress.py index 08760bed39..d119fdaff9 100644 --- a/test_runner/batch_pg_regress/test_pg_regress.py +++ b/test_runner/batch_pg_regress/test_pg_regress.py @@ -1,5 +1,4 @@ import os -import psycopg2 from fixtures.utils import mkdir_if_needed @@ -14,11 +13,7 @@ def test_pg_regress(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, p # Connect to postgres and create a database called "regression". pg = postgres.create_start('test_pg_regress') - pg_conn = psycopg2.connect(pg.connstr()) - pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = pg_conn.cursor() - cur.execute('CREATE DATABASE regression') - pg_conn.close() + pg.safe_psql('CREATE DATABASE regression') # Create some local directories for pg_regress to run in. runpath = os.path.join(test_output_dir, 'regress') diff --git a/test_runner/batch_pg_regress/test_zenith_regress.py b/test_runner/batch_pg_regress/test_zenith_regress.py index 6c13643544..cf77cc41e5 100644 --- a/test_runner/batch_pg_regress/test_zenith_regress.py +++ b/test_runner/batch_pg_regress/test_zenith_regress.py @@ -1,5 +1,4 @@ import os -import psycopg2 from fixtures.utils import mkdir_if_needed @@ -14,11 +13,7 @@ def test_zenith_regress(pageserver, postgres, pg_bin, zenith_cli, test_output_di # Connect to postgres and create a database called "regression". pg = postgres.create_start('test_zenith_regress') - pg_conn = psycopg2.connect(pg.connstr()) - pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = pg_conn.cursor() - cur.execute('CREATE DATABASE regression') - pg_conn.close() + pg.safe_psql('CREATE DATABASE regression') # Create some local directories for pg_regress to run in. runpath = os.path.join(test_output_dir, 'regress') diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index e118838096..aa1557752c 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,14 +1,17 @@ import getpass import os -import signal +import psycopg2 import pytest import shutil +import signal import subprocess -import psycopg2 +from contextlib import closing from pathlib import Path -from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar +# Type-related stuff +from psycopg2.extensions import connection as PgConnection +from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast from typing_extensions import Literal from .utils import (get_self_dir, mkdir_if_needed, subprocess_capture) @@ -75,6 +78,48 @@ def safety_check() -> None: raise Exception('found interfering processes running') +class PgProtocol: + """ Reusable connection logic """ + def __init__(self, host: str, port: int, username: Optional[str] = None): + self.host = host + self.port = port + self.username = username or getpass.getuser() + + def connstr(self, *, dbname: str = 'postgres', username: Optional[str] = None) -> str: + """ + Build a libpq connection string for the Postgres instance. + """ + + username = username or self.username + return f'host={self.host} port={self.port} user={username} dbname={dbname}' + + # autocommit=True here by default because that's what we need most of the time + def connect(self, *, autocommit=True, **kwargs: Any) -> PgConnection: + """ + Connect to the node. + Returns psycopg2's connection object. + This method passes all extra params to connstr. + """ + + conn = psycopg2.connect(self.connstr(**kwargs)) + # WARNING: this setting affects *all* tests! + conn.autocommit = autocommit + return conn + + def safe_psql(self, query: str, **kwargs: Any) -> List[Any]: + """ + Execute query against the node and return all rows. + This method passes all extra params to connstr. + """ + + with closing(self.connect(**kwargs)) as conn: + with conn.cursor() as cur: + cur.execute(query) + if cur.description is None: + return [] # query didn't return data + return cast(List[Any], cur.fetchall()) + + class ZenithCli: """ An object representing the CLI binary named "zenith". @@ -120,9 +165,11 @@ def zenith_cli(zenith_binpath: str, repo_dir: str, pg_distrib_dir: str) -> Zenit return ZenithCli(zenith_binpath, repo_dir, pg_distrib_dir) -class ZenithPageserver: +class ZenithPageserver(PgProtocol): """ An object representing a running pageserver. """ def __init__(self, zenith_cli: ZenithCli): + super().__init__(host='localhost', port=DEFAULT_PAGESERVER_PORT) + self.zenith_cli = zenith_cli self.running = False @@ -157,16 +204,6 @@ class ZenithPageserver: return self - # The page server speaks the Postgres FE/BE protocol, so you can connect - # to it with any Postgres client, and run special commands. This function - # returns a libpq connection string for connecting to it. - def connstr(self) -> str: - username = getpass.getuser() - conn_str = 'host={} port={} dbname=postgres user={}'.format('localhost', - DEFAULT_PAGESERVER_PORT, - username) - return conn_str - @zenfixture def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]: @@ -193,15 +230,14 @@ def pageserver(zenith_cli: ZenithCli) -> Iterator[ZenithPageserver]: ps.stop() -class Postgres: +class Postgres(PgProtocol): """ An object representing a running postgres daemon. """ def __init__(self, zenith_cli: ZenithCli, repo_dir: str, instance_num: int): + super().__init__(host='localhost', port=55431 + instance_num) + self.zenith_cli = zenith_cli self.instance_num = instance_num self.running = False - self.username = getpass.getuser() - self.host = 'localhost' - self.port = 55431 + instance_num # TODO: find a better way self.repo_dir = repo_dir self.branch: Optional[str] = None # dubious, see asserts below # path to conf is /pgdatadirs//postgresql.conf @@ -242,18 +278,18 @@ class Postgres: return self - """ Path to postgresql.conf """ - def config_file_path(self) -> str: - filename = 'pgdatadirs/{}/postgresql.conf'.format(self.branch) + """ Path to postgresql.conf """ + filename = f'pgdatadirs/{self.branch}/postgresql.conf' return os.path.join(self.repo_dir, filename) - """ - Adjust instance config for working with wal acceptors instead of - pageserver (pre-configured by CLI) directly. - """ + def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres': + """ + Adjust instance config for working with wal acceptors instead of + pageserver (pre-configured by CLI) directly. + """ - def adjust_for_wal_acceptors(self, wal_acceptors) -> 'Postgres': + # TODO: reuse config() with open(self.config_file_path(), "r") as f: cfg_lines = f.readlines() with open(self.config_file_path(), "w") as f: @@ -319,33 +355,11 @@ class Postgres: return self - def connstr(self, dbname: str = 'postgres', username: Optional[str] = None) -> str: - """ - Build a libpq connection string for the Postgres instance. - """ - - conn_str = 'host={} port={} dbname={} user={}'.format(self.host, self.port, dbname, - (username or self.username)) - - return conn_str - - def safe_psql(self, query, dbname='postgres', username=None): - """ - Execute query against the node and return all (fetchall) results - """ - with psycopg2.connect(self.connstr(dbname, username)) as conn: - with conn.cursor() as curs: - curs.execute(query) - if curs.description is None: - return [] # query didn't return data - return curs.fetchall() - class PostgresFactory: """ An object representing multiple running postgres daemons. """ def __init__(self, zenith_cli: ZenithCli, repo_dir: str): self.zenith_cli = zenith_cli - self.host = 'localhost' self.repo_dir = repo_dir self.num_instances = 0 self.instances: List[Postgres] = [] @@ -439,17 +453,13 @@ def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin: return PgBin(test_output_dir, pg_distrib_dir) -""" Read content of file into number """ - - def read_pid(path): + """ Read content of file into number """ return int(Path(path).read_text()) -""" An object representing a running wal acceptor daemon. """ - - class WalAcceptor: + """ An object representing a running wal acceptor daemon. """ def __init__(self, wa_binpath, data_dir, port, num): self.wa_binpath = wa_binpath self.data_dir = data_dir @@ -504,11 +514,12 @@ class WalAcceptorFactory: self.data_dir = data_dir self.instances = [] self.initial_port = 54321 + + def start_new(self) -> WalAcceptor: """ Start new wal acceptor. """ - def start_new(self) -> WalAcceptor: wa_num = len(self.instances) wa = WalAcceptor(self.wa_binpath, os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)), @@ -517,11 +528,11 @@ class WalAcceptorFactory: self.instances.append(wa) return wa - """ - Start n new wal acceptors - """ + def start_n_new(self, n: int) -> None: + """ + Start n new wal acceptors. + """ - def start_n_new(self, n): for _ in range(n): self.start_new() @@ -530,14 +541,13 @@ class WalAcceptorFactory: wa.stop() return self - """ Get list of wal acceptor endpoints suitable for wal_acceptors GUC """ - def get_connstrs(self) -> str: + """ Get list of wal acceptor endpoints suitable for wal_acceptors GUC """ return ','.join(["127.0.0.1:{}".format(wa.port) for wa in self.instances]) @zenfixture -def wa_factory(zenith_binpath, repo_dir) -> Iterator[WalAcceptorFactory]: +def wa_factory(zenith_binpath: str, repo_dir: str) -> Iterator[WalAcceptorFactory]: """ Gives WalAcceptorFactory providing wal acceptors. """ wafactory = WalAcceptorFactory(zenith_binpath, os.path.join(repo_dir, "wal_acceptors")) yield wafactory