From 5b297745324f759c4aa16037a165bef251fc8252 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 4 Apr 2022 15:42:13 +0400 Subject: [PATCH] Small refactoring after ec3bc741653d. Move record_safekeeper_info inside safekeeper.rs, fix commit_lsn update, sync control file. --- walkeeper/src/safekeeper.rs | 52 ++++++++++++++++++++++++++++++++++--- walkeeper/src/timeline.rs | 46 ++++++-------------------------- 2 files changed, 57 insertions(+), 41 deletions(-) diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 22a8481e45..cf56261ee6 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -6,6 +6,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; +use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; @@ -15,6 +16,7 @@ use zenith_utils::zid::ZTenantTimelineId; use lazy_static::lazy_static; +use crate::broker::SafekeeperInfo; use crate::control_file; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; @@ -497,6 +499,8 @@ pub struct SafeKeeper { metrics: SafeKeeperMetrics, /// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn. + /// Note: be careful to set only if we are sure our WAL (term history) matches + /// committed one. pub global_commit_lsn: Lsn, /// LSN since the proposer safekeeper currently talking to appends WAL; /// determines epoch switch point. @@ -743,7 +747,9 @@ where let mut state = self.state.clone(); state.commit_lsn = self.inmem.commit_lsn; + state.s3_wal_lsn = self.inmem.s3_wal_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; + state.remote_consistent_lsn = self.inmem.remote_consistent_lsn; state.proposer_uuid = self.inmem.proposer_uuid; self.state.persist(&state) } @@ -788,10 +794,10 @@ where self.wal_store.flush_wal()?; } - // Update global_commit_lsn, verifying that it cannot decrease. + // Update global_commit_lsn if msg.h.commit_lsn != Lsn(0) { - assert!(msg.h.commit_lsn >= self.global_commit_lsn); - self.global_commit_lsn = msg.h.commit_lsn; + // We also obtain commit lsn from peers, so value arrived here might be stale (less) + self.global_commit_lsn = max(self.global_commit_lsn, msg.h.commit_lsn); } self.inmem.peer_horizon_lsn = msg.h.truncate_lsn; @@ -835,6 +841,46 @@ where self.append_response(), ))) } + + /// Update timeline state with peer safekeeper data. + pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperInfo) -> Result<()> { + let mut sync_control_file = false; + if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term) + { + // Note: the check is too restrictive, generally we can update local + // commit_lsn if our history matches (is part of) history of advanced + // commit_lsn provider. + if last_log_term == self.get_epoch() { + self.global_commit_lsn = max(commit_lsn, self.global_commit_lsn); + self.update_commit_lsn()?; + } + } + if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn { + let new_s3_wal_lsn = max(s3_wal_lsn, self.inmem.s3_wal_lsn); + sync_control_file |= + self.state.s3_wal_lsn + (self.state.server.wal_seg_size as u64) < new_s3_wal_lsn; + self.inmem.s3_wal_lsn = new_s3_wal_lsn; + } + if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn { + let new_remote_consistent_lsn = + max(remote_consistent_lsn, self.inmem.remote_consistent_lsn); + sync_control_file |= self.state.remote_consistent_lsn + + (self.state.server.wal_seg_size as u64) + < new_remote_consistent_lsn; + self.inmem.remote_consistent_lsn = new_remote_consistent_lsn; + } + if let Some(peer_horizon_lsn) = sk_info.peer_horizon_lsn { + let new_peer_horizon_lsn = max(peer_horizon_lsn, self.inmem.peer_horizon_lsn); + sync_control_file |= self.state.peer_horizon_lsn + + (self.state.server.wal_seg_size as u64) + < new_peer_horizon_lsn; + self.inmem.peer_horizon_lsn = new_peer_horizon_lsn; + } + if sync_control_file { + self.persist_control_file()?; + } + Ok(()) + } } #[cfg(test)] diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index a2941a9a5c..777db7eb2b 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -375,10 +375,9 @@ impl Timeline { } // Notify caught-up WAL senders about new WAL data received - pub fn notify_wal_senders(&self, commit_lsn: Lsn) { - let mut shared_state = self.mutex.lock().unwrap(); - if shared_state.notified_commit_lsn < commit_lsn { - shared_state.notified_commit_lsn = commit_lsn; + fn notify_wal_senders(&self, shared_state: &mut MutexGuard) { + if shared_state.notified_commit_lsn < shared_state.sk.inmem.commit_lsn { + shared_state.notified_commit_lsn = shared_state.sk.inmem.commit_lsn; self.cond.notify_all(); } } @@ -389,13 +388,9 @@ impl Timeline { msg: &ProposerAcceptorMessage, ) -> Result> { let mut rmsg: Option; - let commit_lsn: Lsn; { let mut shared_state = self.mutex.lock().unwrap(); rmsg = shared_state.sk.process_msg(msg)?; - // locally available commit lsn. flush_lsn can be smaller than - // commit_lsn if we are catching up safekeeper. - commit_lsn = shared_state.sk.inmem.commit_lsn; // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { @@ -405,9 +400,10 @@ impl Timeline { resp.zenith_feedback = zenith_feedback; } } + + // Ping wal sender that new data might be available. + self.notify_wal_senders(&mut shared_state); } - // Ping wal sender that new data might be available. - self.notify_wal_senders(commit_lsn); Ok(rmsg) } @@ -437,34 +433,8 @@ impl Timeline { /// Update timeline state with peer safekeeper data. pub fn record_safekeeper_info(&self, sk_info: &SafekeeperInfo, _sk_id: ZNodeId) -> Result<()> { let mut shared_state = self.mutex.lock().unwrap(); - // Note: the check is too restrictive, generally we can update local - // commit_lsn if our history matches (is part of) history of advanced - // commit_lsn provider. - if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term) - { - if last_log_term == shared_state.sk.get_epoch() { - shared_state.sk.global_commit_lsn = - max(commit_lsn, shared_state.sk.global_commit_lsn); - shared_state.sk.update_commit_lsn()?; - let local_commit_lsn = min(commit_lsn, shared_state.sk.wal_store.flush_lsn()); - shared_state.sk.inmem.commit_lsn = - max(local_commit_lsn, shared_state.sk.inmem.commit_lsn); - } - } - if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn { - shared_state.sk.inmem.s3_wal_lsn = max(s3_wal_lsn, shared_state.sk.inmem.s3_wal_lsn); - } - if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn { - shared_state.sk.inmem.remote_consistent_lsn = max( - remote_consistent_lsn, - shared_state.sk.inmem.remote_consistent_lsn, - ); - } - if let Some(peer_horizon_lsn) = sk_info.peer_horizon_lsn { - shared_state.sk.inmem.peer_horizon_lsn = - max(peer_horizon_lsn, shared_state.sk.inmem.peer_horizon_lsn); - } - // TODO: sync control file + shared_state.sk.record_safekeeper_info(sk_info)?; + self.notify_wal_senders(&mut shared_state); Ok(()) }