Add term history to safekeepers.

Persist full history of term switches on safekeepers instead of storing only the
single term of the highest entry (called epoch). This allows easily and
correctly find the divergence point of two logs and truncate the obsolete part
before overwriting it with entries of the newer proposer(s).

Full history of the proposer is transferred in separate message before proposer
starts streaming; it is immediately persisted by safekeeper, though he might not
yet have entries for some older terms there. That's because we can't atomically
append to WAL and update the control file anyway, so locally available WAL must
be taken into account when looking at the history.

We should sometimes purge term history entries beyond truncate_lsn; this is not
done here.

Per https://github.com/zenithdb/rfcs/pull/12

Closes #296.

Bumps vendor/postgres.
This commit is contained in:
Arseny Sher
2021-11-05 13:39:41 +03:00
parent 2669d140f8
commit cba4da3f4d
7 changed files with 417 additions and 121 deletions

View File

@@ -392,6 +392,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin):
"lm_prefix": "prefix",
"lm_message": "message",
"set_commit_lsn": True,
"send_proposer_elected": True,
"term": 2,
"begin_lsn": begin_lsn,
"epoch_start_lsn": epoch_start_lsn,

View File

@@ -7,7 +7,8 @@ use std::fmt::Display;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
use crate::safekeeper::AcceptorState;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::CreateControlFile;
use crate::timeline::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -29,6 +30,7 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref()
}
/// Serialize through Display trait.
fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
@@ -37,6 +39,14 @@ where
s.serialize_str(&format!("{}", z))
}
/// Augment AcceptorState with epoch for convenience
#[derive(Debug, Serialize)]
struct AcceptorStateStatus {
term: Term,
epoch: Term,
term_history: TermHistory,
}
/// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize)]
struct TimelineStatus {
@@ -44,7 +54,7 @@ struct TimelineStatus {
tenant_id: ZTenantId,
#[serde(serialize_with = "display_serialize")]
timeline_id: ZTimelineId,
acceptor_state: AcceptorState,
acceptor_state: AcceptorStateStatus,
#[serde(serialize_with = "display_serialize")]
commit_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
@@ -68,10 +78,16 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
let sk_state = tli.get_info();
let flush_lsn = tli.get_end_of_wal();
let acc_state = AcceptorStateStatus {
term: sk_state.acceptor_state.term,
epoch: sk_state.acceptor_state.get_epoch(flush_lsn),
term_history: sk_state.acceptor_state.term_history,
};
let status = TimelineStatus {
tenant_id,
timeline_id,
acceptor_state: sk_state.acceptor_state,
acceptor_state: acc_state,
commit_lsn: sk_state.commit_lsn,
truncate_lsn: sk_state.truncate_lsn,
flush_lsn,

View File

@@ -14,9 +14,9 @@ use serde::{Deserialize, Serialize};
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerGreeting,
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
};
use crate::safekeeper::{SafeKeeperState, Term};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::send_wal::SendWalHandler;
use crate::timeline::TimelineTools;
use postgres_ffi::pg_constants;
@@ -35,6 +35,9 @@ struct AppendLogicalMessage {
// if true, commit_lsn will match flush_lsn after append
set_commit_lsn: bool,
// if true, ProposerElected will be sent before append
send_proposer_elected: bool,
// fields from AppendRequestHeader
term: Term,
epoch_start_lsn: Lsn,
@@ -70,6 +73,11 @@ pub fn handle_json_ctrl(
// need to init safekeeper state before AppendRequest
prepare_safekeeper(swh)?;
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?;
}
let inserted_wal = append_logical_message(swh, append_request)?;
let response = AppendResult {
state: swh.timeline.get().get_info(),
@@ -104,11 +112,29 @@ fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> {
let response = swh.timeline.get().process_msg(&greeting_request)?;
match response {
AcceptorProposerMessage::Greeting(_) => Ok(()),
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
_ => anyhow::bail!("not GreetingResponse"),
}
}
fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> {
// add new term to existing history
let history = swh.timeline.get().get_info().acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
let history = TermHistory(history_entries);
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
term,
start_streaming_at: lsn,
term_history: history,
});
swh.timeline.get().process_msg(&proposer_elected_request)?;
Ok(())
}
#[derive(Serialize, Deserialize)]
struct InsertedWAL {
begin_lsn: Lsn,
@@ -150,7 +176,7 @@ fn append_logical_message(
let response = swh.timeline.get().process_msg(&append_request)?;
let append_response = match response {
AcceptorProposerMessage::AppendResponse(resp) => resp,
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
_ => anyhow::bail!("not AppendResponse"),
};

View File

@@ -4,6 +4,7 @@
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use bytes::BytesMut;
use log::*;
use postgres::{Client, Config, NoTls};
@@ -98,7 +99,7 @@ impl<'pg> ReceiveWalConn<'pg> {
// Send message to the postgres
fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> {
let mut buf = Vec::new();
let mut buf = BytesMut::with_capacity(128);
msg.serialize(&mut buf)?;
self.pg_backend.write_message(&BeMessage::CopyData(&buf))?;
Ok(())
@@ -147,7 +148,9 @@ impl<'pg> ReceiveWalConn<'pg> {
.get()
.process_msg(&msg)
.with_context(|| "failed to process ProposerAcceptorMessage")?;
self.write_msg(&reply)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
msg = self.read_msg()?;
}
}

View File

@@ -4,16 +4,16 @@ use anyhow::Context;
use anyhow::{anyhow, bail, Result};
use byteorder::LittleEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use log::*;
use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::TimeLineID;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
use std::io;
use std::fmt;
use std::io::Read;
use lazy_static::lazy_static;
@@ -37,6 +37,70 @@ const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp.
pub type Term = u64;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchEntry {
pub term: Term,
pub lsn: Lsn,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct TermHistory(pub Vec<TermSwitchEntry>);
impl TermHistory {
pub fn empty() -> TermHistory {
TermHistory(Vec::new())
}
// Parse TermHistory as n_entries followed by TermSwitchEntry pairs
pub fn from_bytes(mut bytes: Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
}
let n_entries = bytes.get_u32_le();
let mut res = Vec::with_capacity(n_entries as usize);
for _ in 0..n_entries {
if bytes.remaining() < 16 {
bail!("TermHistory is incomplete");
}
res.push(TermSwitchEntry {
term: bytes.get_u64_le(),
lsn: bytes.get_u64_le().into(),
})
}
Ok(TermHistory(res))
}
/// Return copy of self with switches happening strictly after up_to
/// truncated.
pub fn up_to(&self, up_to: Lsn) -> TermHistory {
let mut res = Vec::with_capacity(self.0.len());
for e in &self.0 {
if e.lsn > up_to {
break;
}
res.push(*e);
}
TermHistory(res)
}
}
/// Display only latest entries for Debug.
impl fmt::Debug for TermHistory {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let n_printed = 20;
write!(
fmt,
"{}{:?}",
if self.0.len() > n_printed { "... " } else { "" },
self.0
.iter()
.rev()
.take(n_printed)
.map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
.collect::<Vec<_>>()
)
}
}
/// Unique id of proposer. Not needed for correctness, used for monitoring.
type PgUuid = [u8; 16];
@@ -45,8 +109,21 @@ type PgUuid = [u8; 16];
pub struct AcceptorState {
/// acceptor's last term it voted for (advanced in 1 phase)
pub term: Term,
/// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached).
pub epoch: Term,
/// History of term switches for safekeeper's WAL.
/// Actually it often goes *beyond* WAL contents as we adopt term history
/// from the proposer before recovery.
pub term_history: TermHistory,
}
impl AcceptorState {
/// acceptor's epoch is the term of the highest entry in the log
pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
let th = self.term_history.up_to(flush_lsn);
match th.0.last() {
Some(e) => e.term,
None => 0,
}
}
}
/// Information about Postgres. Safekeeper gets it once and then verifies
@@ -91,7 +168,10 @@ impl SafeKeeperState {
SafeKeeperState {
magic: SK_MAGIC,
format_version: SK_FORMAT_VERSION,
acceptor_state: AcceptorState { term: 0, epoch: 0 },
acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),
},
server: ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
@@ -147,16 +227,28 @@ pub struct VoteRequest {
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
pub struct VoteResponse {
term: Term, // not really needed, just a sanity check
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
/// Safekeeper's log position, to let proposer choose the most advanced one
epoch: Term,
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
// proposer to choose the most advanced one.
flush_lsn: Lsn,
truncate_lsn: Lsn,
term_history: TermHistory,
}
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
* term history to it.
*/
#[derive(Debug)]
pub struct ProposerElected {
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
}
/// Request with WAL message sent from proposer to safekeeper. Along the way it
/// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn.
/// communicates commit_lsn.
#[derive(Debug)]
pub struct AppendRequest {
pub h: AppendRequestHeader,
@@ -164,6 +256,7 @@ pub struct AppendRequest {
}
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
// LSN since the proposer appends WAL; determines epoch switch point.
pub epoch_start_lsn: Lsn,
@@ -185,7 +278,6 @@ pub struct AppendResponse {
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
pub term: Term,
pub epoch: Term,
// NOTE: this is physical end of wal on safekeeper; currently it doesn't
// make much sense without taking epoch into account, as history can be
// diverged.
@@ -198,19 +290,32 @@ pub struct AppendResponse {
pub hs_feedback: HotStandbyFeedback,
}
impl AppendResponse {
fn term_only(term: Term) -> AppendResponse {
AppendResponse {
term,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// Proposer -> Acceptor messages
#[derive(Debug)]
pub enum ProposerAcceptorMessage {
Greeting(ProposerGreeting),
VoteRequest(VoteRequest),
Elected(ProposerElected),
AppendRequest(AppendRequest),
}
impl ProposerAcceptorMessage {
/// Parse proposer message.
pub fn parse(msg: Bytes) -> Result<ProposerAcceptorMessage> {
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg.reader();
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
@@ -222,6 +327,21 @@ impl ProposerAcceptorMessage {
let msg = VoteRequest::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::VoteRequest(msg))
}
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes(msg_bytes)?;
let msg = ProposerElected {
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => {
// read header followed by wal data
let hdr = AppendRequestHeader::des_from(&mut stream)?;
@@ -259,19 +379,33 @@ pub enum AcceptorProposerMessage {
impl AcceptorProposerMessage {
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, stream: &mut impl io::Write) -> Result<()> {
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
match self {
AcceptorProposerMessage::Greeting(msg) => {
stream.write_u64::<LittleEndian>('g' as u64)?;
msg.ser_into(stream)?;
buf.put_u64_le('g' as u64);
buf.put_u64_le(msg.term);
}
AcceptorProposerMessage::VoteResponse(msg) => {
stream.write_u64::<LittleEndian>('v' as u64)?;
msg.ser_into(stream)?;
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
}
AcceptorProposerMessage::AppendResponse(msg) => {
stream.write_u64::<LittleEndian>('a' as u64)?;
msg.ser_into(stream)?;
buf.put_u64_le('a' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_u64_le(msg.disk_consistent_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
}
}
@@ -284,6 +418,8 @@ pub trait Storage {
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>;
/// Write piece of wal in buf to disk and sync it.
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
// Truncate WAL at specified LSN
fn truncate_wal(&mut self, s: &ServerInfo, endpos: Lsn) -> Result<()>;
}
lazy_static! {
@@ -357,8 +493,7 @@ pub struct SafeKeeper<ST: Storage> {
pub commit_lsn: Lsn,
pub truncate_lsn: Lsn,
pub storage: ST,
pub s: SafeKeeperState, // persistent part
pub elected_proposer_term: Term, // for monitoring/debugging
pub s: SafeKeeperState, // persistent part
decoder: WalStreamDecoder,
}
@@ -375,27 +510,40 @@ where
truncate_lsn: state.truncate_lsn,
storage,
s: state,
elected_proposer_term: 0,
decoder: WalStreamDecoder::new(Lsn(0)),
}
}
/// Get history of term switches for the available WAL
fn get_term_history(&self) -> TermHistory {
self.s.acceptor_state.term_history.up_to(self.flush_lsn)
}
#[cfg(test)]
fn get_epoch(&self) -> Term {
self.s.acceptor_state.get_epoch(self.flush_lsn)
}
/// Process message from proposer and possibly form reply. Concurrent
/// callers must exclude each other.
pub fn process_msg(
&mut self,
msg: &ProposerAcceptorMessage,
) -> Result<AcceptorProposerMessage> {
) -> Result<Option<AcceptorProposerMessage>> {
match msg {
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg),
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg),
}
}
/// Handle initial message from proposer: check its sanity and send my
/// current term.
fn handle_greeting(&mut self, msg: &ProposerGreeting) -> Result<AcceptorProposerMessage> {
fn handle_greeting(
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
/* Check protocol compatibility */
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
@@ -429,64 +577,106 @@ where
"processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term
);
Ok(AcceptorProposerMessage::Greeting(AcceptorGreeting {
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.s.acceptor_state.term,
}))
})))
}
/// Give vote for the given term, if we haven't done that previously.
fn handle_vote_request(&mut self, msg: &VoteRequest) -> Result<AcceptorProposerMessage> {
fn handle_vote_request(
&mut self,
msg: &VoteRequest,
) -> Result<Option<AcceptorProposerMessage>> {
// initialize with refusal
let mut resp = VoteResponse {
term: msg.term,
term: self.s.acceptor_state.term,
vote_given: false as u64,
epoch: 0,
flush_lsn: Lsn(0),
truncate_lsn: Lsn(0),
flush_lsn: self.flush_lsn,
truncate_lsn: self.s.truncate_lsn,
term_history: self.get_term_history(),
};
if self.s.acceptor_state.term < msg.term {
self.s.acceptor_state.term = msg.term;
// persist vote before sending it out
self.storage.persist(&self.s, true)?;
resp.term = self.s.acceptor_state.term;
resp.vote_given = true as u64;
resp.epoch = self.s.acceptor_state.epoch;
resp.flush_lsn = self.flush_lsn;
resp.truncate_lsn = self.s.truncate_lsn;
}
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
Ok(AcceptorProposerMessage::VoteResponse(resp))
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Bump our term if received a note from elected proposer with higher one
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.s.acceptor_state.term < term {
self.s.acceptor_state.term = term;
self.storage.persist(&self.s, true)?;
}
Ok(())
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
AppendResponse {
term: self.s.acceptor_state.term,
flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
}
}
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg);
self.bump_if_higher(msg.term)?;
// If our term is higher, ignore the message (next feedback will inform the compute)
if self.s.acceptor_state.term > msg.term {
return Ok(None);
}
// TODO: cross check divergence point
// streaming must not create a hole
assert!(self.flush_lsn == Lsn(0) || self.flush_lsn >= msg.start_streaming_at);
// truncate obsolete part of WAL
if self.flush_lsn != Lsn(0) {
self.storage
.truncate_wal(&self.s.server, msg.start_streaming_at)?;
}
// update our end of WAL pointer
self.flush_lsn = msg.start_streaming_at;
// and now adopt term history from proposer
self.s.acceptor_state.term_history = msg.term_history.clone();
self.storage.persist(&self.s, true)?;
info!("start receiving WAL since {:?}", msg.start_streaming_at);
Ok(None)
}
/// Handle request to append WAL.
#[allow(clippy::comparison_chain)]
fn handle_append_request(&mut self, msg: &AppendRequest) -> Result<AcceptorProposerMessage> {
// log first AppendRequest from this proposer
if self.elected_proposer_term < msg.h.term {
info!(
"start accepting WAL from timeline {}, tenant {}, term {}, epochStartLsn {:?}",
self.s.server.ztli, self.s.server.tenant_id, msg.h.term, msg.h.epoch_start_lsn,
);
self.elected_proposer_term = msg.h.term;
fn handle_append_request(
&mut self,
msg: &AppendRequest,
) -> Result<Option<AcceptorProposerMessage>> {
if self.s.acceptor_state.term < msg.h.term {
bail!("got AppendRequest before ProposerElected");
}
// If our term is lower than elected proposer one, bump it.
if self.s.acceptor_state.term < msg.h.term {
self.s.acceptor_state.term = msg.h.term;
self.storage.persist(&self.s, true)?;
}
// OTOH, if it is higher, immediately refuse the message.
else if self.s.acceptor_state.term > msg.h.term {
let resp = AppendResponse {
term: self.s.acceptor_state.term,
epoch: self.s.acceptor_state.epoch,
commit_lsn: Lsn(0),
flush_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
};
return Ok(AcceptorProposerMessage::AppendResponse(resp));
// If our term is higher, immediately refuse the message.
if self.s.acceptor_state.term > msg.h.term {
let resp = AppendResponse::term_only(self.s.acceptor_state.term);
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
// After ProposerElected, which performs truncation, we should get only
// indeed append requests (but flush_lsn is advanced only on record
// boundary, so might be less).
assert!(self.flush_lsn <= msg.h.begin_lsn);
self.s.proposer_uuid = msg.h.proposer_uuid;
let mut sync_control_file = false;
@@ -530,48 +720,21 @@ where
}
}
/*
* Epoch switch happen when written WAL record cross the boundary.
* The boundary is maximum of last WAL position at this node (FlushLSN) and global
* maximum (vcl) determined by WAL proposer during handshake.
* Switching epoch means that node completes recovery and start writing in the WAL new data.
* XXX: this is wrong, we must actively truncate not matching part of log.
*
* The non-strict inequality is important for us, as proposer in --sync mode doesn't
* generate new records, but to advance commit_lsn epoch switch must happen on majority.
* We can regard this as commit of empty entry in new epoch, this should be safe.
*/
if self.s.acceptor_state.epoch < msg.h.term
&& msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn)
{
info!(
"switched to new epoch {} on receival of request end_lsn={:?}, len={:?}",
msg.h.term,
msg.h.end_lsn,
msg.wal_data.len(),
);
self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */
sync_control_file = true;
}
if last_rec_lsn > self.flush_lsn {
self.flush_lsn = last_rec_lsn;
self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64);
}
// Advance commit_lsn taking into account what we have locally. xxx this
// is wrapped into epoch check because we overwrite wal instead of
// truncating it, so without it commit_lsn might include wrong part.
// Anyway, nobody is much interested in our commit_lsn while epoch
// switch hasn't happened, right?
//
// Advance commit_lsn taking into account what we have locally.
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
// collected majority of its epoch acks yet, ignore it in this case.
if self.s.acceptor_state.epoch == msg.h.term && msg.h.commit_lsn != Lsn(0) {
if msg.h.commit_lsn != Lsn(0) {
let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn);
// If new commit_lsn reached epoch switch, force sync of control file:
// walproposer in sync mode is very interested when this happens.
sync_control_file |=
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn;
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
sync_control_file |= commit_lsn == msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn;
self.metrics
.commit_lsn
@@ -592,15 +755,7 @@ where
}
self.storage.persist(&self.s, sync_control_file)?;
let resp = AppendResponse {
term: self.s.acceptor_state.term,
epoch: self.s.acceptor_state.epoch,
flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by caller code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
};
let resp = self.append_response();
info!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}",
msg.wal_data.len(),
@@ -609,7 +764,7 @@ where
msg.h.truncate_lsn,
&resp,
);
Ok(AcceptorProposerMessage::AppendResponse(resp))
Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
}
}
@@ -631,6 +786,10 @@ mod tests {
fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> {
Ok(())
}
fn truncate_wal(&mut self, _server: &ServerInfo, _end_pos: Lsn) -> Result<()> {
Ok(())
}
}
#[test]
@@ -644,7 +803,7 @@ mod tests {
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
let mut vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() {
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
r => panic!("unexpected response: {:?}", r),
}
@@ -658,7 +817,7 @@ mod tests {
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() {
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
r => panic!("unexpected response: {:?}", r),
}
}
@@ -684,10 +843,21 @@ mod tests {
wal_data: Bytes::from_static(b"b"),
};
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermSwitchEntry {
term: 1,
lsn: Lsn(3),
}]),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.unwrap();
// check that AppendRequest before epochStartLsn doesn't switch epoch
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
assert!(resp.is_ok());
assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 0);
assert_eq!(sk.get_epoch(), 0);
// but record at epochStartLsn does the switch
ar_hdr.begin_lsn = Lsn(2);
@@ -698,6 +868,7 @@ mod tests {
};
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
assert!(resp.is_ok());
assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 1);
sk.flush_lsn = Lsn(3); // imitate the complete record at 3 %)
assert_eq!(sk.get_epoch(), 1);
}
}

