mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
Add test_lagging_sk.
This commit is contained in:
@@ -365,6 +365,12 @@ class PgProtocol:
|
||||
result.append(cur.fetchall())
|
||||
return result
|
||||
|
||||
def safe_psql_scalar(self, query) -> Any:
|
||||
"""
|
||||
Execute query returning single row with single column.
|
||||
"""
|
||||
return self.safe_psql(query)[0][0]
|
||||
|
||||
|
||||
@dataclass
|
||||
class AuthKeys:
|
||||
@@ -2733,6 +2739,13 @@ class Endpoint(PgProtocol):
|
||||
):
|
||||
self.stop()
|
||||
|
||||
# Checkpoints running endpoint and returns pg_wal size in MB.
|
||||
def get_pg_wal_size(self):
|
||||
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
|
||||
self.safe_psql("checkpoint")
|
||||
assert self.pgdata_dir is not None # please mypy
|
||||
return get_dir_size(os.path.join(self.pgdata_dir, "pg_wal")) / 1024 / 1024
|
||||
|
||||
|
||||
class EndpointFactory:
|
||||
"""An object representing multiple compute endpoints."""
|
||||
@@ -2931,6 +2944,13 @@ class Safekeeper:
|
||||
return segments
|
||||
|
||||
|
||||
# Walreceiver as returned by sk's timeline status endpoint.
|
||||
@dataclass
|
||||
class Walreceiver:
|
||||
conn_id: int
|
||||
state: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperTimelineStatus:
|
||||
acceptor_epoch: int
|
||||
@@ -2941,6 +2961,7 @@ class SafekeeperTimelineStatus:
|
||||
backup_lsn: Lsn
|
||||
peer_horizon_lsn: Lsn
|
||||
remote_consistent_lsn: Lsn
|
||||
walreceivers: List[Walreceiver]
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -3002,6 +3023,7 @@ class SafekeeperHttpClient(requests.Session):
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
|
||||
res.raise_for_status()
|
||||
resj = res.json()
|
||||
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
|
||||
return SafekeeperTimelineStatus(
|
||||
acceptor_epoch=resj["acceptor_state"]["epoch"],
|
||||
pg_version=resj["pg_info"]["pg_version"],
|
||||
@@ -3011,6 +3033,7 @@ class SafekeeperHttpClient(requests.Session):
|
||||
backup_lsn=Lsn(resj["backup_lsn"]),
|
||||
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
|
||||
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
|
||||
walreceivers=walreceivers,
|
||||
)
|
||||
|
||||
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
|
||||
|
||||
@@ -419,7 +419,8 @@ def wait(f, desc, timeout=30, wait_f=None):
|
||||
try:
|
||||
if f():
|
||||
break
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
log.info(f"got exception while waiting for {desc}: {e}")
|
||||
pass
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > timeout:
|
||||
@@ -1001,8 +1002,40 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
|
||||
endpoint.start()
|
||||
|
||||
|
||||
# Context manager which logs passed time on exit.
|
||||
class DurationLogger:
|
||||
def __init__(self, desc):
|
||||
self.desc = desc
|
||||
|
||||
def __enter__(self):
|
||||
self.ts_before = time.time()
|
||||
|
||||
def __exit__(self, *exc):
|
||||
log.info(f"{self.desc} finished in {time.time() - self.ts_before}s")
|
||||
|
||||
|
||||
# Context manager which logs WAL position change on exit.
|
||||
class WalChangeLogger:
|
||||
def __init__(self, ep, desc_before):
|
||||
self.ep = ep
|
||||
self.desc_before = desc_before
|
||||
|
||||
def __enter__(self):
|
||||
self.ts_before = time.time()
|
||||
self.lsn_before = Lsn(self.ep.safe_psql_scalar("select pg_current_wal_lsn()"))
|
||||
log.info(f"{self.desc_before}, lsn_before={self.lsn_before}")
|
||||
|
||||
def __exit__(self, *exc):
|
||||
lsn_after = Lsn(self.ep.safe_psql_scalar("select pg_current_wal_lsn()"))
|
||||
log.info(
|
||||
f"inserted {((lsn_after - self.lsn_before) / 1024 / 1024):.3f} MB of WAL in {(time.time() - self.ts_before):.3f}s"
|
||||
)
|
||||
|
||||
|
||||
# Test that we can create timeline with one safekeeper down and initialize it
|
||||
# later when some data already had been written.
|
||||
# later when some data already had been written. It is strictly weaker than
|
||||
# test_lagging_sk, but also is the simplest test to trigger WAL sk -> compute
|
||||
# download (recovery) and as such useful for development/testing.
|
||||
def test_late_init(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -1010,12 +1043,13 @@ def test_late_init(neon_env_builder: NeonEnvBuilder):
|
||||
sk1 = env.safekeepers[0]
|
||||
sk1.stop()
|
||||
|
||||
# create and insert smth while safekeeper is down...
|
||||
env.neon_cli.create_branch("test_late_init")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_late_init")
|
||||
endpoint = env.endpoints.create_start("test_late_init")
|
||||
# create and insert smth while safekeeper is down...
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
|
||||
log.info("insert with safekeeper down done")
|
||||
with WalChangeLogger(endpoint, "doing insert with sk1 down"):
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
|
||||
endpoint.stop() # stop compute
|
||||
|
||||
# stop another safekeeper, and start one which missed timeline creation
|
||||
@@ -1024,28 +1058,213 @@ def test_late_init(neon_env_builder: NeonEnvBuilder):
|
||||
sk1.start()
|
||||
|
||||
# insert some more
|
||||
endpoint = env.endpoints.create_start("test_late_init")
|
||||
with DurationLogger("recovery"):
|
||||
endpoint = env.endpoints.create_start("test_late_init")
|
||||
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
|
||||
|
||||
wait_flush_lsn_align_by_ep(
|
||||
env, "test_late_init", tenant_id, timeline_id, endpoint, [sk1, env.safekeepers[2]]
|
||||
)
|
||||
# Check that WALs are the same.
|
||||
cmp_sk_wal([sk1, env.safekeepers[2]], tenant_id, timeline_id)
|
||||
|
||||
|
||||
# is timeline flush_lsn equal on provided safekeepers?
|
||||
def is_flush_lsn_aligned(sk1_http_cli, sk2_http_cli, tenant_id, timeline_id):
|
||||
status1 = sk1_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
status2 = sk2_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
f"waiting for flush_lsn alignment, sk1.flush_lsn={status1.flush_lsn}, sk2.flush_lsn={status2.flush_lsn}"
|
||||
def is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
|
||||
flush_lsns = [
|
||||
sk_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
for sk_http_cli in sk_http_clis
|
||||
]
|
||||
log.info(f"waiting for flush_lsn alignment, flush_lsns={flush_lsns}")
|
||||
return all([flush_lsns[0] == flsn for flsn in flush_lsns])
|
||||
|
||||
|
||||
def are_walreceivers_absent(sk_http_cli, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
status = sk_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
|
||||
return len(status.walreceivers) == 0
|
||||
|
||||
|
||||
# Assert by xxd that WAL on given safekeepers is identical. No compute must be
|
||||
# running for this to be reliable.
|
||||
def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
|
||||
assert len(sks) >= 2, "cmp_sk_wal makes sense with >= 2 safekeepers passed"
|
||||
sk_http_clis = [sk.http_client() for sk in sks]
|
||||
|
||||
# First check that term / flush_lsn are the same: it is easier to
|
||||
# report/understand if WALs are different due to that.
|
||||
statuses = [sk_http_cli.timeline_status(tenant_id, timeline_id) for sk_http_cli in sk_http_clis]
|
||||
term_flush_lsns = [(s.acceptor_epoch, s.flush_lsn) for s in statuses]
|
||||
for tfl, sk in zip(term_flush_lsns[1:], sks[1:]):
|
||||
assert (
|
||||
term_flush_lsns[0] == tfl
|
||||
), f"(term, flush_lsn) are not equal on sks {sks[0].id} and {sk.id}: {term_flush_lsns[0]} != {tfl}"
|
||||
|
||||
# check that WALs are identic.
|
||||
segs = [sk.list_segments(tenant_id, timeline_id) for sk in sks]
|
||||
for cmp_segs, sk in zip(segs[1:], sks[1:]):
|
||||
assert (
|
||||
segs[0] == cmp_segs
|
||||
), f"lists of segments on sks {sks[0].id} and {sk.id} are not identic: {segs[0]} and {cmp_segs}"
|
||||
log.info(f"comparing segs {segs[0]}")
|
||||
|
||||
sk0 = sks[0]
|
||||
for sk in sks[1:]:
|
||||
(_, mismatch, not_regular) = filecmp.cmpfiles(
|
||||
sk0.timeline_dir(tenant_id, timeline_id),
|
||||
sk.timeline_dir(tenant_id, timeline_id),
|
||||
segs[0],
|
||||
shallow=False,
|
||||
)
|
||||
log.info(
|
||||
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
|
||||
)
|
||||
|
||||
for f in mismatch:
|
||||
f1 = os.path.join(sk0.timeline_dir(tenant_id, timeline_id), f)
|
||||
f2 = os.path.join(sk.timeline_dir(tenant_id, timeline_id), f)
|
||||
stdout_filename = "{}.filediff".format(f2)
|
||||
|
||||
with open(stdout_filename, "w") as stdout_f:
|
||||
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
|
||||
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
|
||||
|
||||
cmd = "diff {}.hex {}.hex".format(f1, f2)
|
||||
subprocess.run([cmd], stdout=stdout_f, shell=True)
|
||||
|
||||
assert (mismatch, not_regular) == (
|
||||
[],
|
||||
[],
|
||||
), f"WAL segs {f1} and {f2} on sks {sks[0].id} and {sk.id} are not identic"
|
||||
|
||||
|
||||
# Wait until flush_lsn on given sks becomes equal, assuming endpoint ep is
|
||||
# running. ep is stopped by this function. This is used in tests which check
|
||||
# binary equality of WAL segments on safekeepers; which is inherently racy as
|
||||
# shutting down endpoint might always write some WAL which can get to only one
|
||||
# safekeeper. So here we recheck flush_lsn again after ep shutdown and retry if
|
||||
# it has changed.
|
||||
def wait_flush_lsn_align_by_ep(env, branch, tenant_id, timeline_id, ep, sks):
|
||||
sk_http_clis = [sk.http_client() for sk in sks]
|
||||
# First wait for the alignment.
|
||||
wait(
|
||||
partial(is_flush_lsn_aligned, sk_http_clis, tenant_id, timeline_id),
|
||||
"flush_lsn to get aligned",
|
||||
)
|
||||
return status1.flush_lsn == status2.flush_lsn
|
||||
ep.stop() # then stop endpoint
|
||||
# Even if there is no compute, there might be some in flight data; ensure
|
||||
# all walreceivers die before rechecking.
|
||||
for sk_http_cli in sk_http_clis:
|
||||
wait(
|
||||
partial(are_walreceivers_absent, sk_http_cli, tenant_id, timeline_id),
|
||||
"walreceivers to be gone",
|
||||
)
|
||||
# Now recheck again flush_lsn and exit if it is good
|
||||
if is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
|
||||
return
|
||||
# Otherwise repeat.
|
||||
log.info("flush_lsn changed during endpoint shutdown; retrying alignment")
|
||||
ep = env.endpoints.create_start(branch)
|
||||
|
||||
|
||||
# Test behaviour with one safekeeper down and missing a lot of WAL. Namely, that
|
||||
# 1) walproposer can't recover node if it misses WAL written by previous computes, but
|
||||
# still starts up and functions normally if two other sks are ok.
|
||||
# 2) walproposer doesn't keep WAL after some threshold (pg_wal bloat is limited), but functions
|
||||
# normally if two other sks are ok.
|
||||
# 3) Lagged safekeeper can still recover by peer recovery.
|
||||
def test_one_sk_down(neon_env_builder: NeonEnvBuilder):
|
||||
pass
|
||||
# Test behaviour with one safekeeper down and missing a lot of WAL, exercising
|
||||
# neon_walreader and checking that pg_wal never bloats. Namely, ensures that
|
||||
# compute doesn't keep many WAL for lagging sk, but still can recover it with
|
||||
# neon_walreader, in two scenarious: a) WAL never existed on compute (it started
|
||||
# on basebackup LSN later than lagging sk position) though segment file exists
|
||||
# b) WAL had been recycled on it and segment file doesn't exist.
|
||||
#
|
||||
# Also checks along the way that whenever there are two sks alive, compute
|
||||
# should be able to commit.
|
||||
def test_lagging_sk(neon_env_builder: NeonEnvBuilder):
|
||||
# inserts ~20MB of WAL, a bit more than a segment.
|
||||
def fill_segment(ep):
|
||||
ep.safe_psql("insert into t select generate_series(1, 180000), 'payload'")
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
(sk1, sk2, sk3) = env.safekeepers
|
||||
|
||||
# create and insert smth while safekeeper is down...
|
||||
sk1.stop()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_lagging_sk")
|
||||
ep = env.endpoints.create_start("test_lagging_sk")
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
# make small insert to be on the same segment
|
||||
ep.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
|
||||
log.info("insert with safekeeper down done")
|
||||
ep.stop() # stop compute
|
||||
|
||||
# Stop another safekeeper, and start one which missed timeline creation.
|
||||
sk2.stop()
|
||||
sk1.start()
|
||||
|
||||
# Start new ep and insert some more. neon_walreader should download WAL for
|
||||
# sk1 because it should be filled since the horizon (initial LSN) which is
|
||||
# earlier than basebackup LSN.
|
||||
ep = env.endpoints.create_start("test_lagging_sk")
|
||||
ep.safe_psql("insert into t select generate_series(1,100), 'payload'")
|
||||
# stop ep and ensure WAL is identical after recovery.
|
||||
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk1, sk3])
|
||||
# Check that WALs are the same.
|
||||
cmp_sk_wal([sk1, sk3], tenant_id, timeline_id)
|
||||
|
||||
# Now repeat insertion with sk1 down, but with inserting more data to check
|
||||
# that WAL on compute is removed.
|
||||
sk1.stop()
|
||||
sk2.start()
|
||||
|
||||
# min_wal_size must be at least 2x segment size.
|
||||
min_wal_config = [
|
||||
"min_wal_size=32MB",
|
||||
"max_wal_size=32MB",
|
||||
"wal_keep_size=0",
|
||||
"log_checkpoints=on",
|
||||
]
|
||||
ep = env.endpoints.create_start(
|
||||
"test_lagging_sk",
|
||||
config_lines=min_wal_config,
|
||||
)
|
||||
with WalChangeLogger(ep, "doing large insert with sk1 down"):
|
||||
for _ in range(0, 5):
|
||||
fill_segment(ep)
|
||||
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
|
||||
assert ep.get_pg_wal_size() < 16 * 2.5
|
||||
|
||||
sk2.stop() # stop another sk to ensure sk1 and sk3 can work
|
||||
sk1.start()
|
||||
with DurationLogger("recovery"):
|
||||
ep.safe_psql("insert into t select generate_series(1,100), 'payload'") # forces recovery
|
||||
# stop ep and ensure WAL is identical after recovery.
|
||||
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk1, sk3])
|
||||
# Check that WALs are the same.
|
||||
cmp_sk_wal([sk1, sk3], tenant_id, timeline_id)
|
||||
|
||||
# Now do the same with different safekeeper sk2 down, and restarting ep
|
||||
# before recovery (again scenario when recovery starts below basebackup_lsn,
|
||||
# but multi segment now).
|
||||
ep = env.endpoints.create_start(
|
||||
"test_lagging_sk",
|
||||
config_lines=["min_wal_size=32MB", "max_wal_size=32MB", "log_checkpoints=on"],
|
||||
)
|
||||
with WalChangeLogger(ep, "doing large insert with sk2 down"):
|
||||
for _ in range(0, 5):
|
||||
fill_segment(ep)
|
||||
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
|
||||
assert ep.get_pg_wal_size() < 16 * 2.5
|
||||
|
||||
ep.stop()
|
||||
ep = env.endpoints.create_start(
|
||||
"test_lagging_sk",
|
||||
config_lines=min_wal_config,
|
||||
)
|
||||
sk2.start()
|
||||
with DurationLogger("recovery"):
|
||||
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk2, sk3])
|
||||
# Check that WALs are the same.
|
||||
cmp_sk_wal([sk1, sk2, sk3], tenant_id, timeline_id)
|
||||
|
||||
|
||||
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
|
||||
@@ -1065,7 +1284,7 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
sk2_http_cli = sk2.http_client()
|
||||
# ensure tli gets created on sk1, peer recovery won't do that
|
||||
wait(
|
||||
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
|
||||
partial(is_flush_lsn_aligned, [sk1_http_cli, sk2_http_cli], tenant_id, timeline_id),
|
||||
"flush_lsn to get aligned",
|
||||
)
|
||||
|
||||
@@ -1087,7 +1306,7 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
|
||||
|
||||
# wait a bit, lsns shouldn't change
|
||||
# time.sleep(5)
|
||||
time.sleep(2)
|
||||
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
@@ -1098,37 +1317,11 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
# now restart safekeeper with peer recovery enabled and wait for recovery
|
||||
sk1.stop().start(extra_opts=["--peer-recovery=true"])
|
||||
wait(
|
||||
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
|
||||
partial(is_flush_lsn_aligned, [sk1_http_cli, sk2_http_cli], tenant_id, timeline_id),
|
||||
"flush_lsn to get aligned",
|
||||
)
|
||||
|
||||
# check that WALs are identic after recovery
|
||||
segs = sk1.list_segments(tenant_id, timeline_id)
|
||||
log.info(f"segs are {segs}")
|
||||
|
||||
(_, mismatch, not_regular) = filecmp.cmpfiles(
|
||||
sk1.timeline_dir(tenant_id, timeline_id),
|
||||
sk2.timeline_dir(tenant_id, timeline_id),
|
||||
segs,
|
||||
shallow=False,
|
||||
)
|
||||
log.info(
|
||||
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
|
||||
)
|
||||
|
||||
for f in mismatch:
|
||||
f1 = os.path.join(sk1.timeline_dir(tenant_id, timeline_id), f)
|
||||
f2 = os.path.join(sk2.timeline_dir(tenant_id, timeline_id), f)
|
||||
stdout_filename = "{}.filediff".format(f2)
|
||||
|
||||
with open(stdout_filename, "w") as stdout_f:
|
||||
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
|
||||
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
|
||||
|
||||
cmd = "diff {}.hex {}.hex".format(f1, f2)
|
||||
subprocess.run([cmd], stdout=stdout_f, shell=True)
|
||||
|
||||
assert (mismatch, not_regular) == ([], [])
|
||||
cmp_sk_wal([sk1, sk2], tenant_id, timeline_id)
|
||||
|
||||
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
|
||||
env.safekeepers[2].stop()
|
||||
|
||||
Reference in New Issue
Block a user