mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
This PR implements a safekeeper migration algorithm from RFC-035 https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md#change-algorithm - Closes: https://github.com/neondatabase/neon/issues/11823 It is not production-ready yet, but I think it's good enough to commit and start testing. There are some known issues which will be addressed in later PRs: - https://github.com/neondatabase/neon/issues/12186 - https://github.com/neondatabase/neon/issues/12187 - https://github.com/neondatabase/neon/issues/12188 - https://github.com/neondatabase/neon/issues/12189 - https://github.com/neondatabase/neon/issues/12190 - https://github.com/neondatabase/neon/issues/12191 - https://github.com/neondatabase/neon/issues/12192 ## Summary of changes - Implement `tenant_timeline_safekeeper_migrate` handler to drive the migration - Add possibility to specify number of safekeepers per timeline in tests (`timeline_safekeeper_count`) - Add `term` and `flush_lsn` to `TimelineMembershipSwitchResponse` - Implement compare-and-swap (CAS) operation over timeline in DB for updating membership configuration safely. - Write simple test to verify that migration code works
300 lines
11 KiB
Rust
300 lines
11 KiB
Rust
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
|
|
//! and its wrapper with in memory layer (SafekeeperState).
|
|
|
|
use std::cmp::max;
|
|
use std::ops::Deref;
|
|
use std::time::SystemTime;
|
|
|
|
use anyhow::{Result, bail};
|
|
use postgres_ffi::WAL_SEGMENT_SIZE;
|
|
use postgres_versioninfo::{PgMajorVersion, PgVersionId};
|
|
use safekeeper_api::membership::Configuration;
|
|
use safekeeper_api::models::TimelineTermBumpResponse;
|
|
use safekeeper_api::{INITIAL_TERM, ServerInfo, Term};
|
|
use serde::{Deserialize, Serialize};
|
|
use tracing::info;
|
|
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
|
use utils::lsn::Lsn;
|
|
|
|
use crate::control_file;
|
|
use crate::safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn, UNKNOWN_SERVER_VERSION};
|
|
use crate::timeline::TimelineError;
|
|
use crate::wal_backup_partial::{self};
|
|
|
|
/// Persistent information stored on safekeeper node about timeline.
|
|
/// On disk data is prefixed by magic and format version and followed by checksum.
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct TimelinePersistentState {
|
|
#[serde(with = "hex")]
|
|
pub tenant_id: TenantId,
|
|
#[serde(with = "hex")]
|
|
pub timeline_id: TimelineId,
|
|
/// Membership configuration.
|
|
pub mconf: Configuration,
|
|
/// persistent acceptor state
|
|
pub acceptor_state: AcceptorState,
|
|
/// information about server
|
|
pub server: ServerInfo,
|
|
/// Unique id of the last *elected* proposer we dealt with. Not needed
|
|
/// for correctness, exists for monitoring purposes.
|
|
#[serde(with = "hex")]
|
|
pub proposer_uuid: PgUuid,
|
|
/// Since which LSN this timeline generally starts. Safekeeper might have
|
|
/// joined later.
|
|
pub timeline_start_lsn: Lsn,
|
|
/// Since which LSN safekeeper has (had) WAL for this timeline.
|
|
/// All WAL segments next to one containing local_start_lsn are
|
|
/// filled with data from the beginning.
|
|
pub local_start_lsn: Lsn,
|
|
/// Part of WAL acknowledged by quorum *and available locally*. Always points
|
|
/// to record boundary.
|
|
pub commit_lsn: Lsn,
|
|
/// LSN that points to the end of the last backed up segment. Useful to
|
|
/// persist to avoid finding out offloading progress on boot.
|
|
pub backup_lsn: Lsn,
|
|
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
|
|
/// of last record streamed to everyone). Persisting it helps skipping
|
|
/// recovery in walproposer, generally we compute it from peers. In
|
|
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
|
|
/// only by walproposer.
|
|
pub peer_horizon_lsn: Lsn,
|
|
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
|
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
|
/// informational purposes, we receive it from pageserver (or broker).
|
|
pub remote_consistent_lsn: Lsn,
|
|
/// Holds names of partial segments uploaded to remote storage. Used to
|
|
/// clean up old objects without leaving garbage in remote storage.
|
|
pub partial_backup: wal_backup_partial::State,
|
|
/// Eviction state of the timeline. If it's Offloaded, we should download
|
|
/// WAL files from remote storage to serve the timeline.
|
|
pub eviction_state: EvictionState,
|
|
pub creation_ts: SystemTime,
|
|
}
|
|
|
|
/// State of the local WAL files. Used to track current timeline state,
|
|
/// that can be either WAL files are present on disk or last partial segment
|
|
/// is offloaded to remote storage.
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
|
|
pub enum EvictionState {
|
|
/// WAL files are present on disk.
|
|
Present,
|
|
/// Last partial segment is offloaded to remote storage.
|
|
/// Contains flush_lsn of the last offloaded segment.
|
|
Offloaded(Lsn),
|
|
}
|
|
|
|
pub struct MembershipSwitchResult {
|
|
pub previous_conf: Configuration,
|
|
pub current_conf: Configuration,
|
|
}
|
|
|
|
impl TimelinePersistentState {
|
|
/// commit_lsn is the same as start_lsn in the normal creaiton; see
|
|
/// `TimelineCreateRequest` comments.`
|
|
pub fn new(
|
|
ttid: &TenantTimelineId,
|
|
mconf: Configuration,
|
|
server_info: ServerInfo,
|
|
start_lsn: Lsn,
|
|
commit_lsn: Lsn,
|
|
) -> anyhow::Result<TimelinePersistentState> {
|
|
if server_info.wal_seg_size == 0 {
|
|
bail!(TimelineError::UninitializedWalSegSize(*ttid));
|
|
}
|
|
|
|
if server_info.pg_version == UNKNOWN_SERVER_VERSION {
|
|
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
|
|
}
|
|
|
|
if commit_lsn < start_lsn {
|
|
bail!(
|
|
"commit_lsn {} is smaller than start_lsn {}",
|
|
commit_lsn,
|
|
start_lsn
|
|
);
|
|
}
|
|
|
|
// If we are given with init LSN, initialize term history with it. It
|
|
// ensures that walproposer always must be able to find a common point
|
|
// in histories; if it can't something is corrupted. Not having LSN here
|
|
// is so far left for legacy case where timeline is created by compute
|
|
// and LSN during creation is not known yet.
|
|
let term_history = if commit_lsn != Lsn::INVALID {
|
|
TermHistory(vec![TermLsn {
|
|
term: INITIAL_TERM,
|
|
lsn: start_lsn,
|
|
}])
|
|
} else {
|
|
TermHistory::empty()
|
|
};
|
|
|
|
Ok(TimelinePersistentState {
|
|
tenant_id: ttid.tenant_id,
|
|
timeline_id: ttid.timeline_id,
|
|
mconf,
|
|
acceptor_state: AcceptorState {
|
|
term: INITIAL_TERM,
|
|
term_history,
|
|
},
|
|
server: server_info,
|
|
proposer_uuid: [0; 16],
|
|
timeline_start_lsn: start_lsn,
|
|
local_start_lsn: start_lsn,
|
|
commit_lsn,
|
|
backup_lsn: start_lsn,
|
|
peer_horizon_lsn: start_lsn,
|
|
remote_consistent_lsn: Lsn(0),
|
|
partial_backup: wal_backup_partial::State::default(),
|
|
eviction_state: EvictionState::Present,
|
|
creation_ts: SystemTime::now(),
|
|
})
|
|
}
|
|
|
|
pub fn empty() -> Self {
|
|
TimelinePersistentState::new(
|
|
&TenantTimelineId::empty(),
|
|
Configuration::empty(),
|
|
ServerInfo {
|
|
pg_version: PgVersionId::from(PgMajorVersion::PG17),
|
|
system_id: 0, /* Postgres system identifier */
|
|
wal_seg_size: WAL_SEGMENT_SIZE as u32,
|
|
},
|
|
Lsn::INVALID,
|
|
Lsn::INVALID,
|
|
)
|
|
.unwrap()
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
// In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
|
|
// are not flushed yet.
|
|
pub struct TimelineMemState {
|
|
pub commit_lsn: Lsn,
|
|
pub backup_lsn: Lsn,
|
|
pub peer_horizon_lsn: Lsn,
|
|
pub remote_consistent_lsn: Lsn,
|
|
#[serde(with = "hex")]
|
|
pub proposer_uuid: PgUuid,
|
|
}
|
|
|
|
/// Safekeeper persistent state plus in memory layer.
|
|
///
|
|
/// Allows us to avoid frequent fsyncs when we update fields like commit_lsn
|
|
/// which don't need immediate persistence. Provides transactional like API
|
|
/// to atomically update the state.
|
|
///
|
|
/// Implements Deref into *persistent* part.
|
|
pub struct TimelineState<CTRL: control_file::Storage> {
|
|
pub inmem: TimelineMemState,
|
|
pub pers: CTRL, // persistent
|
|
}
|
|
|
|
impl<CTRL> TimelineState<CTRL>
|
|
where
|
|
CTRL: control_file::Storage,
|
|
{
|
|
pub fn new(state: CTRL) -> Self {
|
|
TimelineState {
|
|
inmem: TimelineMemState {
|
|
commit_lsn: state.commit_lsn,
|
|
backup_lsn: state.backup_lsn,
|
|
peer_horizon_lsn: state.peer_horizon_lsn,
|
|
remote_consistent_lsn: state.remote_consistent_lsn,
|
|
proposer_uuid: state.proposer_uuid,
|
|
},
|
|
pers: state,
|
|
}
|
|
}
|
|
|
|
/// Start atomic change. Returns SafeKeeperPersistentState with in memory
|
|
/// values applied; the protocol is to 1) change returned struct as desired
|
|
/// 2) atomically persist it with finish_change.
|
|
pub fn start_change(&self) -> TimelinePersistentState {
|
|
let mut s = self.pers.clone();
|
|
s.commit_lsn = self.inmem.commit_lsn;
|
|
s.backup_lsn = self.inmem.backup_lsn;
|
|
s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
|
|
s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
|
|
s.proposer_uuid = self.inmem.proposer_uuid;
|
|
s
|
|
}
|
|
|
|
/// Persist given state. c.f. start_change.
|
|
pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
|
|
if s.eq(&*self.pers) {
|
|
// nothing to do if state didn't change
|
|
} else {
|
|
self.pers.persist(s).await?;
|
|
}
|
|
|
|
// keep in memory values up to date
|
|
self.inmem.commit_lsn = s.commit_lsn;
|
|
self.inmem.backup_lsn = s.backup_lsn;
|
|
self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
|
|
self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
|
|
self.inmem.proposer_uuid = s.proposer_uuid;
|
|
Ok(())
|
|
}
|
|
|
|
/// Flush in memory values.
|
|
pub async fn flush(&mut self) -> Result<()> {
|
|
let s = self.start_change();
|
|
self.finish_change(&s).await
|
|
}
|
|
|
|
/// Make term at least as `to`. If `to` is None, increment current one. This
|
|
/// is not in safekeeper.rs because we want to be able to do it even if
|
|
/// timeline is offloaded.
|
|
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
|
|
let before = self.acceptor_state.term;
|
|
let mut state = self.start_change();
|
|
let new = match to {
|
|
Some(to) => max(state.acceptor_state.term, to),
|
|
None => state.acceptor_state.term + 1,
|
|
};
|
|
if new > state.acceptor_state.term {
|
|
state.acceptor_state.term = new;
|
|
self.finish_change(&state).await?;
|
|
}
|
|
let after = self.acceptor_state.term;
|
|
Ok(TimelineTermBumpResponse {
|
|
previous_term: before,
|
|
current_term: after,
|
|
})
|
|
}
|
|
|
|
/// Switch into membership configuration `to` if it is higher than the
|
|
/// current one.
|
|
pub async fn membership_switch(&mut self, to: Configuration) -> Result<MembershipSwitchResult> {
|
|
let before = self.mconf.clone();
|
|
// Is switch allowed?
|
|
if to.generation <= self.mconf.generation {
|
|
info!(
|
|
"ignoring request to switch membership conf to {}, current conf {}",
|
|
to, self.mconf
|
|
);
|
|
} else {
|
|
let mut state = self.start_change();
|
|
state.mconf = to.clone();
|
|
self.finish_change(&state).await?;
|
|
info!("switched membership conf to {} from {}", to, before);
|
|
}
|
|
Ok(MembershipSwitchResult {
|
|
previous_conf: before,
|
|
current_conf: self.mconf.clone(),
|
|
})
|
|
}
|
|
}
|
|
|
|
impl<CTRL> Deref for TimelineState<CTRL>
|
|
where
|
|
CTRL: control_file::Storage,
|
|
{
|
|
type Target = TimelinePersistentState;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.pers
|
|
}
|
|
}
|