Merge branch 'main' into bojan-get-page-tests

This commit is contained in:
Bojan Serafimov
2022-04-27 13:05:27 -04:00
187 changed files with 5704 additions and 4343 deletions

View File

@@ -0,0 +1,118 @@
import subprocess
import asyncio
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.zenith_fixtures import ZenithEnvBuilder
#
# Create ancestor branches off the main branch.
#
def test_ancestor_branch(zenith_env_builder: ZenithEnvBuilder):
# Use safekeeper in this test to avoid a subtle race condition.
# Without safekeeper, walreceiver reconnection can stuck
# because of IO deadlock.
#
# See https://github.com/zenithdb/zenith/issues/1068
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
# Override defaults, 1M gc_horizon and 4M checkpoint_distance.
# Extend compaction_period and gc_period to disable background compaction and gc.
tenant = env.zenith_cli.create_tenant(
conf={
'gc_period': '10 m',
'gc_horizon': '1048576',
'checkpoint_distance': '4194304',
'compaction_period': '10 m',
'compaction_threshold': '2',
'compaction_target_size': '4194304',
})
env.zenith_cli.create_timeline(f'main', tenant_id=tenant)
pg_branch0 = env.postgres.create_start('main', tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
branch0_cur.execute("SHOW zenith.zenith_timeline")
branch0_timeline = branch0_cur.fetchone()[0]
log.info(f"b0 timeline {branch0_timeline}")
# Create table, and insert 100k rows.
branch0_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch0_lsn = branch0_cur.fetchone()[0]
log.info(f"b0 at lsn {branch0_lsn}")
branch0_cur.execute('CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)')
branch0_cur.execute('''
INSERT INTO foo
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch0:' || g
FROM generate_series(1, 100000) g
''')
branch0_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_100 = branch0_cur.fetchone()[0]
log.info(f'LSN after 100k rows: {lsn_100}')
# Create branch1.
env.zenith_cli.create_branch('branch1', 'main', tenant_id=tenant, ancestor_start_lsn=lsn_100)
pg_branch1 = env.postgres.create_start('branch1', tenant_id=tenant)
log.info("postgres is running on 'branch1' branch")
branch1_cur = pg_branch1.connect().cursor()
branch1_cur.execute("SHOW zenith.zenith_timeline")
branch1_timeline = branch1_cur.fetchone()[0]
log.info(f"b1 timeline {branch1_timeline}")
branch1_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch1_lsn = branch1_cur.fetchone()[0]
log.info(f"b1 at lsn {branch1_lsn}")
# Insert 100k rows.
branch1_cur.execute('''
INSERT INTO foo
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch1:' || g
FROM generate_series(1, 100000) g
''')
branch1_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_200 = branch1_cur.fetchone()[0]
log.info(f'LSN after 200k rows: {lsn_200}')
# Create branch2.
env.zenith_cli.create_branch('branch2', 'branch1', tenant_id=tenant, ancestor_start_lsn=lsn_200)
pg_branch2 = env.postgres.create_start('branch2', tenant_id=tenant)
log.info("postgres is running on 'branch2' branch")
branch2_cur = pg_branch2.connect().cursor()
branch2_cur.execute("SHOW zenith.zenith_timeline")
branch2_timeline = branch2_cur.fetchone()[0]
log.info(f"b2 timeline {branch2_timeline}")
branch2_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch2_lsn = branch2_cur.fetchone()[0]
log.info(f"b2 at lsn {branch2_lsn}")
# Insert 100k rows.
branch2_cur.execute('''
INSERT INTO foo
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch2:' || g
FROM generate_series(1, 100000) g
''')
branch2_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_300 = branch2_cur.fetchone()[0]
log.info(f'LSN after 300k rows: {lsn_300}')
# Run compaction on branch1.
psconn = env.pageserver.connect()
log.info(f'compact {tenant.hex} {branch1_timeline} {lsn_200}')
psconn.cursor().execute(f'''compact {tenant.hex} {branch1_timeline} {lsn_200}''')
branch0_cur.execute('SELECT count(*) FROM foo')
assert branch0_cur.fetchone() == (100000, )
branch1_cur.execute('SELECT count(*) FROM foo')
assert branch1_cur.fetchone() == (200000, )
branch2_cur.execute('SELECT count(*) FROM foo')
assert branch2_cur.fetchone() == (300000, )

View File

@@ -52,14 +52,14 @@ def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder):
tenant_http_client.tenant_create()
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool):
zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors:
if with_safekeepers:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
branch = f'test_compute_auth_to_pageserver{with_wal_acceptors}'
branch = f'test_compute_auth_to_pageserver{with_safekeepers}'
env.zenith_cli.create_branch(branch)
pg = env.postgres.create_start(branch)

