Merge branch 'main' into bojan-psbench-over-kvstore

This commit is contained in:
Bojan Serafimov
2022-04-12 13:04:59 -04:00
124 changed files with 9381 additions and 3770 deletions

View File

@@ -10,6 +10,8 @@ Prerequisites:
below to run from other directories.
- The zenith git repo, including the postgres submodule
(for some tests, e.g. `pg_regress`)
- Some tests (involving storage nodes coordination) require etcd installed. Follow
[`the guide`](https://etcd.io/docs/v3.5/install/) to obtain it.
### Test Organization

View File

@@ -1,10 +1,7 @@
from contextlib import closing
import asyncio
import asyncpg
import random
from fixtures.zenith_fixtures import ZenithEnv, Postgres, Safekeeper
from fixtures.zenith_fixtures import ZenithEnv, Postgres
from fixtures.log_helper import log
# Test configuration
@@ -76,5 +73,5 @@ def test_gc_aggressive(zenith_simple_env: ZenithEnv):
asyncio.run(update_and_gc(env, pg, timeline))
row = cur.execute('SELECT COUNT(*), SUM(counter) FROM foo')
cur.execute('SELECT COUNT(*), SUM(counter) FROM foo')
assert cur.fetchone() == (num_rows, updates_to_perform)

View File

@@ -1,9 +1,6 @@
import pytest
import random
import time
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log
# Test restarting page server, while safekeeper and compute node keep

View File

@@ -1,5 +1,3 @@
from contextlib import closing
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log

View File

@@ -1,6 +1,6 @@
from uuid import uuid4, UUID
import pytest
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
# test that we cannot override node id
@@ -39,10 +39,14 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
timeline_id_str = str(timeline['timeline_id'])
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=UUID(timeline_id_str))
assert timeline_details['kind'] == 'Local'
assert timeline_details['tenant_id'] == tenant_id.hex
assert timeline_details['timeline_id'] == timeline_id_str
local_timeline_details = timeline_details.get('local')
assert local_timeline_details is not None
assert local_timeline_details['timeline_state'] == 'Loaded'
def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv):
env = zenith_simple_env

View File

@@ -1,11 +1,4 @@
import pytest
import random
import time
from contextlib import closing
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log
# Test safekeeper sync and pageserver catch up
@@ -17,7 +10,9 @@ def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuil
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_pageserver_catchup_while_compute_down')
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down')
# Make shared_buffers large to ensure we won't query pageserver while it is down.
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down',
config_lines=['shared_buffers=512MB'])
pg_conn = pg.connect()
cur = pg_conn.cursor()

View File

@@ -1,9 +1,3 @@
import pytest
import random
import time
from contextlib import closing
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.log_helper import log

View File

