Small refactoring after ec3bc74165.

Move record_safekeeper_info inside safekeeper.rs, fix commit_lsn update, sync
control file.
This commit is contained in:
Arseny Sher
2022-04-04 15:42:13 +04:00
parent 0ca2bd929b
commit 5b29774532
2 changed files with 57 additions and 41 deletions

View File

@@ -6,6 +6,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::xlog_utils::TimeLineID;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
@@ -15,6 +16,7 @@ use zenith_utils::zid::ZTenantTimelineId;
use lazy_static::lazy_static;
use crate::broker::SafekeeperInfo;
use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
@@ -497,6 +499,8 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
metrics: SafeKeeperMetrics,
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
/// Note: be careful to set only if we are sure our WAL (term history) matches
/// committed one.
pub global_commit_lsn: Lsn,
/// LSN since the proposer safekeeper currently talking to appends WAL;
/// determines epoch switch point.
@@ -743,7 +747,9 @@ where
let mut state = self.state.clone();
state.commit_lsn = self.inmem.commit_lsn;
state.s3_wal_lsn = self.inmem.s3_wal_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
state.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
state.proposer_uuid = self.inmem.proposer_uuid;
self.state.persist(&state)
}
@@ -788,10 +794,10 @@ where
self.wal_store.flush_wal()?;
}
// Update global_commit_lsn, verifying that it cannot decrease.
// Update global_commit_lsn
if msg.h.commit_lsn != Lsn(0) {
assert!(msg.h.commit_lsn >= self.global_commit_lsn);
self.global_commit_lsn = msg.h.commit_lsn;
// We also obtain commit lsn from peers, so value arrived here might be stale (less)
self.global_commit_lsn = max(self.global_commit_lsn, msg.h.commit_lsn);
}
self.inmem.peer_horizon_lsn = msg.h.truncate_lsn;
@@ -835,6 +841,46 @@ where
self.append_response(),
)))
}
/// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperInfo) -> Result<()> {
let mut sync_control_file = false;
if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term)
{
// Note: the check is too restrictive, generally we can update local
// commit_lsn if our history matches (is part of) history of advanced
// commit_lsn provider.
if last_log_term == self.get_epoch() {
self.global_commit_lsn = max(commit_lsn, self.global_commit_lsn);
self.update_commit_lsn()?;
}
}
if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn {
let new_s3_wal_lsn = max(s3_wal_lsn, self.inmem.s3_wal_lsn);
sync_control_file |=
self.state.s3_wal_lsn + (self.state.server.wal_seg_size as u64) < new_s3_wal_lsn;
self.inmem.s3_wal_lsn = new_s3_wal_lsn;
}
if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn {
let new_remote_consistent_lsn =
max(remote_consistent_lsn, self.inmem.remote_consistent_lsn);
sync_control_file |= self.state.remote_consistent_lsn
+ (self.state.server.wal_seg_size as u64)
< new_remote_consistent_lsn;
self.inmem.remote_consistent_lsn = new_remote_consistent_lsn;
}
if let Some(peer_horizon_lsn) = sk_info.peer_horizon_lsn {
let new_peer_horizon_lsn = max(peer_horizon_lsn, self.inmem.peer_horizon_lsn);
sync_control_file |= self.state.peer_horizon_lsn
+ (self.state.server.wal_seg_size as u64)
< new_peer_horizon_lsn;
self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
}
if sync_control_file {
self.persist_control_file()?;
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -375,10 +375,9 @@ impl Timeline {
}
// Notify caught-up WAL senders about new WAL data received
pub fn notify_wal_senders(&self, commit_lsn: Lsn) {
let mut shared_state = self.mutex.lock().unwrap();
if shared_state.notified_commit_lsn < commit_lsn {
shared_state.notified_commit_lsn = commit_lsn;
fn notify_wal_senders(&self, shared_state: &mut MutexGuard<SharedState>) {
if shared_state.notified_commit_lsn < shared_state.sk.inmem.commit_lsn {
shared_state.notified_commit_lsn = shared_state.sk.inmem.commit_lsn;
self.cond.notify_all();
}
}
@@ -389,13 +388,9 @@ impl Timeline {
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
rmsg = shared_state.sk.process_msg(msg)?;
// locally available commit lsn. flush_lsn can be smaller than
// commit_lsn if we are catching up safekeeper.
commit_lsn = shared_state.sk.inmem.commit_lsn;
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
@@ -405,9 +400,10 @@ impl Timeline {
resp.zenith_feedback = zenith_feedback;
}
}
// Ping wal sender that new data might be available.
self.notify_wal_senders(&mut shared_state);
}
// Ping wal sender that new data might be available.
self.notify_wal_senders(commit_lsn);
Ok(rmsg)
}
@@ -437,34 +433,8 @@ impl Timeline {
/// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&self, sk_info: &SafekeeperInfo, _sk_id: ZNodeId) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
// Note: the check is too restrictive, generally we can update local
// commit_lsn if our history matches (is part of) history of advanced
// commit_lsn provider.
if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term)
{
if last_log_term == shared_state.sk.get_epoch() {
shared_state.sk.global_commit_lsn =
max(commit_lsn, shared_state.sk.global_commit_lsn);
shared_state.sk.update_commit_lsn()?;
let local_commit_lsn = min(commit_lsn, shared_state.sk.wal_store.flush_lsn());
shared_state.sk.inmem.commit_lsn =
max(local_commit_lsn, shared_state.sk.inmem.commit_lsn);
}
}
if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn {
shared_state.sk.inmem.s3_wal_lsn = max(s3_wal_lsn, shared_state.sk.inmem.s3_wal_lsn);
}
if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn {
shared_state.sk.inmem.remote_consistent_lsn = max(
remote_consistent_lsn,
shared_state.sk.inmem.remote_consistent_lsn,
);
}
if let Some(peer_horizon_lsn) = sk_info.peer_horizon_lsn {
shared_state.sk.inmem.peer_horizon_lsn =
max(peer_horizon_lsn, shared_state.sk.inmem.peer_horizon_lsn);
}
// TODO: sync control file
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
Ok(())
}