View File

@@ -18,6 +18,7 @@ import pytest
# * starts a pageserver with remote storage, stores specific data in its tables
# * triggers a checkpoint (which produces a local data scheduled for backup), gets the corresponding timeline id
# * polls the timeline status to ensure it's copied remotely
# * inserts more data in the pageserver and repeats the process, to check multiple checkpoints case
# * stops the pageserver, clears all local directories
#
# 2. Second pageserver
@@ -50,27 +51,30 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'''
CREATE TABLE t1(id int primary key, secret text);
INSERT INTO t1 VALUES ({data_id}, '{data_secret}');
''')
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
checkpoint_numbers = range(1, 3)
# wait until pageserver receives that data
wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
for checkpoint_number in checkpoint_numbers:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'''
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
''')
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# wait until pageserver receives that data
wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
log.info("waiting for upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
log.info("upload is done")
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
log.info(f'waiting for checkpoint {checkpoint_number} upload')
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
log.info(f'upload of checkpoint {checkpoint_number} is done')
##### Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
@@ -93,5 +97,6 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
pg = env.postgres.create_start('main')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'SELECT secret FROM t1 WHERE id = {data_id};')
assert cur.fetchone() == (data_secret, )
for checkpoint_number in checkpoint_numbers:
cur.execute(f'SELECT secret FROM t{checkpoint_number} WHERE id = {data_id};')
assert cur.fetchone() == (f'{data_secret}|{checkpoint_number}', )

View File

@@ -8,10 +8,10 @@ from fixtures.log_helper import log
#
# Test restarting and recreating a postgres instance
#
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool):
zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors:
if with_safekeepers:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()

View File

@@ -0,0 +1,49 @@
from contextlib import closing
import pytest
from fixtures.zenith_fixtures import ZenithEnvBuilder
def test_tenant_config(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init_start()
"""Test per tenant configuration"""
tenant = env.zenith_cli.create_tenant(
conf={
'checkpoint_distance': '10000',
'compaction_target_size': '1048576',
'compaction_period': '60sec',
'compaction_threshold': '20',
'gc_horizon': '1024',
'gc_period': '100sec',
'pitr_interval': '3600sec',
})
env.zenith_cli.create_timeline(f'test_tenant_conf', tenant_id=tenant)
pg = env.postgres.create_start(
"test_tenant_conf",
"main",
tenant,
)
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"show {tenant.hex}")
assert pscur.fetchone() == (10000, 1048576, 60, 20, 1024, 100, 3600)
# update the config and ensure that it has changed
env.zenith_cli.config_tenant(tenant_id=tenant,
conf={
'checkpoint_distance': '100000',
'compaction_target_size': '1048576',
'compaction_period': '30sec',
'compaction_threshold': '15',
'gc_horizon': '256',
'gc_period': '10sec',
'pitr_interval': '360sec',
})
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"show {tenant.hex}")
assert pscur.fetchone() == (100000, 1048576, 30, 15, 256, 10, 360)

View File

@@ -5,9 +5,9 @@ import pytest
from fixtures.zenith_fixtures import ZenithEnvBuilder
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptors: bool):
if with_wal_acceptors:
@pytest.mark.parametrize('with_safekeepers', [False, True])
def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_safekeepers: bool):
if with_safekeepers:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
@@ -15,17 +15,17 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce
tenant_1 = env.zenith_cli.create_tenant()
tenant_2 = env.zenith_cli.create_tenant()
env.zenith_cli.create_timeline(
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}', tenant_id=tenant_1)
env.zenith_cli.create_timeline(
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}', tenant_id=tenant_2)
env.zenith_cli.create_timeline(f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_1)
env.zenith_cli.create_timeline(f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_2)
pg_tenant1 = env.postgres.create_start(
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}',
f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_1,
)
pg_tenant2 = env.postgres.create_start(
f'test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}',
f'test_tenants_normal_work_with_safekeepers{with_safekeepers}',
tenant_id=tenant_2,
)

