diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 0a7adb96b6..c254f2c57c 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -576,13 +576,16 @@ where self.state .acceptor_state .term_history - .up_to(self.wal_store.flush_lsn()) + .up_to(self.flush_lsn()) } pub fn get_epoch(&self) -> Term { - self.state - .acceptor_state - .get_epoch(self.wal_store.flush_lsn()) + self.state.acceptor_state.get_epoch(self.flush_lsn()) + } + + /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet. + fn flush_lsn(&self) -> Lsn { + max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn) } /// Process message from proposer and possibly form reply. Concurrent @@ -671,7 +674,7 @@ where let mut resp = VoteResponse { term: self.state.acceptor_state.term, vote_given: false as u64, - flush_lsn: self.wal_store.flush_lsn(), + flush_lsn: self.flush_lsn(), truncate_lsn: self.state.peer_horizon_lsn, term_history: self.get_term_history(), timeline_start_lsn: self.state.timeline_start_lsn, @@ -703,7 +706,7 @@ where fn append_response(&self) -> AppendResponse { let ar = AppendResponse { term: self.state.acceptor_state.term, - flush_lsn: self.wal_store.flush_lsn(), + flush_lsn: self.flush_lsn(), commit_lsn: self.state.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), @@ -770,7 +773,7 @@ where /// Advance commit_lsn taking into account what we have locally pub fn update_commit_lsn(&mut self) -> Result<()> { - let commit_lsn = min(self.global_commit_lsn, self.wal_store.flush_lsn()); + let commit_lsn = min(self.global_commit_lsn, self.flush_lsn()); assert!(commit_lsn >= self.inmem.commit_lsn); self.inmem.commit_lsn = commit_lsn;