Create ElectedProposerInfo struct

This commit is contained in:
Arthur Petukhovsky
2022-04-12 11:47:08 +00:00
parent 89755c045f
commit 191c5e5641

View File

@@ -203,14 +203,33 @@ pub struct SafeKeeperState {
}
#[derive(Debug, Clone)]
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
// are not flushed yet.
/// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
/// are not flushed yet.
pub struct SafekeeperMemState {
pub commit_lsn: Lsn,
pub s3_wal_lsn: Lsn, // TODO: keep only persistent version
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub proposer_uuid: PgUuid,
}
#[derive(Debug, Clone)]
/// In memory state for last elected proposer.
struct ElectedProposerInfo {
/// UUID of a elected proposer
proposer_uuid: PgUuid,
/// LSN since the proposer safekeeper currently talking to appends WAL;
/// determines epoch switch point.
epoch_start_lsn: Lsn,
}
impl ElectedProposerInfo {
pub fn empty() -> Self {
Self {
proposer_uuid: [0; 16],
epoch_start_lsn: Lsn(0),
}
}
}
impl SafeKeeperState {
@@ -493,18 +512,20 @@ impl SafeKeeperMetrics {
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
// Cached metrics so we don't have to recompute labels on each update.
/// Cached metrics so we don't have to recompute labels on each update.
metrics: SafeKeeperMetrics,
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
pub global_commit_lsn: Lsn,
/// LSN since the proposer safekeeper currently talking to appends WAL;
/// determines epoch switch point.
epoch_start_lsn: Lsn,
/// In-memory info about the elected proposer.
elected: ElectedProposerInfo,
pub inmem: SafekeeperMemState, // in memory part
pub state: CTRL, // persistent state storage
/// In-memory consensus state, not yet persisted to disk.
pub inmem: SafekeeperMemState,
/// SafeKeeperState persisted to disk and methods for update.
pub state: CTRL,
/// WAL storage with write functions.
pub wal_store: WAL,
}
@@ -529,13 +550,12 @@ where
Ok(SafeKeeper {
metrics: SafeKeeperMetrics::new(state.tenant_id, ztli),
global_commit_lsn: state.commit_lsn,
epoch_start_lsn: Lsn(0),
elected: ElectedProposerInfo::empty(),
inmem: SafekeeperMemState {
commit_lsn: state.commit_lsn,
s3_wal_lsn: state.s3_wal_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
remote_consistent_lsn: state.remote_consistent_lsn,
proposer_uuid: state.proposer_uuid,
},
state,
wal_store,
@@ -724,7 +744,7 @@ where
// Also note that commit_lsn can reach epoch_start_lsn earlier
// that we receive new epoch_start_lsn, and we still need to sync
// control file in this case.
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
if commit_lsn == self.elected.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
self.persist_control_file()?;
}
@@ -744,7 +764,7 @@ where
state.commit_lsn = self.inmem.commit_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
state.proposer_uuid = self.inmem.proposer_uuid;
state.proposer_uuid = self.elected.proposer_uuid;
self.state.persist(&state)
}
@@ -768,8 +788,8 @@ where
// Now we know that we are in the same term as the proposer,
// processing the message.
self.epoch_start_lsn = msg.h.epoch_start_lsn;
self.inmem.proposer_uuid = msg.h.proposer_uuid;
self.elected.epoch_start_lsn = msg.h.epoch_start_lsn;
self.elected.proposer_uuid = msg.h.proposer_uuid;
// do the job
if !msg.wal_data.is_empty() {