mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
reduce flakiness
This commit is contained in:
committed by
Dmitry Rodionov
parent
37c440c5d3
commit
39591ef627
@@ -12,6 +12,10 @@ import pytest
|
||||
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath, pg_distrib_dir
|
||||
|
||||
|
||||
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
|
||||
assert abs(a - b) / a < margin_ratio, (a, b, margin_ratio)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
|
||||
pageserver_bin: pathlib.Path,
|
||||
@@ -95,7 +99,9 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
|
||||
# if we recovered after failure verify that we have correct number of rows
|
||||
log.info("recovering at %s", inserted_ctr)
|
||||
cur.execute("SELECT count(*) FROM load")
|
||||
assert cur.fetchone()[0] == inserted_ctr
|
||||
# it seems that sometimes transaction gets commited before we can acknowledge
|
||||
# the result, so sometimes selected value is larger by one than we expect
|
||||
assert cur.fetchone()[0] - inserted_ctr <= 1
|
||||
log.info("successfully recovered %s", inserted_ctr)
|
||||
failed = False
|
||||
load_ok_event.set()
|
||||
@@ -197,22 +203,24 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
|
||||
func=lambda: assert_local(new_pageserver_http_client, tenant, timeline))
|
||||
assert new_timeline_detail['timeline_state'].get('Ready'), new_timeline_detail
|
||||
# when load is active these checks can break because lsns are not static
|
||||
# so lets check with some margin
|
||||
if with_load == 'without_load':
|
||||
assert new_timeline_detail['disk_consistent_lsn'] == timeline_detail[
|
||||
'disk_consistent_lsn']
|
||||
assert new_timeline_detail['timeline_state']['Ready'] == timeline_detail[
|
||||
'timeline_state']['Ready']
|
||||
# TODO revisit this once https://github.com/zenithdb/zenith/issues/1049 is fixed
|
||||
assert_abs_margin_ratio(new_timeline_detail['disk_consistent_lsn'],
|
||||
timeline_detail['disk_consistent_lsn'],
|
||||
0.01)
|
||||
assert_abs_margin_ratio(new_timeline_detail['timeline_state']['Ready'],
|
||||
timeline_detail['timeline_state']['Ready'],
|
||||
0.01)
|
||||
|
||||
# callmemaybe to start replication from safekeeper to the new pageserver
|
||||
# when there is no load there is a clean checkpoint and no wal delta
|
||||
# needs to be streamed to the new pageserver
|
||||
# TODO (rodionov) use attach to start replication
|
||||
with closing(PgProtocol(host='localhost',
|
||||
port=new_pageserver_pg_port).connect()) as new_pageserver_pg:
|
||||
with new_pageserver_pg.cursor() as cur:
|
||||
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
|
||||
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
|
||||
cur.execute("callmemaybe {} {} {}".format(tenant, timeline, safekeeper_connstring))
|
||||
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
|
||||
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
|
||||
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
|
||||
cur.execute("callmemaybe {} {} {}".format(tenant, timeline, safekeeper_connstring))
|
||||
|
||||
tenant_pg.stop()
|
||||
|
||||
@@ -234,18 +242,17 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
|
||||
# is no longer involved, and if it is, we will see the errors
|
||||
pageserver_http_client.timeline_detach(UUID(tenant), UUID(timeline))
|
||||
|
||||
with closing(tenant_pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# check that data is still there
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (500500, )
|
||||
# check that we can write new data
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1001,2000), 'some payload'")
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (2001000, )
|
||||
with pg_cur(tenant_pg) as cur:
|
||||
# check that data is still there
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (500500, )
|
||||
# check that we can write new data
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1001,2000), 'some payload'")
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (2001000, )
|
||||
|
||||
if with_load == 'with_load':
|
||||
assert load_ok_event.is_set()
|
||||
assert load_ok_event.wait(1)
|
||||
log.info('stopping load thread')
|
||||
load_stop_event.set()
|
||||
load_thread.join()
|
||||
|
||||
Reference in New Issue
Block a user