mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Persist safekeeper control file once in a while.
It should make remote_consistent_lsn commonly up-to-date on non actively writing projects, which removes spike or pageserver -> safekeeper reconnections on storage nodes restart.
This commit is contained in:
@@ -7,6 +7,7 @@ use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Read, Write};
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::control_file_upgrade::upgrade_control_file;
|
||||
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
|
||||
@@ -28,6 +29,9 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
|
||||
pub trait Storage: Deref<Target = SafeKeeperState> {
|
||||
/// Persist safekeeper state on disk and update internal state.
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
|
||||
|
||||
/// Timestamp of last persist.
|
||||
fn last_persist_at(&self) -> Instant;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -38,6 +42,8 @@ pub struct FileStorage {
|
||||
|
||||
/// Last state persisted to disk.
|
||||
state: SafeKeeperState,
|
||||
/// Not preserved across restarts.
|
||||
last_persist_at: Instant,
|
||||
}
|
||||
|
||||
impl FileStorage {
|
||||
@@ -51,6 +57,7 @@ impl FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
state,
|
||||
last_persist_at: Instant::now(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -66,6 +73,7 @@ impl FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
state,
|
||||
last_persist_at: Instant::now(),
|
||||
};
|
||||
|
||||
Ok(store)
|
||||
@@ -216,6 +224,10 @@ impl Storage for FileStorage {
|
||||
self.state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn last_persist_at(&self) -> Instant {
|
||||
self.last_persist_at
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -17,6 +17,9 @@ pub fn thread_main(conf: SafeKeeperConf) {
|
||||
let ttid = tli.ttid;
|
||||
let _enter =
|
||||
info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered();
|
||||
if let Err(e) = tli.maybe_pesist_control_file() {
|
||||
warn!("failed to persist control file: {e}");
|
||||
}
|
||||
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
|
||||
warn!("failed to remove WAL: {}", e);
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::cmp::max;
|
||||
use std::cmp::min;
|
||||
use std::fmt;
|
||||
use std::io::Read;
|
||||
use std::time::Duration;
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
|
||||
use tracing::*;
|
||||
@@ -837,6 +838,26 @@ where
|
||||
self.state.persist(&state)
|
||||
}
|
||||
|
||||
/// Persist control file if there is something to save and enough time
|
||||
/// passed after the last save.
|
||||
pub fn maybe_persist_control_file(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> {
|
||||
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
|
||||
if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
|
||||
return Ok(());
|
||||
}
|
||||
let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
|
||||
|| self.inmem.backup_lsn > self.state.backup_lsn
|
||||
|| self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
|
||||
|| inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
|
||||
if need_persist {
|
||||
let mut state = self.state.clone();
|
||||
state.remote_consistent_lsn = inmem_remote_consistent_lsn;
|
||||
self.persist_control_file(state)?;
|
||||
trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle request to append WAL.
|
||||
#[allow(clippy::comparison_chain)]
|
||||
fn handle_append_request(
|
||||
@@ -949,9 +970,8 @@ where
|
||||
|
||||
if sync_control_file {
|
||||
let mut state = self.state.clone();
|
||||
// Note: we do not persist remote_consistent_lsn in other paths of
|
||||
// persisting cf -- that is not much needed currently. We could do
|
||||
// that by storing Arc to walsenders in Safekeeper.
|
||||
// Note: we could make remote_consistent_lsn update in cf common by
|
||||
// storing Arc to walsenders in Safekeeper.
|
||||
state.remote_consistent_lsn = new_remote_consistent_lsn;
|
||||
self.persist_control_file(state)?;
|
||||
}
|
||||
@@ -981,7 +1001,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::wal_storage::Storage;
|
||||
use std::ops::Deref;
|
||||
use std::{ops::Deref, time::Instant};
|
||||
|
||||
// fake storage for tests
|
||||
struct InMemoryState {
|
||||
@@ -993,6 +1013,10 @@ mod tests {
|
||||
self.persisted_state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn last_persist_at(&self) -> Instant {
|
||||
Instant::now()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for InMemoryState {
|
||||
|
||||
@@ -234,7 +234,6 @@ impl SharedState {
|
||||
flush_lsn: self.sk.wal_store.flush_lsn().0,
|
||||
// note: this value is not flushed to control file yet and can be lost
|
||||
commit_lsn: self.sk.inmem.commit_lsn.0,
|
||||
// TODO: rework feedbacks to avoid max here
|
||||
remote_consistent_lsn: remote_consistent_lsn.0,
|
||||
peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
|
||||
safekeeper_connstr: conf.listen_pg_addr.clone(),
|
||||
@@ -673,6 +672,17 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persist control file if there is something to save and enough time
|
||||
/// passed after the last save. This helps to keep remote_consistent_lsn up
|
||||
/// to date so that storage nodes restart doesn't cause many pageserver ->
|
||||
/// safekeeper reconnections.
|
||||
pub fn maybe_pesist_control_file(&self) -> Result<()> {
|
||||
let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
|
||||
self.write_shared_state()
|
||||
.sk
|
||||
.maybe_persist_control_file(remote_consistent_lsn)
|
||||
}
|
||||
|
||||
/// Returns full timeline info, required for the metrics. If the timeline is
|
||||
/// not active, returns None instead.
|
||||
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
|
||||
|
||||
Reference in New Issue
Block a user