mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 04:52:55 +00:00
Rename VCL to epochStartLsn and restart_lsn to truncate_lsn.
epochStartLsn is the LSN since which new proposer writes its WAL in its epoch, let's be more explicit here. truncate_lsn is LSN still needed by the most lagging safekeeper. restart_lsn is terminology from pg_replicaton_slots, but here we don't really have 'restart'; hopefully truncate word makes it clearer.
This commit is contained in:
@@ -67,10 +67,12 @@ pub struct SafeKeeperState {
|
||||
/// Unique id of the last *elected* proposer we dealed with. Not needed
|
||||
/// correctness, exists for monitoring purposes.
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// part of WAL acknowledged by quorum
|
||||
/// part of WAL acknowledged by quorum (note that we might not have wal to
|
||||
/// up this point locally)
|
||||
pub commit_lsn: Lsn,
|
||||
/// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers
|
||||
pub restart_lsn: Lsn,
|
||||
/// minimal LSN which may be needed for recovery of some safekeeper (end lsn
|
||||
/// + 1 of last record streamed to everyone)
|
||||
pub truncate_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl SafeKeeperState {
|
||||
@@ -87,8 +89,8 @@ impl SafeKeeperState {
|
||||
wal_seg_size: 0,
|
||||
},
|
||||
proposer_uuid: [0; 16],
|
||||
commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */
|
||||
restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */
|
||||
commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */
|
||||
truncate_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -136,7 +138,7 @@ pub struct VoteResponse {
|
||||
}
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
/// announces 1) successful election (with VCL); 2) commit_lsn.
|
||||
/// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AppendRequest {
|
||||
h: AppendRequestHeader,
|
||||
@@ -145,8 +147,8 @@ pub struct AppendRequest {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AppendRequestHeader {
|
||||
term: Term,
|
||||
/// volume commit LSN
|
||||
vcl: Lsn,
|
||||
// LSN since the proposer appends WAL; determines epoch switch point.
|
||||
epoch_start_lsn: Lsn,
|
||||
/// start position of message in WAL
|
||||
begin_lsn: Lsn,
|
||||
/// end position of message in WAL
|
||||
@@ -272,7 +274,7 @@ pub struct SafeKeeper<ST: Storage> {
|
||||
/// reading wal.
|
||||
pub flush_lsn: Lsn,
|
||||
pub tli: u32,
|
||||
pub flushed_restart_lsn: Lsn,
|
||||
pub flushed_truncate_lsn: Lsn,
|
||||
pub storage: ST,
|
||||
pub s: SafeKeeperState, // persistent part
|
||||
pub elected_proposer_term: Term, // for monitoring/debugging
|
||||
@@ -287,7 +289,7 @@ where
|
||||
SafeKeeper {
|
||||
flush_lsn,
|
||||
tli,
|
||||
flushed_restart_lsn: Lsn(0),
|
||||
flushed_truncate_lsn: Lsn(0),
|
||||
storage,
|
||||
s: state,
|
||||
elected_proposer_term: 0,
|
||||
@@ -369,7 +371,7 @@ where
|
||||
resp.vote_given = true as u64;
|
||||
resp.epoch = self.s.acceptor_state.epoch;
|
||||
resp.flush_lsn = self.flush_lsn;
|
||||
resp.restart_lsn = self.s.restart_lsn;
|
||||
resp.restart_lsn = self.s.truncate_lsn;
|
||||
}
|
||||
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
|
||||
Ok(AcceptorProposerMessage::VoteResponse(resp))
|
||||
@@ -414,7 +416,7 @@ where
|
||||
* XXX: this is wrong, we must actively truncate not matching part of log.
|
||||
*/
|
||||
if self.s.acceptor_state.epoch < msg.h.term
|
||||
&& msg.h.end_lsn > max(self.flush_lsn, msg.h.vcl)
|
||||
&& msg.h.end_lsn > max(self.flush_lsn, msg.h.epoch_start_lsn)
|
||||
{
|
||||
info!("switched to new epoch {}", msg.h.term);
|
||||
self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */
|
||||
@@ -426,7 +428,7 @@ where
|
||||
|
||||
self.s.proposer_uuid = msg.h.proposer_uuid;
|
||||
self.s.commit_lsn = msg.h.commit_lsn;
|
||||
self.s.restart_lsn = msg.h.restart_lsn;
|
||||
self.s.truncate_lsn = msg.h.restart_lsn;
|
||||
|
||||
/*
|
||||
* Update restart LSN in control file.
|
||||
@@ -434,10 +436,10 @@ where
|
||||
* when restart_lsn delta exceeds WAL segment size.
|
||||
*/
|
||||
sync_control_file |=
|
||||
self.flushed_restart_lsn + (self.s.server.wal_seg_size as u64) < self.s.restart_lsn;
|
||||
self.flushed_truncate_lsn + (self.s.server.wal_seg_size as u64) < self.s.truncate_lsn;
|
||||
self.storage.persist(&self.s, sync_control_file)?;
|
||||
if sync_control_file {
|
||||
self.flushed_restart_lsn = self.s.restart_lsn;
|
||||
self.flushed_truncate_lsn = self.s.truncate_lsn;
|
||||
}
|
||||
|
||||
let resp = AppendResponse {
|
||||
@@ -517,7 +519,7 @@ mod tests {
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
vcl: Lsn(2),
|
||||
epoch_start_lsn: Lsn(2),
|
||||
begin_lsn: Lsn(1),
|
||||
end_lsn: Lsn(2),
|
||||
commit_lsn: Lsn(0),
|
||||
|
||||
Reference in New Issue
Block a user