View File

@@ -317,8 +317,11 @@ impl Timeline {
}
/// Pass arrived message to the safekeeper.
pub fn process_msg(&self, msg: &ProposerAcceptorMessage) -> Result<AcceptorProposerMessage> {
let mut rmsg: AcceptorProposerMessage;
pub fn process_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
@@ -328,7 +331,7 @@ impl Timeline {
commit_lsn = shared_state.sk.commit_lsn;
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg {
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback;
resp.disk_consistent_lsn = state.disk_consistent_lsn;
@@ -596,6 +599,82 @@ impl Storage for FileStorage {
}
Ok(())
}
fn truncate_wal(&mut self, server: &ServerInfo, end_pos: Lsn) -> Result<()> {
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.ztli;
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
/* Open file */
let mut segno = end_pos.segment_number(wal_seg_size);
// note: we basically don't support changing pg timeline
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name + ".partial");
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else {
wal_file = OpenOptions::new()
.write(true)
.open(&wal_file_partial_path)?;
partial = true;
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
while xlogoff < wal_seg_size {
let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff);
wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?;
xlogoff += bytes_to_write;
}
// Flush file, if not said otherwise
if !self.conf.no_sync {
wal_file.sync_all()?;
}
}
if !partial {
// Make segment partial once again
fs::rename(&wal_file_path, &wal_file_partial_path)?;
}
// Remove all subsequent segments
loop {
segno += 1;
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone() + ".partial");
// TODO: better use fs::try_exists which is currenty avaialble only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
} else if wal_file_partial_path.exists() {
fs::remove_file(&wal_file_partial_path)?;
} else {
break;
}
}
Ok(())
}
}
#[cfg(test)]