mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-14 22:00:38 +00:00
Compare commits
2 Commits
split-prox
...
test-ps-ca
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f164085515 | ||
|
|
cc3acd4201 |
63
test_runner/fixtures/sk_utils.py
Normal file
63
test_runner/fixtures/sk_utils.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
# safekeeper utils
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
from fixtures.log_helper import log
|
||||||
|
from fixtures.neon_fixtures import Safekeeper, get_dir_size
|
||||||
|
from fixtures.types import Lsn, TenantId, TimelineId
|
||||||
|
|
||||||
|
|
||||||
|
# my grammar guard startles at the name, but let's consider it is a shortening from "is it true that ...."
|
||||||
|
def is_segs_not_exist(segs, http_cli, tenant_id, timeline_id):
|
||||||
|
segs_existense = [f"{f}: {os.path.exists(f)}" for f in segs]
|
||||||
|
log.info(
|
||||||
|
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}, segments_existence: {segs_existense}"
|
||||||
|
)
|
||||||
|
return all(not os.path.exists(p) for p in segs)
|
||||||
|
|
||||||
|
|
||||||
|
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
|
||||||
|
http_cli = sk.http_client()
|
||||||
|
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||||
|
log.info(f"sk status is {tli_status}")
|
||||||
|
return tli_status.flush_lsn >= lsn
|
||||||
|
|
||||||
|
|
||||||
|
def is_commit_lsn_equals_flush_lsn(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId):
|
||||||
|
http_cli = sk.http_client()
|
||||||
|
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||||
|
log.info(f"sk status is {tli_status}")
|
||||||
|
return tli_status.flush_lsn == tli_status.commit_lsn
|
||||||
|
|
||||||
|
|
||||||
|
def is_segment_offloaded(
|
||||||
|
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
|
||||||
|
):
|
||||||
|
http_cli = sk.http_client()
|
||||||
|
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||||
|
log.info(f"sk status is {tli_status}")
|
||||||
|
return tli_status.backup_lsn >= seg_end
|
||||||
|
|
||||||
|
|
||||||
|
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
|
||||||
|
http_cli = sk.http_client()
|
||||||
|
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||||
|
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
|
||||||
|
sk_wal_size_mb = sk_wal_size / 1024 / 1024
|
||||||
|
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
|
||||||
|
return sk_wal_size_mb <= target_size_mb
|
||||||
|
|
||||||
|
|
||||||
|
# Wait for something, defined as f() returning True, raising error if this
|
||||||
|
# doesn't happen without timeout seconds.
|
||||||
|
# TODO: unite with wait_until preserving good logs
|
||||||
|
def wait(f, desc, timeout=30):
|
||||||
|
started_at = time.time()
|
||||||
|
while True:
|
||||||
|
if f():
|
||||||
|
break
|
||||||
|
elapsed = time.time() - started_at
|
||||||
|
if elapsed > timeout:
|
||||||
|
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
|
||||||
|
time.sleep(0.5)
|
||||||
@@ -1,4 +1,8 @@
|
|||||||
|
import time
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||||
|
from fixtures.sk_utils import is_commit_lsn_equals_flush_lsn, wait
|
||||||
|
|
||||||
|
|
||||||
# Test safekeeper sync and pageserver catch up
|
# Test safekeeper sync and pageserver catch up
|
||||||
@@ -9,7 +13,9 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
|
|||||||
neon_env_builder.num_safekeepers = 3
|
neon_env_builder.num_safekeepers = 3
|
||||||
env = neon_env_builder.init_start()
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
|
tenant_id = env.initial_tenant
|
||||||
|
|
||||||
|
timeline_id = env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
|
||||||
# Make shared_buffers large to ensure we won't query pageserver while it is down.
|
# Make shared_buffers large to ensure we won't query pageserver while it is down.
|
||||||
endpoint = env.endpoints.create_start(
|
endpoint = env.endpoints.create_start(
|
||||||
"test_pageserver_catchup_while_compute_down", config_lines=["shared_buffers=512MB"]
|
"test_pageserver_catchup_while_compute_down", config_lines=["shared_buffers=512MB"]
|
||||||
@@ -45,12 +51,26 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
|
|||||||
FROM generate_series(1, 10000) g
|
FROM generate_series(1, 10000) g
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
endpoint.stop()
|
||||||
|
|
||||||
|
# wait until safekeepers catch up. This forces/tests fast path which avoids
|
||||||
|
# sync-safekeepers on next compute start.
|
||||||
|
for sk in env.safekeepers:
|
||||||
|
wait(
|
||||||
|
partial(is_commit_lsn_equals_flush_lsn, sk, tenant_id, timeline_id),
|
||||||
|
"commit_lsn to reach flush_lsn",
|
||||||
|
)
|
||||||
|
|
||||||
# stop safekeepers gracefully
|
# stop safekeepers gracefully
|
||||||
env.safekeepers[0].stop()
|
env.safekeepers[0].stop()
|
||||||
env.safekeepers[1].stop()
|
env.safekeepers[1].stop()
|
||||||
env.safekeepers[2].stop()
|
env.safekeepers[2].stop()
|
||||||
|
|
||||||
|
# Wait until in flight messages to broker arrive so pageserver won't know
|
||||||
|
# where to connect if timeline is not activated on safekeeper after restart
|
||||||
|
# -- we had such a bug once.
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
# start everything again
|
# start everything again
|
||||||
# safekeepers must synchronize and pageserver must catch up
|
# safekeepers must synchronize and pageserver must catch up
|
||||||
env.pageserver.start()
|
env.pageserver.start()
|
||||||
@@ -59,7 +79,7 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
|
|||||||
env.safekeepers[2].start()
|
env.safekeepers[2].start()
|
||||||
|
|
||||||
# restart compute node
|
# restart compute node
|
||||||
endpoint.stop_and_destroy().create_start("test_pageserver_catchup_while_compute_down")
|
endpoint = env.endpoints.create_start("test_pageserver_catchup_while_compute_down")
|
||||||
|
|
||||||
# Ensure that basebackup went correct and pageserver returned all data
|
# Ensure that basebackup went correct and pageserver returned all data
|
||||||
pg_conn = endpoint.connect()
|
pg_conn = endpoint.connect()
|
||||||
|
|||||||
@@ -40,6 +40,13 @@ from fixtures.remote_storage import (
|
|||||||
RemoteStorageUsers,
|
RemoteStorageUsers,
|
||||||
available_remote_storages,
|
available_remote_storages,
|
||||||
)
|
)
|
||||||
|
from fixtures.sk_utils import (
|
||||||
|
is_flush_lsn_caught_up,
|
||||||
|
is_segment_offloaded,
|
||||||
|
is_segs_not_exist,
|
||||||
|
is_wal_trimmed,
|
||||||
|
wait,
|
||||||
|
)
|
||||||
from fixtures.types import Lsn, TenantId, TimelineId
|
from fixtures.types import Lsn, TenantId, TimelineId
|
||||||
from fixtures.utils import get_dir_size, query_scalar, start_in_background
|
from fixtures.utils import get_dir_size, query_scalar, start_in_background
|
||||||
|
|
||||||
@@ -385,54 +392,11 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
|||||||
|
|
||||||
# wait till first segment is removed on all safekeepers
|
# wait till first segment is removed on all safekeepers
|
||||||
wait(
|
wait(
|
||||||
lambda first_segments=first_segments: all(not os.path.exists(p) for p in first_segments),
|
partial(is_segs_not_exist, first_segments, http_cli, tenant_id, timeline_id),
|
||||||
"first segment get removed",
|
"first segment get removed",
|
||||||
wait_f=lambda http_cli=http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
|
|
||||||
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}"
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# Wait for something, defined as f() returning True, raising error if this
|
|
||||||
# doesn't happen without timeout seconds, and calling wait_f while waiting.
|
|
||||||
def wait(f, desc, timeout=30, wait_f=None):
|
|
||||||
started_at = time.time()
|
|
||||||
while True:
|
|
||||||
if f():
|
|
||||||
break
|
|
||||||
elapsed = time.time() - started_at
|
|
||||||
if elapsed > timeout:
|
|
||||||
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
|
|
||||||
time.sleep(0.5)
|
|
||||||
if wait_f is not None:
|
|
||||||
wait_f()
|
|
||||||
|
|
||||||
|
|
||||||
def is_segment_offloaded(
|
|
||||||
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
|
|
||||||
):
|
|
||||||
http_cli = sk.http_client()
|
|
||||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
|
||||||
log.info(f"sk status is {tli_status}")
|
|
||||||
return tli_status.backup_lsn >= seg_end
|
|
||||||
|
|
||||||
|
|
||||||
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
|
|
||||||
http_cli = sk.http_client()
|
|
||||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
|
||||||
log.info(f"sk status is {tli_status}")
|
|
||||||
return tli_status.flush_lsn >= lsn
|
|
||||||
|
|
||||||
|
|
||||||
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
|
|
||||||
http_cli = sk.http_client()
|
|
||||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
|
||||||
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
|
|
||||||
sk_wal_size_mb = sk_wal_size / 1024 / 1024
|
|
||||||
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
|
|
||||||
return sk_wal_size_mb <= target_size_mb
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||||
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
|
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
|
||||||
neon_env_builder.num_safekeepers = 3
|
neon_env_builder.num_safekeepers = 3
|
||||||
|
|||||||
Reference in New Issue
Block a user