diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index ba5e453e41..b1b0c032d7 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -7,6 +7,7 @@ use std::fs::{self, File, OpenOptions}; use std::io::{Read, Write}; use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::time::Instant; use crate::control_file_upgrade::upgrade_control_file; use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; @@ -28,6 +29,9 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); pub trait Storage: Deref { /// Persist safekeeper state on disk and update internal state. fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; + + /// Timestamp of last persist. + fn last_persist_at(&self) -> Instant; } #[derive(Debug)] @@ -38,6 +42,8 @@ pub struct FileStorage { /// Last state persisted to disk. state: SafeKeeperState, + /// Not preserved across restarts. + last_persist_at: Instant, } impl FileStorage { @@ -51,6 +57,7 @@ impl FileStorage { timeline_dir, conf: conf.clone(), state, + last_persist_at: Instant::now(), }) } @@ -66,6 +73,7 @@ impl FileStorage { timeline_dir, conf: conf.clone(), state, + last_persist_at: Instant::now(), }; Ok(store) @@ -216,6 +224,10 @@ impl Storage for FileStorage { self.state = s.clone(); Ok(()) } + + fn last_persist_at(&self) -> Instant { + self.last_persist_at + } } #[cfg(test)] diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index b6d497f34e..ad9d655fae 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -17,6 +17,9 @@ pub fn thread_main(conf: SafeKeeperConf) { let ttid = tli.ttid; let _enter = info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered(); + if let Err(e) = tli.maybe_pesist_control_file() { + warn!("failed to persist control file: {e}"); + } if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { warn!("failed to remove WAL: {}", e); } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index eb434136d4..7378ccb994 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -10,6 +10,7 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; +use std::time::Duration; use storage_broker::proto::SafekeeperTimelineInfo; use tracing::*; @@ -837,6 +838,26 @@ where self.state.persist(&state) } + /// Persist control file if there is something to save and enough time + /// passed after the last save. + pub fn maybe_persist_control_file(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> { + const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); + if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL { + return Ok(()); + } + let need_persist = self.inmem.commit_lsn > self.state.commit_lsn + || self.inmem.backup_lsn > self.state.backup_lsn + || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn + || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn; + if need_persist { + let mut state = self.state.clone(); + state.remote_consistent_lsn = inmem_remote_consistent_lsn; + self.persist_control_file(state)?; + trace!("saved control file: {CF_SAVE_INTERVAL:?} passed"); + } + Ok(()) + } + /// Handle request to append WAL. #[allow(clippy::comparison_chain)] fn handle_append_request( @@ -949,9 +970,8 @@ where if sync_control_file { let mut state = self.state.clone(); - // Note: we do not persist remote_consistent_lsn in other paths of - // persisting cf -- that is not much needed currently. We could do - // that by storing Arc to walsenders in Safekeeper. + // Note: we could make remote_consistent_lsn update in cf common by + // storing Arc to walsenders in Safekeeper. state.remote_consistent_lsn = new_remote_consistent_lsn; self.persist_control_file(state)?; } @@ -981,7 +1001,7 @@ mod tests { use super::*; use crate::wal_storage::Storage; - use std::ops::Deref; + use std::{ops::Deref, time::Instant}; // fake storage for tests struct InMemoryState { @@ -993,6 +1013,10 @@ mod tests { self.persisted_state = s.clone(); Ok(()) } + + fn last_persist_at(&self) -> Instant { + Instant::now() + } } impl Deref for InMemoryState { diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2dbf215998..941f8dae54 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -234,7 +234,6 @@ impl SharedState { flush_lsn: self.sk.wal_store.flush_lsn().0, // note: this value is not flushed to control file yet and can be lost commit_lsn: self.sk.inmem.commit_lsn.0, - // TODO: rework feedbacks to avoid max here remote_consistent_lsn: remote_consistent_lsn.0, peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0, safekeeper_connstr: conf.listen_pg_addr.clone(), @@ -673,6 +672,17 @@ impl Timeline { Ok(()) } + /// Persist control file if there is something to save and enough time + /// passed after the last save. This helps to keep remote_consistent_lsn up + /// to date so that storage nodes restart doesn't cause many pageserver -> + /// safekeeper reconnections. + pub fn maybe_pesist_control_file(&self) -> Result<()> { + let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn(); + self.write_shared_state() + .sk + .maybe_persist_control_file(remote_consistent_lsn) + } + /// Returns full timeline info, required for the metrics. If the timeline is /// not active, returns None instead. pub fn info_for_metrics(&self) -> Option {