From 191c5e56419ef7318ae99e25530d66e9e2ece453 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Tue, 12 Apr 2022 11:47:08 +0000 Subject: [PATCH] Create ElectedProposerInfo struct --- walkeeper/src/safekeeper.rs | 50 ++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 22a8481e45..070f6f29cc 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -203,14 +203,33 @@ pub struct SafeKeeperState { } #[derive(Debug, Clone)] -// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values -// are not flushed yet. +/// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values +/// are not flushed yet. pub struct SafekeeperMemState { pub commit_lsn: Lsn, pub s3_wal_lsn: Lsn, // TODO: keep only persistent version pub peer_horizon_lsn: Lsn, pub remote_consistent_lsn: Lsn, - pub proposer_uuid: PgUuid, +} + +#[derive(Debug, Clone)] +/// In memory state for last elected proposer. +struct ElectedProposerInfo { + /// UUID of a elected proposer + proposer_uuid: PgUuid, + + /// LSN since the proposer safekeeper currently talking to appends WAL; + /// determines epoch switch point. + epoch_start_lsn: Lsn, +} + +impl ElectedProposerInfo { + pub fn empty() -> Self { + Self { + proposer_uuid: [0; 16], + epoch_start_lsn: Lsn(0), + } + } } impl SafeKeeperState { @@ -493,18 +512,20 @@ impl SafeKeeperMetrics { /// SafeKeeper which consumes events (messages from compute) and provides /// replies. pub struct SafeKeeper { - // Cached metrics so we don't have to recompute labels on each update. + /// Cached metrics so we don't have to recompute labels on each update. metrics: SafeKeeperMetrics, /// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn. pub global_commit_lsn: Lsn, - /// LSN since the proposer safekeeper currently talking to appends WAL; - /// determines epoch switch point. - epoch_start_lsn: Lsn, + /// In-memory info about the elected proposer. + elected: ElectedProposerInfo, - pub inmem: SafekeeperMemState, // in memory part - pub state: CTRL, // persistent state storage + /// In-memory consensus state, not yet persisted to disk. + pub inmem: SafekeeperMemState, + /// SafeKeeperState persisted to disk and methods for update. + pub state: CTRL, + /// WAL storage with write functions. pub wal_store: WAL, } @@ -529,13 +550,12 @@ where Ok(SafeKeeper { metrics: SafeKeeperMetrics::new(state.tenant_id, ztli), global_commit_lsn: state.commit_lsn, - epoch_start_lsn: Lsn(0), + elected: ElectedProposerInfo::empty(), inmem: SafekeeperMemState { commit_lsn: state.commit_lsn, s3_wal_lsn: state.s3_wal_lsn, peer_horizon_lsn: state.peer_horizon_lsn, remote_consistent_lsn: state.remote_consistent_lsn, - proposer_uuid: state.proposer_uuid, }, state, wal_store, @@ -724,7 +744,7 @@ where // Also note that commit_lsn can reach epoch_start_lsn earlier // that we receive new epoch_start_lsn, and we still need to sync // control file in this case. - if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn { + if commit_lsn == self.elected.epoch_start_lsn && self.state.commit_lsn != commit_lsn { self.persist_control_file()?; } @@ -744,7 +764,7 @@ where state.commit_lsn = self.inmem.commit_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; - state.proposer_uuid = self.inmem.proposer_uuid; + state.proposer_uuid = self.elected.proposer_uuid; self.state.persist(&state) } @@ -768,8 +788,8 @@ where // Now we know that we are in the same term as the proposer, // processing the message. - self.epoch_start_lsn = msg.h.epoch_start_lsn; - self.inmem.proposer_uuid = msg.h.proposer_uuid; + self.elected.epoch_start_lsn = msg.h.epoch_start_lsn; + self.elected.proposer_uuid = msg.h.proposer_uuid; // do the job if !msg.wal_data.is_empty() {