Extract safekeeper per timeline state from safekeeper.rs

safekeeper.rs is mostly about consensus, but state is wider. Also form
SafekeeperState which encapsulates persistent part + in memory layer with API
for atomic updates.

Moves remote_consistent_lsn back to SafekeeperMemState, fixes its absense from
memory dump.

Also renames SafekeeperState to TimelinePersistentState, as TimelineMemState and
TimelinePersistent state are created.
This commit is contained in:
Arseny Sher
2023-12-26 00:06:05 +03:00
committed by Arseny Sher
parent 1eb30b40af
commit 7f828890cf
13 changed files with 372 additions and 318 deletions

View File

@@ -13,13 +13,16 @@ use std::time::Instant;
use crate::control_file_upgrade::upgrade_control_file;
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
use crate::state::TimelinePersistentState;
use utils::{bin_ser::LeSer, id::TenantTimelineId};
use crate::SafeKeeperConf;
use std::convert::TryInto;
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 7;
// contains persistent metadata for safekeeper
const CONTROL_FILE_NAME: &str = "safekeeper.control";
// needed to atomically update the state using `rename`
@@ -29,9 +32,9 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
/// Storage should keep actual state inside of it. It should implement Deref
/// trait to access state fields and have persist method for updating that state.
#[async_trait::async_trait]
pub trait Storage: Deref<Target = SafeKeeperState> {
pub trait Storage: Deref<Target = TimelinePersistentState> {
/// Persist safekeeper state on disk and update internal state.
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()>;
/// Timestamp of last persist.
fn last_persist_at(&self) -> Instant;
@@ -44,7 +47,7 @@ pub struct FileStorage {
conf: SafeKeeperConf,
/// Last state persisted to disk.
state: SafeKeeperState,
state: TimelinePersistentState,
/// Not preserved across restarts.
last_persist_at: Instant,
}
@@ -68,7 +71,7 @@ impl FileStorage {
pub fn create_new(
timeline_dir: Utf8PathBuf,
conf: &SafeKeeperConf,
state: SafeKeeperState,
state: TimelinePersistentState,
) -> Result<FileStorage> {
let store = FileStorage {
timeline_dir,
@@ -81,7 +84,7 @@ impl FileStorage {
}
/// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
// Read the version independent part
let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
if magic != SK_MAGIC {
@@ -93,7 +96,7 @@ impl FileStorage {
}
let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
if version == SK_FORMAT_VERSION {
let res = SafeKeeperState::des(buf)?;
let res = TimelinePersistentState::des(buf)?;
return Ok(res);
}
// try to upgrade
@@ -104,13 +107,15 @@ impl FileStorage {
pub fn load_control_file_conf(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<SafeKeeperState> {
) -> Result<TimelinePersistentState> {
let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
Self::load_control_file(path)
}
/// Read in the control file.
pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
pub fn load_control_file<P: AsRef<Path>>(
control_file_path: P,
) -> Result<TimelinePersistentState> {
let mut control_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
@@ -153,7 +158,7 @@ impl FileStorage {
}
impl Deref for FileStorage {
type Target = SafeKeeperState;
type Target = TimelinePersistentState;
fn deref(&self) -> &Self::Target {
&self.state
@@ -165,7 +170,7 @@ impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
///
/// For a description, see <https://lwn.net/Articles/457667/>.
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
// write data to safekeeper.control.partial
@@ -242,7 +247,7 @@ impl Storage for FileStorage {
mod test {
use super::FileStorage;
use super::*;
use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
use crate::SafeKeeperConf;
use anyhow::Result;
use utils::{id::TenantTimelineId, lsn::Lsn};
@@ -257,7 +262,7 @@ mod test {
async fn load_from_control_file(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, SafeKeeperState)> {
) -> Result<(FileStorage, TimelinePersistentState)> {
fs::create_dir_all(conf.timeline_dir(ttid))
.await
.expect("failed to create timeline dir");
@@ -270,11 +275,11 @@ mod test {
async fn create(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, SafeKeeperState)> {
) -> Result<(FileStorage, TimelinePersistentState)> {
fs::create_dir_all(conf.timeline_dir(ttid))
.await
.expect("failed to create timeline dir");
let state = SafeKeeperState::empty();
let state = TimelinePersistentState::empty();
let timeline_dir = conf.timeline_dir(ttid);
let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
Ok((storage, state))

View File

@@ -1,6 +1,7 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn,
use crate::{
safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn},
state::{PersistedPeers, TimelinePersistentState},
};
use anyhow::{bail, Result};
use pq_proto::SystemId;
@@ -137,7 +138,7 @@ pub struct SafeKeeperStateV4 {
pub peers: PersistedPeers,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersistentState> {
// migrate to storing full term history
if version == 1 {
info!("reading safekeeper control file version {}", version);
@@ -149,7 +150,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
lsn: Lsn(0),
}]),
};
return Ok(SafeKeeperState {
return Ok(TimelinePersistentState {
tenant_id: oldstate.server.tenant_id,
timeline_id: oldstate.server.timeline_id,
acceptor_state: ac,
@@ -176,7 +177,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
system_id: oldstate.server.system_id,
wal_seg_size: oldstate.server.wal_seg_size,
};
return Ok(SafeKeeperState {
return Ok(TimelinePersistentState {
tenant_id: oldstate.server.tenant_id,
timeline_id: oldstate.server.timeline_id,
acceptor_state: oldstate.acceptor_state,
@@ -199,7 +200,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
system_id: oldstate.server.system_id,
wal_seg_size: oldstate.server.wal_seg_size,
};
return Ok(SafeKeeperState {
return Ok(TimelinePersistentState {
tenant_id: oldstate.server.tenant_id,
timeline_id: oldstate.server.timeline_id,
acceptor_state: oldstate.acceptor_state,
@@ -222,7 +223,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
system_id: oldstate.server.system_id,
wal_seg_size: oldstate.server.wal_seg_size,
};
return Ok(SafeKeeperState {
return Ok(TimelinePersistentState {
tenant_id: oldstate.tenant_id,
timeline_id: oldstate.timeline_id,
acceptor_state: oldstate.acceptor_state,
@@ -238,7 +239,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);
let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
let mut oldstate = TimelinePersistentState::des(&buf[..buf.len()])?;
if oldstate.timeline_start_lsn != Lsn(0) {
return Ok(oldstate);
}
@@ -251,7 +252,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
return Ok(oldstate);
} else if version == 6 {
info!("reading safekeeper control file version {}", version);
let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
let mut oldstate = TimelinePersistentState::des(&buf[..buf.len()])?;
if oldstate.server.pg_version != 0 {
return Ok(oldstate);
}

View File

@@ -14,7 +14,7 @@ use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
control_file::{FileStorage, Storage},
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
safekeeper::SafeKeeperState,
state::TimelinePersistentState,
timeline::{Timeline, TimelineError},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
@@ -137,7 +137,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
)
.await?;
let mut new_state = SafeKeeperState::new(
let mut new_state = TimelinePersistentState::new(
&request.destination_ttid,
state.server.clone(),
vec![],
@@ -160,7 +160,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
async fn copy_disk_segments(
conf: &SafeKeeperConf,
persisted_state: &SafeKeeperState,
persisted_state: &TimelinePersistentState,
wal_seg_size: usize,
source_ttid: &TenantTimelineId,
start_lsn: Lsn,

View File

@@ -22,14 +22,13 @@ use utils::id::TenantTimelineId;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::safekeeper::SafeKeeperState;
use crate::safekeeper::SafekeeperMemState;
use crate::safekeeper::TermHistory;
use crate::SafeKeeperConf;
use crate::send_wal::WalSenderState;
use crate::state::TimelineMemState;
use crate::state::TimelinePersistentState;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
/// Various filters that influence the resulting JSON output.
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -143,7 +142,7 @@ pub struct Config {
pub struct Timeline {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub control_file: Option<SafeKeeperState>,
pub control_file: Option<TimelinePersistentState>,
pub memory: Option<Memory>,
pub disk_content: Option<DiskContent>,
}
@@ -158,7 +157,7 @@ pub struct Memory {
pub num_computes: u32,
pub last_removed_segno: XLogSegNo,
pub epoch_start_lsn: Lsn,
pub mem_state: SafekeeperMemState,
pub mem_state: TimelineMemState,
// PhysicalStorage state.
pub write_lsn: Lsn,

View File

@@ -160,7 +160,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
commit_lsn: inmem.commit_lsn,
backup_lsn: inmem.backup_lsn,
peer_horizon_lsn: inmem.peer_horizon_lsn,
remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
remote_consistent_lsn: inmem.remote_consistent_lsn,
peers: tli.get_peers(conf).await,
walsenders: tli.get_walsenders().get_all(),
walreceivers: tli.get_walreceivers().get_all(),

View File

@@ -21,7 +21,8 @@ use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermLsn};
use crate::safekeeper::{Term, TermHistory, TermLsn};
use crate::state::TimelinePersistentState;
use crate::timeline::Timeline;
use crate::GlobalTimelines;
use postgres_backend::PostgresBackend;
@@ -56,7 +57,7 @@ pub struct AppendLogicalMessage {
#[derive(Debug, Serialize)]
struct AppendResult {
// safekeeper state after append
state: SafeKeeperState,
state: TimelinePersistentState,
// info about new record in the WAL
inserted_wal: InsertedWAL,
}

View File

@@ -28,6 +28,7 @@ pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_wal;
pub mod state;
pub mod timeline;
pub mod wal_backup;
pub mod wal_service;

View File

@@ -21,7 +21,7 @@ use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
safekeeper::{SafeKeeperState, SafekeeperMemState},
state::{TimelineMemState, TimelinePersistentState},
GlobalTimelines,
};
@@ -308,11 +308,10 @@ pub struct FullTimelineInfo {
pub last_removed_segno: XLogSegNo,
pub epoch_start_lsn: Lsn,
pub mem_state: SafekeeperMemState,
pub persisted_state: SafeKeeperState,
pub mem_state: TimelineMemState,
pub persisted_state: TimelinePersistentState,
pub flush_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub wal_storage: WalStorageMetrics,
}
@@ -608,7 +607,7 @@ impl Collector for TimelineCollector {
.set(tli.mem_state.peer_horizon_lsn.into());
self.remote_consistent_lsn
.with_label_values(labels)
.set(tli.remote_consistent_lsn.into());
.set(tli.mem_state.remote_consistent_lsn.into());
self.timeline_active
.with_label_values(labels)
.set(tli.timeline_is_active as u64);

View File

@@ -18,17 +18,16 @@ use tracing::*;
use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::state::TimelineState;
use crate::wal_storage;
use pq_proto::SystemId;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{
bin_ser::LeSer,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 7;
const SK_PROTOCOL_VERSION: u32 = 2;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
@@ -222,7 +221,7 @@ pub struct PersistedPeerInfo {
}
impl PersistedPeerInfo {
fn new() -> Self {
pub fn new() -> Self {
Self {
backup_lsn: Lsn::INVALID,
term: INVALID_TERM,
@@ -232,111 +231,10 @@ impl PersistedPeerInfo {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SafeKeeperState {
#[serde(with = "hex")]
pub tenant_id: TenantId,
#[serde(with = "hex")]
pub timeline_id: TimelineId,
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
pub server: ServerInfo,
/// Unique id of the last *elected* proposer we dealt with. Not needed
/// for correctness, exists for monitoring purposes.
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
/// Since which LSN this timeline generally starts. Safekeeper might have
/// joined later.
pub timeline_start_lsn: Lsn,
/// Since which LSN safekeeper has (had) WAL for this timeline.
/// All WAL segments next to one containing local_start_lsn are
/// filled with data from the beginning.
pub local_start_lsn: Lsn,
/// Part of WAL acknowledged by quorum *and available locally*. Always points
/// to record boundary.
pub commit_lsn: Lsn,
/// LSN that points to the end of the last backed up segment. Useful to
/// persist to avoid finding out offloading progress on boot.
pub backup_lsn: Lsn,
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone). Persisting it helps skipping
/// recovery in walproposer, generally we compute it from peers. In
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
/// only by walproposer.
pub peer_horizon_lsn: Lsn,
/// LSN of the oldest known checkpoint made by pageserver and successfully
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
/// informational purposes, we receive it from pageserver (or broker).
pub remote_consistent_lsn: Lsn,
// Peers and their state as we remember it. Knowing peers themselves is
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: PersistedPeers,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
// are not flushed yet.
pub struct SafekeeperMemState {
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
}
impl SafeKeeperState {
pub fn new(
ttid: &TenantTimelineId,
server_info: ServerInfo,
peers: Vec<NodeId>,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> SafeKeeperState {
SafeKeeperState {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),
},
server: server_info,
proposer_uuid: [0; 16],
timeline_start_lsn: Lsn(0),
local_start_lsn,
commit_lsn,
backup_lsn: local_start_lsn,
peer_horizon_lsn: local_start_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(
peers
.iter()
.map(|p| (*p, PersistedPeerInfo::new()))
.collect(),
),
}
}
#[cfg(test)]
pub fn empty() -> Self {
SafeKeeperState::new(
&TenantTimelineId::empty(),
ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 0,
},
vec![],
Lsn::INVALID,
Lsn::INVALID,
)
// make clippy happy
impl Default for PersistedPeerInfo {
fn default() -> Self {
Self::new()
}
}
@@ -583,9 +481,7 @@ pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
/// determines epoch switch point.
pub epoch_start_lsn: Lsn,
pub inmem: SafekeeperMemState, // in memory part
pub state: CTRL, // persistent state storage
pub state: TimelineState<CTRL>, // persistent state storage
pub wal_store: WAL,
node_id: NodeId, // safekeeper's node id
@@ -612,13 +508,7 @@ where
Ok(SafeKeeper {
epoch_start_lsn: Lsn(0),
inmem: SafekeeperMemState {
commit_lsn: state.commit_lsn,
backup_lsn: state.backup_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
proposer_uuid: state.proposer_uuid,
},
state,
state: TimelineState::new(state),
wal_store,
node_id,
})
@@ -726,12 +616,12 @@ where
);
}
let mut state = self.state.clone();
let mut state = self.state.start_change();
state.server.system_id = msg.system_id;
if msg.pg_version != UNKNOWN_SERVER_VERSION {
state.server.pg_version = msg.pg_version;
}
self.state.persist(&state).await?;
self.state.finish_change(&state).await?;
}
info!(
@@ -766,15 +656,15 @@ where
term: self.state.acceptor_state.term,
vote_given: false as u64,
flush_lsn: self.flush_lsn(),
truncate_lsn: self.inmem.peer_horizon_lsn,
truncate_lsn: self.state.inmem.peer_horizon_lsn,
term_history: self.get_term_history(),
timeline_start_lsn: self.state.timeline_start_lsn,
};
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.clone();
let mut state = self.state.start_change();
state.acceptor_state.term = msg.term;
// persist vote before sending it out
self.state.persist(&state).await?;
self.state.finish_change(&state).await?;
resp.term = self.state.acceptor_state.term;
resp.vote_given = true as u64;
@@ -803,9 +693,9 @@ where
) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg);
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.clone();
let mut state = self.state.start_change();
state.acceptor_state.term = msg.term;
self.state.persist(&state).await?;
self.state.finish_change(&state).await?;
}
// If our term is higher, ignore the message (next feedback will inform the compute)
@@ -825,10 +715,10 @@ where
}
// Otherwise we must never attempt to truncate committed data.
assert!(
msg.start_streaming_at >= self.inmem.commit_lsn,
msg.start_streaming_at >= self.state.inmem.commit_lsn,
"attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
msg.start_streaming_at,
self.inmem.commit_lsn
self.state.inmem.commit_lsn
);
// TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
@@ -839,7 +729,7 @@ where
// and now adopt term history from proposer
{
let mut state = self.state.clone();
let mut state = self.state.start_change();
// Here we learn initial LSN for the first time, set fields
// interested in that.
@@ -863,13 +753,13 @@ where
// NB: on new clusters, this happens at the same time as
// timeline_start_lsn initialization, it is taken outside to provide
// upgrade.
self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn);
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn);
state.acceptor_state.term_history = msg.term_history.clone();
self.persist_control_file(state).await?;
self.state.finish_change(&state).await?;
}
info!("start receiving WAL since {:?}", msg.start_streaming_at);
@@ -892,63 +782,41 @@ where
async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
// Both peers and walproposer communicate this value, we might already
// have a fresher (higher) version.
candidate = max(candidate, self.inmem.commit_lsn);
candidate = max(candidate, self.state.inmem.commit_lsn);
let commit_lsn = min(candidate, self.flush_lsn());
assert!(
commit_lsn >= self.inmem.commit_lsn,
commit_lsn >= self.state.inmem.commit_lsn,
"commit_lsn monotonicity violated: old={} new={}",
self.inmem.commit_lsn,
self.state.inmem.commit_lsn,
commit_lsn
);
self.inmem.commit_lsn = commit_lsn;
self.state.inmem.commit_lsn = commit_lsn;
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
self.persist_control_file(self.state.clone()).await?;
self.state.flush().await?;
}
Ok(())
}
/// Persist in-memory state of control file to disk.
//
// TODO: passing inmem_remote_consistent_lsn everywhere is ugly, better
// separate state completely and give Arc to all those who need it.
pub async fn persist_inmem(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> {
let mut state = self.state.clone();
state.remote_consistent_lsn = inmem_remote_consistent_lsn;
self.persist_control_file(state).await
}
/// Persist in-memory state to the disk, taking other data from state.
async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
state.commit_lsn = self.inmem.commit_lsn;
state.backup_lsn = self.inmem.backup_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
state.proposer_uuid = self.inmem.proposer_uuid;
self.state.persist(&state).await
}
/// Persist control file if there is something to save and enough time
/// passed after the last save.
pub async fn maybe_persist_inmem_control_file(
&mut self,
inmem_remote_consistent_lsn: Lsn,
) -> Result<()> {
pub async fn maybe_persist_inmem_control_file(&mut self) -> Result<()> {
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
return Ok(());
}
let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
|| self.inmem.backup_lsn > self.state.backup_lsn
|| self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
|| inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn
|| self.state.inmem.backup_lsn > self.state.backup_lsn
|| self.state.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
|| self.state.inmem.remote_consistent_lsn > self.state.remote_consistent_lsn;
if need_persist {
self.persist_inmem(inmem_remote_consistent_lsn).await?;
self.state.flush().await?;
trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
}
Ok(())
@@ -974,7 +842,7 @@ where
// Now we know that we are in the same term as the proposer,
// processing the message.
self.inmem.proposer_uuid = msg.h.proposer_uuid;
self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
// do the job
if !msg.wal_data.is_empty() {
@@ -998,15 +866,16 @@ where
// - if we make safekeepers always send persistent value,
// any compute restart would pull it down.
// Thus, take max before adopting.
self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
self.state.inmem.peer_horizon_lsn =
max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
// Update truncate and commit LSN in control file.
// To avoid negative impact on performance of extra fsync, do it only
// when truncate_lsn delta exceeds WAL segment size.
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< self.inmem.peer_horizon_lsn
// when commit_lsn delta exceeds WAL segment size.
if self.state.commit_lsn + (self.state.server.wal_seg_size as u64)
< self.state.inmem.commit_lsn
{
self.persist_control_file(self.state.clone()).await?;
self.state.flush().await?;
}
trace!(
@@ -1048,27 +917,27 @@ where
}
}
let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn);
sync_control_file |=
self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
self.inmem.backup_lsn = new_backup_lsn;
self.state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), self.state.inmem.backup_lsn);
sync_control_file |= self.state.backup_lsn + (self.state.server.wal_seg_size as u64)
< self.state.inmem.backup_lsn;
// value in sk_info should be maximized over our local in memory value.
let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn);
assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn);
self.state.inmem.remote_consistent_lsn = max(
Lsn(sk_info.remote_consistent_lsn),
self.state.inmem.remote_consistent_lsn,
);
sync_control_file |= self.state.remote_consistent_lsn
+ (self.state.server.wal_seg_size as u64)
< new_remote_consistent_lsn;
< self.state.inmem.remote_consistent_lsn;
let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
self.state.inmem.peer_horizon_lsn = max(
Lsn(sk_info.peer_horizon_lsn),
self.state.inmem.peer_horizon_lsn,
);
sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< new_peer_horizon_lsn;
self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
< self.state.inmem.peer_horizon_lsn;
if sync_control_file {
let mut state = self.state.clone();
state.remote_consistent_lsn = new_remote_consistent_lsn;
self.persist_control_file(state).await?;
self.state.flush().await?;
}
Ok(())
}
@@ -1096,17 +965,20 @@ mod tests {
use postgres_ffi::WAL_SEGMENT_SIZE;
use super::*;
use crate::wal_storage::Storage;
use crate::{
state::{PersistedPeers, TimelinePersistentState},
wal_storage::Storage,
};
use std::{ops::Deref, str::FromStr, time::Instant};
// fake storage for tests
struct InMemoryState {
persisted_state: SafeKeeperState,
persisted_state: TimelinePersistentState,
}
#[async_trait::async_trait]
impl control_file::Storage for InMemoryState {
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
self.persisted_state = s.clone();
Ok(())
}
@@ -1117,15 +989,15 @@ mod tests {
}
impl Deref for InMemoryState {
type Target = SafeKeeperState;
type Target = TimelinePersistentState;
fn deref(&self) -> &Self::Target {
&self.persisted_state
}
}
fn test_sk_state() -> SafeKeeperState {
let mut state = SafeKeeperState::empty();
fn test_sk_state() -> TimelinePersistentState {
let mut state = TimelinePersistentState::empty();
state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
state.tenant_id = TenantId::from([1u8; 16]);
state.timeline_id = TimelineId::from([1u8; 16]);
@@ -1182,7 +1054,7 @@ mod tests {
}
// reboot...
let state = sk.state.persisted_state.clone();
let state = sk.state.deref().clone();
let storage = InMemoryState {
persisted_state: state,
};
@@ -1321,7 +1193,7 @@ mod tests {
use utils::Hex;
let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
let state = SafeKeeperState {
let state = TimelinePersistentState {
tenant_id,
timeline_id,
acceptor_state: AcceptorState {
@@ -1405,7 +1277,7 @@ mod tests {
assert_eq!(Hex(&ser), Hex(&expected));
let deser = SafeKeeperState::des(&ser).unwrap();
let deser = TimelinePersistentState::des(&ser).unwrap();
assert_eq!(deser, state);
}

View File

@@ -19,7 +19,6 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::lsn::AtomicLsn;
use utils::pageserver_feedback::PageserverFeedback;
use std::cmp::{max, min};
@@ -90,16 +89,12 @@ pub struct StandbyFeedback {
/// WalSenders registry. Timeline holds it (wrapped in Arc).
pub struct WalSenders {
/// Lsn maximized over all walsenders *and* peer data, so might be higher
/// than what we receive from replicas.
remote_consistent_lsn: AtomicLsn,
mutex: Mutex<WalSendersShared>,
}
impl WalSenders {
pub fn new(remote_consistent_lsn: Lsn) -> Arc<WalSenders> {
pub fn new() -> Arc<WalSenders> {
Arc::new(WalSenders {
remote_consistent_lsn: AtomicLsn::from(remote_consistent_lsn),
mutex: Mutex::new(WalSendersShared::new()),
})
}
@@ -157,7 +152,6 @@ impl WalSenders {
let mut shared = self.mutex.lock();
shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
shared.update_ps_feedback();
self.update_remote_consistent_lsn(shared.agg_ps_feedback.remote_consistent_lsn);
}
/// Record standby reply.
@@ -202,18 +196,6 @@ impl WalSenders {
}
}
/// Get remote_consistent_lsn maximized across all walsenders and peers.
pub fn get_remote_consistent_lsn(self: &Arc<WalSenders>) -> Lsn {
self.remote_consistent_lsn.load()
}
/// Update maximized remote_consistent_lsn, return new (potentially) value.
pub fn update_remote_consistent_lsn(self: &Arc<WalSenders>, candidate: Lsn) -> Lsn {
self.remote_consistent_lsn
.fetch_max(candidate)
.max(candidate)
}
/// Unregister walsender.
fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
let mut shared = self.mutex.lock();
@@ -444,7 +426,11 @@ impl SafekeeperPostgresHandler {
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
};
let mut reply_reader = ReplyReader { reader, ws_guard };
let mut reply_reader = ReplyReader {
reader,
ws_guard,
tli,
};
let res = tokio::select! {
// todo: add read|write .context to these errors
@@ -638,17 +624,18 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
struct ReplyReader<IO> {
reader: PostgresBackendReader<IO>,
ws_guard: Arc<WalSenderGuard>,
tli: Arc<Timeline>,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
let msg = self.reader.read_copy_message().await?;
self.handle_feedback(&msg)?
self.handle_feedback(&msg).await?
}
}
fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
match msg.first().cloned() {
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
// Note: deserializing is on m[1..] because we skip the tag byte.
@@ -675,6 +662,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
self.ws_guard
.walsenders
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
self.tli
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
.await;
// in principle new remote_consistent_lsn could allow to
// deactivate the timeline, but we check that regularly through
// broker updated, not need to do it here

197
safekeeper/src/state.rs Normal file
View File

@@ -0,0 +1,197 @@
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
//! and its wrapper with in memory layer (SafekeeperState).
use std::ops::Deref;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
};
use crate::{
control_file,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
};
/// Persistent information stored on safekeeper node about timeline.
/// On disk data is prefixed by magic and format version and followed by checksum.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TimelinePersistentState {
#[serde(with = "hex")]
pub tenant_id: TenantId,
#[serde(with = "hex")]
pub timeline_id: TimelineId,
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
pub server: ServerInfo,
/// Unique id of the last *elected* proposer we dealt with. Not needed
/// for correctness, exists for monitoring purposes.
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
/// Since which LSN this timeline generally starts. Safekeeper might have
/// joined later.
pub timeline_start_lsn: Lsn,
/// Since which LSN safekeeper has (had) WAL for this timeline.
/// All WAL segments next to one containing local_start_lsn are
/// filled with data from the beginning.
pub local_start_lsn: Lsn,
/// Part of WAL acknowledged by quorum *and available locally*. Always points
/// to record boundary.
pub commit_lsn: Lsn,
/// LSN that points to the end of the last backed up segment. Useful to
/// persist to avoid finding out offloading progress on boot.
pub backup_lsn: Lsn,
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone). Persisting it helps skipping
/// recovery in walproposer, generally we compute it from peers. In
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
/// only by walproposer.
pub peer_horizon_lsn: Lsn,
/// LSN of the oldest known checkpoint made by pageserver and successfully
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
/// informational purposes, we receive it from pageserver (or broker).
pub remote_consistent_lsn: Lsn,
// Peers and their state as we remember it. Knowing peers themselves is
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: PersistedPeers,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>);
impl TimelinePersistentState {
pub fn new(
ttid: &TenantTimelineId,
server_info: ServerInfo,
peers: Vec<NodeId>,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> TimelinePersistentState {
TimelinePersistentState {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),
},
server: server_info,
proposer_uuid: [0; 16],
timeline_start_lsn: Lsn(0),
local_start_lsn,
commit_lsn,
backup_lsn: local_start_lsn,
peer_horizon_lsn: local_start_lsn,
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(
peers
.iter()
.map(|p| (*p, PersistedPeerInfo::new()))
.collect(),
),
}
}
#[cfg(test)]
pub fn empty() -> Self {
use crate::safekeeper::UNKNOWN_SERVER_VERSION;
TimelinePersistentState::new(
&TenantTimelineId::empty(),
ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 0,
},
vec![],
Lsn::INVALID,
Lsn::INVALID,
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
// In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values
// are not flushed yet.
pub struct TimelineMemState {
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
}
/// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
/// when we update fields like commit_lsn which don't need immediate
/// persistence. Provides transactional like API to atomically update the state.
///
/// Implements Deref into *persistent* part.
pub struct TimelineState<CTRL: control_file::Storage> {
pub inmem: TimelineMemState,
pub pers: CTRL, // persistent
}
impl<CTRL> TimelineState<CTRL>
where
CTRL: control_file::Storage,
{
pub fn new(state: CTRL) -> Self {
TimelineState {
inmem: TimelineMemState {
commit_lsn: state.commit_lsn,
backup_lsn: state.backup_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
remote_consistent_lsn: state.remote_consistent_lsn,
proposer_uuid: state.proposer_uuid,
},
pers: state,
}
}
/// Start atomic change. Returns SafeKeeperPersistentState with in memory
/// values applied; the protocol is to 1) change returned struct as desired
/// 2) atomically persist it with finish_change.
pub fn start_change(&self) -> TimelinePersistentState {
let mut s = self.pers.clone();
s.commit_lsn = self.inmem.commit_lsn;
s.backup_lsn = self.inmem.backup_lsn;
s.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
s.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
s.proposer_uuid = self.inmem.proposer_uuid;
s
}
/// Persist given state. c.f. start_change.
pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> {
self.pers.persist(s).await?;
// keep in memory values up to date
self.inmem.commit_lsn = s.commit_lsn;
self.inmem.backup_lsn = s.backup_lsn;
self.inmem.peer_horizon_lsn = s.peer_horizon_lsn;
self.inmem.remote_consistent_lsn = s.remote_consistent_lsn;
self.inmem.proposer_uuid = s.proposer_uuid;
Ok(())
}
/// Flush in memory values.
pub async fn flush(&mut self) -> Result<()> {
let s = self.start_change();
self.finish_change(&s).await
}
}
impl<CTRL> Deref for TimelineState<CTRL>
where
CTRL: control_file::Storage,
{
type Target = TimelinePersistentState;
fn deref(&self) -> &Self::Target {
&self.pers
}
}

View File

@@ -28,10 +28,11 @@ use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::receive_wal::WalReceivers;
use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
INVALID_TERM,
};
use crate::send_wal::WalSenders;
use crate::state::{TimelineMemState, TimelinePersistentState};
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo;
@@ -121,7 +122,7 @@ impl SharedState {
fn create_new(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
state: SafeKeeperState,
state: TimelinePersistentState,
) -> Result<Self> {
if state.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
@@ -175,30 +176,28 @@ impl SharedState {
})
}
fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool {
fn is_active(&self, num_computes: usize) -> bool {
self.is_wal_backup_required(num_computes)
// FIXME: add tracking of relevant pageservers and check them here individually,
// otherwise migration won't work (we suspend too early).
|| remote_consistent_lsn < self.sk.inmem.commit_lsn
|| self.sk.state.inmem.remote_consistent_lsn < self.sk.state.inmem.commit_lsn
}
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action. If timeline is deactivated, control file is persisted
/// as maintenance task does that only for active timelines.
async fn update_status(
&mut self,
num_computes: usize,
remote_consistent_lsn: Lsn,
ttid: TenantTimelineId,
) -> bool {
let is_active = self.is_active(num_computes, remote_consistent_lsn);
async fn update_status(&mut self, num_computes: usize, ttid: TenantTimelineId) -> bool {
let is_active = self.is_active(num_computes);
if self.active != is_active {
info!(
"timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
ttid, is_active, remote_consistent_lsn, self.sk.inmem.commit_lsn
ttid,
is_active,
self.sk.state.inmem.remote_consistent_lsn,
self.sk.state.inmem.commit_lsn
);
if !is_active {
if let Err(e) = self.sk.persist_inmem(remote_consistent_lsn).await {
if let Err(e) = self.sk.state.flush().await {
warn!("control file save in update_status failed: {:?}", e);
}
}
@@ -212,8 +211,8 @@ impl SharedState {
let seg_size = self.get_wal_seg_size();
num_computes > 0 ||
// Currently only the whole segment is offloaded, so compare segment numbers.
(self.sk.inmem.commit_lsn.segment_number(seg_size) >
self.sk.inmem.backup_lsn.segment_number(seg_size))
(self.sk.state.inmem.commit_lsn.segment_number(seg_size) >
self.sk.state.inmem.backup_lsn.segment_number(seg_size))
}
/// Is current state of s3 offloading is not what it ought to be?
@@ -227,7 +226,7 @@ impl SharedState {
};
trace!(
"timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
self.sk.state.timeline_id, action_pending, num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
self.sk.state.timeline_id, action_pending, num_computes, self.sk.state.inmem.commit_lsn, self.sk.state.inmem.backup_lsn
);
}
res
@@ -248,7 +247,6 @@ impl SharedState {
&self,
ttid: &TenantTimelineId,
conf: &SafeKeeperConf,
remote_consistent_lsn: Lsn,
) -> SafekeeperTimelineInfo {
SafekeeperTimelineInfo {
safekeeper_id: conf.my_id.0,
@@ -260,15 +258,15 @@ impl SharedState {
last_log_term: self.sk.get_epoch(),
flush_lsn: self.sk.flush_lsn().0,
// note: this value is not flushed to control file yet and can be lost
commit_lsn: self.sk.inmem.commit_lsn.0,
remote_consistent_lsn: remote_consistent_lsn.0,
peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
commit_lsn: self.sk.state.inmem.commit_lsn.0,
remote_consistent_lsn: self.sk.state.inmem.remote_consistent_lsn.0,
peer_horizon_lsn: self.sk.state.inmem.peer_horizon_lsn.0,
safekeeper_connstr: conf
.advertise_pg_addr
.to_owned()
.unwrap_or(conf.listen_pg_addr.clone()),
http_connstr: conf.listen_http_addr.to_owned(),
backup_lsn: self.sk.inmem.backup_lsn.0,
backup_lsn: self.sk.state.inmem.backup_lsn.0,
local_start_lsn: self.sk.state.local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),
}
@@ -366,7 +364,6 @@ impl Timeline {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(conf, &ttid)?;
let rcl = shared_state.sk.state.remote_consistent_lsn;
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state.commit_lsn);
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
@@ -383,7 +380,7 @@ impl Timeline {
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(shared_state),
walsenders: WalSenders::new(rcl),
walsenders: WalSenders::new(),
walreceivers: WalReceivers::new(),
cancellation_rx,
cancellation_tx,
@@ -404,7 +401,8 @@ impl Timeline {
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
Ok(Timeline {
ttid,
@@ -414,7 +412,7 @@ impl Timeline {
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(Lsn(0)),
walsenders: WalSenders::new(),
walreceivers: WalReceivers::new(),
cancellation_rx,
cancellation_tx,
@@ -448,7 +446,7 @@ impl Timeline {
fs::create_dir_all(&self.timeline_dir).await?;
// Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.persist_inmem(Lsn::INVALID).await {
if let Err(e) = shared_state.sk.state.flush().await {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state);
@@ -523,11 +521,7 @@ impl Timeline {
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
shared_state
.update_status(
self.walreceivers.get_num(),
self.get_walsenders().get_remote_consistent_lsn(),
self.ttid,
)
.update_status(self.walreceivers.get_num(), self.ttid)
.await
}
@@ -558,8 +552,8 @@ impl Timeline {
}
let shared_state = self.write_shared_state().await;
if self.walreceivers.get_num() == 0 {
return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
}
false
}
@@ -623,7 +617,7 @@ impl Timeline {
resp.pageserver_feedback = ps_feedback;
}
commit_lsn = shared_state.sk.inmem.commit_lsn;
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
term_flush_lsn =
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
}
@@ -647,14 +641,14 @@ impl Timeline {
}
/// Returns state of the timeline.
pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
let state = self.write_shared_state().await;
(state.sk.inmem.clone(), state.sk.state.clone())
(state.sk.state.inmem.clone(), state.sk.state.clone())
}
/// Returns latest backup_lsn.
pub async fn get_wal_backup_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.inmem.backup_lsn
self.write_shared_state().await.sk.state.inmem.backup_lsn
}
/// Sets backup_lsn to the given value.
@@ -664,7 +658,7 @@ impl Timeline {
}
let mut state = self.write_shared_state().await;
state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
Ok(())
@@ -673,21 +667,11 @@ impl Timeline {
/// Get safekeeper info for broadcasting to broker and other peers.
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state().await;
shared_state.get_safekeeper_info(
&self.ttid,
conf,
self.walsenders.get_remote_consistent_lsn(),
)
shared_state.get_safekeeper_info(&self.ttid, conf)
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> {
// Update local remote_consistent_lsn in memory (in .walsenders) and in
// sk_info to pass it down to control file.
sk_info.remote_consistent_lsn = self
.walsenders
.update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn))
.0;
pub async fn record_safekeeper_info(&self, sk_info: SafekeeperTimelineInfo) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
@@ -696,7 +680,7 @@ impl Timeline {
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
commit_lsn = shared_state.sk.inmem.commit_lsn;
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
// Wake up wal backup launcher, if it is time to stop the offloading.
@@ -706,6 +690,13 @@ impl Timeline {
Ok(())
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state.inmem.remote_consistent_lsn =
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
}
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state().await;
shared_state.get_peers(conf.heartbeat_timeout)
@@ -836,11 +827,10 @@ impl Timeline {
/// to date so that storage nodes restart doesn't cause many pageserver ->
/// safekeeper reconnections.
pub async fn maybe_persist_control_file(&self) -> Result<()> {
let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
self.write_shared_state()
.await
.sk
.maybe_persist_inmem_control_file(remote_consistent_lsn)
.maybe_persist_inmem_control_file()
.await
}
@@ -862,10 +852,9 @@ impl Timeline {
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.inmem.clone(),
mem_state: state.sk.state.inmem.clone(),
persisted_state: state.sk.state.clone(),
flush_lsn: state.sk.wal_store.flush_lsn(),
remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(),
wal_storage: state.sk.wal_store.get_metrics(),
})
} else {
@@ -889,7 +878,7 @@ impl Timeline {
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.inmem.clone(),
mem_state: state.sk.state.inmem.clone(),
write_lsn,
write_record_lsn,
flush_lsn,

View File

@@ -23,7 +23,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tracing::*;
use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
use crate::safekeeper::SafeKeeperState;
use crate::state::TimelinePersistentState;
use crate::wal_backup::read_object;
use crate::SafeKeeperConf;
use postgres_ffi::waldecoder::WalStreamDecoder;
@@ -125,7 +125,7 @@ impl PhysicalStorage {
ttid: &TenantTimelineId,
timeline_dir: Utf8PathBuf,
conf: &SafeKeeperConf,
state: &SafeKeeperState,
state: &TimelinePersistentState,
) -> Result<PhysicalStorage> {
let wal_seg_size = state.server.wal_seg_size as usize;
@@ -525,7 +525,7 @@ impl WalReader {
pub fn new(
workdir: Utf8PathBuf,
timeline_dir: Utf8PathBuf,
state: &SafeKeeperState,
state: &TimelinePersistentState,
start_pos: Lsn,
enable_remote_read: bool,
) -> Result<Self> {