diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 591bfea182..f1daddd7c3 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -13,13 +13,16 @@ use std::time::Instant; use crate::control_file_upgrade::upgrade_control_file; use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; -use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC}; +use crate::state::TimelinePersistentState; use utils::{bin_ser::LeSer, id::TenantTimelineId}; use crate::SafeKeeperConf; use std::convert::TryInto; +pub const SK_MAGIC: u32 = 0xcafeceefu32; +pub const SK_FORMAT_VERSION: u32 = 7; + // contains persistent metadata for safekeeper const CONTROL_FILE_NAME: &str = "safekeeper.control"; // needed to atomically update the state using `rename` @@ -29,9 +32,9 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Storage should keep actual state inside of it. It should implement Deref /// trait to access state fields and have persist method for updating that state. #[async_trait::async_trait] -pub trait Storage: Deref { +pub trait Storage: Deref { /// Persist safekeeper state on disk and update internal state. - async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; + async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()>; /// Timestamp of last persist. fn last_persist_at(&self) -> Instant; @@ -44,7 +47,7 @@ pub struct FileStorage { conf: SafeKeeperConf, /// Last state persisted to disk. - state: SafeKeeperState, + state: TimelinePersistentState, /// Not preserved across restarts. last_persist_at: Instant, } @@ -68,7 +71,7 @@ impl FileStorage { pub fn create_new( timeline_dir: Utf8PathBuf, conf: &SafeKeeperConf, - state: SafeKeeperState, + state: TimelinePersistentState, ) -> Result { let store = FileStorage { timeline_dir, @@ -81,7 +84,7 @@ impl FileStorage { } /// Check the magic/version in the on-disk data and deserialize it, if possible. - fn deser_sk_state(buf: &mut &[u8]) -> Result { + fn deser_sk_state(buf: &mut &[u8]) -> Result { // Read the version independent part let magic = ReadBytesExt::read_u32::(buf)?; if magic != SK_MAGIC { @@ -93,7 +96,7 @@ impl FileStorage { } let version = ReadBytesExt::read_u32::(buf)?; if version == SK_FORMAT_VERSION { - let res = SafeKeeperState::des(buf)?; + let res = TimelinePersistentState::des(buf)?; return Ok(res); } // try to upgrade @@ -104,13 +107,15 @@ impl FileStorage { pub fn load_control_file_conf( conf: &SafeKeeperConf, ttid: &TenantTimelineId, - ) -> Result { + ) -> Result { let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME); Self::load_control_file(path) } /// Read in the control file. - pub fn load_control_file>(control_file_path: P) -> Result { + pub fn load_control_file>( + control_file_path: P, + ) -> Result { let mut control_file = std::fs::OpenOptions::new() .read(true) .write(true) @@ -153,7 +158,7 @@ impl FileStorage { } impl Deref for FileStorage { - type Target = SafeKeeperState; + type Target = TimelinePersistentState; fn deref(&self) -> &Self::Target { &self.state @@ -165,7 +170,7 @@ impl Storage for FileStorage { /// Persists state durably to the underlying storage. /// /// For a description, see . - async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { + async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> { let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer(); // write data to safekeeper.control.partial @@ -242,7 +247,7 @@ impl Storage for FileStorage { mod test { use super::FileStorage; use super::*; - use crate::{safekeeper::SafeKeeperState, SafeKeeperConf}; + use crate::SafeKeeperConf; use anyhow::Result; use utils::{id::TenantTimelineId, lsn::Lsn}; @@ -257,7 +262,7 @@ mod test { async fn load_from_control_file( conf: &SafeKeeperConf, ttid: &TenantTimelineId, - ) -> Result<(FileStorage, SafeKeeperState)> { + ) -> Result<(FileStorage, TimelinePersistentState)> { fs::create_dir_all(conf.timeline_dir(ttid)) .await .expect("failed to create timeline dir"); @@ -270,11 +275,11 @@ mod test { async fn create( conf: &SafeKeeperConf, ttid: &TenantTimelineId, - ) -> Result<(FileStorage, SafeKeeperState)> { + ) -> Result<(FileStorage, TimelinePersistentState)> { fs::create_dir_all(conf.timeline_dir(ttid)) .await .expect("failed to create timeline dir"); - let state = SafeKeeperState::empty(); + let state = TimelinePersistentState::empty(); let timeline_dir = conf.timeline_dir(ttid); let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?; Ok((storage, state)) diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index a0be2b2054..2fd719326d 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,6 +1,7 @@ //! Code to deal with safekeeper control file upgrades -use crate::safekeeper::{ - AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn, +use crate::{ + safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn}, + state::{PersistedPeers, TimelinePersistentState}, }; use anyhow::{bail, Result}; use pq_proto::SystemId; @@ -137,7 +138,7 @@ pub struct SafeKeeperStateV4 { pub peers: PersistedPeers, } -pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { +pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { // migrate to storing full term history if version == 1 { info!("reading safekeeper control file version {}", version); @@ -149,7 +150,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result lsn: Lsn(0), }]), }; - return Ok(SafeKeeperState { + return Ok(TimelinePersistentState { tenant_id: oldstate.server.tenant_id, timeline_id: oldstate.server.timeline_id, acceptor_state: ac, @@ -176,7 +177,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result system_id: oldstate.server.system_id, wal_seg_size: oldstate.server.wal_seg_size, }; - return Ok(SafeKeeperState { + return Ok(TimelinePersistentState { tenant_id: oldstate.server.tenant_id, timeline_id: oldstate.server.timeline_id, acceptor_state: oldstate.acceptor_state, @@ -199,7 +200,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result system_id: oldstate.server.system_id, wal_seg_size: oldstate.server.wal_seg_size, }; - return Ok(SafeKeeperState { + return Ok(TimelinePersistentState { tenant_id: oldstate.server.tenant_id, timeline_id: oldstate.server.timeline_id, acceptor_state: oldstate.acceptor_state, @@ -222,7 +223,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result system_id: oldstate.server.system_id, wal_seg_size: oldstate.server.wal_seg_size, }; - return Ok(SafeKeeperState { + return Ok(TimelinePersistentState { tenant_id: oldstate.tenant_id, timeline_id: oldstate.timeline_id, acceptor_state: oldstate.acceptor_state, @@ -238,7 +239,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result }); } else if version == 5 { info!("reading safekeeper control file version {}", version); - let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?; + let mut oldstate = TimelinePersistentState::des(&buf[..buf.len()])?; if oldstate.timeline_start_lsn != Lsn(0) { return Ok(oldstate); } @@ -251,7 +252,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result return Ok(oldstate); } else if version == 6 { info!("reading safekeeper control file version {}", version); - let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?; + let mut oldstate = TimelinePersistentState::des(&buf[..buf.len()])?; if oldstate.server.pg_version != 0 { return Ok(oldstate); } diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs index ef88eb27e3..5bc877adbd 100644 --- a/safekeeper/src/copy_timeline.rs +++ b/safekeeper/src/copy_timeline.rs @@ -14,7 +14,7 @@ use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::{ control_file::{FileStorage, Storage}, pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline}, - safekeeper::SafeKeeperState, + state::TimelinePersistentState, timeline::{Timeline, TimelineError}, wal_backup::copy_s3_segments, wal_storage::{wal_file_paths, WalReader}, @@ -137,7 +137,7 @@ pub async fn handle_request(request: Request) -> Result<()> { ) .await?; - let mut new_state = SafeKeeperState::new( + let mut new_state = TimelinePersistentState::new( &request.destination_ttid, state.server.clone(), vec![], @@ -160,7 +160,7 @@ pub async fn handle_request(request: Request) -> Result<()> { async fn copy_disk_segments( conf: &SafeKeeperConf, - persisted_state: &SafeKeeperState, + persisted_state: &TimelinePersistentState, wal_seg_size: usize, source_ttid: &TenantTimelineId, start_lsn: Lsn, diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index c9ff1afdea..b50f2e1158 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -22,14 +22,13 @@ use utils::id::TenantTimelineId; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; -use crate::safekeeper::SafeKeeperState; -use crate::safekeeper::SafekeeperMemState; use crate::safekeeper::TermHistory; -use crate::SafeKeeperConf; - use crate::send_wal::WalSenderState; +use crate::state::TimelineMemState; +use crate::state::TimelinePersistentState; use crate::wal_storage::WalReader; use crate::GlobalTimelines; +use crate::SafeKeeperConf; /// Various filters that influence the resulting JSON output. #[derive(Debug, Serialize, Deserialize, Clone)] @@ -143,7 +142,7 @@ pub struct Config { pub struct Timeline { pub tenant_id: TenantId, pub timeline_id: TimelineId, - pub control_file: Option, + pub control_file: Option, pub memory: Option, pub disk_content: Option, } @@ -158,7 +157,7 @@ pub struct Memory { pub num_computes: u32, pub last_removed_segno: XLogSegNo, pub epoch_start_lsn: Lsn, - pub mem_state: SafekeeperMemState, + pub mem_state: TimelineMemState, // PhysicalStorage state. pub write_lsn: Lsn, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 5283ea19c1..ec715f6d2e 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -160,7 +160,7 @@ async fn timeline_status_handler(request: Request) -> Result Self { + pub fn new() -> Self { Self { backup_lsn: Lsn::INVALID, term: INVALID_TERM, @@ -232,111 +231,10 @@ impl PersistedPeerInfo { } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>); - -/// Persistent information stored on safekeeper node -/// On disk data is prefixed by magic and format version and followed by checksum. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct SafeKeeperState { - #[serde(with = "hex")] - pub tenant_id: TenantId, - #[serde(with = "hex")] - pub timeline_id: TimelineId, - /// persistent acceptor state - pub acceptor_state: AcceptorState, - /// information about server - pub server: ServerInfo, - /// Unique id of the last *elected* proposer we dealt with. Not needed - /// for correctness, exists for monitoring purposes. - #[serde(with = "hex")] - pub proposer_uuid: PgUuid, - /// Since which LSN this timeline generally starts. Safekeeper might have - /// joined later. - pub timeline_start_lsn: Lsn, - /// Since which LSN safekeeper has (had) WAL for this timeline. - /// All WAL segments next to one containing local_start_lsn are - /// filled with data from the beginning. - pub local_start_lsn: Lsn, - /// Part of WAL acknowledged by quorum *and available locally*. Always points - /// to record boundary. - pub commit_lsn: Lsn, - /// LSN that points to the end of the last backed up segment. Useful to - /// persist to avoid finding out offloading progress on boot. - pub backup_lsn: Lsn, - /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn - /// of last record streamed to everyone). Persisting it helps skipping - /// recovery in walproposer, generally we compute it from peers. In - /// walproposer proto called 'truncate_lsn'. Updates are currently drived - /// only by walproposer. - pub peer_horizon_lsn: Lsn, - /// LSN of the oldest known checkpoint made by pageserver and successfully - /// pushed to s3. We don't remove WAL beyond it. Persisted only for - /// informational purposes, we receive it from pageserver (or broker). - pub remote_consistent_lsn: Lsn, - // Peers and their state as we remember it. Knowing peers themselves is - // fundamental; but state is saved here only for informational purposes and - // obviously can be stale. (Currently not saved at all, but let's provision - // place to have less file version upgrades). - pub peers: PersistedPeers, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values -// are not flushed yet. -pub struct SafekeeperMemState { - pub commit_lsn: Lsn, - pub backup_lsn: Lsn, - pub peer_horizon_lsn: Lsn, - #[serde(with = "hex")] - pub proposer_uuid: PgUuid, -} - -impl SafeKeeperState { - pub fn new( - ttid: &TenantTimelineId, - server_info: ServerInfo, - peers: Vec, - commit_lsn: Lsn, - local_start_lsn: Lsn, - ) -> SafeKeeperState { - SafeKeeperState { - tenant_id: ttid.tenant_id, - timeline_id: ttid.timeline_id, - acceptor_state: AcceptorState { - term: 0, - term_history: TermHistory::empty(), - }, - server: server_info, - proposer_uuid: [0; 16], - timeline_start_lsn: Lsn(0), - local_start_lsn, - commit_lsn, - backup_lsn: local_start_lsn, - peer_horizon_lsn: local_start_lsn, - remote_consistent_lsn: Lsn(0), - peers: PersistedPeers( - peers - .iter() - .map(|p| (*p, PersistedPeerInfo::new())) - .collect(), - ), - } - } - - #[cfg(test)] - pub fn empty() -> Self { - SafeKeeperState::new( - &TenantTimelineId::empty(), - ServerInfo { - pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ - system_id: 0, /* Postgres system identifier */ - wal_seg_size: 0, - }, - vec![], - Lsn::INVALID, - Lsn::INVALID, - ) +// make clippy happy +impl Default for PersistedPeerInfo { + fn default() -> Self { + Self::new() } } @@ -583,9 +481,7 @@ pub struct SafeKeeper { /// determines epoch switch point. pub epoch_start_lsn: Lsn, - pub inmem: SafekeeperMemState, // in memory part - pub state: CTRL, // persistent state storage - + pub state: TimelineState, // persistent state storage pub wal_store: WAL, node_id: NodeId, // safekeeper's node id @@ -612,13 +508,7 @@ where Ok(SafeKeeper { epoch_start_lsn: Lsn(0), - inmem: SafekeeperMemState { - commit_lsn: state.commit_lsn, - backup_lsn: state.backup_lsn, - peer_horizon_lsn: state.peer_horizon_lsn, - proposer_uuid: state.proposer_uuid, - }, - state, + state: TimelineState::new(state), wal_store, node_id, }) @@ -726,12 +616,12 @@ where ); } - let mut state = self.state.clone(); + let mut state = self.state.start_change(); state.server.system_id = msg.system_id; if msg.pg_version != UNKNOWN_SERVER_VERSION { state.server.pg_version = msg.pg_version; } - self.state.persist(&state).await?; + self.state.finish_change(&state).await?; } info!( @@ -766,15 +656,15 @@ where term: self.state.acceptor_state.term, vote_given: false as u64, flush_lsn: self.flush_lsn(), - truncate_lsn: self.inmem.peer_horizon_lsn, + truncate_lsn: self.state.inmem.peer_horizon_lsn, term_history: self.get_term_history(), timeline_start_lsn: self.state.timeline_start_lsn, }; if self.state.acceptor_state.term < msg.term { - let mut state = self.state.clone(); + let mut state = self.state.start_change(); state.acceptor_state.term = msg.term; // persist vote before sending it out - self.state.persist(&state).await?; + self.state.finish_change(&state).await?; resp.term = self.state.acceptor_state.term; resp.vote_given = true as u64; @@ -803,9 +693,9 @@ where ) -> Result> { info!("received ProposerElected {:?}", msg); if self.state.acceptor_state.term < msg.term { - let mut state = self.state.clone(); + let mut state = self.state.start_change(); state.acceptor_state.term = msg.term; - self.state.persist(&state).await?; + self.state.finish_change(&state).await?; } // If our term is higher, ignore the message (next feedback will inform the compute) @@ -825,10 +715,10 @@ where } // Otherwise we must never attempt to truncate committed data. assert!( - msg.start_streaming_at >= self.inmem.commit_lsn, + msg.start_streaming_at >= self.state.inmem.commit_lsn, "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}", msg.start_streaming_at, - self.inmem.commit_lsn + self.state.inmem.commit_lsn ); // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to @@ -839,7 +729,7 @@ where // and now adopt term history from proposer { - let mut state = self.state.clone(); + let mut state = self.state.start_change(); // Here we learn initial LSN for the first time, set fields // interested in that. @@ -863,13 +753,13 @@ where // NB: on new clusters, this happens at the same time as // timeline_start_lsn initialization, it is taken outside to provide // upgrade. - self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn); + state.commit_lsn = max(state.commit_lsn, state.timeline_start_lsn); // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment. - self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn); + state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn); state.acceptor_state.term_history = msg.term_history.clone(); - self.persist_control_file(state).await?; + self.state.finish_change(&state).await?; } info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -892,63 +782,41 @@ where async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> { // Both peers and walproposer communicate this value, we might already // have a fresher (higher) version. - candidate = max(candidate, self.inmem.commit_lsn); + candidate = max(candidate, self.state.inmem.commit_lsn); let commit_lsn = min(candidate, self.flush_lsn()); assert!( - commit_lsn >= self.inmem.commit_lsn, + commit_lsn >= self.state.inmem.commit_lsn, "commit_lsn monotonicity violated: old={} new={}", - self.inmem.commit_lsn, + self.state.inmem.commit_lsn, commit_lsn ); - self.inmem.commit_lsn = commit_lsn; + self.state.inmem.commit_lsn = commit_lsn; // If new commit_lsn reached epoch switch, force sync of control // file: walproposer in sync mode is very interested when this // happens. Note: this is for sync-safekeepers mode only, as // otherwise commit_lsn might jump over epoch_start_lsn. if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn { - self.persist_control_file(self.state.clone()).await?; + self.state.flush().await?; } Ok(()) } - /// Persist in-memory state of control file to disk. - // - // TODO: passing inmem_remote_consistent_lsn everywhere is ugly, better - // separate state completely and give Arc to all those who need it. - pub async fn persist_inmem(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> { - let mut state = self.state.clone(); - state.remote_consistent_lsn = inmem_remote_consistent_lsn; - self.persist_control_file(state).await - } - - /// Persist in-memory state to the disk, taking other data from state. - async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { - state.commit_lsn = self.inmem.commit_lsn; - state.backup_lsn = self.inmem.backup_lsn; - state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; - state.proposer_uuid = self.inmem.proposer_uuid; - self.state.persist(&state).await - } - /// Persist control file if there is something to save and enough time /// passed after the last save. - pub async fn maybe_persist_inmem_control_file( - &mut self, - inmem_remote_consistent_lsn: Lsn, - ) -> Result<()> { + pub async fn maybe_persist_inmem_control_file(&mut self) -> Result<()> { const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); - if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL { + if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL { return Ok(()); } - let need_persist = self.inmem.commit_lsn > self.state.commit_lsn - || self.inmem.backup_lsn > self.state.backup_lsn - || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn - || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn; + let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn + || self.state.inmem.backup_lsn > self.state.backup_lsn + || self.state.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn + || self.state.inmem.remote_consistent_lsn > self.state.remote_consistent_lsn; if need_persist { - self.persist_inmem(inmem_remote_consistent_lsn).await?; + self.state.flush().await?; trace!("saved control file: {CF_SAVE_INTERVAL:?} passed"); } Ok(()) @@ -974,7 +842,7 @@ where // Now we know that we are in the same term as the proposer, // processing the message. - self.inmem.proposer_uuid = msg.h.proposer_uuid; + self.state.inmem.proposer_uuid = msg.h.proposer_uuid; // do the job if !msg.wal_data.is_empty() { @@ -998,15 +866,16 @@ where // - if we make safekeepers always send persistent value, // any compute restart would pull it down. // Thus, take max before adopting. - self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn); + self.state.inmem.peer_horizon_lsn = + max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn); // Update truncate and commit LSN in control file. // To avoid negative impact on performance of extra fsync, do it only - // when truncate_lsn delta exceeds WAL segment size. - if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) - < self.inmem.peer_horizon_lsn + // when commit_lsn delta exceeds WAL segment size. + if self.state.commit_lsn + (self.state.server.wal_seg_size as u64) + < self.state.inmem.commit_lsn { - self.persist_control_file(self.state.clone()).await?; + self.state.flush().await?; } trace!( @@ -1048,27 +917,27 @@ where } } - let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn); - sync_control_file |= - self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn; - self.inmem.backup_lsn = new_backup_lsn; + self.state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), self.state.inmem.backup_lsn); + sync_control_file |= self.state.backup_lsn + (self.state.server.wal_seg_size as u64) + < self.state.inmem.backup_lsn; - // value in sk_info should be maximized over our local in memory value. - let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn); - assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn); + self.state.inmem.remote_consistent_lsn = max( + Lsn(sk_info.remote_consistent_lsn), + self.state.inmem.remote_consistent_lsn, + ); sync_control_file |= self.state.remote_consistent_lsn + (self.state.server.wal_seg_size as u64) - < new_remote_consistent_lsn; + < self.state.inmem.remote_consistent_lsn; - let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn); + self.state.inmem.peer_horizon_lsn = max( + Lsn(sk_info.peer_horizon_lsn), + self.state.inmem.peer_horizon_lsn, + ); sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) - < new_peer_horizon_lsn; - self.inmem.peer_horizon_lsn = new_peer_horizon_lsn; + < self.state.inmem.peer_horizon_lsn; if sync_control_file { - let mut state = self.state.clone(); - state.remote_consistent_lsn = new_remote_consistent_lsn; - self.persist_control_file(state).await?; + self.state.flush().await?; } Ok(()) } @@ -1096,17 +965,20 @@ mod tests { use postgres_ffi::WAL_SEGMENT_SIZE; use super::*; - use crate::wal_storage::Storage; + use crate::{ + state::{PersistedPeers, TimelinePersistentState}, + wal_storage::Storage, + }; use std::{ops::Deref, str::FromStr, time::Instant}; // fake storage for tests struct InMemoryState { - persisted_state: SafeKeeperState, + persisted_state: TimelinePersistentState, } #[async_trait::async_trait] impl control_file::Storage for InMemoryState { - async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { + async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> { self.persisted_state = s.clone(); Ok(()) } @@ -1117,15 +989,15 @@ mod tests { } impl Deref for InMemoryState { - type Target = SafeKeeperState; + type Target = TimelinePersistentState; fn deref(&self) -> &Self::Target { &self.persisted_state } } - fn test_sk_state() -> SafeKeeperState { - let mut state = SafeKeeperState::empty(); + fn test_sk_state() -> TimelinePersistentState { + let mut state = TimelinePersistentState::empty(); state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32; state.tenant_id = TenantId::from([1u8; 16]); state.timeline_id = TimelineId::from([1u8; 16]); @@ -1182,7 +1054,7 @@ mod tests { } // reboot... - let state = sk.state.persisted_state.clone(); + let state = sk.state.deref().clone(); let storage = InMemoryState { persisted_state: state, }; @@ -1321,7 +1193,7 @@ mod tests { use utils::Hex; let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(); let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap(); - let state = SafeKeeperState { + let state = TimelinePersistentState { tenant_id, timeline_id, acceptor_state: AcceptorState { @@ -1405,7 +1277,7 @@ mod tests { assert_eq!(Hex(&ser), Hex(&expected)); - let deser = SafeKeeperState::des(&ser).unwrap(); + let deser = TimelinePersistentState::des(&ser).unwrap(); assert_eq!(deser, state); } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 9a5657a40d..879b805796 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -19,7 +19,6 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use utils::failpoint_support; use utils::id::TenantTimelineId; -use utils::lsn::AtomicLsn; use utils::pageserver_feedback::PageserverFeedback; use std::cmp::{max, min}; @@ -90,16 +89,12 @@ pub struct StandbyFeedback { /// WalSenders registry. Timeline holds it (wrapped in Arc). pub struct WalSenders { - /// Lsn maximized over all walsenders *and* peer data, so might be higher - /// than what we receive from replicas. - remote_consistent_lsn: AtomicLsn, mutex: Mutex, } impl WalSenders { - pub fn new(remote_consistent_lsn: Lsn) -> Arc { + pub fn new() -> Arc { Arc::new(WalSenders { - remote_consistent_lsn: AtomicLsn::from(remote_consistent_lsn), mutex: Mutex::new(WalSendersShared::new()), }) } @@ -157,7 +152,6 @@ impl WalSenders { let mut shared = self.mutex.lock(); shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback); shared.update_ps_feedback(); - self.update_remote_consistent_lsn(shared.agg_ps_feedback.remote_consistent_lsn); } /// Record standby reply. @@ -202,18 +196,6 @@ impl WalSenders { } } - /// Get remote_consistent_lsn maximized across all walsenders and peers. - pub fn get_remote_consistent_lsn(self: &Arc) -> Lsn { - self.remote_consistent_lsn.load() - } - - /// Update maximized remote_consistent_lsn, return new (potentially) value. - pub fn update_remote_consistent_lsn(self: &Arc, candidate: Lsn) -> Lsn { - self.remote_consistent_lsn - .fetch_max(candidate) - .max(candidate) - } - /// Unregister walsender. fn unregister(self: &Arc, id: WalSenderId) { let mut shared = self.mutex.lock(); @@ -444,7 +426,11 @@ impl SafekeeperPostgresHandler { wal_reader, send_buf: [0; MAX_SEND_SIZE], }; - let mut reply_reader = ReplyReader { reader, ws_guard }; + let mut reply_reader = ReplyReader { + reader, + ws_guard, + tli, + }; let res = tokio::select! { // todo: add read|write .context to these errors @@ -638,17 +624,18 @@ impl WalSender<'_, IO> { struct ReplyReader { reader: PostgresBackendReader, ws_guard: Arc, + tli: Arc, } impl ReplyReader { async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> { loop { let msg = self.reader.read_copy_message().await?; - self.handle_feedback(&msg)? + self.handle_feedback(&msg).await? } } - fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> { + async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> { match msg.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { // Note: deserializing is on m[1..] because we skip the tag byte. @@ -675,6 +662,9 @@ impl ReplyReader { self.ws_guard .walsenders .record_ps_feedback(self.ws_guard.id, &ps_feedback); + self.tli + .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn) + .await; // in principle new remote_consistent_lsn could allow to // deactivate the timeline, but we check that regularly through // broker updated, not need to do it here diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs new file mode 100644 index 0000000000..82f7954051 --- /dev/null +++ b/safekeeper/src/state.rs @@ -0,0 +1,197 @@ +//! Defines per timeline data stored persistently (SafeKeeperPersistentState) +//! and its wrapper with in memory layer (SafekeeperState). + +use std::ops::Deref; + +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use utils::{ + id::{NodeId, TenantId, TenantTimelineId, TimelineId}, + lsn::Lsn, +}; + +use crate::{ + control_file, + safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory}, +}; + +/// Persistent information stored on safekeeper node about timeline. +/// On disk data is prefixed by magic and format version and followed by checksum. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TimelinePersistentState { + #[serde(with = "hex")] + pub tenant_id: TenantId, + #[serde(with = "hex")] + pub timeline_id: TimelineId, + /// persistent acceptor state + pub acceptor_state: AcceptorState, + /// information about server + pub server: ServerInfo, + /// Unique id of the last *elected* proposer we dealt with. Not needed + /// for correctness, exists for monitoring purposes. + #[serde(with = "hex")] + pub proposer_uuid: PgUuid, + /// Since which LSN this timeline generally starts. Safekeeper might have + /// joined later. + pub timeline_start_lsn: Lsn, + /// Since which LSN safekeeper has (had) WAL for this timeline. + /// All WAL segments next to one containing local_start_lsn are + /// filled with data from the beginning. + pub local_start_lsn: Lsn, + /// Part of WAL acknowledged by quorum *and available locally*. Always points + /// to record boundary. + pub commit_lsn: Lsn, + /// LSN that points to the end of the last backed up segment. Useful to + /// persist to avoid finding out offloading progress on boot. + pub backup_lsn: Lsn, + /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn + /// of last record streamed to everyone). Persisting it helps skipping + /// recovery in walproposer, generally we compute it from peers. In + /// walproposer proto called 'truncate_lsn'. Updates are currently drived + /// only by walproposer. + pub peer_horizon_lsn: Lsn, + /// LSN of the oldest known checkpoint made by pageserver and successfully + /// pushed to s3. We don't remove WAL beyond it. Persisted only for + /// informational purposes, we receive it from pageserver (or broker). + pub remote_consistent_lsn: Lsn, + // Peers and their state as we remember it. Knowing peers themselves is + // fundamental; but state is saved here only for informational purposes and + // obviously can be stale. (Currently not saved at all, but let's provision + // place to have less file version upgrades). + pub peers: PersistedPeers, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct PersistedPeers(pub Vec<(NodeId, PersistedPeerInfo)>); + +impl TimelinePersistentState { + pub fn new( + ttid: &TenantTimelineId, + server_info: ServerInfo, + peers: Vec, + commit_lsn: Lsn, + local_start_lsn: Lsn, + ) -> TimelinePersistentState { + TimelinePersistentState { + tenant_id: ttid.tenant_id, + timeline_id: ttid.timeline_id, + acceptor_state: AcceptorState { + term: 0, + term_history: TermHistory::empty(), + }, + server: server_info, + proposer_uuid: [0; 16], + timeline_start_lsn: Lsn(0), + local_start_lsn, + commit_lsn, + backup_lsn: local_start_lsn, + peer_horizon_lsn: local_start_lsn, + remote_consistent_lsn: Lsn(0), + peers: PersistedPeers( + peers + .iter() + .map(|p| (*p, PersistedPeerInfo::new())) + .collect(), + ), + } + } + + #[cfg(test)] + pub fn empty() -> Self { + use crate::safekeeper::UNKNOWN_SERVER_VERSION; + + TimelinePersistentState::new( + &TenantTimelineId::empty(), + ServerInfo { + pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ + system_id: 0, /* Postgres system identifier */ + wal_seg_size: 0, + }, + vec![], + Lsn::INVALID, + Lsn::INVALID, + ) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +// In memory safekeeper state. Fields mirror ones in `SafeKeeperPersistentState`; values +// are not flushed yet. +pub struct TimelineMemState { + pub commit_lsn: Lsn, + pub backup_lsn: Lsn, + pub peer_horizon_lsn: Lsn, + pub remote_consistent_lsn: Lsn, + #[serde(with = "hex")] + pub proposer_uuid: PgUuid, +} + +/// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs +/// when we update fields like commit_lsn which don't need immediate +/// persistence. Provides transactional like API to atomically update the state. +/// +/// Implements Deref into *persistent* part. +pub struct TimelineState { + pub inmem: TimelineMemState, + pub pers: CTRL, // persistent +} + +impl TimelineState +where + CTRL: control_file::Storage, +{ + pub fn new(state: CTRL) -> Self { + TimelineState { + inmem: TimelineMemState { + commit_lsn: state.commit_lsn, + backup_lsn: state.backup_lsn, + peer_horizon_lsn: state.peer_horizon_lsn, + remote_consistent_lsn: state.remote_consistent_lsn, + proposer_uuid: state.proposer_uuid, + }, + pers: state, + } + } + + /// Start atomic change. Returns SafeKeeperPersistentState with in memory + /// values applied; the protocol is to 1) change returned struct as desired + /// 2) atomically persist it with finish_change. + pub fn start_change(&self) -> TimelinePersistentState { + let mut s = self.pers.clone(); + s.commit_lsn = self.inmem.commit_lsn; + s.backup_lsn = self.inmem.backup_lsn; + s.peer_horizon_lsn = self.inmem.peer_horizon_lsn; + s.remote_consistent_lsn = self.inmem.remote_consistent_lsn; + s.proposer_uuid = self.inmem.proposer_uuid; + s + } + + /// Persist given state. c.f. start_change. + pub async fn finish_change(&mut self, s: &TimelinePersistentState) -> Result<()> { + self.pers.persist(s).await?; + // keep in memory values up to date + self.inmem.commit_lsn = s.commit_lsn; + self.inmem.backup_lsn = s.backup_lsn; + self.inmem.peer_horizon_lsn = s.peer_horizon_lsn; + self.inmem.remote_consistent_lsn = s.remote_consistent_lsn; + self.inmem.proposer_uuid = s.proposer_uuid; + Ok(()) + } + + /// Flush in memory values. + pub async fn flush(&mut self) -> Result<()> { + let s = self.start_change(); + self.finish_change(&s).await + } +} + +impl Deref for TimelineState +where + CTRL: control_file::Storage, +{ + type Target = TimelinePersistentState; + + fn deref(&self) -> &Self::Target { + &self.pers + } +} diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2f284abe8c..1a8df92828 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -28,10 +28,11 @@ use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use crate::receive_wal::WalReceivers; use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo}; use crate::safekeeper::{ - AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, - SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM, + AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn, + INVALID_TERM, }; use crate::send_wal::WalSenders; +use crate::state::{TimelineMemState, TimelinePersistentState}; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; use crate::metrics::FullTimelineInfo; @@ -121,7 +122,7 @@ impl SharedState { fn create_new( conf: &SafeKeeperConf, ttid: &TenantTimelineId, - state: SafeKeeperState, + state: TimelinePersistentState, ) -> Result { if state.server.wal_seg_size == 0 { bail!(TimelineError::UninitializedWalSegSize(*ttid)); @@ -175,30 +176,28 @@ impl SharedState { }) } - fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool { + fn is_active(&self, num_computes: usize) -> bool { self.is_wal_backup_required(num_computes) // FIXME: add tracking of relevant pageservers and check them here individually, // otherwise migration won't work (we suspend too early). - || remote_consistent_lsn < self.sk.inmem.commit_lsn + || self.sk.state.inmem.remote_consistent_lsn < self.sk.state.inmem.commit_lsn } /// Mark timeline active/inactive and return whether s3 offloading requires /// start/stop action. If timeline is deactivated, control file is persisted /// as maintenance task does that only for active timelines. - async fn update_status( - &mut self, - num_computes: usize, - remote_consistent_lsn: Lsn, - ttid: TenantTimelineId, - ) -> bool { - let is_active = self.is_active(num_computes, remote_consistent_lsn); + async fn update_status(&mut self, num_computes: usize, ttid: TenantTimelineId) -> bool { + let is_active = self.is_active(num_computes); if self.active != is_active { info!( "timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}", - ttid, is_active, remote_consistent_lsn, self.sk.inmem.commit_lsn + ttid, + is_active, + self.sk.state.inmem.remote_consistent_lsn, + self.sk.state.inmem.commit_lsn ); if !is_active { - if let Err(e) = self.sk.persist_inmem(remote_consistent_lsn).await { + if let Err(e) = self.sk.state.flush().await { warn!("control file save in update_status failed: {:?}", e); } } @@ -212,8 +211,8 @@ impl SharedState { let seg_size = self.get_wal_seg_size(); num_computes > 0 || // Currently only the whole segment is offloaded, so compare segment numbers. - (self.sk.inmem.commit_lsn.segment_number(seg_size) > - self.sk.inmem.backup_lsn.segment_number(seg_size)) + (self.sk.state.inmem.commit_lsn.segment_number(seg_size) > + self.sk.state.inmem.backup_lsn.segment_number(seg_size)) } /// Is current state of s3 offloading is not what it ought to be? @@ -227,7 +226,7 @@ impl SharedState { }; trace!( "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}", - self.sk.state.timeline_id, action_pending, num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn + self.sk.state.timeline_id, action_pending, num_computes, self.sk.state.inmem.commit_lsn, self.sk.state.inmem.backup_lsn ); } res @@ -248,7 +247,6 @@ impl SharedState { &self, ttid: &TenantTimelineId, conf: &SafeKeeperConf, - remote_consistent_lsn: Lsn, ) -> SafekeeperTimelineInfo { SafekeeperTimelineInfo { safekeeper_id: conf.my_id.0, @@ -260,15 +258,15 @@ impl SharedState { last_log_term: self.sk.get_epoch(), flush_lsn: self.sk.flush_lsn().0, // note: this value is not flushed to control file yet and can be lost - commit_lsn: self.sk.inmem.commit_lsn.0, - remote_consistent_lsn: remote_consistent_lsn.0, - peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0, + commit_lsn: self.sk.state.inmem.commit_lsn.0, + remote_consistent_lsn: self.sk.state.inmem.remote_consistent_lsn.0, + peer_horizon_lsn: self.sk.state.inmem.peer_horizon_lsn.0, safekeeper_connstr: conf .advertise_pg_addr .to_owned() .unwrap_or(conf.listen_pg_addr.clone()), http_connstr: conf.listen_http_addr.to_owned(), - backup_lsn: self.sk.inmem.backup_lsn.0, + backup_lsn: self.sk.state.inmem.backup_lsn.0, local_start_lsn: self.sk.state.local_start_lsn.0, availability_zone: conf.availability_zone.clone(), } @@ -366,7 +364,6 @@ impl Timeline { let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); let shared_state = SharedState::restore(conf, &ttid)?; - let rcl = shared_state.sk.state.remote_consistent_lsn; let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(shared_state.sk.state.commit_lsn); let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from(( @@ -383,7 +380,7 @@ impl Timeline { term_flush_lsn_watch_tx, term_flush_lsn_watch_rx, mutex: Mutex::new(shared_state), - walsenders: WalSenders::new(rcl), + walsenders: WalSenders::new(), walreceivers: WalReceivers::new(), cancellation_rx, cancellation_tx, @@ -404,7 +401,8 @@ impl Timeline { let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID))); let (cancellation_tx, cancellation_rx) = watch::channel(false); - let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn); + let state = + TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn); Ok(Timeline { ttid, @@ -414,7 +412,7 @@ impl Timeline { term_flush_lsn_watch_tx, term_flush_lsn_watch_rx, mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?), - walsenders: WalSenders::new(Lsn(0)), + walsenders: WalSenders::new(), walreceivers: WalReceivers::new(), cancellation_rx, cancellation_tx, @@ -448,7 +446,7 @@ impl Timeline { fs::create_dir_all(&self.timeline_dir).await?; // Write timeline to disk and start background tasks. - if let Err(e) = shared_state.sk.persist_inmem(Lsn::INVALID).await { + if let Err(e) = shared_state.sk.state.flush().await { // Bootstrap failed, cancel timeline and remove timeline directory. self.cancel(shared_state); @@ -523,11 +521,7 @@ impl Timeline { async fn update_status(&self, shared_state: &mut SharedState) -> bool { shared_state - .update_status( - self.walreceivers.get_num(), - self.get_walsenders().get_remote_consistent_lsn(), - self.ttid, - ) + .update_status(self.walreceivers.get_num(), self.ttid) .await } @@ -558,8 +552,8 @@ impl Timeline { } let shared_state = self.write_shared_state().await; if self.walreceivers.get_num() == 0 { - return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet - reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn; + return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet + reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn; } false } @@ -623,7 +617,7 @@ impl Timeline { resp.pageserver_feedback = ps_feedback; } - commit_lsn = shared_state.sk.inmem.commit_lsn; + commit_lsn = shared_state.sk.state.inmem.commit_lsn; term_flush_lsn = TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn())); } @@ -647,14 +641,14 @@ impl Timeline { } /// Returns state of the timeline. - pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { + pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) { let state = self.write_shared_state().await; - (state.sk.inmem.clone(), state.sk.state.clone()) + (state.sk.state.inmem.clone(), state.sk.state.clone()) } /// Returns latest backup_lsn. pub async fn get_wal_backup_lsn(&self) -> Lsn { - self.write_shared_state().await.sk.inmem.backup_lsn + self.write_shared_state().await.sk.state.inmem.backup_lsn } /// Sets backup_lsn to the given value. @@ -664,7 +658,7 @@ impl Timeline { } let mut state = self.write_shared_state().await; - state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn); + state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn); // we should check whether to shut down offloader, but this will be done // soon by peer communication anyway. Ok(()) @@ -673,21 +667,11 @@ impl Timeline { /// Get safekeeper info for broadcasting to broker and other peers. pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo { let shared_state = self.write_shared_state().await; - shared_state.get_safekeeper_info( - &self.ttid, - conf, - self.walsenders.get_remote_consistent_lsn(), - ) + shared_state.get_safekeeper_info(&self.ttid, conf) } /// Update timeline state with peer safekeeper data. - pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> { - // Update local remote_consistent_lsn in memory (in .walsenders) and in - // sk_info to pass it down to control file. - sk_info.remote_consistent_lsn = self - .walsenders - .update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn)) - .0; + pub async fn record_safekeeper_info(&self, sk_info: SafekeeperTimelineInfo) -> Result<()> { let is_wal_backup_action_pending: bool; let commit_lsn: Lsn; { @@ -696,7 +680,7 @@ impl Timeline { let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now()); shared_state.peers_info.upsert(&peer_info); is_wal_backup_action_pending = self.update_status(&mut shared_state).await; - commit_lsn = shared_state.sk.inmem.commit_lsn; + commit_lsn = shared_state.sk.state.inmem.commit_lsn; } self.commit_lsn_watch_tx.send(commit_lsn)?; // Wake up wal backup launcher, if it is time to stop the offloading. @@ -706,6 +690,13 @@ impl Timeline { Ok(()) } + /// Update in memory remote consistent lsn. + pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) { + let mut shared_state = self.write_shared_state().await; + shared_state.sk.state.inmem.remote_consistent_lsn = + max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate); + } + pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { let shared_state = self.write_shared_state().await; shared_state.get_peers(conf.heartbeat_timeout) @@ -836,11 +827,10 @@ impl Timeline { /// to date so that storage nodes restart doesn't cause many pageserver -> /// safekeeper reconnections. pub async fn maybe_persist_control_file(&self) -> Result<()> { - let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn(); self.write_shared_state() .await .sk - .maybe_persist_inmem_control_file(remote_consistent_lsn) + .maybe_persist_inmem_control_file() .await } @@ -862,10 +852,9 @@ impl Timeline { num_computes: self.walreceivers.get_num() as u32, last_removed_segno: state.last_removed_segno, epoch_start_lsn: state.sk.epoch_start_lsn, - mem_state: state.sk.inmem.clone(), + mem_state: state.sk.state.inmem.clone(), persisted_state: state.sk.state.clone(), flush_lsn: state.sk.wal_store.flush_lsn(), - remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(), wal_storage: state.sk.wal_store.get_metrics(), }) } else { @@ -889,7 +878,7 @@ impl Timeline { num_computes: self.walreceivers.get_num() as u32, last_removed_segno: state.last_removed_segno, epoch_start_lsn: state.sk.epoch_start_lsn, - mem_state: state.sk.inmem.clone(), + mem_state: state.sk.state.inmem.clone(), write_lsn, write_record_lsn, flush_lsn, diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 8d138c701f..ed6190042a 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -23,7 +23,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tracing::*; use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS}; -use crate::safekeeper::SafeKeeperState; +use crate::state::TimelinePersistentState; use crate::wal_backup::read_object; use crate::SafeKeeperConf; use postgres_ffi::waldecoder::WalStreamDecoder; @@ -125,7 +125,7 @@ impl PhysicalStorage { ttid: &TenantTimelineId, timeline_dir: Utf8PathBuf, conf: &SafeKeeperConf, - state: &SafeKeeperState, + state: &TimelinePersistentState, ) -> Result { let wal_seg_size = state.server.wal_seg_size as usize; @@ -525,7 +525,7 @@ impl WalReader { pub fn new( workdir: Utf8PathBuf, timeline_dir: Utf8PathBuf, - state: &SafeKeeperState, + state: &TimelinePersistentState, start_pos: Lsn, enable_remote_read: bool, ) -> Result {