Add term history to safekeepers.

Persist full history of term switches on safekeepers instead of storing only the
single term of the highest entry (called epoch). This allows easily and
correctly find the divergence point of two logs and truncate the obsolete part
before overwriting it with entries of the newer proposer(s).

Full history of the proposer is transferred in separate message before proposer
starts streaming; it is immediately persisted by safekeeper, though he might not
yet have entries for some older terms there. That's because we can't atomically
append to WAL and update the control file anyway, so locally available WAL must
be taken into account when looking at the history.

We should sometimes purge term history entries beyond truncate_lsn; this is not
done here.

Per https://github.com/zenithdb/rfcs/pull/12

Closes #296.

Bumps vendor/postgres.
This commit is contained in:
Arseny Sher
2021-11-05 13:39:41 +03:00
parent 2669d140f8
commit cba4da3f4d
7 changed files with 417 additions and 121 deletions

View File

@@ -392,6 +392,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin):
"lm_prefix": "prefix", "lm_prefix": "prefix",
"lm_message": "message", "lm_message": "message",
"set_commit_lsn": True, "set_commit_lsn": True,
"send_proposer_elected": True,
"term": 2, "term": 2,
"begin_lsn": begin_lsn, "begin_lsn": begin_lsn,
"epoch_start_lsn": epoch_start_lsn, "epoch_start_lsn": epoch_start_lsn,

View File

