From 7474cfac08491d8486dad312655d2d645baf8dde Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 27 Aug 2021 14:55:21 +0300 Subject: [PATCH] Rename VCL to epochStartLsn and restart_lsn to truncate_lsn. epochStartLsn is the LSN since which new proposer writes its WAL in its epoch, let's be more explicit here. truncate_lsn is LSN still needed by the most lagging safekeeper. restart_lsn is terminology from pg_replicaton_slots, but here we don't really have 'restart'; hopefully truncate word makes it clearer. --- walkeeper/src/safekeeper.rs | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index b9fcae80f3..d85a901696 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -67,10 +67,12 @@ pub struct SafeKeeperState { /// Unique id of the last *elected* proposer we dealed with. Not needed /// correctness, exists for monitoring purposes. pub proposer_uuid: PgUuid, - /// part of WAL acknowledged by quorum + /// part of WAL acknowledged by quorum (note that we might not have wal to + /// up this point locally) pub commit_lsn: Lsn, - /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers - pub restart_lsn: Lsn, + /// minimal LSN which may be needed for recovery of some safekeeper (end lsn + /// + 1 of last record streamed to everyone) + pub truncate_lsn: Lsn, } impl SafeKeeperState { @@ -87,8 +89,8 @@ impl SafeKeeperState { wal_seg_size: 0, }, proposer_uuid: [0; 16], - commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ - restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ + commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ + truncate_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ } } } @@ -136,7 +138,7 @@ pub struct VoteResponse { } /// Request with WAL message sent from proposer to safekeeper. Along the way it -/// announces 1) successful election (with VCL); 2) commit_lsn. +/// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn. #[derive(Debug, Serialize, Deserialize)] pub struct AppendRequest { h: AppendRequestHeader, @@ -145,8 +147,8 @@ pub struct AppendRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AppendRequestHeader { term: Term, - /// volume commit LSN - vcl: Lsn, + // LSN since the proposer appends WAL; determines epoch switch point. + epoch_start_lsn: Lsn, /// start position of message in WAL begin_lsn: Lsn, /// end position of message in WAL @@ -272,7 +274,7 @@ pub struct SafeKeeper { /// reading wal. pub flush_lsn: Lsn, pub tli: u32, - pub flushed_restart_lsn: Lsn, + pub flushed_truncate_lsn: Lsn, pub storage: ST, pub s: SafeKeeperState, // persistent part pub elected_proposer_term: Term, // for monitoring/debugging @@ -287,7 +289,7 @@ where SafeKeeper { flush_lsn, tli, - flushed_restart_lsn: Lsn(0), + flushed_truncate_lsn: Lsn(0), storage, s: state, elected_proposer_term: 0, @@ -369,7 +371,7 @@ where resp.vote_given = true as u64; resp.epoch = self.s.acceptor_state.epoch; resp.flush_lsn = self.flush_lsn; - resp.restart_lsn = self.s.restart_lsn; + resp.restart_lsn = self.s.truncate_lsn; } info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); Ok(AcceptorProposerMessage::VoteResponse(resp)) @@ -414,7 +416,7 @@ where * XXX: this is wrong, we must actively truncate not matching part of log. */ if self.s.acceptor_state.epoch < msg.h.term - && msg.h.end_lsn > max(self.flush_lsn, msg.h.vcl) + && msg.h.end_lsn > max(self.flush_lsn, msg.h.epoch_start_lsn) { info!("switched to new epoch {}", msg.h.term); self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */ @@ -426,7 +428,7 @@ where self.s.proposer_uuid = msg.h.proposer_uuid; self.s.commit_lsn = msg.h.commit_lsn; - self.s.restart_lsn = msg.h.restart_lsn; + self.s.truncate_lsn = msg.h.restart_lsn; /* * Update restart LSN in control file. @@ -434,10 +436,10 @@ where * when restart_lsn delta exceeds WAL segment size. */ sync_control_file |= - self.flushed_restart_lsn + (self.s.server.wal_seg_size as u64) < self.s.restart_lsn; + self.flushed_truncate_lsn + (self.s.server.wal_seg_size as u64) < self.s.truncate_lsn; self.storage.persist(&self.s, sync_control_file)?; if sync_control_file { - self.flushed_restart_lsn = self.s.restart_lsn; + self.flushed_truncate_lsn = self.s.truncate_lsn; } let resp = AppendResponse { @@ -517,7 +519,7 @@ mod tests { let mut ar_hdr = AppendRequestHeader { term: 1, - vcl: Lsn(2), + epoch_start_lsn: Lsn(2), begin_lsn: Lsn(1), end_lsn: Lsn(2), commit_lsn: Lsn(0),