Compare commits

...

2 Commits

Author SHA1 Message Date
Arseny Sher
f164085515 Move sk waiting functions to sk_utils.py
Triggered by one of these is being used now outside test_wal_acceptor.py
2023-08-31 07:58:42 +03:00
Arseny Sher
cc3acd4201 Make test_pageserver_catchup_while_compute_down exercise fastpath.
Fastpath here means avoiding sync-safekeepers on compute start. This discovers a
bug which we had with not (re)activing timeline on safekeeper restart, fixed by
87f7d6bce3.
2023-08-30 14:14:11 +03:00
3 changed files with 93 additions and 46 deletions

View File

@@ -0,0 +1,63 @@
# safekeeper utils
import os
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import Safekeeper, get_dir_size
from fixtures.types import Lsn, TenantId, TimelineId
# my grammar guard startles at the name, but let's consider it is a shortening from "is it true that ...."
def is_segs_not_exist(segs, http_cli, tenant_id, timeline_id):
segs_existense = [f"{f}: {os.path.exists(f)}" for f in segs]
log.info(
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}, segments_existence: {segs_existense}"
)
return all(not os.path.exists(p) for p in segs)
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn >= lsn
def is_commit_lsn_equals_flush_lsn(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn == tli_status.commit_lsn
def is_segment_offloaded(
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.backup_lsn >= seg_end
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
# Wait for something, defined as f() returning True, raising error if this
# doesn't happen without timeout seconds.
# TODO: unite with wait_until preserving good logs
def wait(f, desc, timeout=30):
started_at = time.time()
while True:
if f():
break
elapsed = time.time() - started_at
if elapsed > timeout:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
time.sleep(0.5)

View File

@@ -1,4 +1,8 @@
import time
from functools import partial
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.sk_utils import is_commit_lsn_equals_flush_lsn, wait
# Test safekeeper sync and pageserver catch up
@@ -9,7 +13,9 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
# Make shared_buffers large to ensure we won't query pageserver while it is down.
endpoint = env.endpoints.create_start(
"test_pageserver_catchup_while_compute_down", config_lines=["shared_buffers=512MB"]
@@ -45,12 +51,26 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
FROM generate_series(1, 10000) g
"""
)
endpoint.stop()
# wait until safekeepers catch up. This forces/tests fast path which avoids
# sync-safekeepers on next compute start.
for sk in env.safekeepers:
wait(
partial(is_commit_lsn_equals_flush_lsn, sk, tenant_id, timeline_id),
"commit_lsn to reach flush_lsn",
)
# stop safekeepers gracefully
env.safekeepers[0].stop()
env.safekeepers[1].stop()
env.safekeepers[2].stop()
# Wait until in flight messages to broker arrive so pageserver won't know
# where to connect if timeline is not activated on safekeeper after restart
# -- we had such a bug once.
time.sleep(1)
# start everything again
# safekeepers must synchronize and pageserver must catch up
env.pageserver.start()
@@ -59,7 +79,7 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
env.safekeepers[2].start()
# restart compute node
endpoint.stop_and_destroy().create_start("test_pageserver_catchup_while_compute_down")
endpoint = env.endpoints.create_start("test_pageserver_catchup_while_compute_down")
# Ensure that basebackup went correct and pageserver returned all data
pg_conn = endpoint.connect()

View File

@@ -40,6 +40,13 @@ from fixtures.remote_storage import (
RemoteStorageUsers,
available_remote_storages,
)
from fixtures.sk_utils import (
is_flush_lsn_caught_up,
is_segment_offloaded,
is_segs_not_exist,
is_wal_trimmed,
wait,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background
@@ -385,54 +392,11 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
# wait till first segment is removed on all safekeepers
wait(
lambda first_segments=first_segments: all(not os.path.exists(p) for p in first_segments),
partial(is_segs_not_exist, first_segments, http_cli, tenant_id, timeline_id),
"first segment get removed",
wait_f=lambda http_cli=http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}"
),
)
# Wait for something, defined as f() returning True, raising error if this
# doesn't happen without timeout seconds, and calling wait_f while waiting.
def wait(f, desc, timeout=30, wait_f=None):
started_at = time.time()
while True:
if f():
break
elapsed = time.time() - started_at
if elapsed > timeout:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
time.sleep(0.5)
if wait_f is not None:
wait_f()
def is_segment_offloaded(
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.backup_lsn >= seg_end
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"sk status is {tli_status}")
return tli_status.flush_lsn >= lsn
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3