diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index a70ae247b7..5b1b686529 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -182,7 +182,7 @@ pub struct SafeKeeperState { /// All WAL segments next to one containing local_start_lsn are /// filled with data from the beginning. pub local_start_lsn: Lsn, - /// Part of WAL acknowledged by quorum and available locally. Always points + /// Part of WAL acknowledged by quorum *and available locally*. Always points /// to record boundary. pub commit_lsn: Lsn, /// LSN that points to the end of the last backed up segment. Useful to @@ -501,10 +501,6 @@ impl AcceptorProposerMessage { /// - messages from compute (proposers) and provides replies /// - messages from broker peers pub struct SafeKeeper { - /// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn. - /// Note: be careful to set only if we are sure our WAL (term history) matches - /// committed one. - pub global_commit_lsn: Lsn, /// LSN since the proposer safekeeper currently talking to appends WAL; /// determines epoch switch point. pub epoch_start_lsn: Lsn, @@ -537,7 +533,6 @@ where } Ok(SafeKeeper { - global_commit_lsn: state.commit_lsn, epoch_start_lsn: Lsn(0), inmem: SafekeeperMemState { commit_lsn: state.commit_lsn, @@ -777,7 +772,6 @@ where // NB: on new clusters, this happens at the same time as // timeline_start_lsn initialization, it is taken outside to provide // upgrade. - self.global_commit_lsn = max(self.global_commit_lsn, state.timeline_start_lsn); self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn); // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment. @@ -796,10 +790,21 @@ where Ok(None) } - /// Advance commit_lsn taking into account what we have locally - fn update_commit_lsn(&mut self) -> Result<()> { - let commit_lsn = min(self.global_commit_lsn, self.flush_lsn()); - assert!(commit_lsn >= self.inmem.commit_lsn); + /// Advance commit_lsn taking into account what we have locally. + /// + /// Note: it is assumed that 'WAL we have is from the right term' check has + /// already been done outside. + fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> { + // Both peers and walproposer communicate this value, we might already + // have a fresher (higher) version. + candidate = max(candidate, self.inmem.commit_lsn); + let commit_lsn = min(candidate, self.flush_lsn()); + assert!( + commit_lsn >= self.inmem.commit_lsn, + "commit_lsn monotonicity violated: old={} new={}", + self.inmem.commit_lsn, + commit_lsn + ); self.inmem.commit_lsn = commit_lsn; @@ -865,14 +870,11 @@ where self.wal_store.flush_wal()?; } - // Update global_commit_lsn + // Update commit_lsn. if msg.h.commit_lsn != Lsn(0) { - // We also obtain commit lsn from peers, so value arrived here might be stale (less) - self.global_commit_lsn = max(self.global_commit_lsn, msg.h.commit_lsn); + self.update_commit_lsn(msg.h.commit_lsn)?; } - self.inmem.peer_horizon_lsn = msg.h.truncate_lsn; - self.update_commit_lsn()?; // Update truncate and commit LSN in control file. // To avoid negative impact on performance of extra fsync, do it only @@ -904,10 +906,6 @@ where /// Flush WAL to disk. Return AppendResponse with latest LSNs. fn handle_flush(&mut self) -> Result> { self.wal_store.flush_wal()?; - - // commit_lsn can be updated because we have new flushed data locally. - self.update_commit_lsn()?; - Ok(Some(AcceptorProposerMessage::AppendResponse( self.append_response(), ))) @@ -922,8 +920,7 @@ where // commit_lsn if our history matches (is part of) history of advanced // commit_lsn provider. if sk_info.last_log_term == self.get_epoch() { - self.global_commit_lsn = max(Lsn(sk_info.commit_lsn), self.global_commit_lsn); - self.update_commit_lsn()?; + self.update_commit_lsn(Lsn(sk_info.commit_lsn))?; } }