diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 4099a324f9..2c4cc836f7 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -15,6 +15,7 @@ use tokio::{ fs::{File, OpenOptions}, io::AsyncWrite, sync::mpsc, + task, }; use tokio_tar::{Archive, Builder}; use tokio_util::{ @@ -66,7 +67,11 @@ pub struct SnapshotContext { impl Drop for SnapshotContext { fn drop(&mut self) { - // todo: spawn task removing WAL gc hold off + let tli = self.tli.clone(); + task::spawn(async move { + let mut shared_state = tli.write_shared_state().await; + shared_state.wal_removal_on_hold = false; + }); } } @@ -150,7 +155,7 @@ impl FullAccessTimeline { &self, ar: &mut tokio_tar::Builder, ) -> Result { - let shared_state = self.read_shared_state().await; + let mut shared_state = self.write_shared_state().await; let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME); let mut cf = File::open(cf_path).await?; @@ -183,7 +188,14 @@ impl FullAccessTimeline { ); } - // TODO: set WAL hold off. + // Prevent WAL removal while we're streaming data. + // + // Since this a flag, not a counter just bail out if already set; we + // shouldn't need concurrent snapshotting. + if shared_state.wal_removal_on_hold { + bail!("wal_removal_on_hold is already true"); + } + shared_state.wal_removal_on_hold = true; let bctx = SnapshotContext { from_segno, diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index e510a05a32..544ffdbb36 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -168,6 +168,9 @@ pub struct SharedState { pub(crate) sk: SafeKeeper, /// In memory list containing state of peers sent in latest messages from them. pub(crate) peers_info: PeersInfo, + // True value hinders old WAL removal; this is used by snapshotting. We + // could make it a counter, but there is no need to. + pub(crate) wal_removal_on_hold: bool, } impl SharedState { @@ -205,6 +208,7 @@ impl SharedState { Ok(Self { sk, peers_info: PeersInfo(vec![]), + wal_removal_on_hold: false, }) } @@ -222,6 +226,7 @@ impl SharedState { Ok(Self { sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, peers_info: PeersInfo(vec![]), + wal_removal_on_hold: false, }) } diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 087b988c69..592426bba3 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -39,6 +39,7 @@ pub struct StateSnapshot { // misc pub cfile_last_persist_at: Instant, pub inmem_flush_pending: bool, + pub wal_removal_on_hold: bool, pub peers: Vec, } @@ -54,6 +55,7 @@ impl StateSnapshot { cfile_backup_lsn: read_guard.sk.state.backup_lsn, cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(), inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard), + wal_removal_on_hold: read_guard.wal_removal_on_hold, peers: read_guard.get_peers(heartbeat_timeout), } } @@ -324,8 +326,8 @@ async fn update_wal_removal( last_removed_segno: u64, wal_removal_task: &mut Option>>, ) { - if wal_removal_task.is_some() { - // WAL removal is already in progress + if wal_removal_task.is_some() || state.wal_removal_on_hold { + // WAL removal is already in progress or hold off return; } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 394f5283f3..12fda5468f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3897,11 +3897,13 @@ class Safekeeper(LogUtils): segments.sort() return segments - def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn): + def checkpoint_up_to( + self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, wait_wal_removal=True + ): """ Assuming pageserver(s) uploaded to s3 up to `lsn`, 1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it. - 2) checkpoint timeline on safekeeper, which should remove WAL before this LSN. + 2) checkpoint timeline on safekeeper, which should remove WAL before this LSN; optionally wait for that. """ cli = self.http_client() @@ -3925,7 +3927,8 @@ class Safekeeper(LogUtils): # pageserver to this safekeeper wait_until(30, 1, are_lsns_advanced) cli.checkpoint(tenant_id, timeline_id) - wait_until(30, 1, are_segments_removed) + if wait_wal_removal: + wait_until(30, 1, are_segments_removed) def wait_until_paused(self, failpoint: str): msg = f"at failpoint {failpoint}" diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 11aeb8f182..300a6b7115 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1816,7 +1816,6 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder): # 4) Do some write, verify integrity with timeline_digest. # Expected to fail while holding off WAL gc plus fetching commit_lsn WAL # segment is not implemented. -@pytest.mark.xfail def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) @@ -1850,13 +1849,16 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) assert lsn > Lsn("0/2000000") # Checkpoint timeline beyond lsn. - src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn) + src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=False) first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001" log.info(f"first segment exist={os.path.exists(first_segment_p)}") src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off")) pt_handle.join() + # after pull_timeline is finished WAL should be removed on donor + src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=True) + timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id) dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id) log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}")