@@ -7,7 +7,8 @@ use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
use crate::safekeeper::AcceptorState; use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::CreateControlFile; use crate::timeline::CreateControlFile;
use crate::timeline::GlobalTimelines; use crate::timeline::GlobalTimelines;
use crate::SafeKeeperConf; use crate::SafeKeeperConf;
@@ -29,6 +30,7 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref() .as_ref()
} }
/// Serialize through Display trait.
fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error> fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error>
where where
S: Serializer, S: Serializer,
@@ -37,6 +39,14 @@ where
s.serialize_str(&format!("{}", z)) s.serialize_str(&format!("{}", z))
} }
/// Augment AcceptorState with epoch for convenience
#[derive(Debug, Serialize)]
struct AcceptorStateStatus {
term: Term,
epoch: Term,
term_history: TermHistory,
}
/// Info about timeline on safekeeper ready for reporting. /// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
struct TimelineStatus { struct TimelineStatus {
@@ -44,7 +54,7 @@ struct TimelineStatus {
tenant_id: ZTenantId, tenant_id: ZTenantId,
#[serde(serialize_with = "display_serialize")] #[serde(serialize_with = "display_serialize")]
timeline_id: ZTimelineId, timeline_id: ZTimelineId,
acceptor_state: AcceptorState, acceptor_state: AcceptorStateStatus,
#[serde(serialize_with = "display_serialize")] #[serde(serialize_with = "display_serialize")]
commit_lsn: Lsn, commit_lsn: Lsn,
#[serde(serialize_with = "display_serialize")] #[serde(serialize_with = "display_serialize")]
@@ -68,10 +78,16 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
let sk_state = tli.get_info(); let sk_state = tli.get_info();
let flush_lsn = tli.get_end_of_wal(); let flush_lsn = tli.get_end_of_wal();
let acc_state = AcceptorStateStatus {
term: sk_state.acceptor_state.term,
epoch: sk_state.acceptor_state.get_epoch(flush_lsn),
term_history: sk_state.acceptor_state.term_history,
};
let status = TimelineStatus { let status = TimelineStatus {
tenant_id, tenant_id,
timeline_id, timeline_id,
acceptor_state: sk_state.acceptor_state, acceptor_state: acc_state,
commit_lsn: sk_state.commit_lsn, commit_lsn: sk_state.commit_lsn,
truncate_lsn: sk_state.truncate_lsn, truncate_lsn: sk_state.truncate_lsn,
flush_lsn, flush_lsn,

View File

@@ -14,9 +14,9 @@ use serde::{Deserialize, Serialize};
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{ use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerGreeting, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
}; };
use crate::safekeeper::{SafeKeeperState, Term}; use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::send_wal::SendWalHandler; use crate::send_wal::SendWalHandler;
use crate::timeline::TimelineTools; use crate::timeline::TimelineTools;
use postgres_ffi::pg_constants; use postgres_ffi::pg_constants;
@@ -35,6 +35,9 @@ struct AppendLogicalMessage {
// if true, commit_lsn will match flush_lsn after append // if true, commit_lsn will match flush_lsn after append
set_commit_lsn: bool, set_commit_lsn: bool,
// if true, ProposerElected will be sent before append
send_proposer_elected: bool,
// fields from AppendRequestHeader // fields from AppendRequestHeader
term: Term, term: Term,
epoch_start_lsn: Lsn, epoch_start_lsn: Lsn,
@@ -70,6 +73,11 @@ pub fn handle_json_ctrl(
// need to init safekeeper state before AppendRequest // need to init safekeeper state before AppendRequest
prepare_safekeeper(swh)?; prepare_safekeeper(swh)?;
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?;
}
let inserted_wal = append_logical_message(swh, append_request)?; let inserted_wal = append_logical_message(swh, append_request)?;
let response = AppendResult { let response = AppendResult {
state: swh.timeline.get().get_info(), state: swh.timeline.get().get_info(),
@@ -104,11 +112,29 @@ fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> {
let response = swh.timeline.get().process_msg(&greeting_request)?; let response = swh.timeline.get().process_msg(&greeting_request)?;
match response { match response {
AcceptorProposerMessage::Greeting(_) => Ok(()), Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
_ => anyhow::bail!("not GreetingResponse"), _ => anyhow::bail!("not GreetingResponse"),
} }
} }
fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> {
// add new term to existing history
let history = swh.timeline.get().get_info().acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
let history = TermHistory(history_entries);
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
term,
start_streaming_at: lsn,
term_history: history,
});
swh.timeline.get().process_msg(&proposer_elected_request)?;
Ok(())
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct InsertedWAL { struct InsertedWAL {
begin_lsn: Lsn, begin_lsn: Lsn,
@@ -150,7 +176,7 @@ fn append_logical_message(
let response = swh.timeline.get().process_msg(&append_request)?; let response = swh.timeline.get().process_msg(&append_request)?;
let append_response = match response { let append_response = match response {
AcceptorProposerMessage::AppendResponse(resp) => resp, Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
_ => anyhow::bail!("not AppendResponse"), _ => anyhow::bail!("not AppendResponse"),
}; };

View File

@@ -4,6 +4,7 @@
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bytes::Bytes; use bytes::Bytes;
use bytes::BytesMut;
use log::*; use log::*;
use postgres::{Client, Config, NoTls}; use postgres::{Client, Config, NoTls};
@@ -98,7 +99,7 @@ impl<'pg> ReceiveWalConn<'pg> {
// Send message to the postgres // Send message to the postgres
fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> { fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> {
let mut buf = Vec::new(); let mut buf = BytesMut::with_capacity(128);
msg.serialize(&mut buf)?; msg.serialize(&mut buf)?;
self.pg_backend.write_message(&BeMessage::CopyData(&buf))?; self.pg_backend.write_message(&BeMessage::CopyData(&buf))?;
Ok(()) Ok(())
@@ -147,7 +148,9 @@ impl<'pg> ReceiveWalConn<'pg> {
.get() .get()
.process_msg(&msg) .process_msg(&msg)
.with_context(|| "failed to process ProposerAcceptorMessage")?; .with_context(|| "failed to process ProposerAcceptorMessage")?;
self.write_msg(&reply)?; if let Some(reply) = reply {
self.write_msg(&reply)?;
}
msg = self.read_msg()?; msg = self.read_msg()?;
} }
} }

View File

@@ -4,16 +4,16 @@ use anyhow::Context;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use byteorder::LittleEndian; use byteorder::LittleEndian;
use byteorder::ReadBytesExt; use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
use bytes::Buf; use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes; use bytes::Bytes;
use bytes::BytesMut;
use log::*; use log::*;
use pageserver::waldecoder::WalStreamDecoder; use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::TimeLineID; use postgres_ffi::xlog_utils::TimeLineID;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min; use std::cmp::min;
use std::io; use std::fmt;
use std::io::Read; use std::io::Read;
use lazy_static::lazy_static; use lazy_static::lazy_static;
@@ -37,6 +37,70 @@ const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp. /// Consensus logical timestamp.
pub type Term = u64; pub type Term = u64;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchEntry {
pub term: Term,
pub lsn: Lsn,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct TermHistory(pub Vec<TermSwitchEntry>);
impl TermHistory {
pub fn empty() -> TermHistory {
TermHistory(Vec::new())
}
// Parse TermHistory as n_entries followed by TermSwitchEntry pairs
pub fn from_bytes(mut bytes: Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
}
let n_entries = bytes.get_u32_le();
let mut res = Vec::with_capacity(n_entries as usize);
for _ in 0..n_entries {
if bytes.remaining() < 16 {
bail!("TermHistory is incomplete");
}
res.push(TermSwitchEntry {
term: bytes.get_u64_le(),
lsn: bytes.get_u64_le().into(),
})
}
Ok(TermHistory(res))
}
/// Return copy of self with switches happening strictly after up_to
/// truncated.
pub fn up_to(&self, up_to: Lsn) -> TermHistory {
let mut res = Vec::with_capacity(self.0.len());
for e in &self.0 {
if e.lsn > up_to {
break;
}
res.push(*e);
}
TermHistory(res)
}
}
/// Display only latest entries for Debug.
impl fmt::Debug for TermHistory {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let n_printed = 20;
write!(
fmt,
"{}{:?}",
if self.0.len() > n_printed { "... " } else { "" },
self.0
.iter()
.rev()
.take(n_printed)
.map(|&e| (e.term, e.lsn)) // omit TermSwitchEntry
.collect::<Vec<_>>()
)
}
}
/// Unique id of proposer. Not needed for correctness, used for monitoring. /// Unique id of proposer. Not needed for correctness, used for monitoring.
type PgUuid = [u8; 16]; type PgUuid = [u8; 16];
@@ -45,8 +109,21 @@ type PgUuid = [u8; 16];
pub struct AcceptorState { pub struct AcceptorState {
/// acceptor's last term it voted for (advanced in 1 phase) /// acceptor's last term it voted for (advanced in 1 phase)
pub term: Term, pub term: Term,
/// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached). /// History of term switches for safekeeper's WAL.
pub epoch: Term, /// Actually it often goes *beyond* WAL contents as we adopt term history
/// from the proposer before recovery.
pub term_history: TermHistory,
}
impl AcceptorState {
/// acceptor's epoch is the term of the highest entry in the log
pub fn get_epoch(&self, flush_lsn: Lsn) -> Term {
let th = self.term_history.up_to(flush_lsn);
match th.0.last() {
Some(e) => e.term,
None => 0,
}
}
} }
/// Information about Postgres. Safekeeper gets it once and then verifies /// Information about Postgres. Safekeeper gets it once and then verifies
@@ -91,7 +168,10 @@ impl SafeKeeperState {
SafeKeeperState { SafeKeeperState {
magic: SK_MAGIC, magic: SK_MAGIC,
format_version: SK_FORMAT_VERSION, format_version: SK_FORMAT_VERSION,
acceptor_state: AcceptorState { term: 0, epoch: 0 }, acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),
},
server: ServerInfo { server: ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */ system_id: 0, /* Postgres system identifier */
@@ -147,16 +227,28 @@ pub struct VoteRequest {
/// Vote itself, sent from safekeeper to proposer /// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub struct VoteResponse { pub struct VoteResponse {
term: Term, // not really needed, just a sanity check term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding vote_given: u64, // fixme u64 due to padding
/// Safekeeper's log position, to let proposer choose the most advanced one // Safekeeper flush_lsn (end of WAL) + history of term switches allow
epoch: Term, // proposer to choose the most advanced one.
flush_lsn: Lsn, flush_lsn: Lsn,
truncate_lsn: Lsn, truncate_lsn: Lsn,
term_history: TermHistory,
}
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
* term history to it.
*/
#[derive(Debug)]
pub struct ProposerElected {
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
} }
/// Request with WAL message sent from proposer to safekeeper. Along the way it /// Request with WAL message sent from proposer to safekeeper. Along the way it
/// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn. /// communicates commit_lsn.
#[derive(Debug)] #[derive(Debug)]
pub struct AppendRequest { pub struct AppendRequest {
pub h: AppendRequestHeader, pub h: AppendRequestHeader,
@@ -164,6 +256,7 @@ pub struct AppendRequest {
} }
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeader { pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term, pub term: Term,
// LSN since the proposer appends WAL; determines epoch switch point. // LSN since the proposer appends WAL; determines epoch switch point.
pub epoch_start_lsn: Lsn, pub epoch_start_lsn: Lsn,
@@ -185,7 +278,6 @@ pub struct AppendResponse {
// Current term of the safekeeper; if it is higher than proposer's, the // Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date. // compute is out of date.
pub term: Term, pub term: Term,
pub epoch: Term,
// NOTE: this is physical end of wal on safekeeper; currently it doesn't // NOTE: this is physical end of wal on safekeeper; currently it doesn't
// make much sense without taking epoch into account, as history can be // make much sense without taking epoch into account, as history can be
// diverged. // diverged.
@@ -198,19 +290,32 @@ pub struct AppendResponse {
pub hs_feedback: HotStandbyFeedback, pub hs_feedback: HotStandbyFeedback,
} }
impl AppendResponse {
fn term_only(term: Term) -> AppendResponse {
AppendResponse {
term,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// Proposer -> Acceptor messages /// Proposer -> Acceptor messages
#[derive(Debug)] #[derive(Debug)]
pub enum ProposerAcceptorMessage { pub enum ProposerAcceptorMessage {
Greeting(ProposerGreeting), Greeting(ProposerGreeting),
VoteRequest(VoteRequest), VoteRequest(VoteRequest),
Elected(ProposerElected),
AppendRequest(AppendRequest), AppendRequest(AppendRequest),
} }
impl ProposerAcceptorMessage { impl ProposerAcceptorMessage {
/// Parse proposer message. /// Parse proposer message.
pub fn parse(msg: Bytes) -> Result<ProposerAcceptorMessage> { pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
// xxx using Reader is inefficient but easy to work with bincode // xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg.reader(); let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char; let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag { match tag {
@@ -222,6 +327,21 @@ impl ProposerAcceptorMessage {
let msg = VoteRequest::des_from(&mut stream)?; let msg = VoteRequest::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::VoteRequest(msg)) Ok(ProposerAcceptorMessage::VoteRequest(msg))
} }
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes(msg_bytes)?;
let msg = ProposerElected {
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => { 'a' => {
// read header followed by wal data // read header followed by wal data
let hdr = AppendRequestHeader::des_from(&mut stream)?; let hdr = AppendRequestHeader::des_from(&mut stream)?;
@@ -259,19 +379,33 @@ pub enum AcceptorProposerMessage {
impl AcceptorProposerMessage { impl AcceptorProposerMessage {
/// Serialize acceptor -> proposer message. /// Serialize acceptor -> proposer message.
pub fn serialize(&self, stream: &mut impl io::Write) -> Result<()> { pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
match self { match self {
AcceptorProposerMessage::Greeting(msg) => { AcceptorProposerMessage::Greeting(msg) => {
stream.write_u64::<LittleEndian>('g' as u64)?; buf.put_u64_le('g' as u64);
msg.ser_into(stream)?; buf.put_u64_le(msg.term);
} }
AcceptorProposerMessage::VoteResponse(msg) => { AcceptorProposerMessage::VoteResponse(msg) => {
stream.write_u64::<LittleEndian>('v' as u64)?; buf.put_u64_le('v' as u64);
msg.ser_into(stream)?; buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
} }
AcceptorProposerMessage::AppendResponse(msg) => { AcceptorProposerMessage::AppendResponse(msg) => {
stream.write_u64::<LittleEndian>('a' as u64)?; buf.put_u64_le('a' as u64);
msg.ser_into(stream)?; buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_u64_le(msg.disk_consistent_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
} }
} }
@@ -284,6 +418,8 @@ pub trait Storage {
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>; fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>;
/// Write piece of wal in buf to disk and sync it. /// Write piece of wal in buf to disk and sync it.
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>; fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
// Truncate WAL at specified LSN
fn truncate_wal(&mut self, s: &ServerInfo, endpos: Lsn) -> Result<()>;
} }
lazy_static! { lazy_static! {
@@ -357,8 +493,7 @@ pub struct SafeKeeper<ST: Storage> {
pub commit_lsn: Lsn, pub commit_lsn: Lsn,
pub truncate_lsn: Lsn, pub truncate_lsn: Lsn,
pub storage: ST, pub storage: ST,
pub s: SafeKeeperState, // persistent part pub s: SafeKeeperState, // persistent part
pub elected_proposer_term: Term, // for monitoring/debugging
decoder: WalStreamDecoder, decoder: WalStreamDecoder,
} }
@@ -375,27 +510,40 @@ where
truncate_lsn: state.truncate_lsn, truncate_lsn: state.truncate_lsn,
storage, storage,
s: state, s: state,
elected_proposer_term: 0,
decoder: WalStreamDecoder::new(Lsn(0)), decoder: WalStreamDecoder::new(Lsn(0)),
} }
} }
/// Get history of term switches for the available WAL
fn get_term_history(&self) -> TermHistory {
self.s.acceptor_state.term_history.up_to(self.flush_lsn)
}
#[cfg(test)]
fn get_epoch(&self) -> Term {
self.s.acceptor_state.get_epoch(self.flush_lsn)
}
/// Process message from proposer and possibly form reply. Concurrent /// Process message from proposer and possibly form reply. Concurrent
/// callers must exclude each other. /// callers must exclude each other.
pub fn process_msg( pub fn process_msg(
&mut self, &mut self,
msg: &ProposerAcceptorMessage, msg: &ProposerAcceptorMessage,
) -> Result<AcceptorProposerMessage> { ) -> Result<Option<AcceptorProposerMessage>> {
match msg { match msg {
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg), ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg), ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg),
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg), ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg),
} }
} }
/// Handle initial message from proposer: check its sanity and send my /// Handle initial message from proposer: check its sanity and send my
/// current term. /// current term.
fn handle_greeting(&mut self, msg: &ProposerGreeting) -> Result<AcceptorProposerMessage> { fn handle_greeting(
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
/* Check protocol compatibility */ /* Check protocol compatibility */
if msg.protocol_version != SK_PROTOCOL_VERSION { if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!( bail!(
@@ -429,64 +577,106 @@ where
"processed greeting from proposer {:?}, sending term {:?}", "processed greeting from proposer {:?}, sending term {:?}",
msg.proposer_id, self.s.acceptor_state.term msg.proposer_id, self.s.acceptor_state.term
); );
Ok(AcceptorProposerMessage::Greeting(AcceptorGreeting { Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.s.acceptor_state.term, term: self.s.acceptor_state.term,
})) })))
} }
/// Give vote for the given term, if we haven't done that previously. /// Give vote for the given term, if we haven't done that previously.
fn handle_vote_request(&mut self, msg: &VoteRequest) -> Result<AcceptorProposerMessage> { fn handle_vote_request(
&mut self,
msg: &VoteRequest,
) -> Result<Option<AcceptorProposerMessage>> {
// initialize with refusal // initialize with refusal
let mut resp = VoteResponse { let mut resp = VoteResponse {
term: msg.term, term: self.s.acceptor_state.term,
vote_given: false as u64, vote_given: false as u64,
epoch: 0, flush_lsn: self.flush_lsn,
flush_lsn: Lsn(0), truncate_lsn: self.s.truncate_lsn,
truncate_lsn: Lsn(0), term_history: self.get_term_history(),
}; };
if self.s.acceptor_state.term < msg.term { if self.s.acceptor_state.term < msg.term {
self.s.acceptor_state.term = msg.term; self.s.acceptor_state.term = msg.term;
// persist vote before sending it out // persist vote before sending it out
self.storage.persist(&self.s, true)?; self.storage.persist(&self.s, true)?;
resp.term = self.s.acceptor_state.term;
resp.vote_given = true as u64; resp.vote_given = true as u64;
resp.epoch = self.s.acceptor_state.epoch;
resp.flush_lsn = self.flush_lsn;
resp.truncate_lsn = self.s.truncate_lsn;
} }
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
Ok(AcceptorProposerMessage::VoteResponse(resp)) Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Bump our term if received a note from elected proposer with higher one
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.s.acceptor_state.term < term {
self.s.acceptor_state.term = term;
self.storage.persist(&self.s, true)?;
}
Ok(())
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
AppendResponse {
term: self.s.acceptor_state.term,
flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
}
}
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg);
self.bump_if_higher(msg.term)?;
// If our term is higher, ignore the message (next feedback will inform the compute)
if self.s.acceptor_state.term > msg.term {
return Ok(None);
}
// TODO: cross check divergence point
// streaming must not create a hole
assert!(self.flush_lsn == Lsn(0) || self.flush_lsn >= msg.start_streaming_at);
// truncate obsolete part of WAL
if self.flush_lsn != Lsn(0) {
self.storage
.truncate_wal(&self.s.server, msg.start_streaming_at)?;
}
// update our end of WAL pointer
self.flush_lsn = msg.start_streaming_at;
// and now adopt term history from proposer
self.s.acceptor_state.term_history = msg.term_history.clone();
self.storage.persist(&self.s, true)?;
info!("start receiving WAL since {:?}", msg.start_streaming_at);
Ok(None)
} }
/// Handle request to append WAL. /// Handle request to append WAL.
#[allow(clippy::comparison_chain)] #[allow(clippy::comparison_chain)]
fn handle_append_request(&mut self, msg: &AppendRequest) -> Result<AcceptorProposerMessage> { fn handle_append_request(
// log first AppendRequest from this proposer &mut self,
if self.elected_proposer_term < msg.h.term { msg: &AppendRequest,
info!( ) -> Result<Option<AcceptorProposerMessage>> {
"start accepting WAL from timeline {}, tenant {}, term {}, epochStartLsn {:?}", if self.s.acceptor_state.term < msg.h.term {
self.s.server.ztli, self.s.server.tenant_id, msg.h.term, msg.h.epoch_start_lsn, bail!("got AppendRequest before ProposerElected");
);
self.elected_proposer_term = msg.h.term;
} }
// If our term is lower than elected proposer one, bump it. // If our term is higher, immediately refuse the message.
if self.s.acceptor_state.term < msg.h.term { if self.s.acceptor_state.term > msg.h.term {
self.s.acceptor_state.term = msg.h.term; let resp = AppendResponse::term_only(self.s.acceptor_state.term);
self.storage.persist(&self.s, true)?; return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
// OTOH, if it is higher, immediately refuse the message.
else if self.s.acceptor_state.term > msg.h.term {
let resp = AppendResponse {
term: self.s.acceptor_state.term,
epoch: self.s.acceptor_state.epoch,
commit_lsn: Lsn(0),
flush_lsn: Lsn(0),
disk_consistent_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
};
return Ok(AcceptorProposerMessage::AppendResponse(resp));
} }
// After ProposerElected, which performs truncation, we should get only
// indeed append requests (but flush_lsn is advanced only on record
// boundary, so might be less).
assert!(self.flush_lsn <= msg.h.begin_lsn);
self.s.proposer_uuid = msg.h.proposer_uuid; self.s.proposer_uuid = msg.h.proposer_uuid;
let mut sync_control_file = false; let mut sync_control_file = false;
@@ -530,48 +720,21 @@ where
} }
} }
/*
* Epoch switch happen when written WAL record cross the boundary.
* The boundary is maximum of last WAL position at this node (FlushLSN) and global
* maximum (vcl) determined by WAL proposer during handshake.
* Switching epoch means that node completes recovery and start writing in the WAL new data.
* XXX: this is wrong, we must actively truncate not matching part of log.
*
* The non-strict inequality is important for us, as proposer in --sync mode doesn't
* generate new records, but to advance commit_lsn epoch switch must happen on majority.
* We can regard this as commit of empty entry in new epoch, this should be safe.
*/
if self.s.acceptor_state.epoch < msg.h.term
&& msg.h.end_lsn >= max(self.flush_lsn, msg.h.epoch_start_lsn)
{
info!(
"switched to new epoch {} on receival of request end_lsn={:?}, len={:?}",
msg.h.term,
msg.h.end_lsn,
msg.wal_data.len(),
);
self.s.acceptor_state.epoch = msg.h.term; /* bump epoch */
sync_control_file = true;
}
if last_rec_lsn > self.flush_lsn { if last_rec_lsn > self.flush_lsn {
self.flush_lsn = last_rec_lsn; self.flush_lsn = last_rec_lsn;
self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64); self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64);
} }
// Advance commit_lsn taking into account what we have locally. xxx this // Advance commit_lsn taking into account what we have locally.
// is wrapped into epoch check because we overwrite wal instead of
// truncating it, so without it commit_lsn might include wrong part.
// Anyway, nobody is much interested in our commit_lsn while epoch
// switch hasn't happened, right?
//
// commit_lsn can be 0, being unknown to new walproposer while he hasn't // commit_lsn can be 0, being unknown to new walproposer while he hasn't
// collected majority of its epoch acks yet, ignore it in this case. // collected majority of its epoch acks yet, ignore it in this case.
if self.s.acceptor_state.epoch == msg.h.term && msg.h.commit_lsn != Lsn(0) { if msg.h.commit_lsn != Lsn(0) {
let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn);
// If new commit_lsn reached epoch switch, force sync of control file: // If new commit_lsn reached epoch switch, force sync of control
// walproposer in sync mode is very interested when this happens. // file: walproposer in sync mode is very interested when this
sync_control_file |= // happens. Note: this is for sync-safekeepers mode only, as
commit_lsn >= msg.h.epoch_start_lsn && self.s.commit_lsn < msg.h.epoch_start_lsn; // otherwise commit_lsn might jump over epoch_start_lsn.
sync_control_file |= commit_lsn == msg.h.epoch_start_lsn;
self.commit_lsn = commit_lsn; self.commit_lsn = commit_lsn;
self.metrics self.metrics
.commit_lsn .commit_lsn
@@ -592,15 +755,7 @@ where
} }
self.storage.persist(&self.s, sync_control_file)?; self.storage.persist(&self.s, sync_control_file)?;
let resp = AppendResponse { let resp = self.append_response();
term: self.s.acceptor_state.term,
epoch: self.s.acceptor_state.epoch,
flush_lsn: self.flush_lsn,
commit_lsn: self.s.commit_lsn,
disk_consistent_lsn: Lsn(0),
// will be filled by caller code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
};
info!( info!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}", "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}",
msg.wal_data.len(), msg.wal_data.len(),
@@ -609,7 +764,7 @@ where
msg.h.truncate_lsn, msg.h.truncate_lsn,
&resp, &resp,
); );
Ok(AcceptorProposerMessage::AppendResponse(resp)) Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
} }
} }
@@ -631,6 +786,10 @@ mod tests {
fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> { fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> {
Ok(()) Ok(())
} }
fn truncate_wal(&mut self, _server: &ServerInfo, _end_pos: Lsn) -> Result<()> {
Ok(())
}
} }
#[test] #[test]
@@ -644,7 +803,7 @@ mod tests {
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
let mut vote_resp = sk.process_msg(&vote_request); let mut vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() { match vote_resp.unwrap() {
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given != 0), Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
r => panic!("unexpected response: {:?}", r), r => panic!("unexpected response: {:?}", r),
} }
@@ -658,7 +817,7 @@ mod tests {
// and ensure voting second time for 1 is not ok // and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request); vote_resp = sk.process_msg(&vote_request);
match vote_resp.unwrap() { match vote_resp.unwrap() {
AcceptorProposerMessage::VoteResponse(resp) => assert!(resp.vote_given == 0), Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
r => panic!("unexpected response: {:?}", r), r => panic!("unexpected response: {:?}", r),
} }
} }
@@ -684,10 +843,21 @@ mod tests {
wal_data: Bytes::from_static(b"b"), wal_data: Bytes::from_static(b"b"),
}; };
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermSwitchEntry {
term: 1,
lsn: Lsn(3),
}]),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.unwrap();
// check that AppendRequest before epochStartLsn doesn't switch epoch // check that AppendRequest before epochStartLsn doesn't switch epoch
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
assert!(resp.is_ok()); assert!(resp.is_ok());
assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 0); assert_eq!(sk.get_epoch(), 0);
// but record at epochStartLsn does the switch // but record at epochStartLsn does the switch
ar_hdr.begin_lsn = Lsn(2); ar_hdr.begin_lsn = Lsn(2);
@@ -698,6 +868,7 @@ mod tests {
}; };
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
assert!(resp.is_ok()); assert!(resp.is_ok());
assert_eq!(sk.storage.persisted_state.acceptor_state.epoch, 1); sk.flush_lsn = Lsn(3); // imitate the complete record at 3 %)
assert_eq!(sk.get_epoch(), 1);
} }
} }

