Use types in zenith cli invocations in Python tests

This commit is contained in:
Kirill Bulatov
2022-02-16 11:15:48 +02:00
committed by Kirill Bulatov
parent 9512e21b9e
commit e5bf520b18
31 changed files with 267 additions and 203 deletions

View File

@@ -57,7 +57,7 @@ def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_w
env = zenith_env_builder.init()
branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}"
env.zenith_cli(["branch", branch, "main"])
env.zenith_cli.create_branch(branch, "main")
pg = env.postgres.create_start(branch)

View File

@@ -24,7 +24,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init()
# Branch at the point where only 100 rows were inserted
env.zenith_cli(["branch", "test_branch_behind", "main"])
env.zenith_cli.create_branch("test_branch_behind", "main")
pgmain = env.postgres.create_start('test_branch_behind')
log.info("postgres is running on 'test_branch_behind' branch")
@@ -62,7 +62,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
log.info(f'LSN after 200100 rows: {lsn_b}')
# Branch at the point where only 100 rows were inserted
env.zenith_cli(["branch", "test_branch_behind_hundred", "test_branch_behind@" + lsn_a])
env.zenith_cli.create_branch("test_branch_behind_hundred", "test_branch_behind@" + lsn_a)
# Insert many more rows. This generates enough WAL to fill a few segments.
main_cur.execute('''
@@ -77,7 +77,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
log.info(f'LSN after 400100 rows: {lsn_c}')
# Branch at the point where only 200100 rows were inserted
env.zenith_cli(["branch", "test_branch_behind_more", "test_branch_behind@" + lsn_b])
env.zenith_cli.create_branch("test_branch_behind_more", "test_branch_behind@" + lsn_b)
pg_hundred = env.postgres.create_start("test_branch_behind_hundred")
pg_more = env.postgres.create_start("test_branch_behind_more")
@@ -101,7 +101,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
# Check bad lsn's for branching
# branch at segment boundary
env.zenith_cli(["branch", "test_branch_segment_boundary", "test_branch_behind@0/3000000"])
env.zenith_cli.create_branch("test_branch_segment_boundary", "test_branch_behind@0/3000000")
pg = env.postgres.create_start("test_branch_segment_boundary")
cur = pg.connect().cursor()
cur.execute('SELECT 1')
@@ -109,11 +109,11 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn"):
env.zenith_cli(["branch", "test_branch_preinitdb", "main@0/42"])
env.zenith_cli.create_branch("test_branch_preinitdb", "main@0/42")
# branch at pre-ancestor lsn
with pytest.raises(Exception, match="less than timeline ancestor lsn"):
env.zenith_cli(["branch", "test_branch_preinitdb", "test_branch_behind@0/42"])
env.zenith_cli.create_branch("test_branch_preinitdb", "test_branch_behind@0/42")
# check that we cannot create branch based on garbage collected data
with closing(env.pageserver.connect()) as psconn:
@@ -125,7 +125,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
with pytest.raises(Exception, match="invalid branch start lsn"):
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
env.zenith_cli(["branch", "test_branch_create_fail", f"test_branch_behind@{gced_lsn}"])
env.zenith_cli.create_branch("test_branch_create_fail", f"test_branch_behind@{gced_lsn}")
# check that after gc everything is still there
hundred_cur.execute('SELECT count(*) FROM foo')

View File

@@ -14,8 +14,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_clog_truncate(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_clog_truncate", "empty"])
env.zenith_cli.create_branch("test_clog_truncate", "empty")
# set agressive autovacuum to make sure that truncation will happen
config = [
@@ -65,8 +64,8 @@ def test_clog_truncate(zenith_simple_env: ZenithEnv):
# create new branch after clog truncation and start a compute node on it
log.info(f'create branch at lsn_after_truncation {lsn_after_truncation}')
env.zenith_cli(
["branch", "test_clog_truncate_new", "test_clog_truncate@" + lsn_after_truncation])
env.zenith_cli.create_branch("test_clog_truncate_new",
"test_clog_truncate@" + lsn_after_truncation)
pg2 = env.postgres.create_start('test_clog_truncate_new')
log.info('postgres is running on test_clog_truncate_new branch')

View File

@@ -11,8 +11,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_config(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_config", "empty"])
env.zenith_cli.create_branch("test_config", "empty")
# change config
pg = env.postgres.create_start('test_config', config_lines=['log_min_messages=debug1'])

View File

@@ -13,7 +13,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_createdb(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_createdb", "empty"])
env.zenith_cli.create_branch("test_createdb", "empty")
pg = env.postgres.create_start('test_createdb')
log.info("postgres is running on 'test_createdb' branch")
@@ -29,7 +29,7 @@ def test_createdb(zenith_simple_env: ZenithEnv):
lsn = cur.fetchone()[0]
# Create a branch
env.zenith_cli(["branch", "test_createdb2", "test_createdb@" + lsn])
env.zenith_cli.create_branch("test_createdb2", "test_createdb@" + lsn)
pg2 = env.postgres.create_start('test_createdb2')
@@ -43,7 +43,7 @@ def test_createdb(zenith_simple_env: ZenithEnv):
#
def test_dropdb(zenith_simple_env: ZenithEnv, test_output_dir):
env = zenith_simple_env
env.zenith_cli(["branch", "test_dropdb", "empty"])
env.zenith_cli.create_branch("test_dropdb", "empty")
pg = env.postgres.create_start('test_dropdb')
log.info("postgres is running on 'test_dropdb' branch")
@@ -68,10 +68,10 @@ def test_dropdb(zenith_simple_env: ZenithEnv, test_output_dir):
lsn_after_drop = cur.fetchone()[0]
# Create two branches before and after database drop.
env.zenith_cli(["branch", "test_before_dropdb", "test_dropdb@" + lsn_before_drop])
env.zenith_cli.create_branch("test_before_dropdb", "test_dropdb@" + lsn_before_drop)
pg_before = env.postgres.create_start('test_before_dropdb')
env.zenith_cli(["branch", "test_after_dropdb", "test_dropdb@" + lsn_after_drop])
env.zenith_cli.create_branch("test_after_dropdb", "test_dropdb@" + lsn_after_drop)
pg_after = env.postgres.create_start('test_after_dropdb')
# Test that database exists on the branch before drop

View File

@@ -11,7 +11,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_createuser(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_createuser", "empty"])
env.zenith_cli.create_branch("test_createuser", "empty")
pg = env.postgres.create_start('test_createuser')
log.info("postgres is running on 'test_createuser' branch")
@@ -27,7 +27,7 @@ def test_createuser(zenith_simple_env: ZenithEnv):
lsn = cur.fetchone()[0]
# Create a branch
env.zenith_cli(["branch", "test_createuser2", "test_createuser@" + lsn])
env.zenith_cli.create_branch("test_createuser2", "test_createuser@" + lsn)
pg2 = env.postgres.create_start('test_createuser2')

View File

@@ -57,9 +57,7 @@ async def update_and_gc(env: ZenithEnv, pg: Postgres, timeline: str):
#
def test_gc_aggressive(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_gc_aggressive", "empty"])
env.zenith_cli.create_branch("test_gc_aggressive", "empty")
pg = env.postgres.create_start('test_gc_aggressive')
log.info('postgres is running on test_gc_aggressive branch')

View File

@@ -12,8 +12,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_multixact", "empty"])
env.zenith_cli.create_branch("test_multixact", "empty")
pg = env.postgres.create_start('test_multixact')
log.info("postgres is running on 'test_multixact' branch")
@@ -63,7 +62,7 @@ def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir):
assert int(next_multixact_id) > int(next_multixact_id_old)
# Branch at this point
env.zenith_cli(["branch", "test_multixact_new", "test_multixact@" + lsn])
env.zenith_cli.create_branch("test_multixact_new", "test_multixact@" + lsn)
pg_new = env.postgres.create_start('test_multixact_new')
log.info("postgres is running on 'test_multixact_new' branch")

View File

@@ -18,8 +18,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_old_request_lsn(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_old_request_lsn", "empty"])
env.zenith_cli.create_branch("test_old_request_lsn", "empty")
pg = env.postgres.create_start('test_old_request_lsn')
log.info('postgres is running on test_old_request_lsn branch')

View File

@@ -18,7 +18,7 @@ def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuil
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_pageserver_catchup_while_compute_down", "main"])
env.zenith_cli.create_branch("test_pageserver_catchup_while_compute_down", "main")
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down')
pg_conn = pg.connect()

View File

@@ -17,7 +17,7 @@ def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_pageserver_restart", "main"])
env.zenith_cli.create_branch("test_pageserver_restart", "main")
pg = env.postgres.create_start('test_pageserver_restart')
pg_conn = pg.connect()

View File

@@ -39,9 +39,7 @@ async def parallel_load_same_table(pg: Postgres, n_parallel: int):
# Load data into one table with COPY TO from 5 parallel connections
def test_parallel_copy(zenith_simple_env: ZenithEnv, n_parallel=5):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_parallel_copy", "empty"])
env.zenith_cli.create_branch("test_parallel_copy", "empty")
pg = env.postgres.create_start('test_parallel_copy')
log.info("postgres is running on 'test_parallel_copy' branch")

View File

@@ -6,9 +6,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_pgbench", "empty"])
env.zenith_cli.create_branch("test_pgbench", "empty")
pg = env.postgres.create_start('test_pgbench')
log.info("postgres is running on 'test_pgbench' branch")

View File

@@ -13,7 +13,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_readonly_node(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_readonly_node", "empty"])
env.zenith_cli.create_branch("test_readonly_node", "empty")
pgmain = env.postgres.create_start('test_readonly_node')
log.info("postgres is running on 'test_readonly_node' branch")
@@ -88,4 +88,5 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
# Create node at pre-initdb lsn
with pytest.raises(Exception, match="invalid basebackup lsn"):
# compute node startup with invalid LSN should fail
env.zenith_cli(["pg", "start", "test_readonly_node_preinitdb", "test_readonly_node@0/42"])
env.zenith_cli.pg_start("test_readonly_node_preinitdb",
timeline_spec="test_readonly_node@0/42")

View File

@@ -17,7 +17,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_restart_compute", "main"])
env.zenith_cli.create_branch("test_restart_compute", "main")
pg = env.postgres.create_start('test_restart_compute')
log.info("postgres is running on 'test_restart_compute' branch")

View File

@@ -16,7 +16,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_layerfiles_gc", "empty"])
env.zenith_cli.create_branch("test_layerfiles_gc", "empty")
pg = env.postgres.create_start('test_layerfiles_gc')
with closing(pg.connect()) as conn:

View File

@@ -12,8 +12,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# CLOG.
def test_subxacts(zenith_simple_env: ZenithEnv, test_output_dir):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_subxacts", "empty"])
env.zenith_cli.create_branch("test_subxacts", "empty")
pg = env.postgres.create_start('test_subxacts')
log.info("postgres is running on 'test_subxacts' branch")

View File

@@ -130,7 +130,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
tenant = env.create_tenant("74ee8b079a0e437eb0afea7d26a07209")
log.info("tenant to relocate %s", tenant)
env.zenith_cli(["branch", "test_tenant_relocation", "main", f"--tenantid={tenant}"])
env.zenith_cli.create_branch("test_tenant_relocation", "main", tenant_id=tenant)
tenant_pg = env.postgres.create_start(
"test_tenant_relocation",

View File

@@ -15,18 +15,12 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce
tenant_1 = env.create_tenant()
tenant_2 = env.create_tenant()
env.zenith_cli([
"branch",
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
"main",
f"--tenantid={tenant_1}"
])
env.zenith_cli([
"branch",
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
"main",
f"--tenantid={tenant_2}"
])
env.zenith_cli.create_branch(f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
"main",
tenant_id=tenant_1)
env.zenith_cli.create_branch(f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
"main",
tenant_id=tenant_2)
pg_tenant1 = env.postgres.create_start(
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",

View File

@@ -10,7 +10,7 @@ import time
def test_timeline_size(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Branch at the point where only 100 rows were inserted
env.zenith_cli(["branch", "test_timeline_size", "empty"])
env.zenith_cli.create_branch("test_timeline_size", "empty")
client = env.pageserver.http_client()
res = client.branch_detail(UUID(env.initial_tenant), "test_timeline_size")
@@ -68,7 +68,7 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_timeline_size_quota", "main"])
env.zenith_cli.create_branch("test_timeline_size_quota", "main")
client = env.pageserver.http_client()
res = client.branch_detail(UUID(env.initial_tenant), "test_timeline_size_quota")

View File

@@ -11,7 +11,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
def test_twophase(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_twophase", "empty"])
env.zenith_cli.create_branch("test_twophase", "empty")
pg = env.postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5'])
log.info("postgres is running on 'test_twophase' branch")
@@ -58,7 +58,7 @@ def test_twophase(zenith_simple_env: ZenithEnv):
assert len(twophase_files) == 2
# Create a branch with the transaction in prepared state
env.zenith_cli(["branch", "test_twophase_prepared", "test_twophase"])
env.zenith_cli.create_branch("test_twophase_prepared", "test_twophase")
# Start compute on the new branch
pg2 = env.postgres.create_start(

View File

@@ -11,8 +11,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
def test_vm_bit_clear(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_vm_bit_clear", "empty"])
env.zenith_cli.create_branch("test_vm_bit_clear", "empty")
pg = env.postgres.create_start('test_vm_bit_clear')
log.info("postgres is running on 'test_vm_bit_clear' branch")
@@ -36,7 +35,7 @@ def test_vm_bit_clear(zenith_simple_env: ZenithEnv):
cur.execute('UPDATE vmtest_update SET id = 5000 WHERE id = 1')
# Branch at this point, to test that later
env.zenith_cli(["branch", "test_vm_bit_clear_new", "test_vm_bit_clear"])
env.zenith_cli.create_branch("test_vm_bit_clear_new", "test_vm_bit_clear")
# Clear the buffer cache, to force the VM page to be re-fetched from
# the page server

View File

@@ -26,7 +26,7 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_wal_acceptors_normal_work", "main"])
env.zenith_cli.create_branch("test_wal_acceptors_normal_work", "main")
pg = env.postgres.create_start('test_wal_acceptors_normal_work')
@@ -62,7 +62,7 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
# start postgres on each timeline
pgs = []
for branch in branches:
env.zenith_cli(["branch", branch, "main"])
env.zenith_cli.create_branch(branch, "main")
pgs.append(env.postgres.create_start(branch))
tenant_id = uuid.UUID(env.initial_tenant)
@@ -185,7 +185,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = n_acceptors
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_wal_acceptors_restarts", "main"])
env.zenith_cli.create_branch("test_wal_acceptors_restarts", "main")
pg = env.postgres.create_start('test_wal_acceptors_restarts')
# we rely upon autocommit after each statement
@@ -222,7 +222,7 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_wal_acceptors_unavailability", "main"])
env.zenith_cli.create_branch("test_wal_acceptors_unavailability", "main")
pg = env.postgres.create_start('test_wal_acceptors_unavailability')
# we rely upon autocommit after each statement
@@ -293,7 +293,7 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_wal_acceptors_race_conditions", "main"])
env.zenith_cli.create_branch("test_wal_acceptors_race_conditions", "main")
pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
# we rely upon autocommit after each statement
@@ -458,7 +458,7 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_timeline_status", "main"])
env.zenith_cli.create_branch("test_timeline_status", "main")
pg = env.postgres.create_start('test_timeline_status')
wa = env.safekeepers[0]
@@ -636,7 +636,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 4
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_replace_safekeeper", "main"])
env.zenith_cli.create_branch("test_replace_safekeeper", "main")
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()

View File

@@ -203,7 +203,7 @@ def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_wal_acceptors_restarts_under_load", "main"])
env.zenith_cli.create_branch("test_wal_acceptors_restarts_under_load", "main")
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load')
asyncio.run(run_restarts_under_load(pg, env.safekeepers))

View File

@@ -20,13 +20,11 @@ def helper_compare_branch_list(pageserver_http_client: ZenithPageserverHttpClien
branches_api = sorted(map(lambda b: cast(str, b['name']), branches))
branches_api = [b for b in branches_api if b.startswith('test_cli_') or b in ('empty', 'main')]
res = env.zenith_cli(["branch"])
res.check_returncode()
res = env.zenith_cli.list_branches()
branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n")))
branches_cli = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')]
res = env.zenith_cli(["branch", f"--tenantid={initial_tenant}"])
res.check_returncode()
res = env.zenith_cli.list_branches(tenant_id=initial_tenant)
branches_cli_with_tenant_arg = sorted(
map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n")))
branches_cli_with_tenant_arg = [
@@ -42,19 +40,17 @@ def test_cli_branch_list(zenith_simple_env: ZenithEnv):
# Initial sanity check
helper_compare_branch_list(pageserver_http_client, env, env.initial_tenant)
# Create a branch for us
res = env.zenith_cli(["branch", "test_cli_branch_list_main", "empty"])
assert res.stderr == ''
env.zenith_cli.create_branch("test_cli_branch_list_main", "empty")
helper_compare_branch_list(pageserver_http_client, env, env.initial_tenant)
# Create a nested branch
res = env.zenith_cli(["branch", "test_cli_branch_list_nested", "test_cli_branch_list_main"])
res = env.zenith_cli.create_branch("test_cli_branch_list_nested", "test_cli_branch_list_main")
assert res.stderr == ''
helper_compare_branch_list(pageserver_http_client, env, env.initial_tenant)
# Check that all new branches are visible via CLI
res = env.zenith_cli(["branch"])
res = env.zenith_cli.list_branches()
assert res.stderr == ''
branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n")))
@@ -66,7 +62,7 @@ def helper_compare_tenant_list(pageserver_http_client: ZenithPageserverHttpClien
tenants = pageserver_http_client.tenant_list()
tenants_api = sorted(map(lambda t: cast(str, t['id']), tenants))
res = env.zenith_cli(["tenant", "list"])
res = env.zenith_cli.list_tenants()
assert res.stderr == ''
tenants_cli = sorted(map(lambda t: t.split()[0], res.stdout.splitlines()))
@@ -81,22 +77,19 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
# Create new tenant
tenant1 = uuid.uuid4().hex
res = env.zenith_cli(["tenant", "create", tenant1])
res.check_returncode()
env.zenith_cli.create_tenant(tenant1)
# check tenant1 appeared
helper_compare_tenant_list(pageserver_http_client, env)
# Create new tenant
tenant2 = uuid.uuid4().hex
res = env.zenith_cli(["tenant", "create", tenant2])
res.check_returncode()
env.zenith_cli.create_tenant(tenant2)
# check tenant2 appeared
helper_compare_tenant_list(pageserver_http_client, env)
res = env.zenith_cli(["tenant", "list"])
res.check_returncode()
res = env.zenith_cli.list_tenants()
tenants = sorted(map(lambda t: t.split()[0], res.stdout.splitlines()))
assert env.initial_tenant in tenants

View File

@@ -9,9 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
def test_isolation(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_isolation", "empty"])
env.zenith_cli.create_branch("test_isolation", "empty")
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
pg = env.postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100'])

View File

@@ -9,9 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
def test_pg_regress(zenith_simple_env: ZenithEnv, test_output_dir: str, pg_bin, capsys):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_pg_regress", "empty"])
env.zenith_cli.create_branch("test_pg_regress", "empty")
# Connect to postgres and create a database called "regression".
pg = env.postgres.create_start('test_pg_regress')
pg.safe_psql('CREATE DATABASE regression')

View File

@@ -13,9 +13,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
def test_zenith_regress(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_zenith_regress", "empty"])
env.zenith_cli.create_branch("test_zenith_regress", "empty")
# Connect to postgres and create a database called "regression".
pg = env.postgres.create_start('test_zenith_regress')
pg.safe_psql('CREATE DATABASE regression')

View File

@@ -517,6 +517,7 @@ class ZenithEnv:
self.rust_log_override = config.rust_log_override
self.port_distributor = config.port_distributor
self.s3_mock_server = config.s3_mock_server
self.zenith_cli = ZenithCli(env=self)
self.postgres = PostgresFactory(self)
@@ -573,15 +574,7 @@ sync = false # Disable fsyncs to make the tests go faster
log.info(f"Config: {toml}")
# Run 'zenith init' using the config file we constructed
with tempfile.NamedTemporaryFile(mode='w+') as tmp:
tmp.write(toml)
tmp.flush()
cmd = ['init', f'--config={tmp.name}']
append_pageserver_param_overrides(cmd, config.pageserver_remote_storage)
self.zenith_cli(cmd)
self.zenith_cli.init(toml)
# Start up the page server and all the safekeepers
self.pageserver.start()
@@ -596,66 +589,9 @@ sync = false # Disable fsyncs to make the tests go faster
def create_tenant(self, tenant_id: Optional[str] = None):
if tenant_id is None:
tenant_id = uuid.uuid4().hex
res = self.zenith_cli(['tenant', 'create', tenant_id])
res.check_returncode()
self.zenith_cli.create_tenant(tenant_id)
return tenant_id
def zenith_cli(self, arguments: List[str]) -> 'subprocess.CompletedProcess[str]':
"""
Run "zenith" with the specified arguments.
Arguments must be in list form, e.g. ['pg', 'create']
Return both stdout and stderr, which can be accessed as
>>> result = env.zenith_cli(...)
>>> assert result.stderr == ""
>>> log.info(result.stdout)
"""
assert type(arguments) == list
bin_zenith = os.path.join(str(zenith_binpath), 'zenith')
args = [bin_zenith] + arguments
log.info('Running command "{}"'.format(' '.join(args)))
log.info(f'Running in "{self.repo_dir}"')
env_vars = os.environ.copy()
env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir)
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
if self.rust_log_override is not None:
env_vars['RUST_LOG'] = self.rust_log_override
# Pass coverage settings
var = 'LLVM_PROFILE_FILE'
val = os.environ.get(var)
if val:
env_vars[var] = val
# Intercept CalledProcessError and print more info
try:
res = subprocess.run(args,
env=env_vars,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log.info(f"Run success: {res.stdout}")
except subprocess.CalledProcessError as exc:
# this way command output will be in recorded and shown in CI in failure message
msg = f"""\
Run failed: {exc}
stdout: {exc.stdout}
stderr: {exc.stderr}
"""
log.info(msg)
raise Exception(msg) from exc
return res
@cached_property
def auth_keys(self) -> AuthKeys:
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()
@@ -683,7 +619,7 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
env = builder.init()
# For convenience in tests, create a branch from the freshly-initialized cluster.
env.zenith_cli(["branch", "empty", "main"])
env.zenith_cli.create_branch("empty", "main")
# Return the builder to the caller
yield env
@@ -817,7 +753,7 @@ class ZenithPageserverHttpClient(requests.Session):
assert isinstance(res_json, list)
return res_json
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID):
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}/{timeline_id.hex}")
self.verbose_error(res)
@@ -854,6 +790,185 @@ class S3Storage:
RemoteStorage = Union[LocalFsStorage, S3Storage]
class ZenithCli:
"""
A typed wrapper around the `zenith` CLI tool.
Supports main commands via typed methods and a way to run arbitrary command directly via CLI.
"""
def __init__(self, env: ZenithEnv) -> None:
self.env = env
pass
def create_tenant(self, tenant_id: Optional[str] = None) -> uuid.UUID:
if tenant_id is None:
tenant_id = uuid.uuid4().hex
self.raw_cli(['tenant', 'create', tenant_id])
return uuid.UUID(tenant_id)
def list_tenants(self) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['tenant', 'list'])
def create_branch(self,
branch_name: str,
starting_point: str,
tenant_id: Optional[str] = None) -> 'subprocess.CompletedProcess[str]':
args = ['branch']
if tenant_id is not None:
args.extend(['--tenantid', tenant_id])
args.extend([branch_name, starting_point])
return self.raw_cli(args)
def list_branches(self, tenant_id: Optional[str] = None) -> 'subprocess.CompletedProcess[str]':
args = ['branch']
if tenant_id is not None:
args.extend(['--tenantid', tenant_id])
return self.raw_cli(args)
def init(self, config_toml: str) -> 'subprocess.CompletedProcess[str]':
with tempfile.NamedTemporaryFile(mode='w+') as tmp:
tmp.write(config_toml)
tmp.flush()
cmd = ['init', f'--config={tmp.name}']
append_pageserver_param_overrides(cmd, self.env.pageserver.remote_storage)
return self.raw_cli(cmd)
def pageserver_start(self) -> 'subprocess.CompletedProcess[str]':
start_args = ['pageserver', 'start']
append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage)
return self.raw_cli(start_args)
def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]':
cmd = ['pageserver', 'stop']
if immediate:
cmd.extend(['-m', 'immediate'])
log.info(f"Stopping pageserver with {cmd}")
return self.raw_cli(cmd)
def safekeeper_start(self, name: str) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['safekeeper', 'start', name])
def safekeeper_stop(self, name: str, immediate=False) -> 'subprocess.CompletedProcess[str]':
args = ['safekeeper', 'stop']
if immediate:
args.extend(['-m', 'immediate'])
args.append(name)
return self.raw_cli(args)
def pg_create(
self,
node_name: str,
tenant_id: Optional[str] = None,
timeline_spec: Optional[str] = None,
port: Optional[int] = None,
) -> 'subprocess.CompletedProcess[str]':
args = ['pg', 'create']
if tenant_id is not None:
args.extend(['--tenantid', tenant_id])
if port is not None:
args.append(f'--port={port}')
args.append(node_name)
if timeline_spec is not None:
args.append(timeline_spec)
return self.raw_cli(args)
def pg_start(
self,
node_name: str,
tenant_id: Optional[str] = None,
timeline_spec: Optional[str] = None,
port: Optional[int] = None,
) -> 'subprocess.CompletedProcess[str]':
args = ['pg', 'start']
if tenant_id is not None:
args.extend(['--tenantid', tenant_id])
if port is not None:
args.append(f'--port={port}')
args.append(node_name)
if timeline_spec is not None:
args.append(timeline_spec)
return self.raw_cli(args)
def pg_stop(
self,
node_name: str,
tenant_id: Optional[str] = None,
destroy=False,
) -> 'subprocess.CompletedProcess[str]':
args = ['pg', 'stop']
if tenant_id is not None:
args.extend(['--tenantid', tenant_id])
if destroy:
args.append('--destroy')
args.append(node_name)
return self.raw_cli(args)
def raw_cli(self,
arguments: List[str],
check_return_code=True) -> 'subprocess.CompletedProcess[str]':
"""
Run "zenith" with the specified arguments.
Arguments must be in list form, e.g. ['pg', 'create']
Return both stdout and stderr, which can be accessed as
>>> result = env.zenith_cli.raw_cli(...)
>>> assert result.stderr == ""
>>> log.info(result.stdout)
"""
assert type(arguments) == list
bin_zenith = os.path.join(str(zenith_binpath), 'zenith')
args = [bin_zenith] + arguments
log.info('Running command "{}"'.format(' '.join(args)))
log.info(f'Running in "{self.env.repo_dir}"')
env_vars = os.environ.copy()
env_vars['ZENITH_REPO_DIR'] = str(self.env.repo_dir)
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
if self.env.rust_log_override is not None:
env_vars['RUST_LOG'] = self.env.rust_log_override
# Pass coverage settings
var = 'LLVM_PROFILE_FILE'
val = os.environ.get(var)
if val:
env_vars[var] = val
# Intercept CalledProcessError and print more info
try:
res = subprocess.run(args,
env=env_vars,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log.info(f"Run success: {res.stdout}")
except subprocess.CalledProcessError as exc:
# this way command output will be in recorded and shown in CI in failure message
msg = f"""\
Run failed: {exc}
stdout: {exc.stdout}
stderr: {exc.stderr}
"""
log.info(msg)
raise Exception(msg) from exc
if check_return_code:
res.check_returncode()
return res
class ZenithPageserver(PgProtocol):
"""
An object representing a running pageserver.
@@ -878,10 +993,7 @@ class ZenithPageserver(PgProtocol):
"""
assert self.running == False
start_args = ['pageserver', 'start']
append_pageserver_param_overrides(start_args, self.remote_storage)
self.env.zenith_cli(start_args)
self.env.zenith_cli.pageserver_start()
self.running = True
return self
@@ -890,13 +1002,8 @@ class ZenithPageserver(PgProtocol):
Stop the page server.
Returns self.
"""
cmd = ['pageserver', 'stop']
if immediate:
cmd.extend(['-m', 'immediate'])
log.info(f"Stopping pageserver with {cmd}")
if self.running:
self.env.zenith_cli(cmd)
self.env.zenith_cli.pageserver_stop(immediate)
self.running = False
return self
@@ -1076,14 +1183,10 @@ class Postgres(PgProtocol):
if branch is None:
branch = node_name
self.env.zenith_cli([
'pg',
'create',
f'--tenantid={self.tenant_id}',
f'--port={self.port}',
node_name,
branch
])
self.env.zenith_cli.pg_create(node_name,
tenant_id=self.tenant_id,
port=self.port,
timeline_spec=branch)
self.node_name = node_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.node_name
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
@@ -1104,8 +1207,9 @@ class Postgres(PgProtocol):
log.info(f"Starting postgres node {self.node_name}")
run_result = self.env.zenith_cli(
['pg', 'start', f'--tenantid={self.tenant_id}', f'--port={self.port}', self.node_name])
run_result = self.env.zenith_cli.pg_start(self.node_name,
tenant_id=self.tenant_id,
port=self.port)
self.running = True
log.info(f"stdout: {run_result.stdout}")
@@ -1175,7 +1279,7 @@ class Postgres(PgProtocol):
if self.running:
assert self.node_name is not None
self.env.zenith_cli(['pg', 'stop', self.node_name, f'--tenantid={self.tenant_id}'])
self.env.zenith_cli.pg_stop(self.node_name, tenant_id=self.tenant_id)
self.running = False
return self
@@ -1187,8 +1291,7 @@ class Postgres(PgProtocol):
"""
assert self.node_name is not None
self.env.zenith_cli(
['pg', 'stop', '--destroy', self.node_name, f'--tenantid={self.tenant_id}'])
self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id, destroy=True)
self.node_name = None
return self
@@ -1295,7 +1398,7 @@ class Safekeeper:
auth_token: Optional[str] = None
def start(self) -> 'Safekeeper':
self.env.zenith_cli(['safekeeper', 'start', self.name])
self.env.zenith_cli.safekeeper_start(self.name)
# wait for wal acceptor start by checking its status
started_at = time.time()
@@ -1314,13 +1417,8 @@ class Safekeeper:
return self
def stop(self, immediate=False) -> 'Safekeeper':
cmd = ['safekeeper', 'stop']
if immediate:
cmd.extend(['-m', 'immediate'])
cmd.append(self.name)
log.info('Stopping safekeeper {}'.format(self.name))
self.env.zenith_cli(cmd)
self.env.zenith_cli.safekeeper_stop(self.name, immediate)
return self
def append_logical_message(self, tenant_id: str, timeline_id: str,

View File

@@ -33,12 +33,10 @@ def test_bulk_tenant_create(
start = timeit.default_timer()
tenant = env.create_tenant()
env.zenith_cli([
"branch",
env.zenith_cli.create_branch(
f"test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}",
"main",
f"--tenantid={tenant}"
])
tenant_id=tenant)
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
#if use_wal_acceptors == 'with_wa':

View File

@@ -23,9 +23,7 @@ run_broken = pytest.mark.skipif(os.environ.get('RUN_BROKEN') is None,
def test_broken(zenith_simple_env: ZenithEnv, pg_bin):
env = zenith_simple_env
# Create a branch for us
env.zenith_cli(["branch", "test_broken", "empty"])
env.zenith_cli.create_branch("test_broken", "empty")
env.postgres.create_start("test_broken")
log.info('postgres is running')