Refactor pytest fixtures

Instead of having a lot of separate fixtures for setting up the page
server, the compute nodes, the safekeepers etc., have one big ZenithEnv
object that encapsulates the whole environment. Every test either uses
a shared "zenith_simple_env" fixture, which contains the default setup
of a pageserver with no authentication, and no safekeepers. Tests that
want to use safekeepers or authentication set up a custom test-specific
ZenithEnv fixture.

Gathering information about the whole environment into one object makes
some things simpler. For example, when a new compute node is created,
you no longer need to pass the 'wal_acceptors' connection string as
argument to the 'postgres.create_start' function. The 'create_start'
function fetches that information directly from the ZenithEnv object.
This commit is contained in:
Heikki Linnakangas
2021-10-25 14:14:47 +03:00
parent 28af3e5008
commit 66ec135676
33 changed files with 723 additions and 808 deletions

View File

@@ -65,36 +65,44 @@ Exit after the first test failure:
`pytest -x ...`
(there are many more pytest options; run `pytest -h` to see them.)
### Writing a test
### Building new tests
Every test needs a Zenith Environment, or ZenithEnv to operate in. A Zenith Environment
is like a little cloud-in-a-box, and consists of a Pageserver, 0-N Safekeepers, and
compute Postgres nodes. The connections between them can be configured to use JWT
authentication tokens, and some other configuration options can be tweaked too.
The tests make heavy use of pytest fixtures. You can read about how they work here: https://docs.pytest.org/en/stable/fixture.html
The easiest way to get access to a Zenith Environment is by using the `zenith_simple_env`
fixture. The 'simple' env may be shared across multiple tests, so don't shut down the nodes
or make other destructive changes in that environment. Also don't assume that
there are no tenants or branches or data in the cluster. For convenience, there is a
branch called `empty`, though. The convention is to create a test-specific branch of
that and load any test data there, instead of the 'main' branch.
Essentially, this means that each time you see a fixture named as an input parameter, the function with that name will be run and passed as a parameter to the function.
So this code:
For more complicated cases, you can build a custom Zenith Environment, with the `zenith_env`
fixture:
```python
def test_something(zenith_cli, pg_bin):
pass
def test_foobar(zenith_env_builder: ZenithEnvBuilder):
# Prescribe the environment.
# We want to have 3 safekeeper nodes, and use JWT authentication in the
# connections to the page server
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.set_pageserver_auth(True)
# Now create the environment. This initializes the repository, and starts
# up the page server and the safekeepers
env = zenith_env_builder.init()
# Run the test
...
```
... will run the fixtures called `zenith_cli` and `pg_bin` and deliver those results to the test function.
Fixtures can't be imported using the normal python syntax. Instead, use this:
```python
pytest_plugins = ("fixtures.something")
```
That will make all the fixtures in the `fixtures/something.py` file available.
Anything that's likely to be used in multiple tests should be built into a fixture.
Note that fixtures can clean up after themselves if they use the `yield` syntax.
Cleanup will happen even if the test fails (raises an unhandled exception).
Python destructors, e.g. `__del__()` aren't recommended for cleanup.
For more information about pytest fixtures, see https://docs.pytest.org/en/stable/fixture.html
At the end of a test, all the nodes in the environment are automatically stopped, so you
don't need to worry about cleaning up. Logs and test data are preserved for the analysis,
in a directory under `../test_output/<testname>`
### Before submitting a patch
#### Obligatory checks

View File

@@ -2,18 +2,21 @@ from contextlib import closing
from typing import Iterator
from uuid import uuid4
import psycopg2
from fixtures.zenith_fixtures import PortDistributor, Postgres, ZenithCli, ZenithPageserver, PgBin
from fixtures.zenith_fixtures import ZenithEnvBuilder
import pytest
pytest_plugins = ("fixtures.zenith_fixtures")
def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver):
ps = pageserver_auth_enabled
def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.pageserver_auth_enabled = True
env = zenith_env_builder.init()
tenant_token = ps.auth_keys.generate_tenant_token(ps.initial_tenant)
invalid_tenant_token = ps.auth_keys.generate_tenant_token(uuid4().hex)
management_token = ps.auth_keys.generate_management_token()
ps = env.pageserver
tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant)
invalid_tenant_token = env.auth_keys.generate_tenant_token(uuid4().hex)
management_token = env.auth_keys.generate_management_token()
# this does not invoke auth check and only decodes jwt and checks it for validity
# check both tokens
@@ -21,13 +24,13 @@ def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver):
ps.safe_psql("status", password=management_token)
# tenant can create branches
ps.safe_psql(f"branch_create {ps.initial_tenant} new1 main", password=tenant_token)
ps.safe_psql(f"branch_create {env.initial_tenant} new1 main", password=tenant_token)
# console can create branches for tenant
ps.safe_psql(f"branch_create {ps.initial_tenant} new2 main", password=management_token)
ps.safe_psql(f"branch_create {env.initial_tenant} new2 main", password=management_token)
# fail to create branch using token with different tenantid
with pytest.raises(psycopg2.DatabaseError, match='Tenant id mismatch. Permission denied'):
ps.safe_psql(f"branch_create {ps.initial_tenant} new2 main", password=invalid_tenant_token)
ps.safe_psql(f"branch_create {env.initial_tenant} new2 main", password=invalid_tenant_token)
# create tenant using management token
ps.safe_psql(f"tenant_create {uuid4().hex}", password=management_token)
@@ -40,38 +43,22 @@ def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver):
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_compute_auth_to_pageserver(
zenith_cli: ZenithCli,
wa_factory,
pageserver_auth_enabled: ZenithPageserver,
repo_dir: str,
with_wal_acceptors: bool,
port_distributor: PortDistributor,
):
ps = pageserver_auth_enabled
# since we are in progress of refactoring protocols between compute safekeeper and page server
# use hardcoded management token in safekeeper
management_token = ps.auth_keys.generate_management_token()
def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}"
zenith_cli.run(["branch", branch, "empty"])
if with_wal_acceptors:
wa_factory.start_n_new(3, management_token)
env.zenith_cli(["branch", branch, "main"])
with Postgres(
zenith_cli=zenith_cli,
repo_dir=repo_dir,
tenant_id=ps.initial_tenant,
port=port_distributor.get_port(),
).create_start(
branch,
wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None,
) as pg:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (5000050000, )
pg = env.postgres.create_start(branch)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (5000050000, )

View File