View File

@@ -317,8 +317,11 @@ impl Timeline {
} }
/// Pass arrived message to the safekeeper. /// Pass arrived message to the safekeeper.
pub fn process_msg(&self, msg: &ProposerAcceptorMessage) -> Result<AcceptorProposerMessage> { pub fn process_msg(
let mut rmsg: AcceptorProposerMessage; &self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn; let commit_lsn: Lsn;
{ {
let mut shared_state = self.mutex.lock().unwrap(); let mut shared_state = self.mutex.lock().unwrap();
@@ -328,7 +331,7 @@ impl Timeline {
commit_lsn = shared_state.sk.commit_lsn; commit_lsn = shared_state.sk.commit_lsn;
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg { if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
let state = shared_state.get_replicas_state(); let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback; resp.hs_feedback = state.hs_feedback;
resp.disk_consistent_lsn = state.disk_consistent_lsn; resp.disk_consistent_lsn = state.disk_consistent_lsn;
@@ -596,6 +599,82 @@ impl Storage for FileStorage {
} }
Ok(()) Ok(())
} }
fn truncate_wal(&mut self, server: &ServerInfo, end_pos: Lsn) -> Result<()> {
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.ztli;
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
/* Open file */
let mut segno = end_pos.segment_number(wal_seg_size);
// note: we basically don't support changing pg timeline
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name + ".partial");
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else {
wal_file = OpenOptions::new()
.write(true)
.open(&wal_file_partial_path)?;
partial = true;
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
while xlogoff < wal_seg_size {
let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff);
wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?;
xlogoff += bytes_to_write;
}
// Flush file, if not said otherwise
if !self.conf.no_sync {
wal_file.sync_all()?;
}
}
if !partial {
// Make segment partial once again
fs::rename(&wal_file_path, &wal_file_partial_path)?;
}
// Remove all subsequent segments
loop {
segno += 1;
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone() + ".partial");
// TODO: better use fs::try_exists which is currenty avaialble only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
} else if wal_file_partial_path.exists() {
fs::remove_file(&wal_file_partial_path)?;
} else {
break;
}
}
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]