Add comments

This commit is contained in:
Arthur Petukhovsky
2024-05-16 23:58:32 +00:00
parent 35d6599278
commit 86453b422d
4 changed files with 18 additions and 15 deletions

View File

@@ -97,6 +97,7 @@ impl PeersInfo {
}
pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
/// WriteGuardSharedState is a wrapper around RwLockWriteGuard<SharedState> that
/// automatically updates `watch::Sender` channels with state on drop.
pub struct WriteGuardSharedState<'a> {
@@ -147,6 +148,7 @@ impl<'a> Drop for WriteGuardSharedState<'a> {
}
});
// send notification about shared state update
self.tli.shared_state_version_tx.send_modify(|old| {
*old += 1;
});
@@ -477,7 +479,7 @@ impl Timeline {
Ok(())
}
/// Bootstrap new or existing timeline starting background stasks.
/// Bootstrap new or existing timeline starting background tasks.
pub fn bootstrap(
self: &Arc<Timeline>,
conf: &SafeKeeperConf,

View File

@@ -18,6 +18,7 @@ pub struct StateSnapshot {
}
impl StateSnapshot {
/// Create a new snapshot of the timeline state.
fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
Self {
commit_lsn: read_guard.sk.state.inmem.commit_lsn,
@@ -71,7 +72,7 @@ pub async fn main_task(
// list of background tasks
let mut backup_task: Option<WalBackupTaskHandle> = None;
loop {
let last_state = 'outer: loop {
let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout);
let num_computes = *num_computes_rx.borrow();
@@ -89,12 +90,10 @@ pub async fn main_task(
.await;
}
// FIXME: add tracking of relevant pageservers and check them here individually,
// otherwise migration won't work (we suspend too early).
let is_active = is_wal_backup_required
|| state_snapshot.remote_consistent_lsn < state_snapshot.commit_lsn;
// update the broker map
// update the broker timeline set
if tli_broker_active.set(is_active) {
// write log if state has changed
info!(
@@ -123,7 +122,7 @@ pub async fn main_task(
tokio::select! {
_ = cancellation_rx.changed() => {
// timeline was deleted
return;
break 'outer state_snapshot;
}
_ = state_version_rx.changed() => {
// state was updated
@@ -132,5 +131,10 @@ pub async fn main_task(
// number of connected computes was updated
}
}
};
// shutdown background tasks
if conf.is_wal_backup_enabled() {
wal_backup::update_task(&conf, ttid, false, &last_state, &mut backup_task).await;
}
}

View File

@@ -9,7 +9,7 @@ use crate::timeline::Timeline;
/// - remove timeline
/// - clone the set
///
/// Used for keeping subset of active (for which broker push is required) timelines.
/// Usually used for keeping subset of timelines. For example active timelines that require broker push.
pub struct TimelinesSet {
timelines: std::sync::Mutex<HashMap<TenantTimelineId, Arc<Timeline>>>,
}

View File

@@ -68,14 +68,11 @@ pub async fn update_task(
state: &StateSnapshot,
entry: &mut Option<WalBackupTaskHandle>,
) {
let (should_task_run, election_dbg_str) = if need_backup {
let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
(elected_me, election_dbg_str)
} else {
(false, String::new())
};
let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
let should_task_run = need_backup && elected_me;
// start or stop the task
if should_task_run != (entry.is_some()) {