@@ -1,5 +1,5 @@
import subprocess
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -8,11 +8,12 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Create a couple of branches off the main branch, at a historical point in time.
#
def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
def test_branch_behind(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Branch at the point where only 100 rows were inserted
zenith_cli.run(["branch", "test_branch_behind", "empty"])
env.zenith_cli(["branch", "test_branch_behind", "empty"])
pgmain = postgres.create_start('test_branch_behind')
pgmain = env.postgres.create_start('test_branch_behind')
log.info("postgres is running on 'test_branch_behind' branch")
main_pg_conn = pgmain.connect()
@@ -40,7 +41,7 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
log.info(f'LSN after 200100 rows: {lsn_b}')
# Branch at the point where only 100 rows were inserted
zenith_cli.run(["branch", "test_branch_behind_hundred", "test_branch_behind@" + lsn_a])
env.zenith_cli(["branch", "test_branch_behind_hundred", "test_branch_behind@" + lsn_a])
# Insert many more rows. This generates enough WAL to fill a few segments.
main_cur.execute('''
@@ -55,10 +56,10 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
log.info(f'LSN after 400100 rows: {lsn_c}')
# Branch at the point where only 200100 rows were inserted
zenith_cli.run(["branch", "test_branch_behind_more", "test_branch_behind@" + lsn_b])
env.zenith_cli(["branch", "test_branch_behind_more", "test_branch_behind@" + lsn_b])
pg_hundred = postgres.create_start("test_branch_behind_hundred")
pg_more = postgres.create_start("test_branch_behind_more")
pg_hundred = env.postgres.create_start("test_branch_behind_hundred")
pg_more = env.postgres.create_start("test_branch_behind_more")
# On the 'hundred' branch, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
@@ -79,8 +80,8 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
# Check bad lsn's for branching
# branch at segment boundary
zenith_cli.run(["branch", "test_branch_segment_boundary", "test_branch_behind@0/3000000"])
pg = postgres.create_start("test_branch_segment_boundary")
env.zenith_cli(["branch", "test_branch_segment_boundary", "test_branch_behind@0/3000000"])
pg = env.postgres.create_start("test_branch_segment_boundary")
cur = pg.connect().cursor()
cur.execute('SELECT 1')
assert cur.fetchone() == (1, )
@@ -89,7 +90,7 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
#
# FIXME: This works currently, but probably shouldn't be allowed
try:
zenith_cli.run(["branch", "test_branch_preinitdb", "test_branch_behind@0/42"])
env.zenith_cli(["branch", "test_branch_preinitdb", "test_branch_behind@0/42"])
# FIXME: assert false, "branch with invalid LSN should have failed"
except subprocess.CalledProcessError:
log.info("Branch creation with pre-initdb LSN failed (as expected)")

View File

@@ -3,7 +3,7 @@ import os
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -12,9 +12,10 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test compute node start after clog truncation
#
def test_clog_truncate(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
def test_clog_truncate(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_clog_truncate", "empty"])
env.zenith_cli(["branch", "test_clog_truncate", "empty"])
# set agressive autovacuum to make sure that truncation will happen
config = [
@@ -27,7 +28,7 @@ def test_clog_truncate(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
'autovacuum_freeze_max_age=100000'
]
pg = postgres.create_start('test_clog_truncate', config_lines=config)
pg = env.postgres.create_start('test_clog_truncate', config_lines=config)
log.info('postgres is running on test_clog_truncate branch')
# Install extension containing function needed for test
@@ -64,10 +65,10 @@ def test_clog_truncate(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
# create new branch after clog truncation and start a compute node on it
log.info(f'create branch at lsn_after_truncation {lsn_after_truncation}')
zenith_cli.run(
env.zenith_cli(
["branch", "test_clog_truncate_new", "test_clog_truncate@" + lsn_after_truncation])
pg2 = postgres.create_start('test_clog_truncate_new')
pg2 = env.postgres.create_start('test_clog_truncate_new')
log.info('postgres is running on test_clog_truncate_new branch')
# check that new node doesn't contain truncated segment

View File

@@ -1,6 +1,6 @@
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -9,12 +9,13 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test starting Postgres with custom options
#
def test_config(zenith_cli, postgres: PostgresFactory):
def test_config(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_config", "empty"])
env.zenith_cli(["branch", "test_config", "empty"])
# change config
pg = postgres.create_start('test_config', config_lines=['log_min_messages=debug1'])
pg = env.postgres.create_start('test_config', config_lines=['log_min_messages=debug1'])
log.info('postgres is running on test_config branch')
with closing(pg.connect()) as conn:

View File

@@ -2,7 +2,7 @@ import os
import pathlib
from contextlib import closing
from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory, ZenithCli, check_restored_datadir_content
from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -11,15 +11,11 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test CREATE DATABASE when there have been relmapper changes
#
def test_createdb(
zenith_cli: ZenithCli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin,
):
zenith_cli.run(["branch", "test_createdb", "empty"])
def test_createdb(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_createdb", "empty"])
pg = postgres.create_start('test_createdb')
pg = env.postgres.create_start('test_createdb')
log.info("postgres is running on 'test_createdb' branch")
with closing(pg.connect()) as conn:
@@ -33,9 +29,9 @@ def test_createdb(
lsn = cur.fetchone()[0]
# Create a branch
zenith_cli.run(["branch", "test_createdb2", "test_createdb@" + lsn])
env.zenith_cli(["branch", "test_createdb2", "test_createdb@" + lsn])
pg2 = postgres.create_start('test_createdb2')
pg2 = env.postgres.create_start('test_createdb2')
# Test that you can connect to the new database on both branches
for db in (pg, pg2):
@@ -45,16 +41,11 @@ def test_createdb(
#
# Test DROP DATABASE
#
def test_dropdb(
zenith_cli: ZenithCli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin,
test_output_dir,
):
zenith_cli.run(["branch", "test_dropdb", "empty"])
def test_dropdb(zenith_simple_env: ZenithEnv, test_output_dir):
env = zenith_simple_env
env.zenith_cli(["branch", "test_dropdb", "empty"])
pg = postgres.create_start('test_dropdb')
pg = env.postgres.create_start('test_dropdb')
log.info("postgres is running on 'test_dropdb' branch")
with closing(pg.connect()) as conn:
@@ -77,11 +68,11 @@ def test_dropdb(
lsn_after_drop = cur.fetchone()[0]
# Create two branches before and after database drop.
zenith_cli.run(["branch", "test_before_dropdb", "test_dropdb@" + lsn_before_drop])
pg_before = postgres.create_start('test_before_dropdb')
env.zenith_cli(["branch", "test_before_dropdb", "test_dropdb@" + lsn_before_drop])
pg_before = env.postgres.create_start('test_before_dropdb')
zenith_cli.run(["branch", "test_after_dropdb", "test_dropdb@" + lsn_after_drop])
pg_after = postgres.create_start('test_after_dropdb')
env.zenith_cli(["branch", "test_after_dropdb", "test_dropdb@" + lsn_after_drop])
pg_after = env.postgres.create_start('test_after_dropdb')
# Test that database exists on the branch before drop
pg_before.connect(dbname='foodb').close()
@@ -101,4 +92,4 @@ def test_dropdb(
assert os.path.isdir(dbpath) == False
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, pg, pageserver.service_port.pg)
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -1,6 +1,6 @@
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -9,10 +9,11 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test CREATE USER to check shared catalog restore
#
def test_createuser(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
zenith_cli.run(["branch", "test_createuser", "empty"])
def test_createuser(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_createuser", "empty"])
pg = postgres.create_start('test_createuser')
pg = env.postgres.create_start('test_createuser')
log.info("postgres is running on 'test_createuser' branch")
with closing(pg.connect()) as conn:
@@ -26,9 +27,9 @@ def test_createuser(zenith_cli, pageserver: ZenithPageserver, postgres: Postgres
lsn = cur.fetchone()[0]
# Create a branch
zenith_cli.run(["branch", "test_createuser2", "test_createuser@" + lsn])
env.zenith_cli(["branch", "test_createuser2", "test_createuser@" + lsn])
pg2 = postgres.create_start('test_createuser2')
pg2 = env.postgres.create_start('test_createuser2')
# Test that you can connect to new branch as a new user
assert pg2.safe_psql('select current_user', username='testuser') == [('testuser', )]

View File

@@ -1,4 +1,4 @@
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver, check_restored_datadir_content
from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -10,15 +10,11 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# it only checks next_multixact_id field in restored pg_control,
# since we don't have functions to check multixact internals.
#
def test_multixact(pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin,
zenith_cli,
base_dir,
test_output_dir):
def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_multixact", "empty"])
pg = postgres.create_start('test_multixact')
env.zenith_cli(["branch", "test_multixact", "empty"])
pg = env.postgres.create_start('test_multixact')
log.info("postgres is running on 'test_multixact' branch")
pg_conn = pg.connect()
@@ -57,8 +53,8 @@ def test_multixact(pageserver: ZenithPageserver,
assert int(next_multixact_id) > int(next_multixact_id_old)
# Branch at this point
zenith_cli.run(["branch", "test_multixact_new", "test_multixact@" + lsn])
pg_new = postgres.create_start('test_multixact_new')
env.zenith_cli(["branch", "test_multixact_new", "test_multixact@" + lsn])
pg_new = env.postgres.create_start('test_multixact_new')
log.info("postgres is running on 'test_multixact_new' branch")
pg_new_conn = pg_new.connect()
@@ -71,4 +67,4 @@ def test_multixact(pageserver: ZenithPageserver,
assert next_multixact_id_new == next_multixact_id
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, pg_new, pageserver.service_port.pg)
check_restored_datadir_content(test_output_dir, env, pg_new)

View File

@@ -1,6 +1,6 @@
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -16,13 +16,11 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# just a hint that the page hasn't been modified since that LSN, and the page
# server should return the latest page version regardless of the LSN.
#
def test_old_request_lsn(zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin):
def test_old_request_lsn(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_old_request_lsn", "empty"])
pg = postgres.create_start('test_old_request_lsn')
env.zenith_cli(["branch", "test_old_request_lsn", "empty"])
pg = env.postgres.create_start('test_old_request_lsn')
log.info('postgres is running on test_old_request_lsn branch')
pg_conn = pg.connect()
@@ -32,7 +30,7 @@ def test_old_request_lsn(zenith_cli,
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
psconn = pageserver.connect()
psconn = env.pageserver.connect()
pscur = psconn.cursor()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
@@ -59,7 +57,7 @@ def test_old_request_lsn(zenith_cli,
# Make a lot of updates on a single row, generating a lot of WAL. Trigger
# garbage collections so that the page server will remove old page versions.
for i in range(10):
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
for j in range(100):
cur.execute('UPDATE foo SET val = val + 1 WHERE id = 1;')

View File

@@ -3,26 +3,28 @@ from uuid import uuid4
import pytest
import psycopg2
import requests
from fixtures.zenith_fixtures import ZenithCli, ZenithPageserver, ZenithPageserverHttpClient
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
from typing import cast
pytest_plugins = ("fixtures.zenith_fixtures")
def test_status_psql(pageserver):
assert pageserver.safe_psql('status') == [
def test_status_psql(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
assert env.pageserver.safe_psql('status') == [
('hello world', ),
]
def test_branch_list_psql(pageserver: ZenithPageserver, zenith_cli):
def test_branch_list_psql(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_branch_list_main", "empty"])
env.zenith_cli(["branch", "test_branch_list_main", "empty"])
conn = pageserver.connect()
conn = env.pageserver.connect()
cur = conn.cursor()
cur.execute(f'branch_list {pageserver.initial_tenant}')
cur.execute(f'branch_list {env.initial_tenant}')
branches = json.loads(cur.fetchone()[0])
# Filter out branches created by other tests
branches = [x for x in branches if x['name'].startswith('test_branch_list')]
@@ -35,10 +37,10 @@ def test_branch_list_psql(pageserver: ZenithPageserver, zenith_cli):
assert 'ancestor_lsn' in branches[0]
# Create another branch, and start Postgres on it
zenith_cli.run(['branch', 'test_branch_list_experimental', 'test_branch_list_main'])
zenith_cli.run(['pg', 'create', 'test_branch_list_experimental'])
env.zenith_cli(['branch', 'test_branch_list_experimental', 'test_branch_list_main'])
env.zenith_cli(['pg', 'create', 'test_branch_list_experimental'])
cur.execute(f'branch_list {pageserver.initial_tenant}')
cur.execute(f'branch_list {env.initial_tenant}')
new_branches = json.loads(cur.fetchone()[0])
# Filter out branches created by other tests
new_branches = [x for x in new_branches if x['name'].startswith('test_branch_list')]
@@ -54,19 +56,22 @@ def test_branch_list_psql(pageserver: ZenithPageserver, zenith_cli):
conn.close()
def test_tenant_list_psql(pageserver: ZenithPageserver, zenith_cli: ZenithCli):
res = zenith_cli.run(["tenant", "list"])
def test_tenant_list_psql(zenith_env_builder: ZenithEnvBuilder):
# don't use zenith_simple_env, because there might be other tenants there,
# left over from other tests.
env = zenith_env_builder.init()
res = env.zenith_cli(["tenant", "list"])
res.check_returncode()
tenants = sorted(map(lambda t: t.split()[0], res.stdout.splitlines()))
assert tenants == [pageserver.initial_tenant]
assert tenants == [env.initial_tenant]
conn = pageserver.connect()
conn = env.pageserver.connect()
cur = conn.cursor()
# check same tenant cannot be created twice
with pytest.raises(psycopg2.DatabaseError,
match=f'tenant {pageserver.initial_tenant} already exists'):
cur.execute(f'tenant_create {pageserver.initial_tenant}')
with pytest.raises(psycopg2.DatabaseError, match=f'tenant {env.initial_tenant} already exists'):
cur.execute(f'tenant_create {env.initial_tenant}')
# create one more tenant
tenant1 = uuid4().hex
@@ -76,7 +81,7 @@ def test_tenant_list_psql(pageserver: ZenithPageserver, zenith_cli: ZenithCli):
# compare tenants list
new_tenants = sorted(map(lambda t: cast(str, t['id']), json.loads(cur.fetchone()[0])))
assert sorted([pageserver.initial_tenant, tenant1]) == new_tenants
assert sorted([env.initial_tenant, tenant1]) == new_tenants
def check_client(client: ZenithPageserverHttpClient, initial_tenant: str):
@@ -98,12 +103,17 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: str):
assert branch_name in {b['name'] for b in client.branch_list(tenant_id)}
def test_pageserver_http_api_client(pageserver: ZenithPageserver):
client = pageserver.http_client()
check_client(client, pageserver.initial_tenant)
def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
client = env.pageserver.http_client()
check_client(client, env.initial_tenant)
def test_pageserver_http_api_client_auth_enabled(pageserver_auth_enabled: ZenithPageserver):
client = pageserver_auth_enabled.http_client(
auth_token=pageserver_auth_enabled.auth_keys.generate_management_token())
check_client(client, pageserver_auth_enabled.initial_tenant)
def test_pageserver_http_api_client_auth_enabled(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.pageserver_auth_enabled = True
env = zenith_env_builder.init()
management_token = env.auth_keys.generate_management_token()
client = env.pageserver.http_client(auth_token=management_token)
check_client(client, env.initial_tenant)

View File

@@ -4,7 +4,7 @@ import time
from contextlib import closing
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -13,16 +13,13 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
# times, with fault_probability chance of getting a wal acceptor down or up
# along the way. 2 of 3 are always alive, so the work keeps going.
def test_pageserver_restart(zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
wa_factory: WalAcceptorFactory):
def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test.
wa_factory.start_n_new(1)
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
zenith_cli.run(["branch", "test_pageserver_restart", "empty"])
pg = postgres.create_start('test_pageserver_restart', wal_acceptors=wa_factory.get_connstrs())
env.zenith_cli(["branch", "test_pageserver_restart", "main"])
pg = env.postgres.create_start('test_pageserver_restart')
pg_conn = pg.connect()
cur = pg_conn.cursor()
@@ -50,8 +47,8 @@ def test_pageserver_restart(zenith_cli,
# Stop and restart pageserver. This is a more or less graceful shutdown, although
# the page server doesn't currently have a shutdown routine so there's no difference
# between stopping and crashing.
pageserver.stop()
pageserver.start()
env.pageserver.stop()
env.pageserver.start()
# Stopping the pageserver breaks the connection from the postgres backend to
# the page server, and causes the next query on the connection to fail. Start a new
@@ -65,5 +62,5 @@ def test_pageserver_restart(zenith_cli,
assert cur.fetchone() == (100000, )
# Stop the page server by force, and restart it
pageserver.stop()
pageserver.start()
env.pageserver.stop()
env.pageserver.start()

View File

@@ -1,14 +1,15 @@
from fixtures.zenith_fixtures import PostgresFactory
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
def test_pgbench(postgres: PostgresFactory, pg_bin, zenith_cli):
def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_pgbench", "empty"])
env.zenith_cli(["branch", "test_pgbench", "empty"])
pg = postgres.create_start('test_pgbench')
pg = env.postgres.create_start('test_pgbench')
log.info("postgres is running on 'test_pgbench' branch")
connstr = pg.connstr()

View File

@@ -1,5 +1,5 @@
import subprocess
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -10,10 +10,11 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# This is very similar to the 'test_branch_behind' test, but instead of
# creating branches, creates read-only nodes.
#
def test_readonly_node(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
zenith_cli.run(["branch", "test_readonly_node", "empty"])
def test_readonly_node(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_readonly_node", "empty"])
pgmain = postgres.create_start('test_readonly_node')
pgmain = env.postgres.create_start('test_readonly_node')
print("postgres is running on 'test_readonly_node' branch")
main_pg_conn = pgmain.connect()
@@ -52,11 +53,12 @@ def test_readonly_node(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
print('LSN after 400100 rows: ' + lsn_c)
# Create first read-only node at the point where only 100 rows were inserted
pg_hundred = postgres.create_start("test_readonly_node_hundred",
branch=f'test_readonly_node@{lsn_a}')
pg_hundred = env.postgres.create_start("test_readonly_node_hundred",
branch=f'test_readonly_node@{lsn_a}')
# And another at the point where 200100 rows were inserted
pg_more = postgres.create_start("test_readonly_node_more", branch=f'test_readonly_node@{lsn_b}')
pg_more = env.postgres.create_start("test_readonly_node_more",
branch=f'test_readonly_node@{lsn_b}')
# On the 'hundred' node, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
@@ -75,15 +77,15 @@ def test_readonly_node(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
assert main_cur.fetchone() == (400100, )
# Check creating a node at segment boundary
pg = postgres.create_start("test_branch_segment_boundary",
branch="test_readonly_node@0/3000000")
pg = env.postgres.create_start("test_branch_segment_boundary",
branch="test_readonly_node@0/3000000")
cur = pg.connect().cursor()
cur.execute('SELECT 1')
assert cur.fetchone() == (1, )
# Create node at pre-initdb lsn
try:
zenith_cli.run(["pg", "start", "test_branch_preinitdb", "test_readonly_node@0/42"])
env.zenith_cli(["pg", "start", "test_branch_preinitdb", "test_readonly_node@0/42"])
assert False, "compute node startup with invalid LSN should have failed"
except Exception:
print("Node creation with pre-initdb LSN failed (as expected)")

View File

@@ -1,7 +1,7 @@
import pytest
from contextlib import closing
from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -11,22 +11,15 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# Test restarting and recreating a postgres instance
#
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_restart_compute(
zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin,
wa_factory,
with_wal_acceptors: bool,
):
wal_acceptor_connstrs = None
zenith_cli.run(["branch", "test_restart_compute", "empty"])
def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors:
wa_factory.start_n_new(3)
wal_acceptor_connstrs = wa_factory.get_connstrs()
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
pg = postgres.create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs)
env.zenith_cli(["branch", "test_restart_compute", "main"])
pg = env.postgres.create_start('test_restart_compute')
log.info("postgres is running on 'test_restart_compute' branch")
with closing(pg.connect()) as conn:
@@ -39,7 +32,7 @@ def test_restart_compute(
log.info(f"res = {r}")
# Remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs)
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
@@ -58,7 +51,7 @@ def test_restart_compute(
log.info(f"res = {r}")
# Again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs)
pg.stop_and_destroy().create_start('test_restart_compute')
# That select causes lots of FPI's and increases probability of wakeepers
# lagging behind after query completion
@@ -72,7 +65,7 @@ def test_restart_compute(
log.info(f"res = {r}")
# And again remove data directory and restart
pg.stop_and_destroy().create_start('test_restart_compute', wal_acceptors=wal_acceptor_connstrs)
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:

View File

@@ -1,6 +1,7 @@
from contextlib import closing
import psycopg2.extras
import time
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -22,13 +23,14 @@ def print_gc_result(row):
# This test is pretty tightly coupled with the current implementation of layered
# storage, in layered_repository.rs.
#
def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_layerfiles_gc", "empty"])
pg = postgres.create_start('test_layerfiles_gc')
def test_layerfiles_gc(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_layerfiles_gc", "empty"])
pg = env.postgres.create_start('test_layerfiles_gc')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
with closing(pageserver.connect()) as psconn:
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# Get the timeline ID of our branch. We need it for the 'do_gc' command
@@ -57,7 +59,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
cur.execute("DELETE FROM foo")
log.info("Running GC before test")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
# remember the number of files
@@ -70,7 +72,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
# removing the old image and delta layer.
log.info("Inserting one row and running GC")
cur.execute("INSERT INTO foo VALUES (1)")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
@@ -84,7 +86,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
cur.execute("INSERT INTO foo VALUES (2)")
cur.execute("INSERT INTO foo VALUES (3)")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
@@ -96,7 +98,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
cur.execute("INSERT INTO foo VALUES (2)")
cur.execute("INSERT INTO foo VALUES (3)")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain + 2
@@ -105,7 +107,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
# Run GC again, with no changes in the database. Should not remove anything.
log.info("Run GC again, with nothing to do")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
assert row['layer_relfiles_total'] == layer_relfiles_remain
@@ -118,7 +120,7 @@ def test_layerfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
log.info("Drop table and run GC again")
cur.execute("DROP TABLE foo")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)

View File

@@ -2,51 +2,41 @@ from contextlib import closing
import pytest
from fixtures.zenith_fixtures import (
TenantFactory,
ZenithCli,
PostgresFactory,
)
from fixtures.zenith_fixtures import ZenithEnvBuilder
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_tenants_normal_work(
zenith_cli: ZenithCli,
tenant_factory: TenantFactory,
postgres: PostgresFactory,
wa_factory,
with_wal_acceptors: bool,
):
"""Tests tenants with and without wal acceptors"""
tenant_1 = tenant_factory.create()
tenant_2 = tenant_factory.create()
def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
if with_wal_acceptors:
zenith_env_builder.num_safekeepers = 3
zenith_cli.run([
env = zenith_env_builder.init()
"""Tests tenants with and without wal acceptors"""
tenant_1 = env.create_tenant()
tenant_2 = env.create_tenant()
env.zenith_cli([
"branch",
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
"main",
f"--tenantid={tenant_1}"
])
zenith_cli.run([
env.zenith_cli([
"branch",
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
"main",
f"--tenantid={tenant_2}"
])
if with_wal_acceptors:
wa_factory.start_n_new(3)
pg_tenant1 = postgres.create_start(
pg_tenant1 = env.postgres.create_start(
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
None, # branch name, None means same as node name
tenant_1,
wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None,
)
pg_tenant2 = postgres.create_start(
pg_tenant2 = env.postgres.create_start(
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
None, # branch name, None means same as node name
tenant_2,
wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None,
)
for pg in [pg_tenant1, pg_tenant2]:

View File

@@ -1,19 +1,20 @@
from contextlib import closing
from uuid import UUID
import psycopg2.extras
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
def test_timeline_size(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
def test_timeline_size(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Branch at the point where only 100 rows were inserted
zenith_cli.run(["branch", "test_timeline_size", "empty"])
env.zenith_cli(["branch", "test_timeline_size", "empty"])
client = pageserver.http_client()
res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size")
client = env.pageserver.http_client()
res = client.branch_detail(UUID(env.initial_tenant), "test_timeline_size")
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
pgmain = postgres.create_start("test_timeline_size")
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
with closing(pgmain.connect()) as conn:
@@ -28,9 +29,9 @@ def test_timeline_size(zenith_cli, pageserver: ZenithPageserver, postgres: Postg
FROM generate_series(1, 10) g
""")
res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size")
res = client.branch_detail(UUID(env.initial_tenant), "test_timeline_size")
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
cur.execute("TRUNCATE foo")
res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size")
res = client.branch_detail(UUID(env.initial_tenant), "test_timeline_size")
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]

View File

@@ -1,6 +1,6 @@
import os
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver, PgBin
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -9,13 +9,11 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test branching, when a transaction is in prepared state
#
def test_twophase(zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin: PgBin):
zenith_cli.run(["branch", "test_twophase", "empty"])
def test_twophase(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli(["branch", "test_twophase", "empty"])
pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5'])
pg = env.postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5'])
log.info("postgres is running on 'test_twophase' branch")
conn = pg.connect()
@@ -60,10 +58,10 @@ def test_twophase(zenith_cli,
assert len(twophase_files) == 2
# Create a branch with the transaction in prepared state
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"])
env.zenith_cli(["branch", "test_twophase_prepared", "test_twophase"])
# Start compute on the new branch
pg2 = postgres.create_start(
pg2 = env.postgres.create_start(
'test_twophase_prepared',
config_lines=['max_prepared_transactions=5'],
)

View File

@@ -1,4 +1,4 @@
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -8,14 +8,12 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# Test that the VM bit is cleared correctly at a HEAP_DELETE and
# HEAP_UPDATE record.
#
def test_vm_bit_clear(pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin,
zenith_cli,
base_dir):
def test_vm_bit_clear(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_vm_bit_clear", "empty"])
pg = postgres.create_start('test_vm_bit_clear')
env.zenith_cli(["branch", "test_vm_bit_clear", "empty"])
pg = env.postgres.create_start('test_vm_bit_clear')
log.info("postgres is running on 'test_vm_bit_clear' branch")
pg_conn = pg.connect()
@@ -38,7 +36,7 @@ def test_vm_bit_clear(pageserver: ZenithPageserver,
cur.execute('UPDATE vmtest_update SET id = 5000 WHERE id = 1')
# Branch at this point, to test that later
zenith_cli.run(["branch", "test_vm_bit_clear_new", "test_vm_bit_clear"])
env.zenith_cli(["branch", "test_vm_bit_clear_new", "test_vm_bit_clear"])
# Clear the buffer cache, to force the VM page to be re-fetched from
# the page server
@@ -66,7 +64,7 @@ def test_vm_bit_clear(pageserver: ZenithPageserver,
# a dirty VM page is evicted. If the VM bit was not correctly cleared by the
# earlier WAL record, the full-page image hides the problem. Starting a new
# server at the right point-in-time avoids that full-page image.
pg_new = postgres.create_start('test_vm_bit_clear_new')
pg_new = env.postgres.create_start('test_vm_bit_clear_new')
log.info("postgres is running on 'test_vm_bit_clear_new' branch")
pg_new_conn = pg_new.connect()

View File

@@ -7,7 +7,7 @@ import uuid
from contextlib import closing
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory, PgBin
from fixtures.zenith_fixtures import PgBin, ZenithEnv, ZenithEnvBuilder
from fixtures.utils import lsn_to_hex, mkdir_if_needed
from fixtures.log_helper import log
@@ -16,14 +16,13 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# basic test, write something in setup with wal acceptors, ensure that commits
# succeed and data is written
def test_normal_work(zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
wa_factory):
zenith_cli.run(["branch", "test_wal_acceptors_normal_work", "empty"])
wa_factory.start_n_new(3)
pg = postgres.create_start('test_wal_acceptors_normal_work',
wal_acceptors=wa_factory.get_connstrs())
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env.zenith_cli(["branch", "test_wal_acceptors_normal_work", "main"])
pg = env.postgres.create_start('test_wal_acceptors_normal_work')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
@@ -37,21 +36,19 @@ def test_normal_work(zenith_cli,
# Run page server and multiple acceptors, and multiple compute nodes running
# against different timelines.
def test_many_timelines(zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
wa_factory):
n_timelines = 2
def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
wa_factory.start_n_new(3)
n_timelines = 2
branches = ["test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)]
# start postgres on each timeline
pgs = []
for branch in branches:
zenith_cli.run(["branch", branch, "empty"])
pgs.append(postgres.create_start(branch, wal_acceptors=wa_factory.get_connstrs()))
env.zenith_cli(["branch", branch, "main"])
pgs.append(env.postgres.create_start(branch))
# Do everything in different loops to have actions on different timelines
# interleaved.
@@ -72,19 +69,16 @@ def test_many_timelines(zenith_cli,
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
# times, with fault_probability chance of getting a wal acceptor down or up
# along the way. 2 of 3 are always alive, so the work keeps going.
def test_restarts(zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
wa_factory: WalAcceptorFactory):
def test_restarts(zenith_env_builder: ZenithEnvBuilder):
fault_probability = 0.01
n_inserts = 1000
n_acceptors = 3
wa_factory.start_n_new(n_acceptors)
zenith_env_builder.num_safekeepers = n_acceptors
env = zenith_env_builder.init()
zenith_cli.run(["branch", "test_wal_acceptors_restarts", "empty"])
pg = postgres.create_start('test_wal_acceptors_restarts',
wal_acceptors=wa_factory.get_connstrs())
env.zenith_cli(["branch", "test_wal_acceptors_restarts", "main"])
pg = env.postgres.create_start('test_wal_acceptors_restarts')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -98,7 +92,7 @@ def test_restarts(zenith_cli,
if random.random() <= fault_probability:
if failed_node is None:
failed_node = wa_factory.instances[random.randrange(0, n_acceptors)]
failed_node = env.safekeepers[random.randrange(0, n_acceptors)]
failed_node.stop()
else:
failed_node.start()
@@ -116,12 +110,12 @@ def delayed_wal_acceptor_start(wa):
# When majority of acceptors is offline, commits are expected to be frozen
def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory):
wa_factory.start_n_new(2)
def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
env = zenith_env_builder.init()
zenith_cli.run(["branch", "test_wal_acceptors_unavailability", "empty"])
pg = postgres.create_start('test_wal_acceptors_unavailability',
wal_acceptors=wa_factory.get_connstrs())
env.zenith_cli(["branch", "test_wal_acceptors_unavailability", "main"])
pg = env.postgres.create_start('test_wal_acceptors_unavailability')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -133,9 +127,9 @@ def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory):
cur.execute("INSERT INTO t values (1, 'payload')")
# shutdown one of two acceptors, that is, majority
wa_factory.instances[0].stop()
env.safekeepers[0].stop()
proc = Process(target=delayed_wal_acceptor_start, args=(wa_factory.instances[0], ))
proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[0], ))
proc.start()
start = time.time()
@@ -145,9 +139,9 @@ def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory):
proc.join()
# for the world's balance, do the same with second acceptor
wa_factory.instances[1].stop()
env.safekeepers[1].stop()
proc = Process(target=delayed_wal_acceptor_start, args=(wa_factory.instances[1], ))
proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[1], ))
proc.start()
start = time.time()
@@ -186,17 +180,13 @@ def stop_value():
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
wa_factory,
stop_value):
def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
wa_factory.start_n_new(3)
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
zenith_cli.run(["branch", "test_wal_acceptors_race_conditions", "empty"])
pg = postgres.create_start('test_wal_acceptors_race_conditions',
wal_acceptors=wa_factory.get_connstrs())
env.zenith_cli(["branch", "test_wal_acceptors_race_conditions", "main"])
pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -205,7 +195,7 @@ def test_race_conditions(zenith_cli,
cur.execute('CREATE TABLE t(key int primary key, value text)')
proc = Process(target=xmas_garland, args=(wa_factory.instances, stop_value))
proc = Process(target=xmas_garland, args=(env.safekeepers, stop_value))
proc.start()
for i in range(1000):
@@ -220,7 +210,8 @@ def test_race_conditions(zenith_cli,
class ProposerPostgres:
"""Object for running safekeepers sync with walproposer"""
def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str):
def __init__(self, env: ZenithEnv, pgdata_dir: str, pg_bin, timeline_id: str, tenant_id: str):
self.env = env
self.pgdata_dir: str = pgdata_dir
self.pg_bin: PgBin = pg_bin
self.timeline_id: str = timeline_id
@@ -266,16 +257,20 @@ class ProposerPostgres:
# insert wal in all safekeepers and run sync on proposer
def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory):
wa_factory.start_n_new(3)
def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin):
# We don't really need the full environment for this test, just the
# safekeepers would be enough.
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
timeline_id = uuid.uuid4().hex
tenant_id = uuid.uuid4().hex
# write config for proposer
pgdata_dir = os.path.join(repo_dir, "proposer_pgdata")
pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id)
pg.create_dir_config(wa_factory.get_connstrs())
pgdata_dir = os.path.join(env.repo_dir, "proposer_pgdata")
pg = ProposerPostgres(env, pgdata_dir, pg_bin, timeline_id, tenant_id)
pg.create_dir_config(env.get_safekeeper_connstrs())
# valid lsn, which is not in the segment start, nor in zero segment
epoch_start_lsn = 0x16B9188 # 0/16B9188
@@ -284,7 +279,7 @@ def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorF
# append and commit WAL
lsn_after_append = []
for i in range(3):
res = wa_factory.instances[i].append_logical_message(
res = env.safekeepers[i].append_logical_message(
tenant_id,
timeline_id,
{
@@ -308,13 +303,15 @@ def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorF
assert all(lsn_after_sync == lsn for lsn in lsn_after_append)
def test_timeline_status(zenith_cli, pageserver, postgres, wa_factory: WalAcceptorFactory):
wa_factory.start_n_new(1)
def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
zenith_cli.run(["branch", "test_timeline_status", "empty"])
pg = postgres.create_start('test_timeline_status', wal_acceptors=wa_factory.get_connstrs())
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
wa = wa_factory.instances[0]
env.zenith_cli(["branch", "test_timeline_status", "main"])
pg = env.postgres.create_start('test_timeline_status')
wa = env.safekeepers[0]
wa_http_cli = wa.http_client()
wa_http_cli.check_status()

View File

@@ -3,7 +3,7 @@ import asyncpg
import random
import time
from fixtures.zenith_fixtures import WalAcceptor, WalAcceptorFactory, ZenithPageserver, PostgresFactory, Postgres
from fixtures.zenith_fixtures import ZenithEnvBuilder, Postgres, Safekeeper
from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List
@@ -104,7 +104,7 @@ async def run_random_worker(stats: WorkerStats, pg: Postgres, worker_id, n_accou
await pg_conn.close()
async def wait_for_lsn(safekeeper: WalAcceptor,
async def wait_for_lsn(safekeeper: Safekeeper,
tenant_id: str,
timeline_id: str,
wait_lsn: str,
@@ -140,7 +140,7 @@ async def wait_for_lsn(safekeeper: WalAcceptor,
# On each iteration 1 acceptor is stopped, and 2 others should allow
# background workers execute transactions. In the end, state should remain
# consistent.
async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_workers=10):
async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_workers=10):
n_accounts = 100
init_amount = 100000
max_transfer = 100
@@ -192,18 +192,14 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[WalAcceptor], n_
# restart acceptors one by one, while executing and validating bank transactions
def test_restarts_under_load(zenith_cli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
wa_factory: WalAcceptorFactory):
def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
wa_factory.start_n_new(3)
env.zenith_cli(["branch", "test_wal_acceptors_restarts_under_load", "main"])
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load')
zenith_cli.run(["branch", "test_wal_acceptors_restarts_under_load", "empty"])
pg = postgres.create_start('test_wal_acceptors_restarts_under_load',
wal_acceptors=wa_factory.get_connstrs())
asyncio.run(run_restarts_under_load(pg, wa_factory.instances))
asyncio.run(run_restarts_under_load(pg, env.safekeepers))
# TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed
pg.stop()

View File

@@ -2,15 +2,13 @@ import json
import uuid
from psycopg2.extensions import cursor as PgCursor
from fixtures.zenith_fixtures import ZenithCli, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from typing import cast
pytest_plugins = ("fixtures.zenith_fixtures")
def helper_compare_branch_list(page_server_cur: PgCursor,
zenith_cli: ZenithCli,
initial_tenant: str):
def helper_compare_branch_list(page_server_cur: PgCursor, env: ZenithEnv, initial_tenant: str):
"""
Compare branches list returned by CLI and directly via API.
Filters out branches created by other tests.
@@ -21,12 +19,12 @@ def helper_compare_branch_list(page_server_cur: PgCursor,
map(lambda b: cast(str, b['name']), json.loads(page_server_cur.fetchone()[0])))
branches_api = [b for b in branches_api if b.startswith('test_cli_') or b in ('empty', 'main')]
res = zenith_cli.run(["branch"])
res = env.zenith_cli(["branch"])
res.check_returncode()
branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n")))
branches_cli = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')]
res = zenith_cli.run(["branch", f"--tenantid={initial_tenant}"])
res = env.zenith_cli(["branch", f"--tenantid={initial_tenant}"])
res.check_returncode()
branches_cli_with_tenant_arg = sorted(
map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n")))
@@ -37,25 +35,26 @@ def helper_compare_branch_list(page_server_cur: PgCursor,
assert branches_api == branches_cli == branches_cli_with_tenant_arg
def test_cli_branch_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli):
page_server_conn = pageserver.connect()
def test_cli_branch_list(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
page_server_conn = env.pageserver.connect()
page_server_cur = page_server_conn.cursor()
# Initial sanity check
helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant)
helper_compare_branch_list(page_server_cur, env, env.initial_tenant)
# Create a branch for us
res = zenith_cli.run(["branch", "test_cli_branch_list_main", "main"])
res = env.zenith_cli(["branch", "test_cli_branch_list_main", "empty"])
assert res.stderr == ''
helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant)
helper_compare_branch_list(page_server_cur, env, env.initial_tenant)
# Create a nested branch
res = zenith_cli.run(["branch", "test_cli_branch_list_nested", "test_cli_branch_list_main"])
res = env.zenith_cli(["branch", "test_cli_branch_list_nested", "test_cli_branch_list_main"])
assert res.stderr == ''
helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant)
helper_compare_branch_list(page_server_cur, env, env.initial_tenant)
# Check that all new branches are visible via CLI
res = zenith_cli.run(["branch"])
res = env.zenith_cli(["branch"])
assert res.stderr == ''
branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n")))
@@ -63,45 +62,46 @@ def test_cli_branch_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli):
assert 'test_cli_branch_list_nested' in branches_cli
def helper_compare_tenant_list(page_server_cur: PgCursor, zenith_cli: ZenithCli):
def helper_compare_tenant_list(page_server_cur: PgCursor, env: ZenithEnv):
page_server_cur.execute(f'tenant_list')
tenants_api = sorted(
map(lambda t: cast(str, t['id']), json.loads(page_server_cur.fetchone()[0])))
res = zenith_cli.run(["tenant", "list"])
res = env.zenith_cli(["tenant", "list"])
assert res.stderr == ''
tenants_cli = sorted(map(lambda t: t.split()[0], res.stdout.splitlines()))
assert tenants_api == tenants_cli
def test_cli_tenant_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli):
page_server_conn = pageserver.connect()
def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
page_server_conn = env.pageserver.connect()
page_server_cur = page_server_conn.cursor()
# Initial sanity check
helper_compare_tenant_list(page_server_cur, zenith_cli)
helper_compare_tenant_list(page_server_cur, env)
# Create new tenant
tenant1 = uuid.uuid4().hex
res = zenith_cli.run(["tenant", "create", tenant1])
res = env.zenith_cli(["tenant", "create", tenant1])
res.check_returncode()
# check tenant1 appeared
helper_compare_tenant_list(page_server_cur, zenith_cli)
helper_compare_tenant_list(page_server_cur, env)
# Create new tenant
tenant2 = uuid.uuid4().hex
res = zenith_cli.run(["tenant", "create", tenant2])
res = env.zenith_cli(["tenant", "create", tenant2])
res.check_returncode()
# check tenant2 appeared
helper_compare_tenant_list(page_server_cur, zenith_cli)
helper_compare_tenant_list(page_server_cur, env)
res = zenith_cli.run(["tenant", "list"])
res = env.zenith_cli(["tenant", "list"])
res.check_returncode()
tenants = sorted(map(lambda t: t.split()[0], res.stdout.splitlines()))
assert pageserver.initial_tenant in tenants
assert env.initial_tenant in tenants
assert tenant1 in tenants
assert tenant2 in tenants

View File

@@ -1,24 +1,20 @@
import os
from fixtures.utils import mkdir_if_needed
from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory, base_dir, pg_distrib_dir
from fixtures.zenith_fixtures import ZenithEnv, base_dir, pg_distrib_dir
pytest_plugins = ("fixtures.zenith_fixtures")
def test_isolation(pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin,
zenith_cli,
test_output_dir,
capsys):
def test_isolation(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_isolation", "empty"])
env.zenith_cli(["branch", "test_isolation", "empty"])
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
pg = postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100'])
pg = env.postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100'])
pg.safe_psql('CREATE DATABASE isolation_regression')
# Create some local directories for pg_isolation_regress to run in.
@@ -42,7 +38,7 @@ def test_isolation(pageserver: ZenithPageserver,
'--schedule={}'.format(schedule),
]
env = {
env_vars = {
'PGPORT': str(pg.port),
'PGUSER': pg.username,
'PGHOST': pg.host,
@@ -52,4 +48,4 @@ def test_isolation(pageserver: ZenithPageserver,
# We don't capture the output. It's not too chatty, and it always
# logs the exact same data to `regression.out` anyway.
with capsys.disabled():
pg_bin.run(pg_isolation_regress_command, env=env, cwd=runpath)
pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath)

View File

@@ -1,23 +1,19 @@
import os
from fixtures.utils import mkdir_if_needed
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver, check_restored_datadir_content, base_dir, pg_distrib_dir
from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content, base_dir, pg_distrib_dir
pytest_plugins = ("fixtures.zenith_fixtures")
def test_pg_regress(pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin,
zenith_cli,
test_output_dir,
capsys):
def test_pg_regress(zenith_simple_env: ZenithEnv, test_output_dir: str, pg_bin, capsys):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_pg_regress", "empty"])
env.zenith_cli(["branch", "test_pg_regress", "empty"])
# Connect to postgres and create a database called "regression".
pg = postgres.create_start('test_pg_regress')
pg = env.postgres.create_start('test_pg_regress')
pg.safe_psql('CREATE DATABASE regression')
# Create some local directories for pg_regress to run in.
@@ -42,7 +38,7 @@ def test_pg_regress(pageserver: ZenithPageserver,
'--inputdir={}'.format(src_path),
]
env = {
env_vars = {
'PGPORT': str(pg.port),
'PGUSER': pg.username,
'PGHOST': pg.host,
@@ -52,11 +48,11 @@ def test_pg_regress(pageserver: ZenithPageserver,
# We don't capture the output. It's not too chatty, and it always
# logs the exact same data to `regression.out` anyway.
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env, cwd=runpath)
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
# checkpoint one more time to ensure that the lsn we get is the latest one
pg.safe_psql('CHECKPOINT')
lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, pg, pageserver.service_port.pg)
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -1,8 +1,7 @@
import os
from fixtures.utils import mkdir_if_needed
from fixtures.zenith_fixtures import (PageserverPort,
PostgresFactory,
from fixtures.zenith_fixtures import (ZenithEnv,
check_restored_datadir_content,
base_dir,
pg_distrib_dir)
@@ -11,18 +10,14 @@ from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
def test_zenith_regress(postgres: PostgresFactory,
pg_bin,
zenith_cli,
test_output_dir,
capsys,
pageserver_port: PageserverPort):
def test_zenith_regress(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_zenith_regress", "empty"])
env.zenith_cli(["branch", "test_zenith_regress", "empty"])
# Connect to postgres and create a database called "regression".
pg = postgres.create_start('test_zenith_regress')
pg = env.postgres.create_start('test_zenith_regress')
pg.safe_psql('CREATE DATABASE regression')
# Create some local directories for pg_regress to run in.
@@ -48,7 +43,7 @@ def test_zenith_regress(postgres: PostgresFactory,
]
log.info(pg_regress_command)
env = {
env_vars = {
'PGPORT': str(pg.port),
'PGUSER': pg.username,
'PGHOST': pg.host,
@@ -58,11 +53,11 @@ def test_zenith_regress(postgres: PostgresFactory,
# We don't capture the output. It's not too chatty, and it always
# logs the exact same data to `regression.out` anyway.
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env, cwd=runpath)
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
# checkpoint one more time to ensure that the lsn we get is the latest one
pg.safe_psql('CHECKPOINT')
lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, pg, pageserver_port.pg)
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -31,11 +31,11 @@ To use, declare the 'zenbenchmark' fixture in the test function. Run the
bencmark, and then record the result by calling zenbenchmark.record. For example:
import timeit
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def test_mybench(postgres: PostgresFactory, pageserver: ZenithPageserver, zenbenchmark):
def test_mybench(zenith_simple_env: env, zenbenchmark):
# Initialize the test
...

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
from dataclasses import dataclass
from cached_property import cached_property
import asyncpg
@@ -137,6 +139,41 @@ def zenfixture(func: Fn) -> Fn:
return pytest.fixture(func, scope=scope)
@zenfixture
def worker_seq_no(worker_id: str):
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == 'master':
return 0
assert worker_id.startswith('gw')
return int(worker_id[2:])
@zenfixture
def worker_base_port(worker_seq_no: int):
# so we divide ports in ranges of 100 ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * WORKER_PORT_NUM
class PortDistributor:
def __init__(self, base_port: int, port_number: int) -> None:
self.iterator = iter(range(base_port, base_port + port_number))
def get_port(self) -> int:
try:
return next(self.iterator)
except StopIteration:
raise RuntimeError(
'port range configured for test is exhausted, consider enlarging the range')
@zenfixture
def port_distributor(worker_base_port):
return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM)
class PgProtocol:
""" Reusable connection logic """
def __init__(self, host: str, port: int, username: Optional[str] = None):
@@ -214,21 +251,158 @@ class PgProtocol:
return cast(List[Any], cur.fetchall())
class ZenithCli:
"""
An object representing the CLI binary named "zenith".
@dataclass
class AuthKeys:
pub: bytes
priv: bytes
We also store an environment that will tell the CLI to operate
on a particular ZENITH_REPO_DIR.
def generate_management_token(self):
token = jwt.encode({"scope": "pageserverapi"}, self.priv, algorithm="RS256")
# jwt.encode can return 'bytes' or 'str', depending on Python version or type
# hinting or something (not sure what). If it returned 'bytes', convert it to 'str'
# explicitly.
if isinstance(token, bytes):
token = token.decode()
return token
def generate_tenant_token(self, tenant_id):
token = jwt.encode({
"scope": "tenant", "tenant_id": tenant_id
},
self.priv,
algorithm="RS256")
if isinstance(token, bytes):
token = token.decode()
return token
class ZenithEnvBuilder:
"""
def __init__(self, repo_dir: str):
self.bin_zenith = os.path.join(zenith_binpath, 'zenith')
Builder object to create a Zenith runtime environment
You should use the `zenith_env_builder` or `zenith_simple_env` pytest
fixture to create the ZenithEnv object. That way, the repository is
created in the right directory, based on the test name, and it's properly
cleaned up after the test has finished.
"""
def __init__(self,
repo_dir: Path,
port_distributor: PortDistributor,
num_safekeepers: int = 0,
pageserver_auth_enabled: bool = False):
self.repo_dir = repo_dir
self.env = os.environ.copy()
self.env['ZENITH_REPO_DIR'] = repo_dir
self.env['POSTGRES_DISTRIB_DIR'] = pg_distrib_dir
self.port_distributor = port_distributor
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.env: Optional[ZenithEnv] = None
def run(self, arguments: List[str]) -> 'subprocess.CompletedProcess[str]':
def init(self) -> ZenithEnv:
# Cannot create more than one environment from one builder
assert self.env is None, "environment already initialized"
self.env = ZenithEnv(self)
return self.env
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
# After the yield comes any cleanup code we need. Stop all the nodes.
if self.env:
log.info('Cleaning up all storage and compute nodes')
self.env.postgres.stop_all()
for sk in self.env.safekeepers:
sk.stop()
self.env.pageserver.stop(immediate=True)
class ZenithEnv:
"""
An object representing the Zenith runtime environment. It consists of
the page server, 0-N safekeepers, and the compute nodes.
ZenithEnv contains functions for stopping/starting nodes in the
environment, checking their status, creating tenants, connecting to the
nodes, creating and destroying compute nodes, etc. The page server and
the safekeepers are considered fixed in the environment, you cannot
create or destroy them after the environment is initialized. (That will
likely change in the future, as we start supporting multiple page
servers and adding/removing safekeepers on the fly).
Some notable functions and fields in ZenithEnv:
postgres - A factory object for creating postgres compute nodes.
pageserver - An object that contains functions for manipulating and
connecting to the pageserver
safekeepers - An array containing objects representing the safekeepers
pg_bin - pg_bin.run() can be used to execute Postgres client binaries,
like psql or pg_dump
initial_tenant - tenant ID of the initial tenant created in the repository
zenith_cli() - zenith_cli() can be used to run the 'zenith' CLI tool
create_tenant() - initializes a new tenant in the page server, returns
the tenant id
"""
def __init__(self, config: ZenithEnvBuilder):
self.repo_dir = config.repo_dir
self.port_distributor = config.port_distributor
self.postgres = PostgresFactory(self)
self.safekeepers: List[Safekeeper] = []
# Create and start up the pageserver, and safekeepers if any
pageserver_port = PageserverPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
self.pageserver = ZenithPageserver(self,
port=pageserver_port,
enable_auth=config.pageserver_auth_enabled)
self.pageserver.start()
# since we are in progress of refactoring protocols between compute safekeeper
# and page server use hardcoded management token in safekeeper
management_token = self.auth_keys.generate_management_token() \
if config.pageserver_auth_enabled else None
# get newly created tenant id
self.initial_tenant = self.zenith_cli(['tenant', 'list']).stdout.split()[0]
# Start up safekeepers
for wa_num in range(config.num_safekeepers):
wa = Safekeeper(env=self,
data_dir=Path(os.path.join(self.repo_dir, f"safekeeper_{wa_num}")),
port=SafekeeperPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
),
num=wa_num,
auth_token=management_token)
wa.start()
self.safekeepers.append(wa)
def get_safekeeper_connstrs(self) -> str:
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
def create_tenant(self, tenant_id: Optional[str] = None):
if tenant_id is None:
tenant_id = uuid.uuid4().hex
res = self.zenith_cli(['tenant', 'create', tenant_id])
res.check_returncode()
return tenant_id
def zenith_cli(self, arguments: List[str]) -> 'subprocess.CompletedProcess[str]':
"""
Run "zenith" with the specified arguments.
@@ -236,24 +410,32 @@ class ZenithCli:
Return both stdout and stderr, which can be accessed as
>>> result = zenith_cli.run(...)
>>> result = env.zenith_cli(...)
>>> assert result.stderr == ""
>>> log.info(result.stdout)
"""
assert type(arguments) == list
args = [self.bin_zenith] + arguments
log.info('Running command "{}"'.format(' '.join(args)))
bin_zenith = os.path.join(str(zenith_binpath), 'zenith')
# Interceipt CalledProcessError and print more info
args = [bin_zenith] + arguments
log.info('Running command "{}"'.format(' '.join(args)))
log.info(f'Running in "{self.repo_dir}"')
env_vars = os.environ.copy()
env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir)
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
# Intercept CalledProcessError and print more info
try:
res = subprocess.run(args,
env=self.env,
env=env_vars,
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log.info(f"Run success: {res.stdout}")
except subprocess.CalledProcessError as exc:
# this way command output will be in recorded and shown in CI in failure message
msg = f"""\
@@ -267,10 +449,75 @@ class ZenithCli:
return res
@cached_property
def auth_keys(self) -> AuthKeys:
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()
priv = (Path(self.repo_dir) / 'auth_private_key.pem').read_bytes()
return AuthKeys(pub=pub, priv=priv)
@zenfixture
def zenith_cli(repo_dir: str) -> ZenithCli:
return ZenithCli(repo_dir)
def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
"""
Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `zenith_simple_env`.
"""
if os.environ.get('TEST_SHARED_FIXTURES') is None:
# Create the environment in the per-test output directory
repo_dir = os.path.join(get_test_output_dir(request), "repo")
else:
# We're running shared fixtures. Share a single directory.
repo_dir = os.path.join(str(top_output_dir), "shared_repo")
shutil.rmtree(repo_dir, ignore_errors=True)
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
env = builder.init()
# For convenience in tests, create a branch from the freshly-initialized cluster.
env.zenith_cli(["branch", "empty", "main"])
# Return the builder to the caller
yield env
@pytest.fixture(scope='function')
def zenith_simple_env(_shared_simple_env: ZenithEnv) -> Iterator[ZenithEnv]:
"""
Simple Zenith environment, with no authentication and no safekeepers.
If TEST_SHARED_FIXTURES environment variable is set, we reuse the same
environment for all tests that use 'zenith_simple_env', keeping the
page server and safekeepers running. Any compute nodes are stopped after
each the test, however.
"""
yield _shared_simple_env
_shared_simple_env.postgres.stop_all()
@pytest.fixture(scope='function')
def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvBuilder]:
"""
Fixture to create a Zenith environment for test.
To use, define 'zenith_env_builder' fixture in your test to get access to the
builder object. Set properties on it to describe the environment.
Finally, initialize and start up the environment by calling
zenith_env_builder.init().
After the initialization, you can launch compute nodes by calling
the functions in the 'env.postgres' factory object, stop/start the
nodes, etc.
"""
# Create the environment in the test-specific output dir
repo_dir = os.path.join(test_output_dir, "repo")
# Return the builder to the caller
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
yield builder
class ZenithPageserverHttpClient(requests.Session):
@@ -334,70 +581,6 @@ class ZenithPageserverHttpClient(requests.Session):
return res.text
@dataclass
class AuthKeys:
pub: bytes
priv: bytes
def generate_management_token(self):
token = jwt.encode({"scope": "pageserverapi"}, self.priv, algorithm="RS256")
# jwt.encode can return 'bytes' or 'str', depending on Python version or type
# hinting or something (not sure what). If it returned 'bytes', convert it to 'str'
# explicitly.
if isinstance(token, bytes):
token = token.decode()
return token
def generate_tenant_token(self, tenant_id):
token = jwt.encode({
"scope": "tenant", "tenant_id": tenant_id
},
self.priv,
algorithm="RS256")
if isinstance(token, bytes):
token = token.decode()
return token
@zenfixture
def worker_seq_no(worker_id: str):
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == 'master':
return 0
assert worker_id.startswith('gw')
return int(worker_id[2:])
@zenfixture
def worker_base_port(worker_seq_no: int):
# so we divide ports in ranges of 100 ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * WORKER_PORT_NUM
class PortDistributor:
def __init__(self, base_port: int, port_number: int) -> None:
self.iterator = iter(range(base_port, base_port + port_number))
def get_port(self) -> int:
try:
return next(self.iterator)
except StopIteration:
raise RuntimeError(
'port range configured for test is exhausted, consider enlarging the range')
@zenfixture
def port_distributor(worker_base_port):
return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM)
@dataclass
class PageserverPort:
pg: int
@@ -408,18 +591,12 @@ class ZenithPageserver(PgProtocol):
"""
An object representing a running pageserver.
Initializes the repository via `zenith init` and starts page server.
Initializes the repository via `zenith init`.
"""
def __init__(self,
zenith_cli: ZenithCli,
repo_dir: str,
port: PageserverPort,
enable_auth=False):
def __init__(self, env: ZenithEnv, port: PageserverPort, enable_auth=False):
super().__init__(host='localhost', port=port.pg)
self.zenith_cli = zenith_cli
self.env = env
self.running = False
self.initial_tenant: str = cast(str, None) # Will be fixed by self.start() below
self.repo_dir = repo_dir
self.service_port = port # do not shadow PgProtocol.port which is just int
cmd = [
@@ -429,9 +606,7 @@ class ZenithPageserver(PgProtocol):
]
if enable_auth:
cmd.append('--enable-auth')
self.zenith_cli.run(cmd)
self.start() # Required, otherwise self.initial_tenant is of wrong type
self.env.zenith_cli(cmd)
def start(self) -> 'ZenithPageserver':
"""
@@ -440,14 +615,8 @@ class ZenithPageserver(PgProtocol):
"""
assert self.running == False
self.zenith_cli.run(['start'])
self.env.zenith_cli(['start'])
self.running = True
# get newly created tenant id
current_tenant = self.zenith_cli.run(['tenant', 'list']).stdout.split()[0]
if self.initial_tenant is None:
self.initial_tenant = current_tenant
else:
assert self.initial_tenant == current_tenant
return self
def stop(self, immediate=False) -> 'ZenithPageserver':
@@ -461,7 +630,7 @@ class ZenithPageserver(PgProtocol):
log.info(f"Stopping pageserver with {cmd}")
if self.running:
self.zenith_cli.run(cmd)
self.env.zenith_cli(cmd)
self.running = False
return self
@@ -472,12 +641,6 @@ class ZenithPageserver(PgProtocol):
def __exit__(self, exc_type, exc, tb):
self.stop(True)
@cached_property
def auth_keys(self) -> AuthKeys:
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()
priv = (Path(self.repo_dir) / 'auth_private_key.pem').read_bytes()
return AuthKeys(pub=pub, priv=priv)
def http_client(self, auth_token: Optional[str] = None):
return ZenithPageserverHttpClient(
port=self.service_port.http,
@@ -485,47 +648,13 @@ class ZenithPageserver(PgProtocol):
)
@zenfixture
def pageserver_port(port_distributor: PortDistributor) -> PageserverPort:
pg = port_distributor.get_port()
http = port_distributor.get_port()
log.info(f"pageserver_port: pg={pg} http={http}")
return PageserverPort(pg=pg, http=http)
@zenfixture
def pageserver(zenith_cli: ZenithCli, repo_dir: str,
pageserver_port: PageserverPort) -> Iterator[ZenithPageserver]:
"""
The 'pageserver' fixture provides a Page Server that's up and running.
If TEST_SHARED_FIXTURES is set, the Page Server instance is shared by all
the tests. To avoid clashing with other tests, don't use the 'main' branch in
the tests directly. Instead, create a branch off the 'empty' branch and use
that.
By convention, the test branches are named after the tests. For example,
test called 'test_foo' would create and use branches with the 'test_foo' prefix.
"""
ps = ZenithPageserver(zenith_cli=zenith_cli, repo_dir=repo_dir, port=pageserver_port)
# For convenience in tests, create a branch from the freshly-initialized cluster.
zenith_cli.run(["branch", "empty", "main"])
yield ps
# After the yield comes any cleanup code we need.
log.info('Starting pageserver cleanup')
ps.stop(True)
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str):
self.log_dir = log_dir
self.pg_install_path = pg_distrib_dir
self.pg_bin_path = os.path.join(self.pg_install_path, 'bin')
self.pg_bin_path = os.path.join(str(pg_distrib_dir), 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(self.pg_install_path, 'lib')
self.env['LD_LIBRARY_PATH'] = os.path.join(str(pg_distrib_dir), 'lib')
def _fixpath(self, command: List[str]) -> None:
if '/' not in command[0]:
@@ -580,25 +709,13 @@ def pg_bin(test_output_dir: str) -> PgBin:
return PgBin(test_output_dir)
@pytest.fixture
def pageserver_auth_enabled(zenith_cli: ZenithCli, repo_dir: str, pageserver_port: PageserverPort):
with ZenithPageserver(zenith_cli=zenith_cli,
repo_dir=repo_dir,
port=pageserver_port,
enable_auth=True) as ps:
# For convenience in tests, create a branch from the freshly-initialized cluster.
zenith_cli.run(["branch", "empty", "main"])
yield ps
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, tenant_id: str, port: int):
def __init__(self, env: ZenithEnv, tenant_id: str, port: int):
super().__init__(host='localhost', port=port)
self.zenith_cli = zenith_cli
self.env = env
self.running = False
self.repo_dir = repo_dir
self.node_name: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
@@ -608,13 +725,10 @@ class Postgres(PgProtocol):
self,
node_name: str,
branch: Optional[str] = None,
wal_acceptors: Optional[str] = None,
config_lines: Optional[List[str]] = None,
) -> 'Postgres':
"""
Create the pg data directory.
If wal_acceptors is not None, node will use wal acceptors; config is
adjusted accordingly.
Returns self.
"""
@@ -624,7 +738,7 @@ class Postgres(PgProtocol):
if branch is None:
branch = node_name
self.zenith_cli.run([
self.env.zenith_cli([
'pg',
'create',
f'--tenantid={self.tenant_id}',
@@ -634,10 +748,10 @@ class Postgres(PgProtocol):
])
self.node_name = node_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.node_name
self.pgdata_dir = os.path.join(self.repo_dir, path)
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
if wal_acceptors is not None:
self.adjust_for_wal_acceptors(wal_acceptors)
if self.env.safekeepers:
self.adjust_for_wal_acceptors(self.env.get_safekeeper_connstrs())
if config_lines is None:
config_lines = []
self.config(config_lines)
@@ -654,7 +768,7 @@ class Postgres(PgProtocol):
log.info(f"Starting postgres node {self.node_name}")
run_result = self.zenith_cli.run(
run_result = self.env.zenith_cli(
['pg', 'start', f'--tenantid={self.tenant_id}', f'--port={self.port}', self.node_name])
self.running = True
@@ -666,7 +780,7 @@ class Postgres(PgProtocol):
""" Path to data directory """
assert self.node_name
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.node_name
return os.path.join(self.repo_dir, path)
return os.path.join(self.env.repo_dir, path)
def pg_xact_dir_path(self) -> str:
""" Path to pg_xact dir """
@@ -723,7 +837,7 @@ class Postgres(PgProtocol):
if self.running:
assert self.node_name is not None
self.zenith_cli.run(['pg', 'stop', self.node_name, f'--tenantid={self.tenant_id}'])
self.env.zenith_cli(['pg', 'stop', self.node_name, f'--tenantid={self.tenant_id}'])
self.running = False
return self
@@ -735,7 +849,7 @@ class Postgres(PgProtocol):
"""
assert self.node_name is not None
self.zenith_cli.run(
self.env.zenith_cli(
['pg', 'stop', '--destroy', self.node_name, f'--tenantid={self.tenant_id}'])
self.node_name = None
@@ -745,7 +859,6 @@ class Postgres(PgProtocol):
self,
node_name: str,
branch: Optional[str] = None,
wal_acceptors: Optional[str] = None,
config_lines: Optional[List[str]] = None,
) -> 'Postgres':
"""
@@ -757,7 +870,6 @@ class Postgres(PgProtocol):
self.create(
node_name=node_name,
branch=branch,
wal_acceptors=wal_acceptors,
config_lines=config_lines,
).start()
@@ -772,30 +884,21 @@ class Postgres(PgProtocol):
class PostgresFactory:
""" An object representing multiple running postgres daemons. """
def __init__(self,
zenith_cli: ZenithCli,
repo_dir: str,
initial_tenant: str,
port_distributor: PortDistributor):
self.zenith_cli = zenith_cli
self.repo_dir = repo_dir
def __init__(self, env: ZenithEnv):
self.env = env
self.num_instances = 0
self.instances: List[Postgres] = []
self.initial_tenant: str = initial_tenant
self.port_distributor = port_distributor
def create_start(self,
node_name: str = "main",
branch: Optional[str] = None,
tenant_id: Optional[str] = None,
wal_acceptors: Optional[str] = None,
config_lines: Optional[List[str]] = None) -> Postgres:
pg = Postgres(
zenith_cli=self.zenith_cli,
repo_dir=self.repo_dir,
tenant_id=tenant_id or self.initial_tenant,
port=self.port_distributor.get_port(),
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
self.instances.append(pg)
@@ -803,7 +906,6 @@ class PostgresFactory:
return pg.create_start(
node_name=node_name,
branch=branch,
wal_acceptors=wal_acceptors,
config_lines=config_lines,
)
@@ -811,14 +913,12 @@ class PostgresFactory:
node_name: str = "main",
branch: Optional[str] = None,
tenant_id: Optional[str] = None,
wal_acceptors: Optional[str] = None,
config_lines: Optional[List[str]] = None) -> Postgres:
pg = Postgres(
zenith_cli=self.zenith_cli,
repo_dir=self.repo_dir,
tenant_id=tenant_id or self.initial_tenant,
port=self.port_distributor.get_port(),
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
@@ -827,7 +927,6 @@ class PostgresFactory:
return pg.create(
node_name=node_name,
branch=branch,
wal_acceptors=wal_acceptors,
config_lines=config_lines,
)
@@ -838,65 +937,40 @@ class PostgresFactory:
return self
@zenfixture
def initial_tenant(pageserver: ZenithPageserver):
return pageserver.initial_tenant
@pytest.fixture(scope='function')
def postgres(zenith_cli: ZenithCli,
initial_tenant: str,
repo_dir: str,
port_distributor: PortDistributor) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(
zenith_cli=zenith_cli,
repo_dir=repo_dir,
initial_tenant=initial_tenant,
port_distributor=port_distributor,
)
yield pgfactory
# After the yield comes any cleanup code we need.
log.info('Starting postgres cleanup')
pgfactory.stop_all()
def read_pid(path: Path) -> int:
""" Read content of file into number """
return int(path.read_text())
@dataclass
class WalAcceptorPort:
class SafekeeperPort:
pg: int
http: int
@dataclass
class WalAcceptor:
""" An object representing a running wal acceptor daemon. """
wa_bin_path: Path
class Safekeeper:
""" An object representing a running safekeeper daemon. """
env: ZenithEnv
data_dir: Path
port: WalAcceptorPort
port: SafekeeperPort
num: int # identifier for logging
pageserver_port: int
auth_token: Optional[str] = None
def start(self) -> 'WalAcceptor':
def start(self) -> 'Safekeeper':
# create data directory if not exists
self.data_dir.mkdir(parents=True, exist_ok=True)
with suppress(FileNotFoundError):
self.pidfile.unlink()
cmd = [str(self.wa_bin_path)]
cmd = [os.path.join(str(zenith_binpath), "safekeeper")]
cmd.extend(["-D", str(self.data_dir)])
cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"])
cmd.extend(["--listen-http", f"localhost:{self.port.http}"])
cmd.append("--daemonize")
cmd.append("--no-sync")
# Tell page server it can receive WAL from this WAL safekeeper
cmd.extend(["--pageserver", f"localhost:{self.pageserver_port}"])
cmd.extend(["--pageserver", f"localhost:{self.env.pageserver.service_port.pg}"])
cmd.extend(["--recall", "1 second"])
log.info('Running command "{}"'.format(' '.join(cmd)))
env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None
@@ -933,7 +1007,7 @@ class WalAcceptor:
return pid
def stop(self) -> 'WalAcceptor':
def stop(self) -> 'Safekeeper':
log.info('Stopping wal acceptor {}'.format(self.num))
pid = self.get_pid()
if pid is None:
@@ -950,7 +1024,7 @@ class WalAcceptor:
def append_logical_message(self, tenant_id: str, timeline_id: str,
request: Dict[str, Any]) -> Dict[str, Any]:
"""
Send JSON_CTRL query to append LogicalMessage to WAL and modify
Send JSON_CTRL query to append LogicalMessage to WAL and modify
safekeeper state. It will construct LogicalMessage from provided
prefix and message, and then will write it to WAL.
"""
@@ -973,69 +1047,7 @@ class WalAcceptor:
return res
def http_client(self):
return WalAcceptorHttpClient(port=self.port.http)
class WalAcceptorFactory:
""" An object representing multiple running wal acceptors. """
def __init__(self, data_dir: Path, pageserver_port: int, port_distributor: PortDistributor):
self.wa_bin_path = Path(os.path.join(zenith_binpath, 'safekeeper'))
self.data_dir = data_dir
self.instances: List[WalAcceptor] = []
self.port_distributor = port_distributor
self.pageserver_port = pageserver_port
def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor:
"""
Start new wal acceptor.
"""
wa_num = len(self.instances)
wa = WalAcceptor(
wa_bin_path=self.wa_bin_path,
data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num),
port=WalAcceptorPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
),
num=wa_num,
pageserver_port=self.pageserver_port,
auth_token=auth_token,
)
wa.start()
self.instances.append(wa)
return wa
def start_n_new(self, n: int, auth_token: Optional[str] = None) -> None:
"""
Start n new wal acceptors.
"""
for _ in range(n):
self.start_new(auth_token)
def stop_all(self) -> 'WalAcceptorFactory':
for wa in self.instances:
wa.stop()
return self
def get_connstrs(self) -> str:
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
return ','.join(["localhost:{}".format(wa.port.pg) for wa in self.instances])
@zenfixture
def wa_factory(repo_dir: str, pageserver_port: PageserverPort,
port_distributor: PortDistributor) -> Iterator[WalAcceptorFactory]:
""" Gives WalAcceptorFactory providing wal acceptors. """
wafactory = WalAcceptorFactory(
data_dir=Path(repo_dir) / "wal_acceptors",
pageserver_port=pageserver_port.pg,
port_distributor=port_distributor,
)
yield wafactory
# After the yield comes any cleanup code we need.
log.info('Starting wal acceptors cleanup')
wafactory.stop_all()
return SafekeeperHttpClient(port=self.port.http)
@dataclass
@@ -1044,7 +1056,7 @@ class SafekeeperTimelineStatus:
flush_lsn: str
class WalAcceptorHttpClient(requests.Session):
class SafekeeperHttpClient(requests.Session):
def __init__(self, port: int) -> None:
super().__init__()
self.port = port
@@ -1063,14 +1075,14 @@ class WalAcceptorHttpClient(requests.Session):
def get_test_output_dir(request: Any) -> str:
""" Compute the working directory for an individual test. """
test_name = request.node.name
test_dir = os.path.join(top_output_dir, test_name)
test_dir = os.path.join(str(top_output_dir), test_name)
log.info(f'get_test_output_dir is {test_dir}')
return test_dir
# This is autouse, so the test output directory always gets created, even
# if a test doesn't put anything there. It also solves a problem with the
# repo_dir() fixture: if TEST_SHARED_FIXTURES is not set, repo_dir()
# zenith_simple_env fixture: if TEST_SHARED_FIXTURES is not set, it
# creates the repo in the test output directory. But it cannot depend on
# 'test_output_dir' fixture, because when TEST_SHARED_FIXTURES is not set,
# it has 'session' scope and cannot access fixtures with 'function'
@@ -1089,45 +1101,6 @@ def test_output_dir(request: Any) -> str:
return test_dir
@zenfixture
def repo_dir(request: Any) -> Path:
"""
Compute the test repo_dir.
"repo_dir" is the place where all of the pageserver files will go.
It doesn't have anything to do with the git repo.
"""
if os.environ.get('TEST_SHARED_FIXTURES') is None:
# Create the environment in the per-test output directory
# The 'test_output_dir' fixture should have created it already
repo_dir = os.path.join(get_test_output_dir(request), "repo")
assert os.path.exists(repo_dir)
else:
# We're running shared fixtures. Share a single directory.
repo_dir = os.path.join(top_output_dir, "shared")
shutil.rmtree(repo_dir, ignore_errors=True)
return Path(repo_dir)
class TenantFactory:
def __init__(self, cli: ZenithCli):
self.cli = cli
def create(self, tenant_id: Optional[str] = None):
if tenant_id is None:
tenant_id = uuid.uuid4().hex
res = self.cli.run(['tenant', 'create', tenant_id])
res.check_returncode()
return tenant_id
@zenfixture
def tenant_factory(zenith_cli: ZenithCli):
return TenantFactory(zenith_cli)
#
# Test helpers
#
@@ -1157,7 +1130,7 @@ def list_files_to_compare(pgdata_dir: str):
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: str, pg: Postgres, pageserver_pg_port: int):
def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Postgres):
# Get the timeline ID of our branch. We need it for the 'basebackup' command
with closing(pg.connect()) as conn:
@@ -1169,7 +1142,7 @@ def check_restored_datadir_content(test_output_dir: str, pg: Postgres, pageserve
pg.stop()
# Take a basebackup from pageserver
restored_dir_path = os.path.join(test_output_dir, f"{pg.node_name}_restored_datadir")
restored_dir_path = os.path.join(env.repo_dir, f"{pg.node_name}_restored_datadir")
mkdir_if_needed(restored_dir_path)
pg_bin = PgBin(test_output_dir)
@@ -1178,7 +1151,7 @@ def check_restored_datadir_content(test_output_dir: str, pg: Postgres, pageserve
cmd = rf"""
{psql_path} \
--no-psqlrc \
postgres://localhost:{pageserver_pg_port} \
postgres://localhost:{env.pageserver.service_port.pg} \
-c 'basebackup {pg.tenant_id} {timeline}' \
| tar -x -C {restored_dir_path}
"""

View File

@@ -1,6 +1,6 @@
import os
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
@@ -16,20 +16,17 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
# 3. Disk space used
# 4. Peak memory usage
#
def test_bulk_insert(postgres: PostgresFactory,
pageserver: ZenithPageserver,
zenith_cli,
zenbenchmark,
repo_dir: str):
def test_bulk_insert(zenith_simple_env: ZenithEnv, zenbenchmark):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_bulk_insert", "empty"])
env.zenith_cli(["branch", "test_bulk_insert", "empty"])
pg = postgres.create_start('test_bulk_insert')
pg = env.postgres.create_start('test_bulk_insert')
log.info("postgres is running on 'test_bulk_insert' branch")
# Open a connection directly to the page server that we'll use to force
# flushing the layers to disk
psconn = pageserver.connect()
psconn = env.pageserver.connect()
pscur = psconn.cursor()
# Get the timeline ID of our branch. We need it for the 'do_gc' command
@@ -41,19 +38,19 @@ def test_bulk_insert(postgres: PostgresFactory,
cur.execute("create table huge (i int, j int);")
# Run INSERT, recording the time and I/O it takes
with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'):
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
with zenbenchmark.record_duration('insert'):
cur.execute("insert into huge values (generate_series(1, 5000000), 0);")
# Flush the layers from memory to disk. This is included in the reported
# time and I/O
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
# Record peak memory usage
zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(pageserver) / 1024, 'MB')
zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(env.pageserver) / 1024, 'MB')
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir,
pageserver.initial_tenant,
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir,
env.initial_tenant,
timeline)
zenbenchmark.record('size', timeline_size / (1024 * 1024), 'MB')

View File

@@ -1,11 +1,7 @@
import timeit
import pytest
from fixtures.zenith_fixtures import (
TenantFactory,
ZenithCli,
PostgresFactory,
)
from fixtures.zenith_fixtures import ZenithEnvBuilder
pytest_plugins = ("fixtures.benchmark_fixture")
@@ -20,37 +16,37 @@ pytest_plugins = ("fixtures.benchmark_fixture")
@pytest.mark.parametrize('tenants_count', [1, 5, 10])
@pytest.mark.parametrize('use_wal_acceptors', ['with_wa', 'without_wa'])
def test_bulk_tenant_create(
zenith_cli: ZenithCli,
tenant_factory: TenantFactory,
postgres: PostgresFactory,
wa_factory,
zenith_env_builder: ZenithEnvBuilder,
use_wal_acceptors: str,
tenants_count: int,
zenbenchmark,
):
"""Measure tenant creation time (with and without wal acceptors)"""
if use_wal_acceptors == 'with_wa':
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
time_slices = []
for i in range(tenants_count):
start = timeit.default_timer()
tenant = tenant_factory.create()
zenith_cli.run([
tenant = env.create_tenant()
env.zenith_cli([
"branch",
f"test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}",
"main",
f"--tenantid={tenant}"
])
if use_wal_acceptors == 'with_wa':
wa_factory.start_n_new(3)
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
#if use_wal_acceptors == 'with_wa':
# wa_factory.start_n_new(3)
pg_tenant = postgres.create_start(
pg_tenant = env.postgres.create_start(
f"test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}",
None, # branch name, None means same as node name
tenant,
wal_acceptors=wa_factory.get_connstrs() if use_wal_acceptors == 'with_wa' else None,
)
end = timeit.default_timer()

View File

@@ -1,6 +1,6 @@
import os
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
@@ -11,20 +11,17 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
# As of this writing, we're duplicate those giant WAL records for each page,
# which makes the delta layer about 32x larger than it needs to be.
#
def test_gist_buffering_build(postgres: PostgresFactory,
pageserver: ZenithPageserver,
zenith_cli,
zenbenchmark,
repo_dir: str):
def test_gist_buffering_build(zenith_simple_env: ZenithEnv, zenbenchmark):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_gist_buffering_build", "empty"])
env.zenith_cli(["branch", "test_gist_buffering_build", "empty"])
pg = postgres.create_start('test_gist_buffering_build')
pg = env.postgres.create_start('test_gist_buffering_build')
log.info("postgres is running on 'test_gist_buffering_build' branch")
# Open a connection directly to the page server that we'll use to force
# flushing the layers to disk
psconn = pageserver.connect()
psconn = env.pageserver.connect()
pscur = psconn.cursor()
# Get the timeline ID of our branch. We need it for the 'do_gc' command
@@ -40,7 +37,7 @@ def test_gist_buffering_build(postgres: PostgresFactory,
)
# Build the index.
with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'):
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
with zenbenchmark.record_duration('build'):
cur.execute(
"create index gist_pointidx2 on gist_point_tbl using gist(p) with (buffering = on)"
@@ -48,13 +45,13 @@ def test_gist_buffering_build(postgres: PostgresFactory,
# Flush the layers from memory to disk. This is included in the reported
# time and I/O
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 1000000")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 1000000")
# Record peak memory usage
zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(pageserver) / 1024, 'MB')
zenbenchmark.record("peak_mem", zenbenchmark.get_peak_mem(env.pageserver) / 1024, 'MB')
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir,
pageserver.initial_tenant,
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir,
env.initial_tenant,
timeline)
zenbenchmark.record('size', timeline_size / (1024 * 1024), 'MB')

View File

@@ -1,6 +1,6 @@
import os
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
@@ -15,21 +15,17 @@ pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
# 2. Time to run 5000 pgbench transactions
# 3. Disk space used
#
def test_pgbench(postgres: PostgresFactory,
pageserver: ZenithPageserver,
pg_bin,
zenith_cli,
zenbenchmark,
repo_dir: str):
def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin, zenbenchmark):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_pgbench_perf", "empty"])
env.zenith_cli(["branch", "test_pgbench_perf", "empty"])
pg = postgres.create_start('test_pgbench_perf')
pg = env.postgres.create_start('test_pgbench_perf')
log.info("postgres is running on 'test_pgbench_perf' branch")
# Open a connection directly to the page server that we'll use to force
# flushing the layers to disk
psconn = pageserver.connect()
psconn = env.pageserver.connect()
pscur = psconn.cursor()
# Get the timeline ID of our branch. We need it for the 'do_gc' command
@@ -41,13 +37,13 @@ def test_pgbench(postgres: PostgresFactory,
connstr = pg.connstr()
# Initialize pgbench database, recording the time and I/O it takes
with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'):
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
with zenbenchmark.record_duration('init'):
pg_bin.run_capture(['pgbench', '-s5', '-i', connstr])
# Flush the layers from memory to disk. This is included in the reported
# time and I/O
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
# Run pgbench for 5000 transactions
with zenbenchmark.record_duration('5000_xacts'):
@@ -55,8 +51,8 @@ def test_pgbench(postgres: PostgresFactory,
# Flush the layers to disk again. This is *not' included in the reported time,
# though.
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir, pageserver.initial_tenant, timeline)
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir, env.initial_tenant, timeline)
zenbenchmark.record('size', timeline_size / (1024 * 1024), 'MB')

View File

@@ -12,26 +12,23 @@
# Amplification problem at its finest.
import os
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures", "fixtures.benchmark_fixture")
def test_write_amplification(postgres: PostgresFactory,
pageserver: ZenithPageserver,
zenith_cli,
zenbenchmark,
repo_dir: str):
def test_write_amplification(zenith_simple_env: ZenithEnv, zenbenchmark):
env = zenith_simple_env
# Create a branch for us
zenith_cli.run(["branch", "test_write_amplification", "empty"])
env.zenith_cli(["branch", "test_write_amplification", "empty"])
pg = postgres.create_start('test_write_amplification')
pg = env.postgres.create_start('test_write_amplification')
log.info("postgres is running on 'test_write_amplification' branch")
# Open a connection directly to the page server that we'll use to force
# flushing the layers to disk
psconn = pageserver.connect()
psconn = env.pageserver.connect()
pscur = psconn.cursor()
with closing(pg.connect()) as conn:
@@ -40,7 +37,7 @@ def test_write_amplification(postgres: PostgresFactory,
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
with zenbenchmark.record_pageserver_writes(pageserver, 'pageserver_writes'):
with zenbenchmark.record_pageserver_writes(env.pageserver, 'pageserver_writes'):
with zenbenchmark.record_duration('run'):
# NOTE: Because each iteration updates every table already created,
@@ -73,10 +70,10 @@ def test_write_amplification(postgres: PostgresFactory,
# slower, adding some delays in this loop. But forcing
# the the checkpointing and GC makes the test go faster,
# with the same total I/O effect.
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
# Report disk space used by the repository
timeline_size = zenbenchmark.get_timeline_size(repo_dir,
pageserver.initial_tenant,
timeline_size = zenbenchmark.get_timeline_size(env.repo_dir,
env.initial_tenant,
timeline)
zenbenchmark.record('size', timeline_size / (1024 * 1024), 'MB')

View File

@@ -1,6 +1,7 @@
import pytest
import os
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -19,11 +20,13 @@ run_broken = pytest.mark.skipif(os.environ.get('RUN_BROKEN') is None,
@run_broken
def test_broken(zenith_cli, pageserver, postgres, pg_bin):
# Create a branch for us
zenith_cli.run(["branch", "test_broken", "empty"])
def test_broken(zenith_simple_env: ZenithEnv, pg_bin):
env = zenith_simple_env
postgres.create_start("test_broken")
# Create a branch for us
env.zenith_cli(["branch", "test_broken", "empty"])
env.postgres.create_start("test_broken")
log.info('postgres is running')
log.info('THIS NEXT COMMAND WILL FAIL:')