mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 03:50:37 +00:00
Introduce safekeeper peer recovery.
Implements fetching of WAL by safekeeper from another safekeeper by imitating behaviour of last elected leader. This allows to avoid WAL accumulation on compute and facilitates faster compute startup as it doesn't need to download any WAL. Actually removing WAL download in walproposer is a matter of another patch though. There is a per timeline task which always runs, checking regularly if it should start recovery frome someone, meaning there is something to fetch and there is no streaming compute. It then proceeds with fetching, finishing when there is nothing more to receive. Implements https://github.com/neondatabase/neon/pull/4875
This commit is contained in:
@@ -980,6 +980,81 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
|
||||
endpoint.start()
|
||||
|
||||
|
||||
# is timeline flush_lsn equal on provided safekeepers?
|
||||
def is_flush_lsn_aligned(sk1_http_cli, sk2_http_cli, tenant_id, timeline_id):
|
||||
return (
|
||||
sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
== sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
)
|
||||
|
||||
|
||||
# Test behaviour with one safekeeper down and missing a lot of WAL. Namely, that
|
||||
# 1) walproposer can't recover node if it misses WAL written by previous computes, but
|
||||
# still starts up and functions normally if two other sks are ok.
|
||||
# 2) walproposer doesn't keep WAL after some threshold (pg_wal bloat is limited), but functions
|
||||
# normally if two other sks are ok.
|
||||
# 3) Lagged safekeeper can still recover by peer recovery.
|
||||
def test_one_sk_down(neon_env_builder: NeonEnvBuilder):
|
||||
pass
|
||||
|
||||
|
||||
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
|
||||
# it works without compute at all.
|
||||
def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_peer_recovery")
|
||||
endpoint = env.endpoints.create_start("test_peer_recovery")
|
||||
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
|
||||
sk1 = env.safekeepers[0]
|
||||
sk1.stop()
|
||||
|
||||
# roughly fills one segment
|
||||
endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'")
|
||||
|
||||
endpoint.stop() # stop compute
|
||||
|
||||
# now start safekeeper, but with peer recovery disabled
|
||||
sk1.start(extra_opts=["--peer-recovery=false"])
|
||||
# it should lag for about a segment
|
||||
sk1_http_cli = sk1.http_client()
|
||||
sk2 = env.safekeepers[1]
|
||||
sk2_http_cli = sk2.http_client()
|
||||
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
f"flush_lsns after insertion: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
|
||||
)
|
||||
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
|
||||
|
||||
# wait a bit, lsns shouldn't change
|
||||
# time.sleep(5)
|
||||
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
f"flush_lsns after waiting: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
|
||||
)
|
||||
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
|
||||
|
||||
# now restart safekeeper with peer recovery enabled and wait for recovery
|
||||
sk1.stop().start()
|
||||
wait(
|
||||
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
|
||||
"flush_lsn to get aligned",
|
||||
wait_f=lambda sk1_http_cli=sk1_http_cli, sk2_http_cli=sk2_http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
|
||||
f"waiting for flush_lsn alignment, sk1.flush_lsn={sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}, sk2.flush_lsn={sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}"
|
||||
),
|
||||
)
|
||||
# stop one of safekeepers which weren't recovering and insert a bit more
|
||||
env.safekeepers[2].stop()
|
||||
endpoint = env.endpoints.create_start("test_peer_recovery")
|
||||
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
|
||||
|
||||
|
||||
class SafekeeperEnv:
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user