@@ -0,0 +1,183 @@
from contextlib import closing
from fixtures.zenith_fixtures import ZenithEnv
from fixtures.log_helper import log
from psycopg2.errors import UndefinedTable
from psycopg2.errors import IoError
pytest_plugins = ("fixtures.zenith_fixtures")
extensions = ["pageinspect", "zenith_test_utils", "pg_buffercache"]
#
# Validation of reading different page versions
#
def test_read_validation(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli.create_branch("test_read_validation", "empty")
pg = env.postgres.create_start("test_read_validation")
log.info("postgres is running on 'test_read_validation' branch")
with closing(pg.connect()) as con:
with con.cursor() as c:
for e in extensions:
c.execute("create extension if not exists {};".format(e))
c.execute("create table foo (c int) with (autovacuum_enabled = false)")
c.execute("insert into foo values (1)")
c.execute("select lsn, lower, upper from page_header(get_raw_page('foo', 'main', 0));")
first = c.fetchone()
c.execute("select relfilenode from pg_class where relname = 'foo'")
relfilenode = c.fetchone()[0]
c.execute("insert into foo values (2);")
c.execute("select lsn, lower, upper from page_header(get_raw_page('foo', 'main', 0));")
second = c.fetchone()
assert first != second, "Failed to update page"
log.info("Test table is populated, validating buffer cache")
c.execute(
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] > 0, "No buffers cached for the test relation"
c.execute(
"select reltablespace, reldatabase, relfilenode from pg_buffercache where relfilenode = {}"
.format(relfilenode))
reln = c.fetchone()
log.info("Clear buffer cache to ensure no stale pages are brought into the cache")
c.execute("select clear_buffer_cache()")
c.execute(
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "Failed to clear buffer cache"
log.info("Cache is clear, reading stale page version")
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{}'))"
.format(first[0]))
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn"
c.execute(
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "relation buffers detected after invalidation"
log.info("Cache is clear, reading latest page version without cache")
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, NULL))"
)
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
c.execute(
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "relation buffers detected after invalidation"
log.info(
"Cache is clear, reading stale page version without cache using relation identifiers"
)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, '{}' ))"
.format(reln[0], reln[1], reln[2], first[0]))
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn using oid"
log.info(
"Cache is clear, reading latest page version without cache using relation identifiers"
)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, NULL ))"
.format(reln[0], reln[1], reln[2]))
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
c.execute('drop table foo;')
log.info(
"Relation dropped, attempting reading stale page version without cache using relation identifiers"
)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, '{}' ))"
.format(reln[0], reln[1], reln[2], first[0]))
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn using oid"
log.info("Validation page inspect won't allow reading pages of dropped relations")
try:
c.execute("select * from page_header(get_raw_page('foo', 'main', 0));")
assert False, "query should have failed"
except UndefinedTable as e:
log.info("Caught an expected failure: {}".format(e))
def test_read_validation_neg(zenith_simple_env: ZenithEnv):
env = zenith_simple_env
env.zenith_cli.create_branch("test_read_validation_neg", "empty")
pg = env.postgres.create_start("test_read_validation_neg")
log.info("postgres is running on 'test_read_validation_neg' branch")
with closing(pg.connect()) as con:
with con.cursor() as c:
for e in extensions:
c.execute("create extension if not exists {};".format(e))
log.info("read a page of a missing relation")
try:
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0'))"
)
assert False, "query should have failed"
except UndefinedTable as e:
log.info("Caught an expected failure: {}".format(e))
c.execute("create table foo (c int) with (autovacuum_enabled = false)")
c.execute("insert into foo values (1)")
log.info("read a page at lsn 0")
try:
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0'))"
)
assert False, "query should have failed"
except IoError as e:
log.info("Caught an expected failure: {}".format(e))
log.info("Pass NULL as an input")
expected = (None, None, None)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn(NULL, 'main', 0, '0/0'))"
)
assert c.fetchone() == expected, "Expected null output"
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', NULL, 0, '0/0'))"
)
assert c.fetchone() == expected, "Expected null output"
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', NULL, '0/0'))"
)
assert c.fetchone() == expected, "Expected null output"
# This check is currently failing, reading beyond EOF is returning a 0-page
log.info("Read beyond EOF")
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 1, NULL))"
)

View File

