From 86453b422d2078e2f034b92b77b1e8fd1f1b3754 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 16 May 2024 23:58:32 +0000 Subject: [PATCH] Add comments --- safekeeper/src/timeline.rs | 4 +++- safekeeper/src/timeline_manager.rs | 14 +++++++++----- safekeeper/src/timelines_set.rs | 2 +- safekeeper/src/wal_backup.rs | 13 +++++-------- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 1b22f1747e..1b9201a11b 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -97,6 +97,7 @@ impl PeersInfo { } pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>; + /// WriteGuardSharedState is a wrapper around RwLockWriteGuard that /// automatically updates `watch::Sender` channels with state on drop. pub struct WriteGuardSharedState<'a> { @@ -147,6 +148,7 @@ impl<'a> Drop for WriteGuardSharedState<'a> { } }); + // send notification about shared state update self.tli.shared_state_version_tx.send_modify(|old| { *old += 1; }); @@ -477,7 +479,7 @@ impl Timeline { Ok(()) } - /// Bootstrap new or existing timeline starting background stasks. + /// Bootstrap new or existing timeline starting background tasks. pub fn bootstrap( self: &Arc, conf: &SafeKeeperConf, diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 698c42e8d7..fc86f655b8 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -18,6 +18,7 @@ pub struct StateSnapshot { } impl StateSnapshot { + /// Create a new snapshot of the timeline state. fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self { Self { commit_lsn: read_guard.sk.state.inmem.commit_lsn, @@ -71,7 +72,7 @@ pub async fn main_task( // list of background tasks let mut backup_task: Option = None; - loop { + let last_state = 'outer: loop { let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout); let num_computes = *num_computes_rx.borrow(); @@ -89,12 +90,10 @@ pub async fn main_task( .await; } - // FIXME: add tracking of relevant pageservers and check them here individually, - // otherwise migration won't work (we suspend too early). let is_active = is_wal_backup_required || state_snapshot.remote_consistent_lsn < state_snapshot.commit_lsn; - // update the broker map + // update the broker timeline set if tli_broker_active.set(is_active) { // write log if state has changed info!( @@ -123,7 +122,7 @@ pub async fn main_task( tokio::select! { _ = cancellation_rx.changed() => { // timeline was deleted - return; + break 'outer state_snapshot; } _ = state_version_rx.changed() => { // state was updated @@ -132,5 +131,10 @@ pub async fn main_task( // number of connected computes was updated } } + }; + + // shutdown background tasks + if conf.is_wal_backup_enabled() { + wal_backup::update_task(&conf, ttid, false, &last_state, &mut backup_task).await; } } diff --git a/safekeeper/src/timelines_set.rs b/safekeeper/src/timelines_set.rs index 9a48773447..ea8e23bb72 100644 --- a/safekeeper/src/timelines_set.rs +++ b/safekeeper/src/timelines_set.rs @@ -9,7 +9,7 @@ use crate::timeline::Timeline; /// - remove timeline /// - clone the set /// -/// Used for keeping subset of active (for which broker push is required) timelines. +/// Usually used for keeping subset of timelines. For example active timelines that require broker push. pub struct TimelinesSet { timelines: std::sync::Mutex>>, } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 72e7c78326..8a5f1b7df7 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -68,14 +68,11 @@ pub async fn update_task( state: &StateSnapshot, entry: &mut Option, ) { - let (should_task_run, election_dbg_str) = if need_backup { - let (offloader, election_dbg_str) = - determine_offloader(&state.peers, state.backup_lsn, ttid, conf); - let elected_me = Some(conf.my_id) == offloader; - (elected_me, election_dbg_str) - } else { - (false, String::new()) - }; + let (offloader, election_dbg_str) = + determine_offloader(&state.peers, state.backup_lsn, ttid, conf); + let elected_me = Some(conf.my_id) == offloader; + + let should_task_run = need_backup && elected_me; // start or stop the task if should_task_run != (entry.is_some()) {