diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index a4b779649d..1723d03ee3 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -204,6 +204,7 @@ impl WalBackupTask { l.give_up().await; } + info!("acquiring leadership"); match broker::get_leader(&self.election).await { Ok(l) => { self.leader = Some(l); @@ -214,6 +215,7 @@ impl WalBackupTask { continue; } } + info!("acquired leadership"); // offload loop loop { @@ -268,7 +270,7 @@ impl WalBackupTask { { Ok(leader) => { if !leader { - info!("leader has changed"); + info!("lost leadership"); break; } } diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 35b7d9585a..40a9b48a18 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -414,6 +414,22 @@ def test_wal_removal(zenith_env_builder: ZenithEnvBuilder): time.sleep(0.5) +def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end): + started_at = time.time() + http_cli = live_sk.http_client() + while True: + tli_status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"live sk status is {tli_status}") + + if lsn_from_hex(tli_status.backup_lsn) >= lsn_from_hex(seg_end): + break + elapsed = time.time() - started_at + if elapsed > 20: + raise RuntimeError( + f"timed out waiting {elapsed:.0f}s for segment ending at {seg_end} get offloaded") + time.sleep(0.5) + + @pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs']) def test_wal_backup(zenith_env_builder: ZenithEnvBuilder, storage_type: str): zenith_env_builder.num_safekeepers = 3 @@ -446,23 +462,21 @@ def test_wal_backup(zenith_env_builder: ZenithEnvBuilder, storage_type: str): # roughly fills one segment cur.execute("insert into t select generate_series(1,250000), 'payload'") live_sk = [sk for sk in env.safekeepers if sk != victim][0] - http_cli = live_sk.http_client() - started_at = time.time() - while True: - tli_status = http_cli.timeline_status(tenant_id, timeline_id) - log.info(f"live sk status is {tli_status}") - - if lsn_from_hex(tli_status.backup_lsn) >= lsn_from_hex(seg_end): - break - elapsed = time.time() - started_at - if elapsed > 20: - raise RuntimeError( - f"timed out waiting {elapsed:.0f}s segment ending at {seg_end} get offloaded") - time.sleep(0.5) + wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end) victim.start() + # put one of safekeepers down again + env.safekeepers[0].stop() + # restart postgres + pg.stop_and_destroy().create_start('test_safekeepers_wal_backup') + # and ensure offloading still works + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute("insert into t select generate_series(1,250000), 'payload'") + wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000') + class ProposerPostgres(PgProtocol): """Object for running postgres without ZenithEnv"""