mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 23:59:58 +00:00
Extend test_wal_backup with compute restart.
This commit is contained in:
@@ -204,6 +204,7 @@ impl WalBackupTask {
|
||||
l.give_up().await;
|
||||
}
|
||||
|
||||
info!("acquiring leadership");
|
||||
match broker::get_leader(&self.election).await {
|
||||
Ok(l) => {
|
||||
self.leader = Some(l);
|
||||
@@ -214,6 +215,7 @@ impl WalBackupTask {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
info!("acquired leadership");
|
||||
|
||||
// offload loop
|
||||
loop {
|
||||
@@ -268,7 +270,7 @@ impl WalBackupTask {
|
||||
{
|
||||
Ok(leader) => {
|
||||
if !leader {
|
||||
info!("leader has changed");
|
||||
info!("lost leadership");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -414,6 +414,22 @@ def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end):
|
||||
started_at = time.time()
|
||||
http_cli = live_sk.http_client()
|
||||
while True:
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"live sk status is {tli_status}")
|
||||
|
||||
if lsn_from_hex(tli_status.backup_lsn) >= lsn_from_hex(seg_end):
|
||||
break
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > 20:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for segment ending at {seg_end} get offloaded")
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
|
||||
def test_wal_backup(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
@@ -446,23 +462,21 @@ def test_wal_backup(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
|
||||
# roughly fills one segment
|
||||
cur.execute("insert into t select generate_series(1,250000), 'payload'")
|
||||
live_sk = [sk for sk in env.safekeepers if sk != victim][0]
|
||||
http_cli = live_sk.http_client()
|
||||
|
||||
started_at = time.time()
|
||||
while True:
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"live sk status is {tli_status}")
|
||||
|
||||
if lsn_from_hex(tli_status.backup_lsn) >= lsn_from_hex(seg_end):
|
||||
break
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > 20:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s segment ending at {seg_end} get offloaded")
|
||||
time.sleep(0.5)
|
||||
wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end)
|
||||
|
||||
victim.start()
|
||||
|
||||
# put one of safekeepers down again
|
||||
env.safekeepers[0].stop()
|
||||
# restart postgres
|
||||
pg.stop_and_destroy().create_start('test_safekeepers_wal_backup')
|
||||
# and ensure offloading still works
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("insert into t select generate_series(1,250000), 'payload'")
|
||||
wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000')
|
||||
|
||||
|
||||
class ProposerPostgres(PgProtocol):
|
||||
"""Object for running postgres without ZenithEnv"""
|
||||
|
||||
Reference in New Issue
Block a user