mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
safekeeper: don't pass conf into storage constructors (#9523)
## Problem The storage components take an entire `SafekeeperConf` during construction, but only actually use the `no_sync` field. This makes it hard to understand the storage inputs (which fields do they actually care about?), and is also inconvenient for tests and benchmarks that need to set up a lot of unnecessary boilerplate. ## Summary of changes * Don't take the entire config, but pass in the `no_sync` field explicitly. * Take the timeline dir instead of `ttid` as an input, since it's the only thing it cares about. * Fix a couple of tests to not leak tempdirs. * Various minor tweaks.
This commit is contained in:
@@ -14,12 +14,10 @@ use std::path::Path;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::control_file_upgrade::downgrade_v9_to_v8;
|
||||
use crate::control_file_upgrade::upgrade_control_file;
|
||||
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
|
||||
use crate::state::{EvictionState, TimelinePersistentState};
|
||||
use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
|
||||
use utils::{bin_ser::LeSer, id::TenantTimelineId};
|
||||
|
||||
use crate::SafeKeeperConf;
|
||||
use utils::bin_ser::LeSer;
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 9;
|
||||
@@ -54,13 +52,12 @@ pub struct FileStorage {
|
||||
|
||||
impl FileStorage {
|
||||
/// Initialize storage by loading state from disk.
|
||||
pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
let state = Self::load_control_file_from_dir(&timeline_dir)?;
|
||||
pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
|
||||
let state = Self::load_control_file_from_dir(timeline_dir)?;
|
||||
|
||||
Ok(FileStorage {
|
||||
timeline_dir,
|
||||
no_sync: conf.no_sync,
|
||||
timeline_dir: timeline_dir.to_path_buf(),
|
||||
no_sync,
|
||||
state,
|
||||
last_persist_at: Instant::now(),
|
||||
})
|
||||
@@ -71,16 +68,16 @@ impl FileStorage {
|
||||
/// Note: we normally call this in temp directory for atomic init, so
|
||||
/// interested in FileStorage as a result only in tests.
|
||||
pub async fn create_new(
|
||||
dir: Utf8PathBuf,
|
||||
conf: &SafeKeeperConf,
|
||||
timeline_dir: &Utf8Path,
|
||||
state: TimelinePersistentState,
|
||||
no_sync: bool,
|
||||
) -> Result<FileStorage> {
|
||||
// we don't support creating new timelines in offloaded state
|
||||
assert!(matches!(state.eviction_state, EvictionState::Present));
|
||||
|
||||
let mut store = FileStorage {
|
||||
timeline_dir: dir,
|
||||
no_sync: conf.no_sync,
|
||||
timeline_dir: timeline_dir.to_path_buf(),
|
||||
no_sync,
|
||||
state: state.clone(),
|
||||
last_persist_at: Instant::now(),
|
||||
};
|
||||
@@ -239,89 +236,46 @@ mod test {
|
||||
use tokio::fs;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
fn stub_conf() -> SafeKeeperConf {
|
||||
let workdir = camino_tempfile::tempdir().unwrap().into_path();
|
||||
SafeKeeperConf {
|
||||
workdir,
|
||||
..SafeKeeperConf::dummy()
|
||||
}
|
||||
}
|
||||
const NO_SYNC: bool = true;
|
||||
|
||||
async fn load_from_control_file(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
) -> Result<(FileStorage, TimelinePersistentState)> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
fs::create_dir_all(&timeline_dir)
|
||||
.await
|
||||
.expect("failed to create timeline dir");
|
||||
Ok((
|
||||
FileStorage::restore_new(ttid, conf)?,
|
||||
FileStorage::load_control_file_from_dir(&timeline_dir)?,
|
||||
))
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
let mut state = TimelinePersistentState::empty();
|
||||
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
|
||||
|
||||
async fn create(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
) -> Result<(FileStorage, TimelinePersistentState)> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
fs::create_dir_all(&timeline_dir)
|
||||
.await
|
||||
.expect("failed to create timeline dir");
|
||||
let state = TimelinePersistentState::empty();
|
||||
let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
|
||||
Ok((storage, state))
|
||||
// Make a change.
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage.persist(&state).await?;
|
||||
|
||||
// Reload the state. It should match the previously persisted state.
|
||||
let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
|
||||
assert_eq!(loaded_state, state);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_write_safekeeper_state() {
|
||||
let conf = stub_conf();
|
||||
let ttid = TenantTimelineId::generate();
|
||||
{
|
||||
let (mut storage, mut state) =
|
||||
create(&conf, &ttid).await.expect("failed to create state");
|
||||
// change something
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage
|
||||
.persist(&state)
|
||||
.await
|
||||
.expect("failed to persist state");
|
||||
}
|
||||
|
||||
let (_, state) = load_from_control_file(&conf, &ttid)
|
||||
.await
|
||||
.expect("failed to read state");
|
||||
assert_eq!(state.commit_lsn, Lsn(42));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_safekeeper_state_checksum_mismatch() {
|
||||
let conf = stub_conf();
|
||||
let ttid = TenantTimelineId::generate();
|
||||
{
|
||||
let (mut storage, mut state) =
|
||||
create(&conf, &ttid).await.expect("failed to read state");
|
||||
|
||||
// change something
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage
|
||||
.persist(&state)
|
||||
.await
|
||||
.expect("failed to persist state");
|
||||
}
|
||||
let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
|
||||
let mut data = fs::read(&control_path).await.unwrap();
|
||||
data[0] += 1; // change the first byte of the file to fail checksum validation
|
||||
fs::write(&control_path, &data)
|
||||
.await
|
||||
.expect("failed to write control file");
|
||||
|
||||
match load_from_control_file(&conf, &ttid).await {
|
||||
Err(err) => assert!(err
|
||||
.to_string()
|
||||
.contains("safekeeper control file checksum mismatch")),
|
||||
Ok(_) => panic!("expected error"),
|
||||
async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
let mut state = TimelinePersistentState::empty();
|
||||
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
|
||||
|
||||
// Make a change.
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage.persist(&state).await?;
|
||||
|
||||
// Change the first byte to fail checksum validation.
|
||||
let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
|
||||
let mut data = fs::read(&ctrl_path).await?;
|
||||
data[0] += 1;
|
||||
fs::write(&ctrl_path, &data).await?;
|
||||
|
||||
// Loading the file should fail checksum validation.
|
||||
if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
|
||||
assert!(err.to_string().contains("control file checksum mismatch"))
|
||||
} else {
|
||||
panic!("expected checksum error")
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
|
||||
new_state.peer_horizon_lsn = request.until_lsn;
|
||||
new_state.backup_lsn = new_backup_lsn;
|
||||
|
||||
FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone()).await?;
|
||||
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
|
||||
|
||||
// now we have a ready timeline in a temp directory
|
||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
||||
|
||||
@@ -113,6 +113,7 @@ impl SafeKeeperConf {
|
||||
|
||||
impl SafeKeeperConf {
|
||||
#[cfg(test)]
|
||||
#[allow(unused)]
|
||||
fn dummy() -> Self {
|
||||
SafeKeeperConf {
|
||||
workdir: Utf8PathBuf::from("./"),
|
||||
|
||||
@@ -328,15 +328,19 @@ impl SharedState {
|
||||
/// Restore SharedState from control file. If file doesn't exist, bails out.
|
||||
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
|
||||
let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
|
||||
if control_store.server.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(*ttid));
|
||||
}
|
||||
|
||||
let sk = match control_store.eviction_state {
|
||||
EvictionState::Present => {
|
||||
let wal_store =
|
||||
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(
|
||||
ttid,
|
||||
&timeline_dir,
|
||||
&control_store,
|
||||
conf.no_sync,
|
||||
)?;
|
||||
StateSK::Loaded(SafeKeeper::new(
|
||||
TimelineState::new(control_store),
|
||||
wal_store,
|
||||
@@ -1046,9 +1050,9 @@ impl ManagerTimeline {
|
||||
// trying to restore WAL storage
|
||||
let wal_store = wal_storage::PhysicalStorage::new(
|
||||
&self.ttid,
|
||||
self.timeline_dir.clone(),
|
||||
&conf,
|
||||
&self.timeline_dir,
|
||||
shared.sk.state(),
|
||||
conf.no_sync,
|
||||
)?;
|
||||
|
||||
// updating control file
|
||||
|
||||
@@ -244,7 +244,7 @@ impl GlobalTimelines {
|
||||
// immediately initialize first WAL segment as well.
|
||||
let state =
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
|
||||
control_file::FileStorage::create_new(tmp_dir_path.clone(), &conf, state).await?;
|
||||
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
|
||||
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
@@ -596,7 +596,7 @@ pub async fn validate_temp_timeline(
|
||||
bail!("wal_seg_size is not set");
|
||||
}
|
||||
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
|
||||
|
||||
let commit_lsn = control_store.commit_lsn;
|
||||
let flush_lsn = wal_store.flush_lsn();
|
||||
|
||||
@@ -29,7 +29,6 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::state::TimelinePersistentState;
|
||||
use crate::wal_backup::{read_object, remote_timeline_path};
|
||||
use crate::SafeKeeperConf;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::XLOG_BLCKSZ;
|
||||
@@ -87,7 +86,9 @@ pub trait Storage {
|
||||
pub struct PhysicalStorage {
|
||||
metrics: WalStorageMetrics,
|
||||
timeline_dir: Utf8PathBuf,
|
||||
conf: SafeKeeperConf,
|
||||
|
||||
/// Disables fsync if true.
|
||||
no_sync: bool,
|
||||
|
||||
/// Size of WAL segment in bytes.
|
||||
wal_seg_size: usize,
|
||||
@@ -151,9 +152,9 @@ impl PhysicalStorage {
|
||||
/// the disk. Otherwise, all LSNs are set to zero.
|
||||
pub fn new(
|
||||
ttid: &TenantTimelineId,
|
||||
timeline_dir: Utf8PathBuf,
|
||||
conf: &SafeKeeperConf,
|
||||
timeline_dir: &Utf8Path,
|
||||
state: &TimelinePersistentState,
|
||||
no_sync: bool,
|
||||
) -> Result<PhysicalStorage> {
|
||||
let wal_seg_size = state.server.wal_seg_size as usize;
|
||||
|
||||
@@ -198,8 +199,8 @@ impl PhysicalStorage {
|
||||
|
||||
Ok(PhysicalStorage {
|
||||
metrics: WalStorageMetrics::default(),
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
timeline_dir: timeline_dir.to_path_buf(),
|
||||
no_sync,
|
||||
wal_seg_size,
|
||||
pg_version: state.server.pg_version,
|
||||
system_id: state.server.system_id,
|
||||
@@ -224,7 +225,7 @@ impl PhysicalStorage {
|
||||
|
||||
/// Call fdatasync if config requires so.
|
||||
async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
|
||||
if !self.conf.no_sync {
|
||||
if !self.no_sync {
|
||||
self.metrics
|
||||
.observe_flush_seconds(time_io_closure(file.sync_data()).await?);
|
||||
}
|
||||
@@ -263,9 +264,7 @@ impl PhysicalStorage {
|
||||
|
||||
// Note: this doesn't get into observe_flush_seconds metric. But
|
||||
// segment init should be separate metric, if any.
|
||||
if let Err(e) =
|
||||
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
|
||||
{
|
||||
if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
|
||||
// Probably rename succeeded, but fsync of it failed. Remove
|
||||
// the file then to avoid using it.
|
||||
remove_file(wal_file_partial_path)
|
||||
|
||||
Reference in New Issue
Block a user