View File

@@ -13,7 +13,7 @@ from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import etcd_path, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.utils import etcd_path, get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -25,8 +25,8 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.broker = True
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_normal_work')
pg = env.postgres.create_start('test_wal_acceptors_normal_work')
env.zenith_cli.create_branch('test_safekeepers_normal_work')
pg = env.postgres.create_start('test_safekeepers_normal_work')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
@@ -56,7 +56,7 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
n_timelines = 3
branch_names = [
"test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)
"test_safekeepers_many_timelines_{}".format(tlin) for tlin in range(n_timelines)
]
# pageserver, safekeeper operate timelines via their ids (can be represented in hex as 'ad50847381e248feaac9876cc71ae418')
# that's not really human readable, so the branch names are introduced in Zenith CLI.
@@ -196,8 +196,8 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = n_acceptors
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_restarts')
pg = env.postgres.create_start('test_wal_acceptors_restarts')
env.zenith_cli.create_branch('test_safekeepers_restarts')
pg = env.postgres.create_start('test_safekeepers_restarts')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -223,7 +223,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
start_delay_sec = 2
def delayed_wal_acceptor_start(wa):
def delayed_safekeeper_start(wa):
time.sleep(start_delay_sec)
wa.start()
@@ -233,8 +233,8 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_unavailability')
pg = env.postgres.create_start('test_wal_acceptors_unavailability')
env.zenith_cli.create_branch('test_safekeepers_unavailability')
pg = env.postgres.create_start('test_safekeepers_unavailability')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -248,7 +248,7 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
# shutdown one of two acceptors, that is, majority
env.safekeepers[0].stop()
proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[0], ))
proc = Process(target=delayed_safekeeper_start, args=(env.safekeepers[0], ))
proc.start()
start = time.time()
@@ -260,7 +260,7 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
# for the world's balance, do the same with second acceptor
env.safekeepers[1].stop()
proc = Process(target=delayed_wal_acceptor_start, args=(env.safekeepers[1], ))
proc = Process(target=delayed_safekeeper_start, args=(env.safekeepers[1], ))
proc.start()
start = time.time()
@@ -304,8 +304,8 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_race_conditions')
pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
env.zenith_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -370,6 +370,55 @@ def test_broker(zenith_env_builder: ZenithEnvBuilder):
time.sleep(0.5)
# Test that old WAL consumed by peers and pageserver is removed from safekeepers.
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
zenith_env_builder.broker = True
# to advance remote_consistent_llsn
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_safekeepers_wal_removal')
pg = env.postgres.create_start('test_safekeepers_wal_removal')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
# force checkpoint to advance remote_consistent_lsn
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# We will wait for first segment removal. Make sure they exist for starter.
first_segments = [
os.path.join(sk.data_dir(), tenant_id, timeline_id, '000000010000000000000001')
for sk in env.safekeepers
]
assert all(os.path.exists(p) for p in first_segments)
http_cli = env.safekeepers[0].http_client()
# Pretend WAL is offloaded to s3.
http_cli.record_safekeeper_info(tenant_id, timeline_id, {'s3_wal_lsn': 'FFFFFFFF/FEFFFFFF'})
# wait till first segment is removed on all safekeepers
started_at = time.time()
while True:
if all(not os.path.exists(p) for p in first_segments):
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for first segment get removed")
time.sleep(0.5)
class ProposerPostgres(PgProtocol):
"""Object for running postgres without ZenithEnv"""
def __init__(self,
@@ -396,7 +445,7 @@ class ProposerPostgres(PgProtocol):
""" Path to postgresql.conf """
return os.path.join(self.pgdata_dir, 'postgresql.conf')
def create_dir_config(self, wal_acceptors: str):
def create_dir_config(self, safekeepers: str):
""" Create dir and config for running --sync-safekeepers """
mkdir_if_needed(self.pg_data_dir_path())
@@ -407,7 +456,7 @@ class ProposerPostgres(PgProtocol):
f"zenith.zenith_timeline = '{self.timeline_id.hex}'\n",
f"zenith.zenith_tenant = '{self.tenant_id.hex}'\n",
f"zenith.page_server_connstring = ''\n",
f"wal_acceptors = '{wal_acceptors}'\n",
f"wal_acceptors = '{safekeepers}'\n",
f"listen_addresses = '{self.listen_addr}'\n",
f"port = '{self.port}'\n",
]
@@ -692,7 +741,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
env.safekeepers[3].stop()
active_safekeepers = [1, 2, 3]
pg = env.postgres.create('test_replace_safekeeper')
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
pg.start()
# learn zenith timeline from compute
@@ -732,7 +781,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
pg.stop_and_destroy().create('test_replace_safekeeper')
active_safekeepers = [2, 3, 4]
env.safekeepers[3].start()
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
pg.start()
execute_payload(pg)
@@ -742,3 +791,56 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
env.safekeepers[1].stop(immediate=True)
execute_payload(pg)
show_statuses(env.safekeepers, tenant_id, timeline_id)
# We have `wal_keep_size=0`, so postgres should trim WAL once it's broadcasted
# to all safekeepers. This test checks that compute WAL can fit into small number
# of WAL segments.
def test_wal_deleted_after_broadcast(zenith_env_builder: ZenithEnvBuilder):
# used to calculate delta in collect_stats
last_lsn = .0
# returns LSN and pg_wal size, all in MB
def collect_stats(pg: Postgres, cur, enable_logs=True):
nonlocal last_lsn
assert pg.pgdata_dir is not None
log.info('executing INSERT to generate WAL')
cur.execute("select pg_current_wal_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0]) / 1024 / 1024
pg_wal_size = get_dir_size(os.path.join(pg.pgdata_dir, 'pg_wal')) / 1024 / 1024
if enable_logs:
log.info(f"LSN delta: {current_lsn - last_lsn} MB, current WAL size: {pg_wal_size} MB")
last_lsn = current_lsn
return current_lsn, pg_wal_size
# generates about ~20MB of WAL, to create at least one new segment
def generate_wal(cur):
cur.execute("INSERT INTO t SELECT generate_series(1,300000), 'payload'")
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_deleted_after_broadcast')
# Adjust checkpoint config to prevent keeping old WAL segments
pg = env.postgres.create_start(
'test_wal_deleted_after_broadcast',
config_lines=['min_wal_size=32MB', 'max_wal_size=32MB', 'log_checkpoints=on'])
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute('CREATE TABLE t(key int, value text)')
collect_stats(pg, cur)
# generate WAL to simulate normal workload
for i in range(5):
generate_wal(cur)
collect_stats(pg, cur)
log.info('executing checkpoint')
cur.execute('CHECKPOINT')
wal_size_after_checkpoint = collect_stats(pg, cur)[1]
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert wal_size_after_checkpoint < 16 * 2.5

View File

@@ -9,7 +9,7 @@ from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List
log = getLogger('root.wal_acceptor_async')
log = getLogger('root.safekeeper_async')
class BankClient(object):
@@ -139,13 +139,12 @@ async def wait_for_lsn(safekeeper: Safekeeper,
async def run_restarts_under_load(env: ZenithEnv,
pg: Postgres,
acceptors: List[Safekeeper],
n_workers=10):
n_accounts = 100
init_amount = 100000
max_transfer = 100
period_time = 4
iterations = 10
n_workers=10,
n_accounts=100,
init_amount=100000,
max_transfer=100,
period_time=4,
iterations=10):
# Set timeout for this test at 5 minutes. It should be enough for test to complete
# and less than CircleCI's no_output_timeout, taking into account that this timeout
# is checked only at the beginning of every iteration.
@@ -202,14 +201,36 @@ async def run_restarts_under_load(env: ZenithEnv,
await pg_conn.close()
# restart acceptors one by one, while executing and validating bank transactions
# Restart acceptors one by one, while executing and validating bank transactions
def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_restarts_under_load')
env.zenith_cli.create_branch('test_safekeepers_restarts_under_load')
# Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load',
pg = env.postgres.create_start('test_safekeepers_restarts_under_load',
config_lines=['max_replication_write_lag=1MB'])
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers))
# Restart acceptors one by one and test that everything is working as expected
# when checkpoins are triggered frequently by max_wal_size=32MB. Because we have
# wal_keep_size=0, there will be aggressive WAL segments recycling.
def test_restarts_frequent_checkpoints(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_restarts_frequent_checkpoints')
# Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long
pg = env.postgres.create_start('test_restarts_frequent_checkpoints',
config_lines=[
'max_replication_write_lag=1MB',
'min_wal_size=32MB',
'max_wal_size=32MB',
'log_checkpoints=on'
])
# we try to simulate large (flush_lsn - truncate_lsn) lag, to test that WAL segments
# are not removed before broadcasted to all safekeepers, with the help of replication slot
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers, period_time=15, iterations=5))