@@ -1,12 +1,13 @@
# It's possible to run any regular test with the local fs remote storage via
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/zenith_zzz/'}" poetry ......
import time, shutil, os
import shutil, os
from contextlib import closing
from pathlib import Path
from uuid import UUID
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.zenith_fixtures import ZenithEnvBuilder, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload
from fixtures.log_helper import log
from fixtures.utils import lsn_from_hex
import pytest
@@ -26,7 +27,6 @@ import pytest
# * queries the specific data, ensuring that it matches the one stored before
#
# The tests are done for all types of remote storage pageserver supports.
@pytest.mark.skip(reason="will be fixed with https://github.com/zenithdb/zenith/issues/1193")
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
zenith_env_builder.rust_log_override = 'debug'
@@ -45,6 +45,8 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
env = zenith_env_builder.init_start()
pg = env.postgres.create_start('main')
client = env.pageserver.http_client()
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
@@ -54,13 +56,21 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
CREATE TABLE t1(id int primary key, secret text);
INSERT INTO t1 VALUES ({data_id}, '{data_secret}');
''')
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
# wait until pageserver receives that data
wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant_id} {timeline_id}")
log.info("waiting for upload") # TODO api to check if upload is done
time.sleep(2)
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
log.info("waiting for upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
log.info("upload is done")
##### Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
@@ -73,26 +83,12 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
##### Second start, restore the data and ensure it's the same
env.pageserver.start()
client = env.pageserver.http_client()
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
# FIXME cannot handle duplicate download requests (which might be caused by repeated timeline detail calls)
# subject to fix in https://github.com/zenithdb/zenith/issues/997
time.sleep(5)
log.info("waiting for timeline redownload")
attempts = 0
while True:
timeline_details = client.timeline_detail(UUID(tenant_id), UUID(timeline_id))
assert timeline_details['timeline_id'] == timeline_id
assert timeline_details['tenant_id'] == tenant_id
if timeline_details['kind'] == 'Local':
log.info("timeline downloaded, checking its data")
break
attempts += 1
if attempts > 10:
raise Exception("timeline redownload failed")
log.debug("still waiting")
time.sleep(1)
wait_for(number_of_iterations=10,
interval=1,
func=lambda: assert_local(client, UUID(tenant_id), UUID(timeline_id)))
pg = env.postgres.create_start('main')
with closing(pg.connect()) as conn:

View File

@@ -5,15 +5,15 @@ import subprocess
import threading
from uuid import UUID
from fixtures.log_helper import log
import time
import signal
import pytest
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath, pg_distrib_dir
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir
from fixtures.utils import lsn_from_hex
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
assert abs(a - b) / a < margin_ratio, (a, b, margin_ratio)
assert abs(a - b) / a < margin_ratio, abs(a - b) / a
@contextmanager
@@ -34,6 +34,7 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
f"-c listen_pg_addr='localhost:{pg_port}'",
f"-c listen_http_addr='localhost:{http_port}'",
f"-c pg_distrib_dir='{pg_distrib_dir}'",
f"-c id=2",
f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}",
]
@@ -57,20 +58,6 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
os.kill(pid, signal.SIGQUIT)
def wait_for(number_of_iterations: int, interval: int, func):
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
@contextmanager
def pg_cur(pg):
with closing(pg.connect()) as conn:
@@ -108,13 +95,6 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
log.info('load thread stopped')
def assert_local(pageserver_http_client: ZenithPageserverHttpClient, tenant: UUID, timeline: str):
timeline_detail = pageserver_http_client.timeline_detail(tenant, UUID(timeline))
assert timeline_detail.get('type') == "Local", timeline_detail
return timeline_detail
@pytest.mark.skip(reason="will be fixed with https://github.com/zenithdb/zenith/issues/1193")
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
port_distributor: PortDistributor,
@@ -129,7 +109,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
tenant = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
log.info("tenant to relocate %s", tenant)
env.zenith_cli.create_root_branch('main', tenant_id=tenant)
env.zenith_cli.create_branch('test_tenant_relocation', tenant_id=tenant)
tenant_pg = env.postgres.create_start(branch_name='main',
@@ -141,8 +121,8 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
with conn.cursor() as cur:
# save timeline for later gc call
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
log.info("timeline to relocate %s", timeline)
timeline = UUID(cur.fetchone()[0])
log.info("timeline to relocate %s", timeline.hex)
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -150,6 +130,15 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (500500, )
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
pageserver_http = env.pageserver.http_client()
# wait until pageserver receives that data
wait_for_last_record_lsn(pageserver_http, tenant, timeline, current_lsn)
timeline_detail = assert_local(pageserver_http, tenant, timeline)
if with_load == 'with_load':
# create load table
@@ -165,12 +154,10 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant.hex} {timeline}")
pscur.execute(f"checkpoint {tenant.hex} {timeline.hex}")
# ensure upload is completed
pageserver_http_client = env.pageserver.http_client()
timeline_detail = pageserver_http_client.timeline_detail(tenant, UUID(timeline))
assert timeline_detail['disk_consistent_lsn'] == timeline_detail['timeline_state']['Ready']
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(pageserver_http, tenant, timeline, current_lsn)
log.info("inititalizing new pageserver")
# bootstrap second pageserver
@@ -182,8 +169,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port)
pageserver_bin = pathlib.Path(zenith_binpath) / 'pageserver'
new_pageserver_http_client = ZenithPageserverHttpClient(port=new_pageserver_http_port,
auth_token=None)
new_pageserver_http = ZenithPageserverHttpClient(port=new_pageserver_http_port, auth_token=None)
with new_pageserver_helper(new_pageserver_dir,
pageserver_bin,
@@ -192,25 +178,18 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
new_pageserver_http_port):
# call to attach timeline to new pageserver
new_pageserver_http_client.timeline_attach(tenant, UUID(timeline))
# FIXME cannot handle duplicate download requests, subject to fix in https://github.com/zenithdb/zenith/issues/997
time.sleep(5)
# new pageserver should in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_pageserver_http.timeline_attach(tenant, timeline)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_for(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http_client, tenant, timeline))
assert new_timeline_detail['timeline_state'].get('Ready'), new_timeline_detail
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# when load is active these checks can break because lsns are not static
# so lets check with some margin
if with_load == 'without_load':
# TODO revisit this once https://github.com/zenithdb/zenith/issues/1049 is fixed
assert_abs_margin_ratio(new_timeline_detail['disk_consistent_lsn'],
timeline_detail['disk_consistent_lsn'],
0.01)
assert_abs_margin_ratio(new_timeline_detail['timeline_state']['Ready'],
timeline_detail['timeline_state']['Ready'],
0.01)
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
# callmemaybe to start replication from safekeeper to the new pageserver
# when there is no load there is a clean checkpoint and no wal delta
@@ -219,7 +198,9 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
cur.execute("callmemaybe {} {} {}".format(tenant, timeline, safekeeper_connstring))
cur.execute("callmemaybe {} {} {}".format(tenant.hex,
timeline.hex,
safekeeper_connstring))
tenant_pg.stop()
@@ -239,7 +220,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
# detach tenant from old pageserver before we check
# that all the data is there to be sure that old pageserver
# is no longer involved, and if it is, we will see the errors
pageserver_http_client.timeline_detach(tenant, UUID(timeline))
pageserver_http.timeline_detach(tenant, timeline)
with pg_cur(tenant_pg) as cur:
# check that data is still there
@@ -251,10 +232,10 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
assert cur.fetchone() == (2001000, )
if with_load == 'with_load':
assert load_ok_event.wait(1)
assert load_ok_event.wait(3)
log.info('stopping load thread')
load_stop_event.set()
load_thread.join()
load_thread.join(timeout=10)
log.info('load thread stopped')
# bring old pageserver back for clean shutdown via zenith cli

View File

@@ -1,8 +1,7 @@
from contextlib import closing
from uuid import UUID
import psycopg2.extras
import psycopg2.errors
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres, assert_local
from fixtures.log_helper import log
import time
@@ -13,8 +12,9 @@ def test_timeline_size(zenith_simple_env: ZenithEnv):
new_timeline_id = env.zenith_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
timeline_details = assert_local(client, env.initial_tenant, new_timeline_id)
assert timeline_details['local']['current_logical_size'] == timeline_details['local'][
'current_logical_size_non_incremental']
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
@@ -31,12 +31,16 @@ def test_timeline_size(zenith_simple_env: ZenithEnv):
FROM generate_series(1, 10) g
""")
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute("TRUNCATE foo")
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
# wait until received_lsn_lag is 0
@@ -71,8 +75,9 @@ def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
new_timeline_id = env.zenith_cli.create_branch('test_timeline_size_quota')
client = env.pageserver.http_client()
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
res = assert_local(client, env.initial_tenant, new_timeline_id)
assert res['local']["current_logical_size"] == res['local'][
"current_logical_size_non_incremental"]
pgmain = env.postgres.create_start(
"test_timeline_size_quota",

View File

@@ -13,7 +13,7 @@ from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.utils import etcd_path, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -22,6 +22,7 @@ from typing import List, Optional, Any
# succeed and data is written
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.broker = True
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_normal_work')
@@ -89,29 +90,33 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
sk_metrics = [sk.http_client().get_metrics() for sk in env.safekeepers]
timeline_metrics = []
with env.pageserver.http_client() as pageserver_http:
for timeline_detail in timeline_details:
timeline_id: str = timeline_detail["timeline_id"]
for timeline_detail in timeline_details:
timeline_id: str = timeline_detail["timeline_id"]
m = TimelineMetrics(
timeline_id=timeline_id,
last_record_lsn=lsn_from_hex(timeline_detail["last_record_lsn"]),
)
for sk_m in sk_metrics:
m.flush_lsns.append(sk_m.flush_lsn_inexact[(tenant_id.hex, timeline_id)])
m.commit_lsns.append(sk_m.commit_lsn_inexact[(tenant_id.hex, timeline_id)])
local_timeline_detail = timeline_detail.get('local')
if local_timeline_detail is None:
log.debug(f"Timeline {timeline_id} is not present locally, skipping")
continue
for flush_lsn, commit_lsn in zip(m.flush_lsns, m.commit_lsns):
# Invariant. May be < when transaction is in progress.
assert commit_lsn <= flush_lsn
# We only call collect_metrics() after a transaction is confirmed by
# the compute node, which only happens after a consensus of safekeepers
# has confirmed the transaction. We assume majority consensus here.
assert (2 * sum(m.last_record_lsn <= lsn
for lsn in m.flush_lsns) > zenith_env_builder.num_safekeepers)
assert (2 * sum(m.last_record_lsn <= lsn
for lsn in m.commit_lsns) > zenith_env_builder.num_safekeepers)
timeline_metrics.append(m)
m = TimelineMetrics(
timeline_id=timeline_id,
last_record_lsn=lsn_from_hex(local_timeline_detail['last_record_lsn']),
)
for sk_m in sk_metrics:
m.flush_lsns.append(sk_m.flush_lsn_inexact[(tenant_id.hex, timeline_id)])
m.commit_lsns.append(sk_m.commit_lsn_inexact[(tenant_id.hex, timeline_id)])
for flush_lsn, commit_lsn in zip(m.flush_lsns, m.commit_lsns):
# Invariant. May be < when transaction is in progress.
assert commit_lsn <= flush_lsn, f"timeline_id={timeline_id}, timeline_detail={timeline_detail}, sk_metrics={sk_metrics}"
# We only call collect_metrics() after a transaction is confirmed by
# the compute node, which only happens after a consensus of safekeepers
# has confirmed the transaction. We assume majority consensus here.
assert (2 * sum(m.last_record_lsn <= lsn
for lsn in m.flush_lsns) > zenith_env_builder.num_safekeepers), f"timeline_id={timeline_id}, timeline_detail={timeline_detail}, sk_metrics={sk_metrics}"
assert (2 * sum(m.last_record_lsn <= lsn
for lsn in m.commit_lsns) > zenith_env_builder.num_safekeepers), f"timeline_id={timeline_id}, timeline_detail={timeline_detail}, sk_metrics={sk_metrics}"
timeline_metrics.append(m)
log.info(f"{message}: {timeline_metrics}")
return timeline_metrics
@@ -322,6 +327,49 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
proc.join()
# Test that safekeepers push their info to the broker and learn peer status from it
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
def test_broker(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.broker = True
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_broker", "main")
pg = env.postgres.create_start('test_broker')
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
# learn zenith timeline from compute
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
# wait until remote_consistent_lsn gets advanced on all safekeepers
clients = [sk.http_client() for sk in env.safekeepers]
stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
log.info(f"statuses is {stat_before}")
pg.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
# force checkpoint to advance remote_consistent_lsn
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# and wait till remote_consistent_lsn propagates to all safekeepers
started_at = time.time()
while True:
stat_after = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
if all(
lsn_from_hex(s_after.remote_consistent_lsn) > lsn_from_hex(
s_before.remote_consistent_lsn) for s_after,
s_before in zip(stat_after, stat_before)):
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for remote_consistent_lsn propagation: status before {stat_before}, status current {stat_after}"
)
time.sleep(0.5)
class ProposerPostgres(PgProtocol):
"""Object for running postgres without ZenithEnv"""
def __init__(self,

View File

@@ -1,9 +1,10 @@
import asyncio
import uuid
import asyncpg
import random
import time
from fixtures.zenith_fixtures import ZenithEnvBuilder, Postgres, Safekeeper
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres, Safekeeper
from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List
@@ -30,10 +31,6 @@ class BankClient(object):
await self.conn.execute('DROP TABLE IF EXISTS bank_log')
await self.conn.execute('CREATE TABLE bank_log(from_uid int, to_uid int, amount int)')
# TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed
await self.conn.execute('ALTER TABLE bank_accs SET (autovacuum_enabled = false)')
await self.conn.execute('ALTER TABLE bank_log SET (autovacuum_enabled = false)')
async def check_invariant(self):
row = await self.conn.fetchrow('SELECT sum(amount) AS sum FROM bank_accs')
assert row['sum'] == self.n_accounts * self.init_amount
@@ -139,12 +136,15 @@ async def wait_for_lsn(safekeeper: Safekeeper,
# On each iteration 1 acceptor is stopped, and 2 others should allow
# background workers execute transactions. In the end, state should remain
# consistent.
async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_workers=10):
async def run_restarts_under_load(env: ZenithEnv,
pg: Postgres,
acceptors: List[Safekeeper],
n_workers=10):
n_accounts = 100
init_amount = 100000
max_transfer = 100
period_time = 10
iterations = 6
period_time = 4
iterations = 10
# Set timeout for this test at 5 minutes. It should be enough for test to complete
# and less than CircleCI's no_output_timeout, taking into account that this timeout
@@ -176,6 +176,11 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_w
flush_lsn = lsn_to_hex(flush_lsn)
log.info(f'Postgres flush_lsn {flush_lsn}')
pageserver_lsn = env.pageserver.http_client().timeline_detail(
uuid.UUID(tenant_id), uuid.UUID((timeline_id)))["local"]["last_record_lsn"]
sk_ps_lag = lsn_from_hex(flush_lsn) - lsn_from_hex(pageserver_lsn)
log.info(f'Pageserver last_record_lsn={pageserver_lsn} lag={sk_ps_lag / 1024}kb')
# Wait until alive safekeepers catch up with postgres
for idx, safekeeper in enumerate(acceptors):
if idx != victim_idx:
@@ -203,9 +208,8 @@ def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_wal_acceptors_restarts_under_load')
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load')
# Enable backpressure with 1MB maximal lag, because we don't want to block on `wait_for_lsn()` for too long
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load',
config_lines=['max_replication_write_lag=1MB'])
asyncio.run(run_restarts_under_load(pg, env.safekeepers))
# TODO: Remove when https://github.com/zenithdb/zenith/issues/644 is fixed
pg.stop()
asyncio.run(run_restarts_under_load(env, pg, env.safekeepers))

View File

@@ -0,0 +1,38 @@
import os
import subprocess
from fixtures.utils import mkdir_if_needed
from fixtures.zenith_fixtures import (ZenithEnvBuilder,
VanillaPostgres,
PortDistributor,
PgBin,
base_dir,
vanilla_pg,
pg_distrib_dir)
from fixtures.log_helper import log
def test_wal_restore(zenith_env_builder: ZenithEnvBuilder,
test_output_dir,
port_distributor: PortDistributor):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_restore")
pg = env.postgres.create_start('test_wal_restore')
pg.safe_psql("create table t as select generate_series(1,1000000)")
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
env.zenith_cli.pageserver_stop()
port = port_distributor.get_port()
data_dir = os.path.join(test_output_dir, 'pgsql.restored')
restored = VanillaPostgres(data_dir, PgBin(test_output_dir), port)
subprocess.call([
'bash',
os.path.join(base_dir, 'zenith_utils/scripts/restore_from_wal.sh'),
os.path.join(pg_distrib_dir, 'bin'),
os.path.join(test_output_dir, 'repo/safekeepers/sk1/{}/*'.format(tenant_id)),
data_dir,
str(port)
])
restored.start()
assert restored.safe_psql('select count(*) from t') == [(1000000, )]
restored.stop()

View File

@@ -1,8 +1,6 @@
import json
import uuid
import requests
from psycopg2.extensions import cursor as PgCursor
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
from typing import cast

View File

@@ -1,4 +1,5 @@
import os
import shutil
import subprocess
from typing import Any, List
@@ -76,3 +77,8 @@ def print_gc_result(row):
log.info(
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
.format_map(row))
# path to etcd binary or None if not present.
def etcd_path():
return shutil.which("etcd")

View File

@@ -33,7 +33,7 @@ from typing_extensions import Literal
import requests
import backoff # type: ignore
from .utils import (get_self_dir, mkdir_if_needed, subprocess_capture)
from .utils import (etcd_path, get_self_dir, mkdir_if_needed, subprocess_capture, lsn_from_hex)
from fixtures.log_helper import log
"""
This file contains pytest fixtures. A fixture is a test resource that can be
@@ -257,7 +257,8 @@ class PgProtocol:
dbname: Optional[str] = None,
schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None) -> str:
password: Optional[str] = None,
statement_timeout_ms: Optional[int] = None) -> str:
"""
Build a libpq connection string for the Postgres instance.
"""
@@ -277,16 +278,23 @@ class PgProtocol:
if schema:
res = f"{res} options='-c search_path={schema}'"
if statement_timeout_ms:
res = f"{res} options='-c statement_timeout={statement_timeout_ms}'"
return res
# autocommit=True here by default because that's what we need most of the time
def connect(self,
*,
autocommit=True,
dbname: Optional[str] = None,
schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None) -> PgConnection:
def connect(
self,
*,
autocommit=True,
dbname: Optional[str] = None,
schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
# individual statement timeout in seconds, 2 minutes should be enough for our tests
statement_timeout: Optional[int] = 120
) -> PgConnection:
"""
Connect to the node.
Returns psycopg2's connection object.
@@ -294,12 +302,12 @@ class PgProtocol:
"""
conn = psycopg2.connect(
self.connstr(
dbname=dbname,
schema=schema,
username=username,
password=password,
))
self.connstr(dbname=dbname,
schema=schema,
username=username,
password=password,
statement_timeout_ms=statement_timeout *
1000 if statement_timeout else None))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
return conn
@@ -425,7 +433,8 @@ class ZenithEnvBuilder:
num_safekeepers: int = 0,
pageserver_auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name=DEFAULT_BRANCH_NAME):
default_branch_name=DEFAULT_BRANCH_NAME,
broker: bool = False):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
@@ -434,6 +443,7 @@ class ZenithEnvBuilder:
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.default_branch_name = default_branch_name
self.broker = broker
self.env: Optional[ZenithEnv] = None
self.s3_mock_server: Optional[MockS3Server] = None
@@ -509,6 +519,8 @@ class ZenithEnvBuilder:
self.env.pageserver.stop(immediate=True)
if self.s3_mock_server:
self.s3_mock_server.kill()
if self.env.broker is not None:
self.env.broker.stop()
class ZenithEnv:
@@ -561,6 +573,16 @@ class ZenithEnv:
default_tenant_id = '{self.initial_tenant.hex}'
""")
self.broker = None
if config.broker:
# keep etcd datadir inside 'repo'
self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"),
port=self.port_distributor.get_port(),
peer_port=self.port_distributor.get_port())
toml += textwrap.dedent(f"""
broker_endpoints = 'http://127.0.0.1:{self.broker.port}'
""")
# Create config for pageserver
pageserver_port = PageserverPort(
pg=self.port_distributor.get_port(),
@@ -603,12 +625,15 @@ class ZenithEnv:
self.zenith_cli.init(toml)
def start(self):
# Start up the page server and all the safekeepers
# Start up the page server, all the safekeepers and the broker
self.pageserver.start()
for safekeeper in self.safekeepers:
safekeeper.start()
if self.broker is not None:
self.broker.start()
def get_safekeeper_connstrs(self) -> str:
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
@@ -873,6 +898,30 @@ class ZenithCli:
return uuid.UUID(created_timeline_id)
def create_root_branch(self, branch_name: str, tenant_id: Optional[uuid.UUID] = None):
cmd = [
'timeline',
'create',
'--branch-name',
branch_name,
'--tenant-id',
(tenant_id or self.env.initial_tenant).hex,
]
res = self.raw_cli(cmd)
res.check_returncode()
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group('timeline_id')
if created_timeline_id is None:
raise Exception('could not find timeline id after `zenith timeline create` invocation')
else:
return uuid.UUID(created_timeline_id)
def create_branch(self,
new_branch_name: str = DEFAULT_BRANCH_NAME,
ancestor_branch_name: Optional[str] = None,
@@ -1649,6 +1698,7 @@ class Safekeeper:
class SafekeeperTimelineStatus:
acceptor_epoch: int
flush_lsn: str
remote_consistent_lsn: str
@dataclass
@@ -1672,7 +1722,8 @@ class SafekeeperHttpClient(requests.Session):
res.raise_for_status()
resj = res.json()
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
flush_lsn=resj['flush_lsn'])
flush_lsn=resj['flush_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'])
def get_metrics(self) -> SafekeeperMetrics:
request_result = self.get(f"http://localhost:{self.port}/metrics")
@@ -1693,6 +1744,54 @@ class SafekeeperHttpClient(requests.Session):
return metrics
@dataclass
class Etcd:
""" An object managing etcd instance """
datadir: str
port: int
peer_port: int
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def check_status(self):
s = requests.Session()
s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry
s.get(f"http://localhost:{self.port}/health").raise_for_status()
def start(self):
pathlib.Path(self.datadir).mkdir(exist_ok=True)
etcd_full_path = etcd_path()
if etcd_full_path is None:
raise Exception('etcd not found')
with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file:
args = [
etcd_full_path,
f"--data-dir={self.datadir}",
f"--listen-client-urls=http://localhost:{self.port}",
f"--advertise-client-urls=http://localhost:{self.port}",
f"--listen-peer-urls=http://localhost:{self.peer_port}"
]
self.handle = subprocess.Popen(args, stdout=log_file, stderr=log_file)
# wait for start
started_at = time.time()
while True:
try:
self.check_status()
except Exception as e:
elapsed = time.time() - started_at
if elapsed > 5:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for etcd start: {e}")
time.sleep(0.5)
else:
break # success
def stop(self):
if self.handle is not None:
self.handle.terminate()
self.handle.wait()
def get_test_output_dir(request: Any) -> str:
""" Compute the working directory for an individual test. """
test_name = request.node.name
@@ -1846,3 +1945,63 @@ def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Pos
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, error) == ([], [])
def wait_for(number_of_iterations: int, interval: int, func):
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
def assert_local(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID):
timeline_detail = pageserver_http_client.timeline_detail(tenant, timeline)
assert timeline_detail.get('local', {}).get("disk_consistent_lsn"), timeline_detail
return timeline_detail
def remote_consistent_lsn(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID) -> int:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail['remote']['remote_consistent_lsn']
assert isinstance(lsn_str, str)
return lsn_from_hex(lsn_str)
def wait_for_upload(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID,
lsn: int):
"""waits for local timeline upload up to specified lsn"""
wait_for(10, 1, lambda: remote_consistent_lsn(pageserver_http_client, tenant, timeline) >= lsn)
def last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID) -> int:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail['local']['last_record_lsn']
assert isinstance(lsn_str, str)
return lsn_from_hex(lsn_str)
def wait_for_last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID,
lsn: int):
"""waits for pageserver to catch up to a certain lsn"""
wait_for(10, 1, lambda: last_record_lsn(pageserver_http_client, tenant, timeline) >= lsn)

View File

@@ -49,7 +49,15 @@ def test_random_writes(zenith_with_baseline: PgCompare):
count integer default 0
);
""")
cur.execute(f"INSERT INTO Big (pk) values (generate_series(1,{n_rows}))")
# Insert n_rows in batches to avoid query timeouts
rows_inserted = 0
while rows_inserted < n_rows:
rows_to_insert = min(1000 * 1000, n_rows - rows_inserted)
low = rows_inserted + 1
high = rows_inserted + rows_to_insert
cur.execute(f"INSERT INTO Big (pk) values (generate_series({low},{high}))")
rows_inserted += rows_to_insert
# Get table size (can't be predicted because padding and alignment)
cur.execute("SELECT pg_relation_size('Big');")

View File

@@ -17,8 +17,8 @@ import pytest
# into memory in the page server.
pytest.param(100000, 100, 0),
# Also test with a larger table, with and without parallelism
pytest.param(10000000, 1, 0, marks=pytest.mark.slow),
pytest.param(10000000, 1, 4, marks=pytest.mark.slow)
pytest.param(10000000, 1, 0),
pytest.param(10000000, 1, 4)
])
def test_seqscans(zenith_with_baseline: PgCompare, rows: int, iters: int, workers: int):
env = zenith_with_baseline