Remove global_commit_lsn.

It is complicated and fragile to maintain and not really needed; update
commit_lsn locally only when we have enough WAL flushed.

ref https://github.com/neondatabase/neon/issues/3069
This commit is contained in:
Arseny Sher
2022-12-27 15:55:33 +04:00
committed by Arseny Sher
parent 1ad6e186bc
commit fee8bf3a17

View File

@@ -182,7 +182,7 @@ pub struct SafeKeeperState {
/// All WAL segments next to one containing local_start_lsn are
/// filled with data from the beginning.
pub local_start_lsn: Lsn,
/// Part of WAL acknowledged by quorum and available locally. Always points
/// Part of WAL acknowledged by quorum *and available locally*. Always points
/// to record boundary.
pub commit_lsn: Lsn,
/// LSN that points to the end of the last backed up segment. Useful to
@@ -501,10 +501,6 @@ impl AcceptorProposerMessage {
/// - messages from compute (proposers) and provides replies
/// - messages from broker peers
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
/// Note: be careful to set only if we are sure our WAL (term history) matches
/// committed one.
pub global_commit_lsn: Lsn,
/// LSN since the proposer safekeeper currently talking to appends WAL;
/// determines epoch switch point.
pub epoch_start_lsn: Lsn,
@@ -537,7 +533,6 @@ where
}
Ok(SafeKeeper {
global_commit_lsn: state.commit_lsn,
epoch_start_lsn: Lsn(0),
inmem: SafekeeperMemState {
commit_lsn: state.commit_lsn,
@@ -777,7 +772,6 @@ where
// NB: on new clusters, this happens at the same time as
// timeline_start_lsn initialization, it is taken outside to provide
// upgrade.
self.global_commit_lsn = max(self.global_commit_lsn, state.timeline_start_lsn);
self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
@@ -796,10 +790,21 @@ where
Ok(None)
}
/// Advance commit_lsn taking into account what we have locally
fn update_commit_lsn(&mut self) -> Result<()> {
let commit_lsn = min(self.global_commit_lsn, self.flush_lsn());
assert!(commit_lsn >= self.inmem.commit_lsn);
/// Advance commit_lsn taking into account what we have locally.
///
/// Note: it is assumed that 'WAL we have is from the right term' check has
/// already been done outside.
fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
// Both peers and walproposer communicate this value, we might already
// have a fresher (higher) version.
candidate = max(candidate, self.inmem.commit_lsn);
let commit_lsn = min(candidate, self.flush_lsn());
assert!(
commit_lsn >= self.inmem.commit_lsn,
"commit_lsn monotonicity violated: old={} new={}",
self.inmem.commit_lsn,
commit_lsn
);
self.inmem.commit_lsn = commit_lsn;
@@ -865,14 +870,11 @@ where
self.wal_store.flush_wal()?;
}
// Update global_commit_lsn
// Update commit_lsn.
if msg.h.commit_lsn != Lsn(0) {
// We also obtain commit lsn from peers, so value arrived here might be stale (less)
self.global_commit_lsn = max(self.global_commit_lsn, msg.h.commit_lsn);
self.update_commit_lsn(msg.h.commit_lsn)?;
}
self.inmem.peer_horizon_lsn = msg.h.truncate_lsn;
self.update_commit_lsn()?;
// Update truncate and commit LSN in control file.
// To avoid negative impact on performance of extra fsync, do it only
@@ -904,10 +906,6 @@ where
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
self.wal_store.flush_wal()?;
// commit_lsn can be updated because we have new flushed data locally.
self.update_commit_lsn()?;
Ok(Some(AcceptorProposerMessage::AppendResponse(
self.append_response(),
)))
@@ -922,8 +920,7 @@ where
// commit_lsn if our history matches (is part of) history of advanced
// commit_lsn provider.
if sk_info.last_log_term == self.get_epoch() {
self.global_commit_lsn = max(Lsn(sk_info.commit_lsn), self.global_commit_lsn);
self.update_commit_lsn()?;
self.update_commit_lsn(Lsn(sk_info.commit_lsn))?;
}
}