View File

@@ -1,7 +1,6 @@
import os
import subprocess
from fixtures.utils import mkdir_if_needed
from fixtures.zenith_fixtures import (ZenithEnvBuilder,
VanillaPostgres,
PortDistributor,
@@ -13,26 +12,25 @@ from fixtures.log_helper import log
def test_wal_restore(zenith_env_builder: ZenithEnvBuilder,
pg_bin: PgBin,
test_output_dir,
port_distributor: PortDistributor):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_restore")
pg = env.postgres.create_start('test_wal_restore')
pg.safe_psql("create table t as select generate_series(1,1000000)")
pg.safe_psql("create table t as select generate_series(1,300000)")
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
env.zenith_cli.pageserver_stop()
port = port_distributor.get_port()
data_dir = os.path.join(test_output_dir, 'pgsql.restored')
restored = VanillaPostgres(data_dir, PgBin(test_output_dir), port)
subprocess.call([
'bash',
os.path.join(base_dir, 'zenith_utils/scripts/restore_from_wal.sh'),
os.path.join(pg_distrib_dir, 'bin'),
os.path.join(test_output_dir, 'repo/safekeepers/sk1/{}/*'.format(tenant_id)),
data_dir,
str(port)
])
restored.start()
assert restored.safe_psql('select count(*) from t') == [(1000000, )]
restored.stop()
with VanillaPostgres(data_dir, PgBin(test_output_dir), port) as restored:
pg_bin.run_capture([
os.path.join(base_dir, 'libs/utils/scripts/restore_from_wal.sh'),
os.path.join(pg_distrib_dir, 'bin'),
os.path.join(test_output_dir, 'repo/safekeepers/sk1/{}/*'.format(tenant_id)),
data_dir,
str(port)
])
restored.start()
assert restored.safe_psql('select count(*) from t', user='zenith_admin') == [(300000, )]

View File

@@ -25,7 +25,7 @@ LOGGING = {
"root": {
"level": "INFO"
},
"root.wal_acceptor_async": {
"root.safekeeper_async": {
"level": "INFO" # a lot of logs on DEBUG level
}
}

View File

@@ -82,3 +82,14 @@ def print_gc_result(row):
# path to etcd binary or None if not present.
def etcd_path():
return shutil.which("etcd")
# Traverse directory to get total size.
def get_dir_size(path: str) -> int:
"""Return size in bytes."""
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
return totalbytes

View File

@@ -40,8 +40,8 @@ from fixtures.log_helper import log
This file contains pytest fixtures. A fixture is a test resource that can be
summoned by placing its name in the test's arguments.
A fixture is created with the decorator @zenfixture, which is a wrapper around
the standard pytest.fixture with some extra behavior.
A fixture is created with the decorator @pytest.fixture decorator.
See docs: https://docs.pytest.org/en/6.2.x/fixture.html
There are several environment variables that can control the running of tests:
ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
@@ -155,25 +155,30 @@ def pytest_configure(config):
raise Exception('zenith binaries not found at "{}"'.format(zenith_binpath))
def zenfixture(func: Fn) -> Fn:
def profiling_supported():
"""Return True if the pageserver was compiled with the 'profiling' feature
"""
This is a python decorator for fixtures with a flexible scope.
bin_pageserver = os.path.join(str(zenith_binpath), 'pageserver')
res = subprocess.run([bin_pageserver, '--version'],
check=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return "profiling:true" in res.stdout
By default every test function will set up and tear down a new
database. In pytest, this is called fixtures "function" scope.
If the environment variable TEST_SHARED_FIXTURES is set, then all
tests will share the same database. State, logs, etc. will be
stored in a directory called "shared".
def shareable_scope(fixture_name, config) -> Literal["session", "function"]:
"""Return either session of function scope, depending on TEST_SHARED_FIXTURES envvar.
This function can be used as a scope like this:
@pytest.fixture(scope=shareable_scope)
def myfixture(...)
...
"""
scope: Literal['session', 'function'] = \
'function' if os.environ.get('TEST_SHARED_FIXTURES') is None else 'session'
return pytest.fixture(func, scope=scope)
return 'function' if os.environ.get('TEST_SHARED_FIXTURES') is None else 'session'
@zenfixture
@pytest.fixture(scope=shareable_scope)
def worker_seq_no(worker_id: str):
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
@@ -184,7 +189,7 @@ def worker_seq_no(worker_id: str):
return int(worker_id[2:])
@zenfixture
@pytest.fixture(scope=shareable_scope)
def worker_base_port(worker_seq_no: int):
# so we divide ports in ranges of 100 ports
# so workers have disjoint set of ports for services
@@ -237,7 +242,7 @@ class PortDistributor:
'port range configured for test is exhausted, consider enlarging the range')
@zenfixture
@pytest.fixture(scope=shareable_scope)
def port_distributor(worker_base_port):
return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM)
@@ -612,7 +617,7 @@ class ZenithEnv:
self.broker.start()
def get_safekeeper_connstrs(self) -> str:
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
""" Get list of safekeeper endpoints suitable for safekeepers GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
@cached_property
@@ -622,7 +627,7 @@ class ZenithEnv:
return AuthKeys(pub=pub, priv=priv)
@zenfixture
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
"""
Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES
@@ -830,16 +835,35 @@ class ZenithCli:
self.env = env
pass
def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
def create_tenant(self,
tenant_id: Optional[uuid.UUID] = None,
conf: Optional[Dict[str, str]] = None) -> uuid.UUID:
"""
Creates a new tenant, returns its id and its initial timeline's id.
"""
if tenant_id is None:
tenant_id = uuid.uuid4()
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
if conf is None:
res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex])
else:
res = self.raw_cli(
['tenant', 'create', '--tenant-id', tenant_id.hex] +
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
res.check_returncode()
return tenant_id
def config_tenant(self, tenant_id: uuid.UUID, conf: Dict[str, str]):
"""
Update tenant config.
"""
if conf is None:
res = self.raw_cli(['tenant', 'config', '--tenant-id', tenant_id.hex])
else:
res = self.raw_cli(
['tenant', 'config', '--tenant-id', tenant_id.hex] +
sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), []))
res.check_returncode()
def list_tenants(self) -> 'subprocess.CompletedProcess[str]':
res = self.raw_cli(['tenant', 'list'])
res.check_returncode()
@@ -1305,10 +1329,14 @@ class VanillaPostgres(PgProtocol):
with open(os.path.join(self.pgdatadir, 'postgresql.conf'), 'a') as conf_file:
conf_file.writelines(options)
def start(self):
def start(self, log_path: Optional[str] = None):
assert not self.running
self.running = True
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, 'start'])
if log_path is None:
log_path = os.path.join(self.pgdatadir, "pg.log")
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, '-l', log_path, 'start'])
def stop(self):
assert self.running
@@ -1509,7 +1537,7 @@ class Postgres(PgProtocol):
""" Path to postgresql.conf """
return os.path.join(self.pg_data_dir_path(), 'postgresql.conf')
def adjust_for_wal_acceptors(self, wal_acceptors: str) -> 'Postgres':
def adjust_for_safekeepers(self, safekeepers: str) -> 'Postgres':
"""
Adjust instance config for working with wal acceptors instead of
pageserver (pre-configured by CLI) directly.
@@ -1524,12 +1552,12 @@ class Postgres(PgProtocol):
if ("synchronous_standby_names" in cfg_line or
# don't ask pageserver to fetch WAL from compute
"callmemaybe_connstring" in cfg_line or
# don't repeat wal_acceptors multiple times
# don't repeat safekeepers/wal_acceptors multiple times
"wal_acceptors" in cfg_line):
continue
f.write(cfg_line)
f.write("synchronous_standby_names = 'walproposer'\n")
f.write("wal_acceptors = '{}'\n".format(wal_acceptors))
f.write("wal_acceptors = '{}'\n".format(safekeepers))
return self
def config(self, lines: List[str]) -> 'Postgres':
@@ -1735,6 +1763,9 @@ class Safekeeper:
def http_client(self) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http)
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
@dataclass
class SafekeeperTimelineStatus:
@@ -1767,6 +1798,12 @@ class SafekeeperHttpClient(requests.Session):
flush_lsn=resj['flush_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'])
def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
json=body)
res.raise_for_status()
def get_metrics(self) -> SafekeeperMetrics:
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()

View File

@@ -13,15 +13,15 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder
@pytest.mark.parametrize('tenants_count', [1, 5, 10])
@pytest.mark.parametrize('use_wal_acceptors', ['with_wa', 'without_wa'])
@pytest.mark.parametrize('use_safekeepers', ['with_wa', 'without_wa'])
def test_bulk_tenant_create(
zenith_env_builder: ZenithEnvBuilder,
use_wal_acceptors: str,
use_safekeepers: str,
tenants_count: int,
zenbenchmark,
):
"""Measure tenant creation time (with and without wal acceptors)"""
if use_wal_acceptors == 'with_wa':
if use_safekeepers == 'with_wa':
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
@@ -32,14 +32,14 @@ def test_bulk_tenant_create(
tenant = env.zenith_cli.create_tenant()
env.zenith_cli.create_timeline(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant_id=tenant)
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)
# FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now?
#if use_wal_acceptors == 'with_wa':
#if use_safekeepers == 'with_sa':
# wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start(
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}', tenant_id=tenant)
f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant)
end = timeit.default_timer()
time_slices.append(end - start)

View File

@@ -1,5 +1,5 @@
from contextlib import closing
from fixtures.zenith_fixtures import PgBin, VanillaPostgres, ZenithEnv
from fixtures.zenith_fixtures import PgBin, VanillaPostgres, ZenithEnv, profiling_supported
from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare
from fixtures.benchmark_fixture import PgBenchRunResult, MetricReport, ZenithBenchmarker
@@ -106,6 +106,28 @@ def test_pgbench(zenith_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(zenith_with_baseline, scale, duration)
# Run the pgbench tests, and generate a flamegraph from it
# This requires that the pageserver was built with the 'profiling' feature.
#
# TODO: If the profiling is cheap enough, there's no need to run the same test
# twice, with and without profiling. But for now, run it separately, so that we
# can see how much overhead the profiling adds.
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_flamegraph(zenbenchmark, pg_bin, zenith_env_builder, scale: int, duration: int):
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.pageserver_config_override = '''
profiling="page_requests"
'''
if not profiling_supported():
pytest.skip("pageserver was built without 'profiling' feature")
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("empty", "main")
run_test_pgbench(ZenithCompare(zenbenchmark, env, pg_bin, "pgbench"), scale, duration)
# Run the pgbench tests against an existing Postgres cluster
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())

View File

@@ -0,0 +1,48 @@
from contextlib import closing
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.benchmark_fixture import ZenithBenchmarker
def test_startup(zenith_env_builder: ZenithEnvBuilder, zenbenchmark: ZenithBenchmarker):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
# Start
env.zenith_cli.create_branch('test_startup')
with zenbenchmark.record_duration("startup_time"):
pg = env.postgres.create_start('test_startup')
pg.safe_psql("select 1;")
# Restart
pg.stop_and_destroy()
with zenbenchmark.record_duration("restart_time"):
pg.create_start('test_startup')
pg.safe_psql("select 1;")
# Fill up
num_rows = 1000000 # 30 MB
num_tables = 100
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
for i in range(num_tables):
cur.execute(f'create table t_{i} (i integer);')
cur.execute(f'insert into t_{i} values (generate_series(1,{num_rows}));')
# Read
with zenbenchmark.record_duration("read_time"):
pg.safe_psql("select * from t_0;")
# Read again
with zenbenchmark.record_duration("second_read_time"):
pg.safe_psql("select * from t_0;")
# Restart
pg.stop_and_destroy()
with zenbenchmark.record_duration("restart_with_data"):
pg.create_start('test_startup')
pg.safe_psql("select 1;")
# Read
with zenbenchmark.record_duration("read_after_restart"):
pg.safe_psql("select * from t_0;")