mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
safekeeper: rename epoch to last_log_term.
epoch is a historical and potentially confusing name. It semantically means lastLogTerm from the raft paper, so let's use it. This commit changes only internal namings, not public interface (http).
This commit is contained in:
@@ -85,11 +85,11 @@ impl From<TermSwitchApiEntry> for TermLsn {
|
||||
}
|
||||
}
|
||||
|
||||
/// Augment AcceptorState with epoch for convenience
|
||||
/// Augment AcceptorState with last_log_term for convenience
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AcceptorStateStatus {
|
||||
pub term: Term,
|
||||
pub epoch: Term,
|
||||
pub epoch: Term, // aka last_log_term
|
||||
pub term_history: Vec<TermSwitchApiEntry>,
|
||||
}
|
||||
|
||||
@@ -130,7 +130,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
let (inmem, state) = tli.get_state().await;
|
||||
let flush_lsn = tli.get_flush_lsn().await;
|
||||
|
||||
let epoch = state.acceptor_state.get_epoch(flush_lsn);
|
||||
let last_log_term = state.acceptor_state.get_last_log_term(flush_lsn);
|
||||
let term_history = state
|
||||
.acceptor_state
|
||||
.term_history
|
||||
@@ -143,7 +143,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
.collect();
|
||||
let acc_state = AcceptorStateStatus {
|
||||
term: state.acceptor_state.term,
|
||||
epoch,
|
||||
epoch: last_log_term,
|
||||
term_history,
|
||||
};
|
||||
|
||||
|
||||
@@ -165,7 +165,7 @@ pub async fn append_logical_message(
|
||||
let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
term: msg.term,
|
||||
epoch_start_lsn: begin_lsn,
|
||||
term_start_lsn: begin_lsn,
|
||||
begin_lsn,
|
||||
end_lsn,
|
||||
commit_lsn,
|
||||
|
||||
@@ -337,7 +337,7 @@ async fn network_io(
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
let ar_hdr = AppendRequestHeader {
|
||||
term: donor.term,
|
||||
epoch_start_lsn: Lsn::INVALID, // unused
|
||||
term_start_lsn: Lsn::INVALID, // unused
|
||||
begin_lsn: Lsn(xlog_data.wal_start()),
|
||||
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
|
||||
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
|
||||
|
||||
@@ -188,8 +188,8 @@ pub struct AcceptorState {
|
||||
}
|
||||
|
||||
impl AcceptorState {
|
||||
/// acceptor's epoch is the term of the highest entry in the log
|
||||
pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
|
||||
/// acceptor's last_log_term is the term of the highest entry in the log
|
||||
pub fn get_last_log_term(&self, flush_lsn: Lsn) -> Term {
|
||||
let th = self.term_history.up_to(flush_lsn);
|
||||
match th.0.last() {
|
||||
Some(e) => e.term,
|
||||
@@ -305,9 +305,9 @@ pub struct AppendRequest {
|
||||
pub struct AppendRequestHeader {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
// TODO: remove this field, it in unused -- LSN of term switch can be taken
|
||||
// from ProposerElected (as well as from term history).
|
||||
pub epoch_start_lsn: Lsn,
|
||||
// TODO: remove this field from the protocol, it in unused -- LSN of term
|
||||
// switch can be taken from ProposerElected (as well as from term history).
|
||||
pub term_start_lsn: Lsn,
|
||||
/// start position of message in WAL
|
||||
pub begin_lsn: Lsn,
|
||||
/// end position of message in WAL
|
||||
@@ -326,9 +326,10 @@ pub struct AppendResponse {
|
||||
// Current term of the safekeeper; if it is higher than proposer's, the
|
||||
// compute is out of date.
|
||||
pub term: Term,
|
||||
// NOTE: this is physical end of wal on safekeeper; currently it doesn't
|
||||
// make much sense without taking epoch into account, as history can be
|
||||
// diverged.
|
||||
// Flushed end of wal on safekeeper; one should be always mindful from what
|
||||
// term history this value comes, either checking history directly or
|
||||
// observing term being set to one for which WAL truncation is known to have
|
||||
// happened.
|
||||
pub flush_lsn: Lsn,
|
||||
// We report back our awareness about which WAL is committed, as this is
|
||||
// a criterion for walproposer --sync mode exit
|
||||
@@ -482,8 +483,8 @@ impl AcceptorProposerMessage {
|
||||
/// - messages from broker peers
|
||||
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
|
||||
/// LSN since the proposer safekeeper currently talking to appends WAL;
|
||||
/// determines epoch switch point.
|
||||
pub epoch_start_lsn: Lsn,
|
||||
/// determines last_log_term switch point.
|
||||
pub term_start_lsn: Lsn,
|
||||
|
||||
pub state: TimelineState<CTRL>, // persistent state storage
|
||||
pub wal_store: WAL,
|
||||
@@ -511,7 +512,7 @@ where
|
||||
}
|
||||
|
||||
Ok(SafeKeeper {
|
||||
epoch_start_lsn: Lsn(0),
|
||||
term_start_lsn: Lsn(0),
|
||||
state: TimelineState::new(state),
|
||||
wal_store,
|
||||
node_id,
|
||||
@@ -531,8 +532,10 @@ where
|
||||
self.state.acceptor_state.term
|
||||
}
|
||||
|
||||
pub fn get_epoch(&self) -> Term {
|
||||
self.state.acceptor_state.get_epoch(self.flush_lsn())
|
||||
pub fn get_last_log_term(&self) -> Term {
|
||||
self.state
|
||||
.acceptor_state
|
||||
.get_last_log_term(self.flush_lsn())
|
||||
}
|
||||
|
||||
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
|
||||
@@ -713,7 +716,7 @@ where
|
||||
// proceed, but to prevent commit_lsn surprisingly going down we should
|
||||
// either refuse the session (simpler) or skip the part we already have
|
||||
// from the stream (can be implemented).
|
||||
if msg.term == self.get_epoch() && self.flush_lsn() > msg.start_streaming_at {
|
||||
if msg.term == self.get_last_log_term() && self.flush_lsn() > msg.start_streaming_at {
|
||||
bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
|
||||
msg.term, self.flush_lsn(), msg.start_streaming_at)
|
||||
}
|
||||
@@ -788,7 +791,7 @@ where
|
||||
// Cache LSN where term starts to immediately fsync control file with
|
||||
// commit_lsn once we reach it -- sync-safekeepers finishes when
|
||||
// persisted commit_lsn on majority of safekeepers aligns.
|
||||
self.epoch_start_lsn = match msg.term_history.0.last() {
|
||||
self.term_start_lsn = match msg.term_history.0.last() {
|
||||
None => bail!("proposer elected with empty term history"),
|
||||
Some(term_lsn_start) => term_lsn_start.lsn,
|
||||
};
|
||||
@@ -814,11 +817,11 @@ where
|
||||
|
||||
self.state.inmem.commit_lsn = commit_lsn;
|
||||
|
||||
// If new commit_lsn reached epoch switch, force sync of control
|
||||
// If new commit_lsn reached term switch, force sync of control
|
||||
// file: walproposer in sync mode is very interested when this
|
||||
// happens. Note: this is for sync-safekeepers mode only, as
|
||||
// otherwise commit_lsn might jump over epoch_start_lsn.
|
||||
if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
|
||||
// otherwise commit_lsn might jump over term_start_lsn.
|
||||
if commit_lsn >= self.term_start_lsn && self.state.commit_lsn < self.term_start_lsn {
|
||||
self.state.flush().await?;
|
||||
}
|
||||
|
||||
@@ -933,7 +936,7 @@ where
|
||||
// Note: the check is too restrictive, generally we can update local
|
||||
// commit_lsn if our history matches (is part of) history of advanced
|
||||
// commit_lsn provider.
|
||||
if sk_info.last_log_term == self.get_epoch() {
|
||||
if sk_info.last_log_term == self.get_last_log_term() {
|
||||
self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
|
||||
}
|
||||
}
|
||||
@@ -1079,7 +1082,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_epoch_switch() {
|
||||
async fn test_last_log_term_switch() {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: test_sk_state(),
|
||||
};
|
||||
@@ -1089,7 +1092,7 @@ mod tests {
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
epoch_start_lsn: Lsn(3),
|
||||
term_start_lsn: Lsn(3),
|
||||
begin_lsn: Lsn(1),
|
||||
end_lsn: Lsn(2),
|
||||
commit_lsn: Lsn(0),
|
||||
@@ -1114,14 +1117,14 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// check that AppendRequest before epochStartLsn doesn't switch epoch
|
||||
// check that AppendRequest before term_start_lsn doesn't switch last_log_term.
|
||||
let resp = sk
|
||||
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await;
|
||||
assert!(resp.is_ok());
|
||||
assert_eq!(sk.get_epoch(), 0);
|
||||
assert_eq!(sk.get_last_log_term(), 0);
|
||||
|
||||
// but record at epochStartLsn does the switch
|
||||
// but record at term_start_lsn does the switch
|
||||
ar_hdr.begin_lsn = Lsn(2);
|
||||
ar_hdr.end_lsn = Lsn(3);
|
||||
append_request = AppendRequest {
|
||||
@@ -1133,7 +1136,7 @@ mod tests {
|
||||
.await;
|
||||
assert!(resp.is_ok());
|
||||
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
|
||||
assert_eq!(sk.get_epoch(), 1);
|
||||
assert_eq!(sk.get_last_log_term(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -244,7 +244,7 @@ impl SharedState {
|
||||
timeline_id: ttid.timeline_id.as_ref().to_owned(),
|
||||
}),
|
||||
term: self.sk.state.acceptor_state.term,
|
||||
last_log_term: self.sk.get_epoch(),
|
||||
last_log_term: self.sk.get_last_log_term(),
|
||||
flush_lsn: self.sk.flush_lsn().0,
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
commit_lsn: self.sk.state.inmem.commit_lsn.0,
|
||||
@@ -704,7 +704,7 @@ impl Timeline {
|
||||
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
|
||||
let ss = self.read_shared_state().await;
|
||||
let term = ss.sk.state.acceptor_state.term;
|
||||
let last_log_term = ss.sk.get_epoch();
|
||||
let last_log_term = ss.sk.get_last_log_term();
|
||||
let flush_lsn = ss.sk.flush_lsn();
|
||||
// note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
|
||||
let mut peers = ss.get_peers(heartbeat_timeout);
|
||||
@@ -844,7 +844,7 @@ impl Timeline {
|
||||
timeline_is_active: self.broker_active.load(Ordering::Relaxed),
|
||||
num_computes: self.walreceivers.get_num() as u32,
|
||||
last_removed_segno: state.last_removed_segno,
|
||||
epoch_start_lsn: state.sk.epoch_start_lsn,
|
||||
epoch_start_lsn: state.sk.term_start_lsn,
|
||||
mem_state: state.sk.state.inmem.clone(),
|
||||
persisted_state: state.sk.state.clone(),
|
||||
flush_lsn: state.sk.wal_store.flush_lsn(),
|
||||
@@ -867,7 +867,7 @@ impl Timeline {
|
||||
active: self.broker_active.load(Ordering::Relaxed),
|
||||
num_computes: self.walreceivers.get_num() as u32,
|
||||
last_removed_segno: state.last_removed_segno,
|
||||
epoch_start_lsn: state.sk.epoch_start_lsn,
|
||||
epoch_start_lsn: state.sk.term_start_lsn,
|
||||
mem_state: state.sk.state.inmem.clone(),
|
||||
write_lsn,
|
||||
write_record_lsn,
|
||||
|
||||
Reference in New Issue
Block a user