mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 08:30:37 +00:00
Add term history to safekeepers.
Persist full history of term switches on safekeepers instead of storing only the single term of the highest entry (called epoch). This allows easily and correctly find the divergence point of two logs and truncate the obsolete part before overwriting it with entries of the newer proposer(s). Full history of the proposer is transferred in separate message before proposer starts streaming; it is immediately persisted by safekeeper, though he might not yet have entries for some older terms there. That's because we can't atomically append to WAL and update the control file anyway, so locally available WAL must be taken into account when looking at the history. We should sometimes purge term history entries beyond truncate_lsn; this is not done here. Per https://github.com/zenithdb/rfcs/pull/12 Closes #296. Bumps vendor/postgres.
This commit is contained in:
@@ -14,9 +14,9 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
||||
use crate::safekeeper::{
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerGreeting,
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
|
||||
};
|
||||
use crate::safekeeper::{SafeKeeperState, Term};
|
||||
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::TimelineTools;
|
||||
use postgres_ffi::pg_constants;
|
||||
@@ -35,6 +35,9 @@ struct AppendLogicalMessage {
|
||||
// if true, commit_lsn will match flush_lsn after append
|
||||
set_commit_lsn: bool,
|
||||
|
||||
// if true, ProposerElected will be sent before append
|
||||
send_proposer_elected: bool,
|
||||
|
||||
// fields from AppendRequestHeader
|
||||
term: Term,
|
||||
epoch_start_lsn: Lsn,
|
||||
@@ -70,6 +73,11 @@ pub fn handle_json_ctrl(
|
||||
// need to init safekeeper state before AppendRequest
|
||||
prepare_safekeeper(swh)?;
|
||||
|
||||
// if send_proposer_elected is true, we need to update local history
|
||||
if append_request.send_proposer_elected {
|
||||
send_proposer_elected(swh, append_request.term, append_request.epoch_start_lsn)?;
|
||||
}
|
||||
|
||||
let inserted_wal = append_logical_message(swh, append_request)?;
|
||||
let response = AppendResult {
|
||||
state: swh.timeline.get().get_info(),
|
||||
@@ -104,11 +112,29 @@ fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> {
|
||||
|
||||
let response = swh.timeline.get().process_msg(&greeting_request)?;
|
||||
match response {
|
||||
AcceptorProposerMessage::Greeting(_) => Ok(()),
|
||||
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
|
||||
_ => anyhow::bail!("not GreetingResponse"),
|
||||
}
|
||||
}
|
||||
|
||||
fn send_proposer_elected(swh: &mut SendWalHandler, term: Term, lsn: Lsn) -> Result<()> {
|
||||
// add new term to existing history
|
||||
let history = swh.timeline.get().get_info().acceptor_state.term_history;
|
||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||
let mut history_entries = history.0;
|
||||
history_entries.push(TermSwitchEntry { term, lsn });
|
||||
let history = TermHistory(history_entries);
|
||||
|
||||
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
term,
|
||||
start_streaming_at: lsn,
|
||||
term_history: history,
|
||||
});
|
||||
|
||||
swh.timeline.get().process_msg(&proposer_elected_request)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct InsertedWAL {
|
||||
begin_lsn: Lsn,
|
||||
@@ -150,7 +176,7 @@ fn append_logical_message(
|
||||
let response = swh.timeline.get().process_msg(&append_request)?;
|
||||
|
||||
let append_response = match response {
|
||||
AcceptorProposerMessage::AppendResponse(resp) => resp,
|
||||
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
|
||||
_ => anyhow::bail!("not AppendResponse"),
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user