diff --git a/test_runner/README.md b/test_runner/README.md index 4d42bbd914..daf3695192 100644 --- a/test_runner/README.md +++ b/test_runner/README.md @@ -65,36 +65,44 @@ Exit after the first test failure: `pytest -x ...` (there are many more pytest options; run `pytest -h` to see them.) +### Writing a test -### Building new tests +Every test needs a Zenith Environment, or ZenithEnv to operate in. A Zenith Environment +is like a little cloud-in-a-box, and consists of a Pageserver, 0-N Safekeepers, and +compute Postgres nodes. The connections between them can be configured to use JWT +authentication tokens, and some other configuration options can be tweaked too. -The tests make heavy use of pytest fixtures. You can read about how they work here: https://docs.pytest.org/en/stable/fixture.html +The easiest way to get access to a Zenith Environment is by using the `zenith_simple_env` +fixture. The 'simple' env may be shared across multiple tests, so don't shut down the nodes +or make other destructive changes in that environment. Also don't assume that +there are no tenants or branches or data in the cluster. For convenience, there is a +branch called `empty`, though. The convention is to create a test-specific branch of +that and load any test data there, instead of the 'main' branch. -Essentially, this means that each time you see a fixture named as an input parameter, the function with that name will be run and passed as a parameter to the function. - -So this code: +For more complicated cases, you can build a custom Zenith Environment, with the `zenith_env` +fixture: ```python -def test_something(zenith_cli, pg_bin): - pass +def test_foobar(zenith_env_builder: ZenithEnvBuilder): + # Prescribe the environment. + # We want to have 3 safekeeper nodes, and use JWT authentication in the + # connections to the page server + zenith_env_builder.num_safekeepers = 3 + zenith_env_builder.set_pageserver_auth(True) + + # Now create the environment. This initializes the repository, and starts + # up the page server and the safekeepers + env = zenith_env_builder.init() + + # Run the test + ... ``` -... will run the fixtures called `zenith_cli` and `pg_bin` and deliver those results to the test function. - -Fixtures can't be imported using the normal python syntax. Instead, use this: - -```python -pytest_plugins = ("fixtures.something") -``` - -That will make all the fixtures in the `fixtures/something.py` file available. - -Anything that's likely to be used in multiple tests should be built into a fixture. - -Note that fixtures can clean up after themselves if they use the `yield` syntax. -Cleanup will happen even if the test fails (raises an unhandled exception). -Python destructors, e.g. `__del__()` aren't recommended for cleanup. +For more information about pytest fixtures, see https://docs.pytest.org/en/stable/fixture.html +At the end of a test, all the nodes in the environment are automatically stopped, so you +don't need to worry about cleaning up. Logs and test data are preserved for the analysis, +in a directory under `../test_output/` ### Before submitting a patch #### Obligatory checks diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py index 0369cf1b13..025f21346f 100644 --- a/test_runner/batch_others/test_auth.py +++ b/test_runner/batch_others/test_auth.py @@ -2,18 +2,21 @@ from contextlib import closing from typing import Iterator from uuid import uuid4 import psycopg2 -from fixtures.zenith_fixtures import PortDistributor, Postgres, ZenithCli, ZenithPageserver, PgBin +from fixtures.zenith_fixtures import ZenithEnvBuilder import pytest pytest_plugins = ("fixtures.zenith_fixtures") -def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver): - ps = pageserver_auth_enabled +def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.pageserver_auth_enabled = True + env = zenith_env_builder.init() - tenant_token = ps.auth_keys.generate_tenant_token(ps.initial_tenant) - invalid_tenant_token = ps.auth_keys.generate_tenant_token(uuid4().hex) - management_token = ps.auth_keys.generate_management_token() + ps = env.pageserver + + tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant) + invalid_tenant_token = env.auth_keys.generate_tenant_token(uuid4().hex) + management_token = env.auth_keys.generate_management_token() # this does not invoke auth check and only decodes jwt and checks it for validity # check both tokens @@ -21,13 +24,13 @@ def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver): ps.safe_psql("status", password=management_token) # tenant can create branches - ps.safe_psql(f"branch_create {ps.initial_tenant} new1 main", password=tenant_token) + ps.safe_psql(f"branch_create {env.initial_tenant} new1 main", password=tenant_token) # console can create branches for tenant - ps.safe_psql(f"branch_create {ps.initial_tenant} new2 main", password=management_token) + ps.safe_psql(f"branch_create {env.initial_tenant} new2 main", password=management_token) # fail to create branch using token with different tenantid with pytest.raises(psycopg2.DatabaseError, match='Tenant id mismatch. Permission denied'): - ps.safe_psql(f"branch_create {ps.initial_tenant} new2 main", password=invalid_tenant_token) + ps.safe_psql(f"branch_create {env.initial_tenant} new2 main", password=invalid_tenant_token) # create tenant using management token ps.safe_psql(f"tenant_create {uuid4().hex}", password=management_token) @@ -40,38 +43,22 @@ def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver): @pytest.mark.parametrize('with_wal_acceptors', [False, True]) -def test_compute_auth_to_pageserver( - zenith_cli: ZenithCli, - wa_factory, - pageserver_auth_enabled: ZenithPageserver, - repo_dir: str, - with_wal_acceptors: bool, - port_distributor: PortDistributor, -): - ps = pageserver_auth_enabled - # since we are in progress of refactoring protocols between compute safekeeper and page server - # use hardcoded management token in safekeeper - management_token = ps.auth_keys.generate_management_token() +def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool): + zenith_env_builder.pageserver_auth_enabled = True + if with_wal_acceptors: + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init() branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}" - zenith_cli.run(["branch", branch, "empty"]) - if with_wal_acceptors: - wa_factory.start_n_new(3, management_token) + env.zenith_cli(["branch", branch, "main"]) - with Postgres( - zenith_cli=zenith_cli, - repo_dir=repo_dir, - tenant_id=ps.initial_tenant, - port=port_distributor.get_port(), - ).create_start( - branch, - wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, - ) as pg: - with closing(pg.connect()) as conn: - with conn.cursor() as cur: - # we rely upon autocommit after each statement - # as waiting for acceptors happens there - cur.execute('CREATE TABLE t(key int primary key, value text)') - cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") - cur.execute('SELECT sum(key) FROM t') - assert cur.fetchone() == (5000050000, ) + pg = env.postgres.create_start(branch) + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute('CREATE TABLE t(key int primary key, value text)') + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute('SELECT sum(key) FROM t') + assert cur.fetchone() == (5000050000, ) diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index 887671bf99..f13079503d 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -1,5 +1,5 @@ import subprocess -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -8,11 +8,12 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Create a couple of branches off the main branch, at a historical point in time. # -def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): +def test_branch_behind(zenith_simple_env: ZenithEnv): + env = zenith_simple_env # Branch at the point where only 100 rows were inserted - zenith_cli.run(["branch", "test_branch_behind", "empty"]) + env.zenith_cli(["branch", "test_branch_behind", "empty"]) - pgmain = postgres.create_start('test_branch_behind') + pgmain = env.postgres.create_start('test_branch_behind') log.info("postgres is running on 'test_branch_behind' branch") main_pg_conn = pgmain.connect() @@ -40,7 +41,7 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg log.info(f'LSN after 200100 rows: {lsn_b}') # Branch at the point where only 100 rows were inserted - zenith_cli.run(["branch", "test_branch_behind_hundred", "test_branch_behind@" + lsn_a]) + env.zenith_cli(["branch", "test_branch_behind_hundred", "test_branch_behind@" + lsn_a]) # Insert many more rows. This generates enough WAL to fill a few segments. main_cur.execute(''' @@ -55,10 +56,10 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg log.info(f'LSN after 400100 rows: {lsn_c}') # Branch at the point where only 200100 rows were inserted - zenith_cli.run(["branch", "test_branch_behind_more", "test_branch_behind@" + lsn_b]) + env.zenith_cli(["branch", "test_branch_behind_more", "test_branch_behind@" + lsn_b]) - pg_hundred = postgres.create_start("test_branch_behind_hundred") - pg_more = postgres.create_start("test_branch_behind_more") + pg_hundred = env.postgres.create_start("test_branch_behind_hundred") + pg_more = env.postgres.create_start("test_branch_behind_more") # On the 'hundred' branch, we should see only 100 rows hundred_pg_conn = pg_hundred.connect() @@ -79,8 +80,8 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg # Check bad lsn's for branching # branch at segment boundary - zenith_cli.run(["branch", "test_branch_segment_boundary", "test_branch_behind@0/3000000"]) - pg = postgres.create_start("test_branch_segment_boundary") + env.zenith_cli(["branch", "test_branch_segment_boundary", "test_branch_behind@0/3000000"]) + pg = env.postgres.create_start("test_branch_segment_boundary") cur = pg.connect().cursor() cur.execute('SELECT 1') assert cur.fetchone() == (1, ) @@ -89,7 +90,7 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg # # FIXME: This works currently, but probably shouldn't be allowed try: - zenith_cli.run(["branch", "test_branch_preinitdb", "test_branch_behind@0/42"]) + env.zenith_cli(["branch", "test_branch_preinitdb", "test_branch_behind@0/42"]) # FIXME: assert false, "branch with invalid LSN should have failed" except subprocess.CalledProcessError: log.info("Branch creation with pre-initdb LSN failed (as expected)") diff --git a/test_runner/batch_others/test_clog_truncate.py b/test_runner/batch_others/test_clog_truncate.py index a70e14d9a9..c52c78adee 100644 --- a/test_runner/batch_others/test_clog_truncate.py +++ b/test_runner/batch_others/test_clog_truncate.py @@ -3,7 +3,7 @@ import os from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -12,9 +12,10 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test compute node start after clog truncation # -def test_clog_truncate(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): +def test_clog_truncate(zenith_simple_env: ZenithEnv): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_clog_truncate", "empty"]) + env.zenith_cli(["branch", "test_clog_truncate", "empty"]) # set agressive autovacuum to make sure that truncation will happen config = [ @@ -27,7 +28,7 @@ def test_clog_truncate(zenith_cli, pageserver: ZenithPageserver, postgres: Postg 'autovacuum_freeze_max_age=100000' ] - pg = postgres.create_start('test_clog_truncate', config_lines=config) + pg = env.postgres.create_start('test_clog_truncate', config_lines=config) log.info('postgres is running on test_clog_truncate branch') # Install extension containing function needed for test @@ -64,10 +65,10 @@ def test_clog_truncate(zenith_cli, pageserver: ZenithPageserver, postgres: Postg # create new branch after clog truncation and start a compute node on it log.info(f'create branch at lsn_after_truncation {lsn_after_truncation}') - zenith_cli.run( + env.zenith_cli( ["branch", "test_clog_truncate_new", "test_clog_truncate@" + lsn_after_truncation]) - pg2 = postgres.create_start('test_clog_truncate_new') + pg2 = env.postgres.create_start('test_clog_truncate_new') log.info('postgres is running on test_clog_truncate_new branch') # check that new node doesn't contain truncated segment diff --git a/test_runner/batch_others/test_config.py b/test_runner/batch_others/test_config.py index 0ec27b81e2..b39b2f2dcf 100644 --- a/test_runner/batch_others/test_config.py +++ b/test_runner/batch_others/test_config.py @@ -1,6 +1,6 @@ from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -9,12 +9,13 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test starting Postgres with custom options # -def test_config(zenith_cli, postgres: PostgresFactory): +def test_config(zenith_simple_env: ZenithEnv): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_config", "empty"]) + env.zenith_cli(["branch", "test_config", "empty"]) # change config - pg = postgres.create_start('test_config', config_lines=['log_min_messages=debug1']) + pg = env.postgres.create_start('test_config', config_lines=['log_min_messages=debug1']) log.info('postgres is running on test_config branch') with closing(pg.connect()) as conn: diff --git a/test_runner/batch_others/test_createdropdb.py b/test_runner/batch_others/test_createdropdb.py index 83b6a799ae..45b344774a 100644 --- a/test_runner/batch_others/test_createdropdb.py +++ b/test_runner/batch_others/test_createdropdb.py @@ -2,7 +2,7 @@ import os import pathlib from contextlib import closing -from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory, ZenithCli, check_restored_datadir_content +from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -11,15 +11,11 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test CREATE DATABASE when there have been relmapper changes # -def test_createdb( - zenith_cli: ZenithCli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin, -): - zenith_cli.run(["branch", "test_createdb", "empty"]) +def test_createdb(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + env.zenith_cli(["branch", "test_createdb", "empty"]) - pg = postgres.create_start('test_createdb') + pg = env.postgres.create_start('test_createdb') log.info("postgres is running on 'test_createdb' branch") with closing(pg.connect()) as conn: @@ -33,9 +29,9 @@ def test_createdb( lsn = cur.fetchone()[0] # Create a branch - zenith_cli.run(["branch", "test_createdb2", "test_createdb@" + lsn]) + env.zenith_cli(["branch", "test_createdb2", "test_createdb@" + lsn]) - pg2 = postgres.create_start('test_createdb2') + pg2 = env.postgres.create_start('test_createdb2') # Test that you can connect to the new database on both branches for db in (pg, pg2): @@ -45,16 +41,11 @@ def test_createdb( # # Test DROP DATABASE # -def test_dropdb( - zenith_cli: ZenithCli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin, - test_output_dir, -): - zenith_cli.run(["branch", "test_dropdb", "empty"]) +def test_dropdb(zenith_simple_env: ZenithEnv, test_output_dir): + env = zenith_simple_env + env.zenith_cli(["branch", "test_dropdb", "empty"]) - pg = postgres.create_start('test_dropdb') + pg = env.postgres.create_start('test_dropdb') log.info("postgres is running on 'test_dropdb' branch") with closing(pg.connect()) as conn: @@ -77,11 +68,11 @@ def test_dropdb( lsn_after_drop = cur.fetchone()[0] # Create two branches before and after database drop. - zenith_cli.run(["branch", "test_before_dropdb", "test_dropdb@" + lsn_before_drop]) - pg_before = postgres.create_start('test_before_dropdb') + env.zenith_cli(["branch", "test_before_dropdb", "test_dropdb@" + lsn_before_drop]) + pg_before = env.postgres.create_start('test_before_dropdb') - zenith_cli.run(["branch", "test_after_dropdb", "test_dropdb@" + lsn_after_drop]) - pg_after = postgres.create_start('test_after_dropdb') + env.zenith_cli(["branch", "test_after_dropdb", "test_dropdb@" + lsn_after_drop]) + pg_after = env.postgres.create_start('test_after_dropdb') # Test that database exists on the branch before drop pg_before.connect(dbname='foodb').close() @@ -101,4 +92,4 @@ def test_dropdb( assert os.path.isdir(dbpath) == False # Check that we restore the content of the datadir correctly - check_restored_datadir_content(test_output_dir, pg, pageserver.service_port.pg) + check_restored_datadir_content(test_output_dir, env, pg) diff --git a/test_runner/batch_others/test_createuser.py b/test_runner/batch_others/test_createuser.py index 57cc610f55..e13c39eafe 100644 --- a/test_runner/batch_others/test_createuser.py +++ b/test_runner/batch_others/test_createuser.py @@ -1,6 +1,6 @@ from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -9,10 +9,11 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test CREATE USER to check shared catalog restore # -def test_createuser(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): - zenith_cli.run(["branch", "test_createuser", "empty"]) +def test_createuser(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + env.zenith_cli(["branch", "test_createuser", "empty"]) - pg = postgres.create_start('test_createuser') + pg = env.postgres.create_start('test_createuser') log.info("postgres is running on 'test_createuser' branch") with closing(pg.connect()) as conn: @@ -26,9 +27,9 @@ def test_createuser(zenith_cli, pageserver: ZenithPageserver, postgres: Postgres lsn = cur.fetchone()[0] # Create a branch - zenith_cli.run(["branch", "test_createuser2", "test_createuser@" + lsn]) + env.zenith_cli(["branch", "test_createuser2", "test_createuser@" + lsn]) - pg2 = postgres.create_start('test_createuser2') + pg2 = env.postgres.create_start('test_createuser2') # Test that you can connect to new branch as a new user assert pg2.safe_psql('select current_user', username='testuser') == [('testuser', )] diff --git a/test_runner/batch_others/test_multixact.py b/test_runner/batch_others/test_multixact.py index 7f032db8c3..96fcb6871d 100644 --- a/test_runner/batch_others/test_multixact.py +++ b/test_runner/batch_others/test_multixact.py @@ -1,4 +1,4 @@ -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver, check_restored_datadir_content +from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -10,15 +10,11 @@ pytest_plugins = ("fixtures.zenith_fixtures") # it only checks next_multixact_id field in restored pg_control, # since we don't have functions to check multixact internals. # -def test_multixact(pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin, - zenith_cli, - base_dir, - test_output_dir): +def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_multixact", "empty"]) - pg = postgres.create_start('test_multixact') + env.zenith_cli(["branch", "test_multixact", "empty"]) + pg = env.postgres.create_start('test_multixact') log.info("postgres is running on 'test_multixact' branch") pg_conn = pg.connect() @@ -57,8 +53,8 @@ def test_multixact(pageserver: ZenithPageserver, assert int(next_multixact_id) > int(next_multixact_id_old) # Branch at this point - zenith_cli.run(["branch", "test_multixact_new", "test_multixact@" + lsn]) - pg_new = postgres.create_start('test_multixact_new') + env.zenith_cli(["branch", "test_multixact_new", "test_multixact@" + lsn]) + pg_new = env.postgres.create_start('test_multixact_new') log.info("postgres is running on 'test_multixact_new' branch") pg_new_conn = pg_new.connect() @@ -71,4 +67,4 @@ def test_multixact(pageserver: ZenithPageserver, assert next_multixact_id_new == next_multixact_id # Check that we restore the content of the datadir correctly - check_restored_datadir_content(test_output_dir, pg_new, pageserver.service_port.pg) + check_restored_datadir_content(test_output_dir, env, pg_new) diff --git a/test_runner/batch_others/test_old_request_lsn.py b/test_runner/batch_others/test_old_request_lsn.py index 6cc5c01b83..e01fa06e12 100644 --- a/test_runner/batch_others/test_old_request_lsn.py +++ b/test_runner/batch_others/test_old_request_lsn.py @@ -1,6 +1,6 @@ from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -16,13 +16,11 @@ pytest_plugins = ("fixtures.zenith_fixtures") # just a hint that the page hasn't been modified since that LSN, and the page # server should return the latest page version regardless of the LSN. # -def test_old_request_lsn(zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin): +def test_old_request_lsn(zenith_simple_env: ZenithEnv): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_old_request_lsn", "empty"]) - pg = postgres.create_start('test_old_request_lsn') + env.zenith_cli(["branch", "test_old_request_lsn", "empty"]) + pg = env.postgres.create_start('test_old_request_lsn') log.info('postgres is running on test_old_request_lsn branch') pg_conn = pg.connect() @@ -32,7 +30,7 @@ def test_old_request_lsn(zenith_cli, cur.execute("SHOW zenith.zenith_timeline") timeline = cur.fetchone()[0] - psconn = pageserver.connect() + psconn = env.pageserver.connect() pscur = psconn.cursor() # Create table, and insert some rows. Make it big enough that it doesn't fit in @@ -59,7 +57,7 @@ def test_old_request_lsn(zenith_cli, # Make a lot of updates on a single row, generating a lot of WAL. Trigger # garbage collections so that the page server will remove old page versions. for i in range(10): - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") for j in range(100): cur.execute('UPDATE foo SET val = val + 1 WHERE id = 1;') diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index e7674491c2..1939837d56 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -3,26 +3,28 @@ from uuid import uuid4 import pytest import psycopg2 import requests -from fixtures.zenith_fixtures import ZenithCli, ZenithPageserver, ZenithPageserverHttpClient +from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient from typing import cast pytest_plugins = ("fixtures.zenith_fixtures") -def test_status_psql(pageserver): - assert pageserver.safe_psql('status') == [ +def test_status_psql(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + assert env.pageserver.safe_psql('status') == [ ('hello world', ), ] -def test_branch_list_psql(pageserver: ZenithPageserver, zenith_cli): +def test_branch_list_psql(zenith_simple_env: ZenithEnv): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_branch_list_main", "empty"]) + env.zenith_cli(["branch", "test_branch_list_main", "empty"]) - conn = pageserver.connect() + conn = env.pageserver.connect() cur = conn.cursor() - cur.execute(f'branch_list {pageserver.initial_tenant}') + cur.execute(f'branch_list {env.initial_tenant}') branches = json.loads(cur.fetchone()[0]) # Filter out branches created by other tests branches = [x for x in branches if x['name'].startswith('test_branch_list')] @@ -35,10 +37,10 @@ def test_branch_list_psql(pageserver: ZenithPageserver, zenith_cli): assert 'ancestor_lsn' in branches[0] # Create another branch, and start Postgres on it - zenith_cli.run(['branch', 'test_branch_list_experimental', 'test_branch_list_main']) - zenith_cli.run(['pg', 'create', 'test_branch_list_experimental']) + env.zenith_cli(['branch', 'test_branch_list_experimental', 'test_branch_list_main']) + env.zenith_cli(['pg', 'create', 'test_branch_list_experimental']) - cur.execute(f'branch_list {pageserver.initial_tenant}') + cur.execute(f'branch_list {env.initial_tenant}') new_branches = json.loads(cur.fetchone()[0]) # Filter out branches created by other tests new_branches = [x for x in new_branches if x['name'].startswith('test_branch_list')] @@ -54,19 +56,22 @@ def test_branch_list_psql(pageserver: ZenithPageserver, zenith_cli): conn.close() -def test_tenant_list_psql(pageserver: ZenithPageserver, zenith_cli: ZenithCli): - res = zenith_cli.run(["tenant", "list"]) +def test_tenant_list_psql(zenith_env_builder: ZenithEnvBuilder): + # don't use zenith_simple_env, because there might be other tenants there, + # left over from other tests. + env = zenith_env_builder.init() + + res = env.zenith_cli(["tenant", "list"]) res.check_returncode() tenants = sorted(map(lambda t: t.split()[0], res.stdout.splitlines())) - assert tenants == [pageserver.initial_tenant] + assert tenants == [env.initial_tenant] - conn = pageserver.connect() + conn = env.pageserver.connect() cur = conn.cursor() # check same tenant cannot be created twice - with pytest.raises(psycopg2.DatabaseError, - match=f'tenant {pageserver.initial_tenant} already exists'): - cur.execute(f'tenant_create {pageserver.initial_tenant}') + with pytest.raises(psycopg2.DatabaseError, match=f'tenant {env.initial_tenant} already exists'): + cur.execute(f'tenant_create {env.initial_tenant}') # create one more tenant tenant1 = uuid4().hex @@ -76,7 +81,7 @@ def test_tenant_list_psql(pageserver: ZenithPageserver, zenith_cli: ZenithCli): # compare tenants list new_tenants = sorted(map(lambda t: cast(str, t['id']), json.loads(cur.fetchone()[0]))) - assert sorted([pageserver.initial_tenant, tenant1]) == new_tenants + assert sorted([env.initial_tenant, tenant1]) == new_tenants def check_client(client: ZenithPageserverHttpClient, initial_tenant: str): @@ -98,12 +103,17 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: str): assert branch_name in {b['name'] for b in client.branch_list(tenant_id)} -def test_pageserver_http_api_client(pageserver: ZenithPageserver): - client = pageserver.http_client() - check_client(client, pageserver.initial_tenant) +def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + client = env.pageserver.http_client() + check_client(client, env.initial_tenant) -def test_pageserver_http_api_client_auth_enabled(pageserver_auth_enabled: ZenithPageserver): - client = pageserver_auth_enabled.http_client( - auth_token=pageserver_auth_enabled.auth_keys.generate_management_token()) - check_client(client, pageserver_auth_enabled.initial_tenant) +def test_pageserver_http_api_client_auth_enabled(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.pageserver_auth_enabled = True + env = zenith_env_builder.init() + + management_token = env.auth_keys.generate_management_token() + + client = env.pageserver.http_client(auth_token=management_token) + check_client(client, env.initial_tenant) diff --git a/test_runner/batch_others/test_pageserver_restart.py b/test_runner/batch_others/test_pageserver_restart.py index 5b4943aa27..a483450c54 100644 --- a/test_runner/batch_others/test_pageserver_restart.py +++ b/test_runner/batch_others/test_pageserver_restart.py @@ -4,7 +4,7 @@ import time from contextlib import closing from multiprocessing import Process, Value -from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory +from fixtures.zenith_fixtures import ZenithEnvBuilder from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -13,16 +13,13 @@ pytest_plugins = ("fixtures.zenith_fixtures") # Check that dead minority doesn't prevent the commits: execute insert n_inserts # times, with fault_probability chance of getting a wal acceptor down or up # along the way. 2 of 3 are always alive, so the work keeps going. -def test_pageserver_restart(zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - wa_factory: WalAcceptorFactory): - +def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder): # One safekeeper is enough for this test. - wa_factory.start_n_new(1) + zenith_env_builder.num_safekeepers = 1 + env = zenith_env_builder.init() - zenith_cli.run(["branch", "test_pageserver_restart", "empty"]) - pg = postgres.create_start('test_pageserver_restart', wal_acceptors=wa_factory.get_connstrs()) + env.zenith_cli(["branch", "test_pageserver_restart", "main"]) + pg = env.postgres.create_start('test_pageserver_restart') pg_conn = pg.connect() cur = pg_conn.cursor() @@ -50,8 +47,8 @@ def test_pageserver_restart(zenith_cli, # Stop and restart pageserver. This is a more or less graceful shutdown, although # the page server doesn't currently have a shutdown routine so there's no difference # between stopping and crashing. - pageserver.stop() - pageserver.start() + env.pageserver.stop() + env.pageserver.start() # Stopping the pageserver breaks the connection from the postgres backend to # the page server, and causes the next query on the connection to fail. Start a new @@ -65,5 +62,5 @@ def test_pageserver_restart(zenith_cli, assert cur.fetchone() == (100000, ) # Stop the page server by force, and restart it - pageserver.stop() - pageserver.start() + env.pageserver.stop() + env.pageserver.start() diff --git a/test_runner/batch_others/test_pgbench.py b/test_runner/batch_others/test_pgbench.py index 46633daa34..3b12502ae0 100644 --- a/test_runner/batch_others/test_pgbench.py +++ b/test_runner/batch_others/test_pgbench.py @@ -1,14 +1,15 @@ -from fixtures.zenith_fixtures import PostgresFactory +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") -def test_pgbench(postgres: PostgresFactory, pg_bin, zenith_cli): +def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_pgbench", "empty"]) + env.zenith_cli(["branch", "test_pgbench", "empty"]) - pg = postgres.create_start('test_pgbench') + pg = env.postgres.create_start('test_pgbench') log.info("postgres is running on 'test_pgbench' branch") connstr = pg.connstr() diff --git a/test_runner/batch_others/test_readonly_node.py b/test_runner/batch_others/test_readonly_node.py index 7f964470f3..904b5da5c9 100644 --- a/test_runner/batch_others/test_readonly_node.py +++ b/test_runner/batch_others/test_readonly_node.py @@ -1,5 +1,5 @@ import subprocess -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv pytest_plugins = ("fixtures.zenith_fixtures") @@ -10,10 +10,11 @@ pytest_plugins = ("fixtures.zenith_fixtures") # This is very similar to the 'test_branch_behind' test, but instead of # creating branches, creates read-only nodes. # -def test_readonly_node(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): - zenith_cli.run(["branch", "test_readonly_node", "empty"]) +def test_readonly_node(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + env.zenith_cli(["branch", "test_readonly_node", "empty"]) - pgmain = postgres.create_start('test_readonly_node') + pgmain = env.postgres.create_start('test_readonly_node') print("postgres is running on 'test_readonly_node' branch") main_pg_conn = pgmain.connect() @@ -52,11 +53,12 @@ def test_readonly_node(zenith_cli, pageserver: ZenithPageserver, postgres: Postg print('LSN after 400100 rows: ' + lsn_c) # Create first read-only node at the point where only 100 rows were inserted - pg_hundred = postgres.create_start("test_readonly_node_hundred", - branch=f'test_readonly_node@{lsn_a}') + pg_hundred = env.postgres.create_start("test_readonly_node_hundred", + branch=f'test_readonly_node@{lsn_a}') # And another at the point where 200100 rows were inserted - pg_more = postgres.create_start("test_readonly_node_more", branch=f'test_readonly_node@{lsn_b}') + pg_more = env.postgres.create_start("test_readonly_node_more", + branch=f'test_readonly_node@{lsn_b}') # On the 'hundred' node, we should see only 100 rows hundred_pg_conn = pg_hundred.connect() @@ -75,15 +77,15 @@ def test_readonly_node(zenith_cli, pageserver: ZenithPageserver, postgres: Postg assert main_cur.fetchone() == (400100, ) # Check creating a node at segment boundary - pg = postgres.create_start("test_branch_segment_boundary", - branch="test_readonly_node@0/3000000") + pg = env.postgres.create_start("test_branch_segment_boundary", + branch="test_readonly_node@0/3000000") cur = pg.connect().cursor() cur.execute('SELECT 1') assert cur.fetchone() == (1, ) # Create node at pre-initdb lsn try: - zenith_cli.run(["pg", "start", "test_branch_preinitdb", "test_readonly_node@0/42"]) + env.zenith_cli(["pg", "start", "test_branch_preinitdb", "test_readonly_node@0/42"]) assert False, "compute node startup with invalid LSN should have failed" except Exception: print("Node creation with pre-initdb LSN failed (as expected)") diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index 5d47d32aac..2b871f91fd 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -1,7 +1,7 @@ import pytest from contextlib import closing -from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory +from fixtures.zenith_fixtures import ZenithEnvBuilder from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -11,22 +11,15 @@ pytest_plugins = ("fixtures.zenith_fixtures") # Test restarting and recreating a postgres instance # @pytest.mark.parametrize('with_wal_acceptors', [False, True]) -def test_restart_compute( - zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin, - wa_factory, - with_wal_acceptors: bool, -): - wal_acceptor_connstrs = None - zenith_cli.run(["branch", "test_restart_compute", "empty"]) - +def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool): + zenith_env_builder.pageserver_auth_enabled = True if with_wal_acceptors: - wa_factory.start_n_new(3) - wal_acceptor_connstrs = wa_factory.get_connstrs() + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init() - pg = postgres.create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs) + env.zenith_cli(["branch", "test_restart_compute", "main"]) + + pg = env.postgres.create_start('test_restart_compute') log.info("postgres is running on 'test_restart_compute' branch") with closing(pg.connect()) as conn: @@ -39,7 +32,7 @@ def test_restart_compute( log.info(f"res = {r}") # Remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs) + pg.stop_and_destroy().create_start('test_restart_compute') with closing(pg.connect()) as conn: with conn.cursor() as cur: @@ -58,7 +51,7 @@ def test_restart_compute( log.info(f"res = {r}") # Again remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs) + pg.stop_and_destroy().create_start('test_restart_compute') # That select causes lots of FPI's and increases probability of wakeepers # lagging behind after query completion @@ -72,7 +65,7 @@ def test_restart_compute( log.info(f"res = {r}") # And again remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs) + pg.stop_and_destroy().create_start('test_restart_compute') with closing(pg.connect()) as conn: with conn.cursor() as cur: diff --git a/test_runner/batch_others/test_snapfiles_gc.py b/test_runner/batch_others/test_snapfiles_gc.py index a799b34aa6..9b57d5acbe 100644 --- a/test_runner/batch_others/test_snapfiles_gc.py +++ b/test_runner/batch_others/test_snapfiles_gc.py @@ -1,6 +1,7 @@ from contextlib import closing import psycopg2.extras import time +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -22,13 +23,14 @@ def print_gc_result(row): # This test is pretty tightly coupled with the current implementation of layered # storage, in layered_repository.rs. # -def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin): - zenith_cli.run(["branch", "test_layerfiles_gc", "empty"]) - pg = postgres.create_start('test_layerfiles_gc') +def test_layerfiles_gc(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + env.zenith_cli(["branch", "test_layerfiles_gc", "empty"]) + pg = env.postgres.create_start('test_layerfiles_gc') with closing(pg.connect()) as conn: with conn.cursor() as cur: - with closing(pageserver.connect()) as psconn: + with closing(env.pageserver.connect()) as psconn: with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: # Get the timeline ID of our branch. We need it for the 'do_gc' command @@ -57,7 +59,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin): cur.execute("DELETE FROM foo") log.info("Running GC before test") - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") row = pscur.fetchone() print_gc_result(row) # remember the number of files @@ -70,7 +72,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin): # removing the old image and delta layer. log.info("Inserting one row and running GC") cur.execute("INSERT INTO foo VALUES (1)") - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") row = pscur.fetchone() print_gc_result(row) assert row['layer_relfiles_total'] == layer_relfiles_remain + 2 @@ -84,7 +86,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin): cur.execute("INSERT INTO foo VALUES (2)") cur.execute("INSERT INTO foo VALUES (3)") - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") row = pscur.fetchone() print_gc_result(row) assert row['layer_relfiles_total'] == layer_relfiles_remain + 2 @@ -96,7 +98,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin): cur.execute("INSERT INTO foo VALUES (2)") cur.execute("INSERT INTO foo VALUES (3)") - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") row = pscur.fetchone() print_gc_result(row) assert row['layer_relfiles_total'] == layer_relfiles_remain + 2 @@ -105,7 +107,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin): # Run GC again, with no changes in the database. Should not remove anything. log.info("Run GC again, with nothing to do") - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") row = pscur.fetchone() print_gc_result(row) assert row['layer_relfiles_total'] == layer_relfiles_remain @@ -118,7 +120,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin): log.info("Drop table and run GC again") cur.execute("DROP TABLE foo") - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") row = pscur.fetchone() print_gc_result(row) diff --git a/test_runner/batch_others/test_tenants.py b/test_runner/batch_others/test_tenants.py index d646f10666..d87621c172 100644 --- a/test_runner/batch_others/test_tenants.py +++ b/test_runner/batch_others/test_tenants.py @@ -2,51 +2,41 @@ from contextlib import closing import pytest -from fixtures.zenith_fixtures import ( - TenantFactory, - ZenithCli, - PostgresFactory, -) +from fixtures.zenith_fixtures import ZenithEnvBuilder @pytest.mark.parametrize('with_wal_acceptors', [False, True]) -def test_tenants_normal_work( - zenith_cli: ZenithCli, - tenant_factory: TenantFactory, - postgres: PostgresFactory, - wa_factory, - with_wal_acceptors: bool, -): - """Tests tenants with and without wal acceptors""" - tenant_1 = tenant_factory.create() - tenant_2 = tenant_factory.create() +def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool): + if with_wal_acceptors: + zenith_env_builder.num_safekeepers = 3 - zenith_cli.run([ + env = zenith_env_builder.init() + """Tests tenants with and without wal acceptors""" + tenant_1 = env.create_tenant() + tenant_2 = env.create_tenant() + + env.zenith_cli([ "branch", f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", "main", f"--tenantid={tenant_1}" ]) - zenith_cli.run([ + env.zenith_cli([ "branch", f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", "main", f"--tenantid={tenant_2}" ]) - if with_wal_acceptors: - wa_factory.start_n_new(3) - pg_tenant1 = postgres.create_start( + pg_tenant1 = env.postgres.create_start( f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", None, # branch name, None means same as node name tenant_1, - wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, ) - pg_tenant2 = postgres.create_start( + pg_tenant2 = env.postgres.create_start( f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", None, # branch name, None means same as node name tenant_2, - wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, ) for pg in [pg_tenant1, pg_tenant2]: diff --git a/test_runner/batch_others/test_timeline_size.py b/test_runner/batch_others/test_timeline_size.py index 819edc26b4..3c58b0b30e 100644 --- a/test_runner/batch_others/test_timeline_size.py +++ b/test_runner/batch_others/test_timeline_size.py @@ -1,19 +1,20 @@ from contextlib import closing from uuid import UUID import psycopg2.extras -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log -def test_timeline_size(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): +def test_timeline_size(zenith_simple_env: ZenithEnv): + env = zenith_simple_env # Branch at the point where only 100 rows were inserted - zenith_cli.run(["branch", "test_timeline_size", "empty"]) + env.zenith_cli(["branch", "test_timeline_size", "empty"]) - client = pageserver.http_client() - res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size") + client = env.pageserver.http_client() + res = client.branch_detail(UUID(env.initial_tenant), "test_timeline_size") assert res["current_logical_size"] == res["current_logical_size_non_incremental"] - pgmain = postgres.create_start("test_timeline_size") + pgmain = env.postgres.create_start("test_timeline_size") log.info("postgres is running on 'test_timeline_size' branch") with closing(pgmain.connect()) as conn: @@ -28,9 +29,9 @@ def test_timeline_size(zenith_cli, pageserver: ZenithPageserver, postgres: Postg FROM generate_series(1, 10) g """) - res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size") + res = client.branch_detail(UUID(env.initial_tenant), "test_timeline_size") assert res["current_logical_size"] == res["current_logical_size_non_incremental"] cur.execute("TRUNCATE foo") - res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size") + res = client.branch_detail(UUID(env.initial_tenant), "test_timeline_size") assert res["current_logical_size"] == res["current_logical_size_non_incremental"] diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index bc6ee076c1..6f6596c6dd 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -1,6 +1,6 @@ import os -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver, PgBin +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -9,13 +9,11 @@ pytest_plugins = ("fixtures.zenith_fixtures") # # Test branching, when a transaction is in prepared state # -def test_twophase(zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin: PgBin): - zenith_cli.run(["branch", "test_twophase", "empty"]) +def test_twophase(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + env.zenith_cli(["branch", "test_twophase", "empty"]) - pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5']) + pg = env.postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5']) log.info("postgres is running on 'test_twophase' branch") conn = pg.connect() @@ -60,10 +58,10 @@ def test_twophase(zenith_cli, assert len(twophase_files) == 2 # Create a branch with the transaction in prepared state - zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"]) + env.zenith_cli(["branch", "test_twophase_prepared", "test_twophase"]) # Start compute on the new branch - pg2 = postgres.create_start( + pg2 = env.postgres.create_start( 'test_twophase_prepared', config_lines=['max_prepared_transactions=5'], ) diff --git a/test_runner/batch_others/test_vm_bits.py b/test_runner/batch_others/test_vm_bits.py index 6f19940f2f..5e5fbfa646 100644 --- a/test_runner/batch_others/test_vm_bits.py +++ b/test_runner/batch_others/test_vm_bits.py @@ -1,4 +1,4 @@ -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -8,14 +8,12 @@ pytest_plugins = ("fixtures.zenith_fixtures") # Test that the VM bit is cleared correctly at a HEAP_DELETE and # HEAP_UPDATE record. # -def test_vm_bit_clear(pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin, - zenith_cli, - base_dir): +def test_vm_bit_clear(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + # Create a branch for us - zenith_cli.run(["branch", "test_vm_bit_clear", "empty"]) - pg = postgres.create_start('test_vm_bit_clear') + env.zenith_cli(["branch", "test_vm_bit_clear", "empty"]) + pg = env.postgres.create_start('test_vm_bit_clear') log.info("postgres is running on 'test_vm_bit_clear' branch") pg_conn = pg.connect() @@ -38,7 +36,7 @@ def test_vm_bit_clear(pageserver: ZenithPageserver, cur.execute('UPDATE vmtest_update SET id = 5000 WHERE id = 1') # Branch at this point, to test that later - zenith_cli.run(["branch", "test_vm_bit_clear_new", "test_vm_bit_clear"]) + env.zenith_cli(["branch", "test_vm_bit_clear_new", "test_vm_bit_clear"]) # Clear the buffer cache, to force the VM page to be re-fetched from # the page server @@ -66,7 +64,7 @@ def test_vm_bit_clear(pageserver: ZenithPageserver, # a dirty VM page is evicted. If the VM bit was not correctly cleared by the # earlier WAL record, the full-page image hides the problem. Starting a new # server at the right point-in-time avoids that full-page image. - pg_new = postgres.create_start('test_vm_bit_clear_new') + pg_new = env.postgres.create_start('test_vm_bit_clear_new') log.info("postgres is running on 'test_vm_bit_clear_new' branch") pg_new_conn = pg_new.connect() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 3eaadc78a6..8e33264a20 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -7,7 +7,7 @@ import uuid from contextlib import closing from multiprocessing import Process, Value -from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory, PgBin +from fixtures.zenith_fixtures import PgBin, ZenithEnv, ZenithEnvBuilder from fixtures.utils import lsn_to_hex, mkdir_if_needed from fixtures.log_helper import log @@ -16,14 +16,13 @@ pytest_plugins = ("fixtures.zenith_fixtures") # basic test, write something in setup with wal acceptors, ensure that commits # succeed and data is written -def test_normal_work(zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - wa_factory): - zenith_cli.run(["branch", "test_wal_acceptors_normal_work", "empty"]) - wa_factory.start_n_new(3) - pg = postgres.create_start('test_wal_acceptors_normal_work', - wal_acceptors=wa_factory.get_connstrs()) +def test_normal_work(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init() + + env.zenith_cli(["branch", "test_wal_acceptors_normal_work", "main"]) + + pg = env.postgres.create_start('test_wal_acceptors_normal_work') with closing(pg.connect()) as conn: with conn.cursor() as cur: @@ -37,21 +36,19 @@ def test_normal_work(zenith_cli, # Run page server and multiple acceptors, and multiple compute nodes running # against different timelines. -def test_many_timelines(zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - wa_factory): - n_timelines = 2 +def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init() - wa_factory.start_n_new(3) + n_timelines = 2 branches = ["test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)] # start postgres on each timeline pgs = [] for branch in branches: - zenith_cli.run(["branch", branch, "empty"]) - pgs.append(postgres.create_start(branch, wal_acceptors=wa_factory.get_connstrs())) + env.zenith_cli(["branch", branch, "main"]) + pgs.append(env.postgres.create_start(branch)) # Do everything in different loops to have actions on different timelines # interleaved. @@ -72,19 +69,16 @@ def test_many_timelines(zenith_cli, # Check that dead minority doesn't prevent the commits: execute insert n_inserts # times, with fault_probability chance of getting a wal acceptor down or up # along the way. 2 of 3 are always alive, so the work keeps going. -def test_restarts(zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - wa_factory: WalAcceptorFactory): +def test_restarts(zenith_env_builder: ZenithEnvBuilder): fault_probability = 0.01 n_inserts = 1000 n_acceptors = 3 - wa_factory.start_n_new(n_acceptors) + zenith_env_builder.num_safekeepers = n_acceptors + env = zenith_env_builder.init() - zenith_cli.run(["branch", "test_wal_acceptors_restarts", "empty"]) - pg = postgres.create_start('test_wal_acceptors_restarts', - wal_acceptors=wa_factory.get_connstrs()) + env.zenith_cli(["branch", "test_wal_acceptors_restarts", "main"]) + pg = env.postgres.create_start('test_wal_acceptors_restarts') # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -98,7 +92,7 @@ def test_restarts(zenith_cli, if random.random() <= fault_probability: if failed_node is None: - failed_node = wa_factory.instances[random.randrange(0, n_acceptors)] + failed_node = env.safekeepers[random.randrange(0, n_acceptors)] failed_node.stop() else: failed_node.start() @@ -116,12 +110,12 @@ def delayed_wal_acceptor_start(wa): # When majority of acceptors is offline, commits are expected to be frozen -def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory): - wa_factory.start_n_new(2) +def test_unavailability(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 2 + env = zenith_env_builder.init() - zenith_cli.run(["branch", "test_wal_acceptors_unavailability", "empty"]) - pg = postgres.create_start('test_wal_acceptors_unavailability', - wal_acceptors=wa_factory.get_connstrs()) + env.zenith_cli(["branch", "test_wal_acceptors_unavailability", "main"]) + pg = env.postgres.create_start('test_wal_acceptors_unavailability') # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -133,9 +127,9 @@ def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory): cur.execute("INSERT INTO t values (1, 'payload')") # shutdown one of two acceptors, that is, majority - wa_factory.instances[0].stop() + env.safekeepers[0].stop() - proc = Process(target=delayed_wal_acceptor_start, args=(wa_factory.instances[0], )) + proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[0], )) proc.start() start = time.time() @@ -145,9 +139,9 @@ def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory): proc.join() # for the world's balance, do the same with second acceptor - wa_factory.instances[1].stop() + env.safekeepers[1].stop() - proc = Process(target=delayed_wal_acceptor_start, args=(wa_factory.instances[1], )) + proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[1], )) proc.start() start = time.time() @@ -186,17 +180,13 @@ def stop_value(): # do inserts while concurrently getting up/down subsets of acceptors -def test_race_conditions(zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - wa_factory, - stop_value): +def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): - wa_factory.start_n_new(3) + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init() - zenith_cli.run(["branch", "test_wal_acceptors_race_conditions", "empty"]) - pg = postgres.create_start('test_wal_acceptors_race_conditions', - wal_acceptors=wa_factory.get_connstrs()) + env.zenith_cli(["branch", "test_wal_acceptors_race_conditions", "main"]) + pg = env.postgres.create_start('test_wal_acceptors_race_conditions') # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -205,7 +195,7 @@ def test_race_conditions(zenith_cli, cur.execute('CREATE TABLE t(key int primary key, value text)') - proc = Process(target=xmas_garland, args=(wa_factory.instances, stop_value)) + proc = Process(target=xmas_garland, args=(env.safekeepers, stop_value)) proc.start() for i in range(1000): @@ -220,7 +210,8 @@ def test_race_conditions(zenith_cli, class ProposerPostgres: """Object for running safekeepers sync with walproposer""" - def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str): + def __init__(self, env: ZenithEnv, pgdata_dir: str, pg_bin, timeline_id: str, tenant_id: str): + self.env = env self.pgdata_dir: str = pgdata_dir self.pg_bin: PgBin = pg_bin self.timeline_id: str = timeline_id @@ -266,16 +257,20 @@ class ProposerPostgres: # insert wal in all safekeepers and run sync on proposer -def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory): - wa_factory.start_n_new(3) +def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin): + + # We don't really need the full environment for this test, just the + # safekeepers would be enough. + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init() timeline_id = uuid.uuid4().hex tenant_id = uuid.uuid4().hex # write config for proposer - pgdata_dir = os.path.join(repo_dir, "proposer_pgdata") - pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id) - pg.create_dir_config(wa_factory.get_connstrs()) + pgdata_dir = os.path.join(env.repo_dir, "proposer_pgdata") + pg = ProposerPostgres(env, pgdata_dir, pg_bin, timeline_id, tenant_id) + pg.create_dir_config(env.get_safekeeper_connstrs()) # valid lsn, which is not in the segment start, nor in zero segment epoch_start_lsn = 0x16B9188 # 0/16B9188 @@ -284,7 +279,7 @@ def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorF # append and commit WAL lsn_after_append = [] for i in range(3): - res = wa_factory.instances[i].append_logical_message( + res = env.safekeepers[i].append_logical_message( tenant_id, timeline_id, { @@ -308,13 +303,15 @@ def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorF assert all(lsn_after_sync == lsn for lsn in lsn_after_append) -def test_timeline_status(zenith_cli, pageserver, postgres, wa_factory: WalAcceptorFactory): - wa_factory.start_n_new(1) +def test_timeline_status(zenith_env_builder: ZenithEnvBuilder): - zenith_cli.run(["branch", "test_timeline_status", "empty"]) - pg = postgres.create_start('test_timeline_status', wal_acceptors=wa_factory.get_connstrs()) + zenith_env_builder.num_safekeepers = 1 + env = zenith_env_builder.init() - wa = wa_factory.instances[0] + env.zenith_cli(["branch", "test_timeline_status", "main"]) + pg = env.postgres.create_start('test_timeline_status') + + wa = env.safekeepers[0] wa_http_cli = wa.http_client() wa_http_cli.check_status() diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index a86b17c946..6fdf2ee462 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -3,7 +3,7 @@ import asyncpg import random import time -from fixtures.zenith_fixtures import WalAcceptor, WalAcceptorFactory, ZenithPageserver, PostgresFactory, Postgres +from fixtures.zenith_fixtures import ZenithEnvBuilder, Postgres, Safekeeper from fixtures.log_helper import getLogger from fixtures.utils import lsn_from_hex, lsn_to_hex from typing import List @@ -104,7 +104,7 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou await pg_conn.close() -async def wait_for_lsn(safekeeper: WalAcceptor, +async def wait_for_lsn(safekeeper: Safekeeper, tenant_id: str, timeline_id: str, wait_lsn: str, @@ -140,7 +140,7 @@ async def wait_for_lsn(safekeeper: WalAcceptor, # On each iteration 1 acceptor is stopped, and 2 others should allow # background workers execute transactions. In the end, state should remain # consistent. -async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_workers=10): +async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_workers=10): n_accounts = 100 init_amount = 100000 max_transfer = 100 @@ -192,18 +192,14 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_ # restart acceptors one by one, while executing and validating bank transactions -def test_restarts_under_load(zenith_cli, - pageserver: ZenithPageserver, - postgres: PostgresFactory, - wa_factory: WalAcceptorFactory): +def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init() - wa_factory.start_n_new(3) + env.zenith_cli(["branch", "test_wal_acceptors_restarts_under_load", "main"]) + pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load') - zenith_cli.run(["branch", "test_wal_acceptors_restarts_under_load", "empty"]) - pg = postgres.create_start('test_wal_acceptors_restarts_under_load', - wal_acceptors=wa_factory.get_connstrs()) - - asyncio.run(run_restarts_under_load(pg, wa_factory.instances)) + asyncio.run(run_restarts_under_load(pg, env.safekeepers)) # TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed pg.stop() diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index 91e9d5085a..0c571201d0 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -2,15 +2,13 @@ import json import uuid from psycopg2.extensions import cursor as PgCursor -from fixtures.zenith_fixtures import ZenithCli, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from typing import cast pytest_plugins = ("fixtures.zenith_fixtures") -def helper_compare_branch_list(page_server_cur: PgCursor, - zenith_cli: ZenithCli, - initial_tenant: str): +def helper_compare_branch_list(page_server_cur: PgCursor, env: ZenithEnv, initial_tenant: str): """ Compare branches list returned by CLI and directly via API. Filters out branches created by other tests. @@ -21,12 +19,12 @@ def helper_compare_branch_list(page_server_cur: PgCursor, map(lambda b: cast(str, b['name']), json.loads(page_server_cur.fetchone()[0]))) branches_api = [b for b in branches_api if b.startswith('test_cli_') or b in ('empty', 'main')] - res = zenith_cli.run(["branch"]) + res = env.zenith_cli(["branch"]) res.check_returncode() branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) branches_cli = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')] - res = zenith_cli.run(["branch", f"--tenantid={initial_tenant}"]) + res = env.zenith_cli(["branch", f"--tenantid={initial_tenant}"]) res.check_returncode() branches_cli_with_tenant_arg = sorted( map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) @@ -37,25 +35,26 @@ def helper_compare_branch_list(page_server_cur: PgCursor, assert branches_api == branches_cli == branches_cli_with_tenant_arg -def test_cli_branch_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli): - page_server_conn = pageserver.connect() +def test_cli_branch_list(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + page_server_conn = env.pageserver.connect() page_server_cur = page_server_conn.cursor() # Initial sanity check - helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) + helper_compare_branch_list(page_server_cur, env, env.initial_tenant) # Create a branch for us - res = zenith_cli.run(["branch", "test_cli_branch_list_main", "main"]) + res = env.zenith_cli(["branch", "test_cli_branch_list_main", "empty"]) assert res.stderr == '' - helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) + helper_compare_branch_list(page_server_cur, env, env.initial_tenant) # Create a nested branch - res = zenith_cli.run(["branch", "test_cli_branch_list_nested", "test_cli_branch_list_main"]) + res = env.zenith_cli(["branch", "test_cli_branch_list_nested", "test_cli_branch_list_main"]) assert res.stderr == '' - helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant) + helper_compare_branch_list(page_server_cur, env, env.initial_tenant) # Check that all new branches are visible via CLI - res = zenith_cli.run(["branch"]) + res = env.zenith_cli(["branch"]) assert res.stderr == '' branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) @@ -63,45 +62,46 @@ def test_cli_branch_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli): assert 'test_cli_branch_list_nested' in branches_cli -def helper_compare_tenant_list(page_server_cur: PgCursor, zenith_cli: ZenithCli): +def helper_compare_tenant_list(page_server_cur: PgCursor, env: ZenithEnv): page_server_cur.execute(f'tenant_list') tenants_api = sorted( map(lambda t: cast(str, t['id']), json.loads(page_server_cur.fetchone()[0]))) - res = zenith_cli.run(["tenant", "list"]) + res = env.zenith_cli(["tenant", "list"]) assert res.stderr == '' tenants_cli = sorted(map(lambda t: t.split()[0], res.stdout.splitlines())) assert tenants_api == tenants_cli -def test_cli_tenant_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli): - page_server_conn = pageserver.connect() +def test_cli_tenant_list(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + page_server_conn = env.pageserver.connect() page_server_cur = page_server_conn.cursor() # Initial sanity check - helper_compare_tenant_list(page_server_cur, zenith_cli) + helper_compare_tenant_list(page_server_cur, env) # Create new tenant tenant1 = uuid.uuid4().hex - res = zenith_cli.run(["tenant", "create", tenant1]) + res = env.zenith_cli(["tenant", "create", tenant1]) res.check_returncode() # check tenant1 appeared - helper_compare_tenant_list(page_server_cur, zenith_cli) + helper_compare_tenant_list(page_server_cur, env) # Create new tenant tenant2 = uuid.uuid4().hex - res = zenith_cli.run(["tenant", "create", tenant2]) + res = env.zenith_cli(["tenant", "create", tenant2]) res.check_returncode() # check tenant2 appeared - helper_compare_tenant_list(page_server_cur, zenith_cli) + helper_compare_tenant_list(page_server_cur, env) - res = zenith_cli.run(["tenant", "list"]) + res = env.zenith_cli(["tenant", "list"]) res.check_returncode() tenants = sorted(map(lambda t: t.split()[0], res.stdout.splitlines())) - assert pageserver.initial_tenant in tenants + assert env.initial_tenant in tenants assert tenant1 in tenants assert tenant2 in tenants diff --git a/test_runner/batch_pg_regress/test_isolation.py b/test_runner/batch_pg_regress/test_isolation.py index 155eee1022..5817d76dd9 100644 --- a/test_runner/batch_pg_regress/test_isolation.py +++ b/test_runner/batch_pg_regress/test_isolation.py @@ -1,24 +1,20 @@ import os from fixtures.utils import mkdir_if_needed -from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory, base_dir, pg_distrib_dir +from fixtures.zenith_fixtures import ZenithEnv, base_dir, pg_distrib_dir pytest_plugins = ("fixtures.zenith_fixtures") -def test_isolation(pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin, - zenith_cli, - test_output_dir, - capsys): +def test_isolation(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_isolation", "empty"]) + env.zenith_cli(["branch", "test_isolation", "empty"]) # Connect to postgres and create a database called "regression". # isolation tests use prepared transactions, so enable them - pg = postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100']) + pg = env.postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100']) pg.safe_psql('CREATE DATABASE isolation_regression') # Create some local directories for pg_isolation_regress to run in. @@ -42,7 +38,7 @@ def test_isolation(pageserver: ZenithPageserver, '--schedule={}'.format(schedule), ] - env = { + env_vars = { 'PGPORT': str(pg.port), 'PGUSER': pg.username, 'PGHOST': pg.host, @@ -52,4 +48,4 @@ def test_isolation(pageserver: ZenithPageserver, # We don't capture the output. It's not too chatty, and it always # logs the exact same data to `regression.out` anyway. with capsys.disabled(): - pg_bin.run(pg_isolation_regress_command, env=env, cwd=runpath) + pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath) diff --git a/test_runner/batch_pg_regress/test_pg_regress.py b/test_runner/batch_pg_regress/test_pg_regress.py index eab65ff512..b85a727cfa 100644 --- a/test_runner/batch_pg_regress/test_pg_regress.py +++ b/test_runner/batch_pg_regress/test_pg_regress.py @@ -1,23 +1,19 @@ import os from fixtures.utils import mkdir_if_needed -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver, check_restored_datadir_content, base_dir, pg_distrib_dir +from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content, base_dir, pg_distrib_dir pytest_plugins = ("fixtures.zenith_fixtures") -def test_pg_regress(pageserver: ZenithPageserver, - postgres: PostgresFactory, - pg_bin, - zenith_cli, - test_output_dir, - capsys): +def test_pg_regress(zenith_simple_env: ZenithEnv, test_output_dir: str, pg_bin, capsys): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_pg_regress", "empty"]) + env.zenith_cli(["branch", "test_pg_regress", "empty"]) # Connect to postgres and create a database called "regression". - pg = postgres.create_start('test_pg_regress') + pg = env.postgres.create_start('test_pg_regress') pg.safe_psql('CREATE DATABASE regression') # Create some local directories for pg_regress to run in. @@ -42,7 +38,7 @@ def test_pg_regress(pageserver: ZenithPageserver, '--inputdir={}'.format(src_path), ] - env = { + env_vars = { 'PGPORT': str(pg.port), 'PGUSER': pg.username, 'PGHOST': pg.host, @@ -52,11 +48,11 @@ def test_pg_regress(pageserver: ZenithPageserver, # We don't capture the output. It's not too chatty, and it always # logs the exact same data to `regression.out` anyway. with capsys.disabled(): - pg_bin.run(pg_regress_command, env=env, cwd=runpath) + pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath) # checkpoint one more time to ensure that the lsn we get is the latest one pg.safe_psql('CHECKPOINT') lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0] # Check that we restore the content of the datadir correctly - check_restored_datadir_content(test_output_dir, pg, pageserver.service_port.pg) + check_restored_datadir_content(test_output_dir, env, pg) diff --git a/test_runner/batch_pg_regress/test_zenith_regress.py b/test_runner/batch_pg_regress/test_zenith_regress.py index 29de95a570..6b0dcbd130 100644 --- a/test_runner/batch_pg_regress/test_zenith_regress.py +++ b/test_runner/batch_pg_regress/test_zenith_regress.py @@ -1,8 +1,7 @@ import os from fixtures.utils import mkdir_if_needed -from fixtures.zenith_fixtures import (PageserverPort, - PostgresFactory, +from fixtures.zenith_fixtures import (ZenithEnv, check_restored_datadir_content, base_dir, pg_distrib_dir) @@ -11,18 +10,14 @@ from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") -def test_zenith_regress(postgres: PostgresFactory, - pg_bin, - zenith_cli, - test_output_dir, - capsys, - pageserver_port: PageserverPort): +def test_zenith_regress(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_zenith_regress", "empty"]) + env.zenith_cli(["branch", "test_zenith_regress", "empty"]) # Connect to postgres and create a database called "regression". - pg = postgres.create_start('test_zenith_regress') + pg = env.postgres.create_start('test_zenith_regress') pg.safe_psql('CREATE DATABASE regression') # Create some local directories for pg_regress to run in. @@ -48,7 +43,7 @@ def test_zenith_regress(postgres: PostgresFactory, ] log.info(pg_regress_command) - env = { + env_vars = { 'PGPORT': str(pg.port), 'PGUSER': pg.username, 'PGHOST': pg.host, @@ -58,11 +53,11 @@ def test_zenith_regress(postgres: PostgresFactory, # We don't capture the output. It's not too chatty, and it always # logs the exact same data to `regression.out` anyway. with capsys.disabled(): - pg_bin.run(pg_regress_command, env=env, cwd=runpath) + pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath) # checkpoint one more time to ensure that the lsn we get is the latest one pg.safe_psql('CHECKPOINT') lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0] # Check that we restore the content of the datadir correctly - check_restored_datadir_content(test_output_dir, pg, pageserver_port.pg) + check_restored_datadir_content(test_output_dir, env, pg) diff --git a/test_runner/fixtures/benchmark_fixture.py b/test_runner/fixtures/benchmark_fixture.py index 7bfb698984..8f5deef690 100644 --- a/test_runner/fixtures/benchmark_fixture.py +++ b/test_runner/fixtures/benchmark_fixture.py @@ -31,11 +31,11 @@ To use, declare the 'zenbenchmark' fixture in the test function. Run the bencmark, and then record the result by calling zenbenchmark.record. For example: import timeit -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") -def test_mybench(postgres: PostgresFactory, pageserver: ZenithPageserver, zenbenchmark): +def test_mybench(zenith_simple_env: env, zenbenchmark): # Initialize the test ... diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index f9879ec1f7..d5c528a5da 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from cached_property import cached_property import asyncpg @@ -137,6 +139,41 @@ def zenfixture(func: Fn) -> Fn: return pytest.fixture(func, scope=scope) +@zenfixture +def worker_seq_no(worker_id: str): + # worker_id is a pytest-xdist fixture + # it can be master or gw + # parse it to always get a number + if worker_id == 'master': + return 0 + assert worker_id.startswith('gw') + return int(worker_id[2:]) + + +@zenfixture +def worker_base_port(worker_seq_no: int): + # so we divide ports in ranges of 100 ports + # so workers have disjoint set of ports for services + return BASE_PORT + worker_seq_no * WORKER_PORT_NUM + + +class PortDistributor: + def __init__(self, base_port: int, port_number: int) -> None: + self.iterator = iter(range(base_port, base_port + port_number)) + + def get_port(self) -> int: + try: + return next(self.iterator) + except StopIteration: + raise RuntimeError( + 'port range configured for test is exhausted, consider enlarging the range') + + +@zenfixture +def port_distributor(worker_base_port): + return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM) + + class PgProtocol: """ Reusable connection logic """ def __init__(self, host: str, port: int, username: Optional[str] = None): @@ -214,21 +251,158 @@ class PgProtocol: return cast(List[Any], cur.fetchall()) -class ZenithCli: - """ - An object representing the CLI binary named "zenith". +@dataclass +class AuthKeys: + pub: bytes + priv: bytes - We also store an environment that will tell the CLI to operate - on a particular ZENITH_REPO_DIR. + def generate_management_token(self): + token = jwt.encode({"scope": "pageserverapi"}, self.priv, algorithm="RS256") + + # jwt.encode can return 'bytes' or 'str', depending on Python version or type + # hinting or something (not sure what). If it returned 'bytes', convert it to 'str' + # explicitly. + if isinstance(token, bytes): + token = token.decode() + + return token + + def generate_tenant_token(self, tenant_id): + token = jwt.encode({ + "scope": "tenant", "tenant_id": tenant_id + }, + self.priv, + algorithm="RS256") + + if isinstance(token, bytes): + token = token.decode() + + return token + + +class ZenithEnvBuilder: """ - def __init__(self, repo_dir: str): - self.bin_zenith = os.path.join(zenith_binpath, 'zenith') + Builder object to create a Zenith runtime environment + + You should use the `zenith_env_builder` or `zenith_simple_env` pytest + fixture to create the ZenithEnv object. That way, the repository is + created in the right directory, based on the test name, and it's properly + cleaned up after the test has finished. + """ + def __init__(self, + repo_dir: Path, + port_distributor: PortDistributor, + num_safekeepers: int = 0, + pageserver_auth_enabled: bool = False): self.repo_dir = repo_dir - self.env = os.environ.copy() - self.env['ZENITH_REPO_DIR'] = repo_dir - self.env['POSTGRES_DISTRIB_DIR'] = pg_distrib_dir + self.port_distributor = port_distributor + self.num_safekeepers = num_safekeepers + self.pageserver_auth_enabled = pageserver_auth_enabled + self.env: Optional[ZenithEnv] = None - def run(self, arguments: List[str]) -> 'subprocess.CompletedProcess[str]': + def init(self) -> ZenithEnv: + # Cannot create more than one environment from one builder + assert self.env is None, "environment already initialized" + self.env = ZenithEnv(self) + return self.env + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + + # After the yield comes any cleanup code we need. Stop all the nodes. + if self.env: + log.info('Cleaning up all storage and compute nodes') + self.env.postgres.stop_all() + for sk in self.env.safekeepers: + sk.stop() + self.env.pageserver.stop(immediate=True) + + +class ZenithEnv: + """ + An object representing the Zenith runtime environment. It consists of + the page server, 0-N safekeepers, and the compute nodes. + + ZenithEnv contains functions for stopping/starting nodes in the + environment, checking their status, creating tenants, connecting to the + nodes, creating and destroying compute nodes, etc. The page server and + the safekeepers are considered fixed in the environment, you cannot + create or destroy them after the environment is initialized. (That will + likely change in the future, as we start supporting multiple page + servers and adding/removing safekeepers on the fly). + + Some notable functions and fields in ZenithEnv: + + postgres - A factory object for creating postgres compute nodes. + + pageserver - An object that contains functions for manipulating and + connecting to the pageserver + + safekeepers - An array containing objects representing the safekeepers + + pg_bin - pg_bin.run() can be used to execute Postgres client binaries, + like psql or pg_dump + + initial_tenant - tenant ID of the initial tenant created in the repository + + zenith_cli() - zenith_cli() can be used to run the 'zenith' CLI tool + + create_tenant() - initializes a new tenant in the page server, returns + the tenant id + """ + def __init__(self, config: ZenithEnvBuilder): + self.repo_dir = config.repo_dir + self.port_distributor = config.port_distributor + + self.postgres = PostgresFactory(self) + + self.safekeepers: List[Safekeeper] = [] + + # Create and start up the pageserver, and safekeepers if any + pageserver_port = PageserverPort( + pg=self.port_distributor.get_port(), + http=self.port_distributor.get_port(), + ) + self.pageserver = ZenithPageserver(self, + port=pageserver_port, + enable_auth=config.pageserver_auth_enabled) + self.pageserver.start() + + # since we are in progress of refactoring protocols between compute safekeeper + # and page server use hardcoded management token in safekeeper + management_token = self.auth_keys.generate_management_token() \ + if config.pageserver_auth_enabled else None + + # get newly created tenant id + self.initial_tenant = self.zenith_cli(['tenant', 'list']).stdout.split()[0] + + # Start up safekeepers + for wa_num in range(config.num_safekeepers): + wa = Safekeeper(env=self, + data_dir=Path(os.path.join(self.repo_dir, f"safekeeper_{wa_num}")), + port=SafekeeperPort( + pg=self.port_distributor.get_port(), + http=self.port_distributor.get_port(), + ), + num=wa_num, + auth_token=management_token) + wa.start() + self.safekeepers.append(wa) + + def get_safekeeper_connstrs(self) -> str: + """ Get list of safekeeper endpoints suitable for wal_acceptors GUC """ + return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) + + def create_tenant(self, tenant_id: Optional[str] = None): + if tenant_id is None: + tenant_id = uuid.uuid4().hex + res = self.zenith_cli(['tenant', 'create', tenant_id]) + res.check_returncode() + return tenant_id + + def zenith_cli(self, arguments: List[str]) -> 'subprocess.CompletedProcess[str]': """ Run "zenith" with the specified arguments. @@ -236,24 +410,32 @@ class ZenithCli: Return both stdout and stderr, which can be accessed as - >>> result = zenith_cli.run(...) + >>> result = env.zenith_cli(...) >>> assert result.stderr == "" >>> log.info(result.stdout) """ assert type(arguments) == list - args = [self.bin_zenith] + arguments - log.info('Running command "{}"'.format(' '.join(args))) + bin_zenith = os.path.join(str(zenith_binpath), 'zenith') - # Interceipt CalledProcessError and print more info + args = [bin_zenith] + arguments + log.info('Running command "{}"'.format(' '.join(args))) + log.info(f'Running in "{self.repo_dir}"') + + env_vars = os.environ.copy() + env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir) + env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir) + + # Intercept CalledProcessError and print more info try: res = subprocess.run(args, - env=self.env, + env=env_vars, check=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info(f"Run success: {res.stdout}") except subprocess.CalledProcessError as exc: # this way command output will be in recorded and shown in CI in failure message msg = f"""\ @@ -267,10 +449,75 @@ class ZenithCli: return res + @cached_property + def auth_keys(self) -> AuthKeys: + pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes() + priv = (Path(self.repo_dir) / 'auth_private_key.pem').read_bytes() + return AuthKeys(pub=pub, priv=priv) + @zenfixture -def zenith_cli(repo_dir: str) -> ZenithCli: - return ZenithCli(repo_dir) +def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]: + """ + Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES + is set, this is shared by all tests using `zenith_simple_env`. + """ + + if os.environ.get('TEST_SHARED_FIXTURES') is None: + # Create the environment in the per-test output directory + repo_dir = os.path.join(get_test_output_dir(request), "repo") + else: + # We're running shared fixtures. Share a single directory. + repo_dir = os.path.join(str(top_output_dir), "shared_repo") + shutil.rmtree(repo_dir, ignore_errors=True) + + with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder: + + env = builder.init() + + # For convenience in tests, create a branch from the freshly-initialized cluster. + env.zenith_cli(["branch", "empty", "main"]) + + # Return the builder to the caller + yield env + + +@pytest.fixture(scope='function') +def zenith_simple_env(_shared_simple_env: ZenithEnv) -> Iterator[ZenithEnv]: + """ + Simple Zenith environment, with no authentication and no safekeepers. + + If TEST_SHARED_FIXTURES environment variable is set, we reuse the same + environment for all tests that use 'zenith_simple_env', keeping the + page server and safekeepers running. Any compute nodes are stopped after + each the test, however. + """ + yield _shared_simple_env + + _shared_simple_env.postgres.stop_all() + + +@pytest.fixture(scope='function') +def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvBuilder]: + """ + Fixture to create a Zenith environment for test. + + To use, define 'zenith_env_builder' fixture in your test to get access to the + builder object. Set properties on it to describe the environment. + Finally, initialize and start up the environment by calling + zenith_env_builder.init(). + + After the initialization, you can launch compute nodes by calling + the functions in the 'env.postgres' factory object, stop/start the + nodes, etc. + """ + + # Create the environment in the test-specific output dir + repo_dir = os.path.join(test_output_dir, "repo") + + # Return the builder to the caller + with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder: + yield builder class ZenithPageserverHttpClient(requests.Session): @@ -334,70 +581,6 @@ class ZenithPageserverHttpClient(requests.Session): return res.text -@dataclass -class AuthKeys: - pub: bytes - priv: bytes - - def generate_management_token(self): - token = jwt.encode({"scope": "pageserverapi"}, self.priv, algorithm="RS256") - - # jwt.encode can return 'bytes' or 'str', depending on Python version or type - # hinting or something (not sure what). If it returned 'bytes', convert it to 'str' - # explicitly. - if isinstance(token, bytes): - token = token.decode() - - return token - - def generate_tenant_token(self, tenant_id): - token = jwt.encode({ - "scope": "tenant", "tenant_id": tenant_id - }, - self.priv, - algorithm="RS256") - - if isinstance(token, bytes): - token = token.decode() - - return token - - -@zenfixture -def worker_seq_no(worker_id: str): - # worker_id is a pytest-xdist fixture - # it can be master or gw - # parse it to always get a number - if worker_id == 'master': - return 0 - assert worker_id.startswith('gw') - return int(worker_id[2:]) - - -@zenfixture -def worker_base_port(worker_seq_no: int): - # so we divide ports in ranges of 100 ports - # so workers have disjoint set of ports for services - return BASE_PORT + worker_seq_no * WORKER_PORT_NUM - - -class PortDistributor: - def __init__(self, base_port: int, port_number: int) -> None: - self.iterator = iter(range(base_port, base_port + port_number)) - - def get_port(self) -> int: - try: - return next(self.iterator) - except StopIteration: - raise RuntimeError( - 'port range configured for test is exhausted, consider enlarging the range') - - -@zenfixture -def port_distributor(worker_base_port): - return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM) - - @dataclass class PageserverPort: pg: int @@ -408,18 +591,12 @@ class ZenithPageserver(PgProtocol): """ An object representing a running pageserver. - Initializes the repository via `zenith init` and starts page server. + Initializes the repository via `zenith init`. """ - def __init__(self, - zenith_cli: ZenithCli, - repo_dir: str, - port: PageserverPort, - enable_auth=False): + def __init__(self, env: ZenithEnv, port: PageserverPort, enable_auth=False): super().__init__(host='localhost', port=port.pg) - self.zenith_cli = zenith_cli + self.env = env self.running = False - self.initial_tenant: str = cast(str, None) # Will be fixed by self.start() below - self.repo_dir = repo_dir self.service_port = port # do not shadow PgProtocol.port which is just int cmd = [ @@ -429,9 +606,7 @@ class ZenithPageserver(PgProtocol): ] if enable_auth: cmd.append('--enable-auth') - self.zenith_cli.run(cmd) - - self.start() # Required, otherwise self.initial_tenant is of wrong type + self.env.zenith_cli(cmd) def start(self) -> 'ZenithPageserver': """ @@ -440,14 +615,8 @@ class ZenithPageserver(PgProtocol): """ assert self.running == False - self.zenith_cli.run(['start']) + self.env.zenith_cli(['start']) self.running = True - # get newly created tenant id - current_tenant = self.zenith_cli.run(['tenant', 'list']).stdout.split()[0] - if self.initial_tenant is None: - self.initial_tenant = current_tenant - else: - assert self.initial_tenant == current_tenant return self def stop(self, immediate=False) -> 'ZenithPageserver': @@ -461,7 +630,7 @@ class ZenithPageserver(PgProtocol): log.info(f"Stopping pageserver with {cmd}") if self.running: - self.zenith_cli.run(cmd) + self.env.zenith_cli(cmd) self.running = False return self @@ -472,12 +641,6 @@ class ZenithPageserver(PgProtocol): def __exit__(self, exc_type, exc, tb): self.stop(True) - @cached_property - def auth_keys(self) -> AuthKeys: - pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes() - priv = (Path(self.repo_dir) / 'auth_private_key.pem').read_bytes() - return AuthKeys(pub=pub, priv=priv) - def http_client(self, auth_token: Optional[str] = None): return ZenithPageserverHttpClient( port=self.service_port.http, @@ -485,47 +648,13 @@ class ZenithPageserver(PgProtocol): ) -@zenfixture -def pageserver_port(port_distributor: PortDistributor) -> PageserverPort: - pg = port_distributor.get_port() - http = port_distributor.get_port() - log.info(f"pageserver_port: pg={pg} http={http}") - return PageserverPort(pg=pg, http=http) - - -@zenfixture -def pageserver(zenith_cli: ZenithCli, repo_dir: str, - pageserver_port: PageserverPort) -> Iterator[ZenithPageserver]: - """ - The 'pageserver' fixture provides a Page Server that's up and running. - - If TEST_SHARED_FIXTURES is set, the Page Server instance is shared by all - the tests. To avoid clashing with other tests, don't use the 'main' branch in - the tests directly. Instead, create a branch off the 'empty' branch and use - that. - - By convention, the test branches are named after the tests. For example, - test called 'test_foo' would create and use branches with the 'test_foo' prefix. - """ - ps = ZenithPageserver(zenith_cli=zenith_cli, repo_dir=repo_dir, port=pageserver_port) - # For convenience in tests, create a branch from the freshly-initialized cluster. - zenith_cli.run(["branch", "empty", "main"]) - - yield ps - - # After the yield comes any cleanup code we need. - log.info('Starting pageserver cleanup') - ps.stop(True) - - class PgBin: """ A helper class for executing postgres binaries """ def __init__(self, log_dir: str): self.log_dir = log_dir - self.pg_install_path = pg_distrib_dir - self.pg_bin_path = os.path.join(self.pg_install_path, 'bin') + self.pg_bin_path = os.path.join(str(pg_distrib_dir), 'bin') self.env = os.environ.copy() - self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib') + self.env['LD_LIBRARY_PATH'] = os.path.join(str(pg_distrib_dir), 'lib') def _fixpath(self, command: List[str]) -> None: if '/' not in command[0]: @@ -580,25 +709,13 @@ def pg_bin(test_output_dir: str) -> PgBin: return PgBin(test_output_dir) -@pytest.fixture -def pageserver_auth_enabled(zenith_cli: ZenithCli, repo_dir: str, pageserver_port: PageserverPort): - with ZenithPageserver(zenith_cli=zenith_cli, - repo_dir=repo_dir, - port=pageserver_port, - enable_auth=True) as ps: - # For convenience in tests, create a branch from the freshly-initialized cluster. - zenith_cli.run(["branch", "empty", "main"]) - yield ps - - class Postgres(PgProtocol): """ An object representing a running postgres daemon. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int): + def __init__(self, env: ZenithEnv, tenant_id: str, port: int): super().__init__(host='localhost', port=port) - self.zenith_cli = zenith_cli + self.env = env self.running = False - self.repo_dir = repo_dir self.node_name: Optional[str] = None # dubious, see asserts below self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA self.tenant_id = tenant_id @@ -608,13 +725,10 @@ class Postgres(PgProtocol): self, node_name: str, branch: Optional[str] = None, - wal_acceptors: Optional[str] = None, config_lines: Optional[List[str]] = None, ) -> 'Postgres': """ Create the pg data directory. - If wal_acceptors is not None, node will use wal acceptors; config is - adjusted accordingly. Returns self. """ @@ -624,7 +738,7 @@ class Postgres(PgProtocol): if branch is None: branch = node_name - self.zenith_cli.run([ + self.env.zenith_cli([ 'pg', 'create', f'--tenantid={self.tenant_id}', @@ -634,10 +748,10 @@ class Postgres(PgProtocol): ]) self.node_name = node_name path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.node_name - self.pgdata_dir = os.path.join(self.repo_dir, path) + self.pgdata_dir = os.path.join(self.env.repo_dir, path) - if wal_acceptors is not None: - self.adjust_for_wal_acceptors(wal_acceptors) + if self.env.safekeepers: + self.adjust_for_wal_acceptors(self.env.get_safekeeper_connstrs()) if config_lines is None: config_lines = [] self.config(config_lines) @@ -654,7 +768,7 @@ class Postgres(PgProtocol): log.info(f"Starting postgres node {self.node_name}") - run_result = self.zenith_cli.run( + run_result = self.env.zenith_cli( ['pg', 'start', f'--tenantid={self.tenant_id}', f'--port={self.port}', self.node_name]) self.running = True @@ -666,7 +780,7 @@ class Postgres(PgProtocol): """ Path to data directory """ assert self.node_name path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.node_name - return os.path.join(self.repo_dir, path) + return os.path.join(self.env.repo_dir, path) def pg_xact_dir_path(self) -> str: """ Path to pg_xact dir """ @@ -723,7 +837,7 @@ class Postgres(PgProtocol): if self.running: assert self.node_name is not None - self.zenith_cli.run(['pg', 'stop', self.node_name, f'--tenantid={self.tenant_id}']) + self.env.zenith_cli(['pg', 'stop', self.node_name, f'--tenantid={self.tenant_id}']) self.running = False return self @@ -735,7 +849,7 @@ class Postgres(PgProtocol): """ assert self.node_name is not None - self.zenith_cli.run( + self.env.zenith_cli( ['pg', 'stop', '--destroy', self.node_name, f'--tenantid={self.tenant_id}']) self.node_name = None @@ -745,7 +859,6 @@ class Postgres(PgProtocol): self, node_name: str, branch: Optional[str] = None, - wal_acceptors: Optional[str] = None, config_lines: Optional[List[str]] = None, ) -> 'Postgres': """ @@ -757,7 +870,6 @@ class Postgres(PgProtocol): self.create( node_name=node_name, branch=branch, - wal_acceptors=wal_acceptors, config_lines=config_lines, ).start() @@ -772,30 +884,21 @@ class Postgres(PgProtocol): class PostgresFactory: """ An object representing multiple running postgres daemons. """ - def __init__(self, - zenith_cli: ZenithCli, - repo_dir: str, - initial_tenant: str, - port_distributor: PortDistributor): - self.zenith_cli = zenith_cli - self.repo_dir = repo_dir + def __init__(self, env: ZenithEnv): + self.env = env self.num_instances = 0 self.instances: List[Postgres] = [] - self.initial_tenant: str = initial_tenant - self.port_distributor = port_distributor def create_start(self, node_name: str = "main", branch: Optional[str] = None, tenant_id: Optional[str] = None, - wal_acceptors: Optional[str] = None, config_lines: Optional[List[str]] = None) -> Postgres: pg = Postgres( - zenith_cli=self.zenith_cli, - repo_dir=self.repo_dir, - tenant_id=tenant_id or self.initial_tenant, - port=self.port_distributor.get_port(), + self.env, + tenant_id=tenant_id or self.env.initial_tenant, + port=self.env.port_distributor.get_port(), ) self.num_instances += 1 self.instances.append(pg) @@ -803,7 +906,6 @@ class PostgresFactory: return pg.create_start( node_name=node_name, branch=branch, - wal_acceptors=wal_acceptors, config_lines=config_lines, ) @@ -811,14 +913,12 @@ class PostgresFactory: node_name: str = "main", branch: Optional[str] = None, tenant_id: Optional[str] = None, - wal_acceptors: Optional[str] = None, config_lines: Optional[List[str]] = None) -> Postgres: pg = Postgres( - zenith_cli=self.zenith_cli, - repo_dir=self.repo_dir, - tenant_id=tenant_id or self.initial_tenant, - port=self.port_distributor.get_port(), + self.env, + tenant_id=tenant_id or self.env.initial_tenant, + port=self.env.port_distributor.get_port(), ) self.num_instances += 1 @@ -827,7 +927,6 @@ class PostgresFactory: return pg.create( node_name=node_name, branch=branch, - wal_acceptors=wal_acceptors, config_lines=config_lines, ) @@ -838,65 +937,40 @@ class PostgresFactory: return self -@zenfixture -def initial_tenant(pageserver: ZenithPageserver): - return pageserver.initial_tenant - - -@pytest.fixture(scope='function') -def postgres(zenith_cli: ZenithCli, - initial_tenant: str, - repo_dir: str, - port_distributor: PortDistributor) -> Iterator[PostgresFactory]: - pgfactory = PostgresFactory( - zenith_cli=zenith_cli, - repo_dir=repo_dir, - initial_tenant=initial_tenant, - port_distributor=port_distributor, - ) - - yield pgfactory - - # After the yield comes any cleanup code we need. - log.info('Starting postgres cleanup') - pgfactory.stop_all() - - def read_pid(path: Path) -> int: """ Read content of file into number """ return int(path.read_text()) @dataclass -class WalAcceptorPort: +class SafekeeperPort: pg: int http: int @dataclass -class WalAcceptor: - """ An object representing a running wal acceptor daemon. """ - wa_bin_path: Path +class Safekeeper: + """ An object representing a running safekeeper daemon. """ + env: ZenithEnv data_dir: Path - port: WalAcceptorPort + port: SafekeeperPort num: int # identifier for logging - pageserver_port: int auth_token: Optional[str] = None - def start(self) -> 'WalAcceptor': + def start(self) -> 'Safekeeper': # create data directory if not exists self.data_dir.mkdir(parents=True, exist_ok=True) with suppress(FileNotFoundError): self.pidfile.unlink() - cmd = [str(self.wa_bin_path)] + cmd = [os.path.join(str(zenith_binpath), "safekeeper")] cmd.extend(["-D", str(self.data_dir)]) cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"]) cmd.extend(["--listen-http", f"localhost:{self.port.http}"]) cmd.append("--daemonize") cmd.append("--no-sync") # Tell page server it can receive WAL from this WAL safekeeper - cmd.extend(["--pageserver", f"localhost:{self.pageserver_port}"]) + cmd.extend(["--pageserver", f"localhost:{self.env.pageserver.service_port.pg}"]) cmd.extend(["--recall", "1 second"]) log.info('Running command "{}"'.format(' '.join(cmd))) env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None @@ -933,7 +1007,7 @@ class WalAcceptor: return pid - def stop(self) -> 'WalAcceptor': + def stop(self) -> 'Safekeeper': log.info('Stopping wal acceptor {}'.format(self.num)) pid = self.get_pid() if pid is None: @@ -950,7 +1024,7 @@ class WalAcceptor: def append_logical_message(self, tenant_id: str, timeline_id: str, request: Dict[str, Any]) -> Dict[str, Any]: """ - Send JSON_CTRL query to append LogicalMessage to WAL and modify + Send JSON_CTRL query to append LogicalMessage to WAL and modify safekeeper state. It will construct LogicalMessage from provided prefix and message, and then will write it to WAL. """ @@ -973,69 +1047,7 @@ class WalAcceptor: return res def http_client(self): - return WalAcceptorHttpClient(port=self.port.http) - - -class WalAcceptorFactory: - """ An object representing multiple running wal acceptors. """ - def __init__(self, data_dir: Path, pageserver_port: int, port_distributor: PortDistributor): - self.wa_bin_path = Path(os.path.join(zenith_binpath, 'safekeeper')) - self.data_dir = data_dir - self.instances: List[WalAcceptor] = [] - self.port_distributor = port_distributor - self.pageserver_port = pageserver_port - - def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor: - """ - Start new wal acceptor. - """ - wa_num = len(self.instances) - wa = WalAcceptor( - wa_bin_path=self.wa_bin_path, - data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num), - port=WalAcceptorPort( - pg=self.port_distributor.get_port(), - http=self.port_distributor.get_port(), - ), - num=wa_num, - pageserver_port=self.pageserver_port, - auth_token=auth_token, - ) - wa.start() - self.instances.append(wa) - return wa - - def start_n_new(self, n: int, auth_token: Optional[str] = None) -> None: - """ - Start n new wal acceptors. - """ - - for _ in range(n): - self.start_new(auth_token) - - def stop_all(self) -> 'WalAcceptorFactory': - for wa in self.instances: - wa.stop() - return self - - def get_connstrs(self) -> str: - """ Get list of wal acceptor endpoints suitable for wal_acceptors GUC """ - return ','.join(["localhost:{}".format(wa.port.pg) for wa in self.instances]) - - -@zenfixture -def wa_factory(repo_dir: str, pageserver_port: PageserverPort, - port_distributor: PortDistributor) -> Iterator[WalAcceptorFactory]: - """ Gives WalAcceptorFactory providing wal acceptors. """ - wafactory = WalAcceptorFactory( - data_dir=Path(repo_dir) / "wal_acceptors", - pageserver_port=pageserver_port.pg, - port_distributor=port_distributor, - ) - yield wafactory - # After the yield comes any cleanup code we need. - log.info('Starting wal acceptors cleanup') - wafactory.stop_all() + return SafekeeperHttpClient(port=self.port.http) @dataclass @@ -1044,7 +1056,7 @@ class SafekeeperTimelineStatus: flush_lsn: str -class WalAcceptorHttpClient(requests.Session): +class SafekeeperHttpClient(requests.Session): def __init__(self, port: int) -> None: super().__init__() self.port = port @@ -1063,14 +1075,14 @@ class WalAcceptorHttpClient(requests.Session): def get_test_output_dir(request: Any) -> str: """ Compute the working directory for an individual test. """ test_name = request.node.name - test_dir = os.path.join(top_output_dir, test_name) + test_dir = os.path.join(str(top_output_dir), test_name) log.info(f'get_test_output_dir is {test_dir}') return test_dir # This is autouse, so the test output directory always gets created, even # if a test doesn't put anything there. It also solves a problem with the -# repo_dir() fixture: if TEST_SHARED_FIXTURES is not set, repo_dir() +# zenith_simple_env fixture: if TEST_SHARED_FIXTURES is not set, it # creates the repo in the test output directory. But it cannot depend on # 'test_output_dir' fixture, because when TEST_SHARED_FIXTURES is not set, # it has 'session' scope and cannot access fixtures with 'function' @@ -1089,45 +1101,6 @@ def test_output_dir(request: Any) -> str: return test_dir -@zenfixture -def repo_dir(request: Any) -> Path: - """ - Compute the test repo_dir. - - "repo_dir" is the place where all of the pageserver files will go. - It doesn't have anything to do with the git repo. - """ - - if os.environ.get('TEST_SHARED_FIXTURES') is None: - # Create the environment in the per-test output directory - # The 'test_output_dir' fixture should have created it already - repo_dir = os.path.join(get_test_output_dir(request), "repo") - assert os.path.exists(repo_dir) - else: - # We're running shared fixtures. Share a single directory. - repo_dir = os.path.join(top_output_dir, "shared") - shutil.rmtree(repo_dir, ignore_errors=True) - - return Path(repo_dir) - - -class TenantFactory: - def __init__(self, cli: ZenithCli): - self.cli = cli - - def create(self, tenant_id: Optional[str] = None): - if tenant_id is None: - tenant_id = uuid.uuid4().hex - res = self.cli.run(['tenant', 'create', tenant_id]) - res.check_returncode() - return tenant_id - - -@zenfixture -def tenant_factory(zenith_cli: ZenithCli): - return TenantFactory(zenith_cli) - - # # Test helpers # @@ -1157,7 +1130,7 @@ def list_files_to_compare(pgdata_dir: str): # pg is the existing and running compute node, that we want to compare with a basebackup -def check_restored_datadir_content(test_output_dir: str, pg: Postgres, pageserver_pg_port: int): +def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Postgres): # Get the timeline ID of our branch. We need it for the 'basebackup' command with closing(pg.connect()) as conn: @@ -1169,7 +1142,7 @@ def check_restored_datadir_content(test_output_dir: str, pg: Postgres, pageserve pg.stop() # Take a basebackup from pageserver - restored_dir_path = os.path.join(test_output_dir, f"{pg.node_name}_restored_datadir") + restored_dir_path = os.path.join(env.repo_dir, f"{pg.node_name}_restored_datadir") mkdir_if_needed(restored_dir_path) pg_bin = PgBin(test_output_dir) @@ -1178,7 +1151,7 @@ def check_restored_datadir_content(test_output_dir: str, pg: Postgres, pageserve cmd = rf""" {psql_path} \ --no-psqlrc \ - postgres://localhost:{pageserver_pg_port} \ + postgres://localhost:{env.pageserver.service_port.pg} \ -c 'basebackup {pg.tenant_id} {timeline}' \ | tar -x -C {restored_dir_path} """ diff --git a/test_runner/performance/test_bulk_insert.py b/test_runner/performance/test_bulk_insert.py index ee7c062a47..46dcb01c71 100644 --- a/test_runner/performance/test_bulk_insert.py +++ b/test_runner/performance/test_bulk_insert.py @@ -1,6 +1,6 @@ import os from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") @@ -16,20 +16,17 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") # 3. Disk space used # 4. Peak memory usage # -def test_bulk_insert(postgres: PostgresFactory, - pageserver: ZenithPageserver, - zenith_cli, - zenbenchmark, - repo_dir: str): +def test_bulk_insert(zenith_simple_env: ZenithEnv, zenbenchmark): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_bulk_insert", "empty"]) + env.zenith_cli(["branch", "test_bulk_insert", "empty"]) - pg = postgres.create_start('test_bulk_insert') + pg = env.postgres.create_start('test_bulk_insert') log.info("postgres is running on 'test_bulk_insert' branch") # Open a connection directly to the page server that we'll use to force # flushing the layers to disk - psconn = pageserver.connect() + psconn = env.pageserver.connect() pscur = psconn.cursor() # Get the timeline ID of our branch. We need it for the 'do_gc' command @@ -41,19 +38,19 @@ def test_bulk_insert(postgres: PostgresFactory, cur.execute("create table huge (i int, j int);") # Run INSERT, recording the time and I/O it takes - with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'): + with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): with zenbenchmark.record_duration('insert'): cur.execute("insert into huge values (generate_series(1, 5000000), 0);") # Flush the layers from memory to disk. This is included in the reported # time and I/O - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") # Record peak memory usage - zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(pageserver) / 1024, 'MB') + zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(env.pageserver) / 1024, 'MB') # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(repo_dir, - pageserver.initial_tenant, + timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, + env.initial_tenant, timeline) zenbenchmark.record('size', timeline_size / (1024 * 1024), 'MB') diff --git a/test_runner/performance/test_bulk_tenant_create.py b/test_runner/performance/test_bulk_tenant_create.py index 1e2a17c2c9..e913afc27c 100644 --- a/test_runner/performance/test_bulk_tenant_create.py +++ b/test_runner/performance/test_bulk_tenant_create.py @@ -1,11 +1,7 @@ import timeit import pytest -from fixtures.zenith_fixtures import ( - TenantFactory, - ZenithCli, - PostgresFactory, -) +from fixtures.zenith_fixtures import ZenithEnvBuilder pytest_plugins = ("fixtures.benchmark_fixture") @@ -20,37 +16,37 @@ pytest_plugins = ("fixtures.benchmark_fixture") @pytest.mark.parametrize('tenants_count', [1, 5, 10]) @pytest.mark.parametrize('use_wal_acceptors', ['with_wa', 'without_wa']) def test_bulk_tenant_create( - zenith_cli: ZenithCli, - tenant_factory: TenantFactory, - postgres: PostgresFactory, - wa_factory, + zenith_env_builder: ZenithEnvBuilder, use_wal_acceptors: str, tenants_count: int, zenbenchmark, ): """Measure tenant creation time (with and without wal acceptors)""" + if use_wal_acceptors == 'with_wa': + zenith_env_builder.num_safekeepers = 3 + env = zenith_env_builder.init() time_slices = [] for i in range(tenants_count): start = timeit.default_timer() - tenant = tenant_factory.create() - zenith_cli.run([ + tenant = env.create_tenant() + env.zenith_cli([ "branch", f"test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}", "main", f"--tenantid={tenant}" ]) - if use_wal_acceptors == 'with_wa': - wa_factory.start_n_new(3) + # FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now? + #if use_wal_acceptors == 'with_wa': + # wa_factory.start_n_new(3) - pg_tenant = postgres.create_start( + pg_tenant = env.postgres.create_start( f"test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}", None, # branch name, None means same as node name tenant, - wal_acceptors=wa_factory.get_connstrs() if use_wal_acceptors == 'with_wa' else None, ) end = timeit.default_timer() diff --git a/test_runner/performance/test_gist_build.py b/test_runner/performance/test_gist_build.py index c4617e1252..b078c820b0 100644 --- a/test_runner/performance/test_gist_build.py +++ b/test_runner/performance/test_gist_build.py @@ -1,6 +1,6 @@ import os from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") @@ -11,20 +11,17 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") # As of this writing, we're duplicate those giant WAL records for each page, # which makes the delta layer about 32x larger than it needs to be. # -def test_gist_buffering_build(postgres: PostgresFactory, - pageserver: ZenithPageserver, - zenith_cli, - zenbenchmark, - repo_dir: str): +def test_gist_buffering_build(zenith_simple_env: ZenithEnv, zenbenchmark): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_gist_buffering_build", "empty"]) + env.zenith_cli(["branch", "test_gist_buffering_build", "empty"]) - pg = postgres.create_start('test_gist_buffering_build') + pg = env.postgres.create_start('test_gist_buffering_build') log.info("postgres is running on 'test_gist_buffering_build' branch") # Open a connection directly to the page server that we'll use to force # flushing the layers to disk - psconn = pageserver.connect() + psconn = env.pageserver.connect() pscur = psconn.cursor() # Get the timeline ID of our branch. We need it for the 'do_gc' command @@ -40,7 +37,7 @@ def test_gist_buffering_build(postgres: PostgresFactory, ) # Build the index. - with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'): + with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): with zenbenchmark.record_duration('build'): cur.execute( "create index gist_pointidx2 on gist_point_tbl using gist(p) with (buffering = on)" @@ -48,13 +45,13 @@ def test_gist_buffering_build(postgres: PostgresFactory, # Flush the layers from memory to disk. This is included in the reported # time and I/O - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 1000000") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 1000000") # Record peak memory usage - zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(pageserver) / 1024, 'MB') + zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(env.pageserver) / 1024, 'MB') # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(repo_dir, - pageserver.initial_tenant, + timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, + env.initial_tenant, timeline) zenbenchmark.record('size', timeline_size / (1024 * 1024), 'MB') diff --git a/test_runner/performance/test_perf_pgbench.py b/test_runner/performance/test_perf_pgbench.py index 388ac4314c..dc50587a82 100644 --- a/test_runner/performance/test_perf_pgbench.py +++ b/test_runner/performance/test_perf_pgbench.py @@ -1,6 +1,6 @@ import os from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") @@ -15,21 +15,17 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") # 2. Time to run 5000 pgbench transactions # 3. Disk space used # -def test_pgbench(postgres: PostgresFactory, - pageserver: ZenithPageserver, - pg_bin, - zenith_cli, - zenbenchmark, - repo_dir: str): +def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin, zenbenchmark): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_pgbench_perf", "empty"]) + env.zenith_cli(["branch", "test_pgbench_perf", "empty"]) - pg = postgres.create_start('test_pgbench_perf') + pg = env.postgres.create_start('test_pgbench_perf') log.info("postgres is running on 'test_pgbench_perf' branch") # Open a connection directly to the page server that we'll use to force # flushing the layers to disk - psconn = pageserver.connect() + psconn = env.pageserver.connect() pscur = psconn.cursor() # Get the timeline ID of our branch. We need it for the 'do_gc' command @@ -41,13 +37,13 @@ def test_pgbench(postgres: PostgresFactory, connstr = pg.connstr() # Initialize pgbench database, recording the time and I/O it takes - with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'): + with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): with zenbenchmark.record_duration('init'): pg_bin.run_capture(['pgbench', '-s5', '-i', connstr]) # Flush the layers from memory to disk. This is included in the reported # time and I/O - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") # Run pgbench for 5000 transactions with zenbenchmark.record_duration('5000_xacts'): @@ -55,8 +51,8 @@ def test_pgbench(postgres: PostgresFactory, # Flush the layers to disk again. This is *not' included in the reported time, # though. - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(repo_dir, pageserver.initial_tenant, timeline) + timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline) zenbenchmark.record('size', timeline_size / (1024 * 1024), 'MB') diff --git a/test_runner/performance/test_write_amplification.py b/test_runner/performance/test_write_amplification.py index 2ac8ee225b..a5850e98f6 100644 --- a/test_runner/performance/test_write_amplification.py +++ b/test_runner/performance/test_write_amplification.py @@ -12,26 +12,23 @@ # Amplification problem at its finest. import os from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture") -def test_write_amplification(postgres: PostgresFactory, - pageserver: ZenithPageserver, - zenith_cli, - zenbenchmark, - repo_dir: str): +def test_write_amplification(zenith_simple_env: ZenithEnv, zenbenchmark): + env = zenith_simple_env # Create a branch for us - zenith_cli.run(["branch", "test_write_amplification", "empty"]) + env.zenith_cli(["branch", "test_write_amplification", "empty"]) - pg = postgres.create_start('test_write_amplification') + pg = env.postgres.create_start('test_write_amplification') log.info("postgres is running on 'test_write_amplification' branch") # Open a connection directly to the page server that we'll use to force # flushing the layers to disk - psconn = pageserver.connect() + psconn = env.pageserver.connect() pscur = psconn.cursor() with closing(pg.connect()) as conn: @@ -40,7 +37,7 @@ def test_write_amplification(postgres: PostgresFactory, cur.execute("SHOW zenith.zenith_timeline") timeline = cur.fetchone()[0] - with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'): + with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'): with zenbenchmark.record_duration('run'): # NOTE: Because each iteration updates every table already created, @@ -73,10 +70,10 @@ def test_write_amplification(postgres: PostgresFactory, # slower, adding some delays in this loop. But forcing # the the checkpointing and GC makes the test go faster, # with the same total I/O effect. - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0") # Report disk space used by the repository - timeline_size = zenbenchmark.get_timeline_size(repo_dir, - pageserver.initial_tenant, + timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, + env.initial_tenant, timeline) zenbenchmark.record('size', timeline_size / (1024 * 1024), 'MB') diff --git a/test_runner/test_broken.py b/test_runner/test_broken.py index 66bfe1192c..6582b43519 100644 --- a/test_runner/test_broken.py +++ b/test_runner/test_broken.py @@ -1,6 +1,7 @@ import pytest import os +from fixtures.zenith_fixtures import ZenithEnv from fixtures.log_helper import log pytest_plugins = ("fixtures.zenith_fixtures") @@ -19,11 +20,13 @@ run_broken = pytest.mark.skipif(os.environ.get('RUN_BROKEN') is None, @run_broken -def test_broken(zenith_cli, pageserver, postgres, pg_bin): - # Create a branch for us - zenith_cli.run(["branch", "test_broken", "empty"]) +def test_broken(zenith_simple_env: ZenithEnv, pg_bin): + env = zenith_simple_env - postgres.create_start("test_broken") + # Create a branch for us + env.zenith_cli(["branch", "test_broken", "empty"]) + + env.postgres.create_start("test_broken") log.info('postgres is running') log.info('THIS NEXT COMMAND WILL FAIL:')