Implement holding off WAL removal for pull_timeline.

This commit is contained in:
Arseny Sher
2024-06-13 16:10:05 +03:00
committed by Arseny Sher
parent d8b2a49c55
commit 29a41fc7b9
5 changed files with 34 additions and 10 deletions

View File

@@ -15,6 +15,7 @@ use tokio::{
fs::{File, OpenOptions},
io::AsyncWrite,
sync::mpsc,
task,
};
use tokio_tar::{Archive, Builder};
use tokio_util::{
@@ -66,7 +67,11 @@ pub struct SnapshotContext {
impl Drop for SnapshotContext {
fn drop(&mut self) {
// todo: spawn task removing WAL gc hold off
let tli = self.tli.clone();
task::spawn(async move {
let mut shared_state = tli.write_shared_state().await;
shared_state.wal_removal_on_hold = false;
});
}
}
@@ -150,7 +155,7 @@ impl FullAccessTimeline {
&self,
ar: &mut tokio_tar::Builder<W>,
) -> Result<SnapshotContext> {
let shared_state = self.read_shared_state().await;
let mut shared_state = self.write_shared_state().await;
let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME);
let mut cf = File::open(cf_path).await?;
@@ -183,7 +188,14 @@ impl FullAccessTimeline {
);
}
// TODO: set WAL hold off.
// Prevent WAL removal while we're streaming data.
//
// Since this a flag, not a counter just bail out if already set; we
// shouldn't need concurrent snapshotting.
if shared_state.wal_removal_on_hold {
bail!("wal_removal_on_hold is already true");
}
shared_state.wal_removal_on_hold = true;
let bctx = SnapshotContext {
from_segno,

View File

@@ -168,6 +168,9 @@ pub struct SharedState {
pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// In memory list containing state of peers sent in latest messages from them.
pub(crate) peers_info: PeersInfo,
// True value hinders old WAL removal; this is used by snapshotting. We
// could make it a counter, but there is no need to.
pub(crate) wal_removal_on_hold: bool,
}
impl SharedState {
@@ -205,6 +208,7 @@ impl SharedState {
Ok(Self {
sk,
peers_info: PeersInfo(vec![]),
wal_removal_on_hold: false,
})
}
@@ -222,6 +226,7 @@ impl SharedState {
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
wal_removal_on_hold: false,
})
}

View File

@@ -39,6 +39,7 @@ pub struct StateSnapshot {
// misc
pub cfile_last_persist_at: Instant,
pub inmem_flush_pending: bool,
pub wal_removal_on_hold: bool,
pub peers: Vec<PeerInfo>,
}
@@ -54,6 +55,7 @@ impl StateSnapshot {
cfile_backup_lsn: read_guard.sk.state.backup_lsn,
cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(),
inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard),
wal_removal_on_hold: read_guard.wal_removal_on_hold,
peers: read_guard.get_peers(heartbeat_timeout),
}
}
@@ -324,8 +326,8 @@ async fn update_wal_removal(
last_removed_segno: u64,
wal_removal_task: &mut Option<JoinHandle<anyhow::Result<u64>>>,
) {
if wal_removal_task.is_some() {
// WAL removal is already in progress
if wal_removal_task.is_some() || state.wal_removal_on_hold {
// WAL removal is already in progress or hold off
return;
}

View File

@@ -3897,11 +3897,13 @@ class Safekeeper(LogUtils):
segments.sort()
return segments
def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
def checkpoint_up_to(
self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, wait_wal_removal=True
):
"""
Assuming pageserver(s) uploaded to s3 up to `lsn`,
1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it.
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN.
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN; optionally wait for that.
"""
cli = self.http_client()
@@ -3925,7 +3927,8 @@ class Safekeeper(LogUtils):
# pageserver to this safekeeper
wait_until(30, 1, are_lsns_advanced)
cli.checkpoint(tenant_id, timeline_id)
wait_until(30, 1, are_segments_removed)
if wait_wal_removal:
wait_until(30, 1, are_segments_removed)
def wait_until_paused(self, failpoint: str):
msg = f"at failpoint {failpoint}"

View File

@@ -1816,7 +1816,6 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
# 4) Do some write, verify integrity with timeline_digest.
# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL
# segment is not implemented.
@pytest.mark.xfail
def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
@@ -1850,13 +1849,16 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
assert lsn > Lsn("0/2000000")
# Checkpoint timeline beyond lsn.
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn)
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=False)
first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
log.info(f"first segment exist={os.path.exists(first_segment_p)}")
src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off"))
pt_handle.join()
# after pull_timeline is finished WAL should be removed on donor
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn, wait_wal_removal=True)
timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id)
dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}")