diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index cd82e43780..06e5afbf74 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -14,12 +14,10 @@ use std::path::Path; use std::time::Instant; use crate::control_file_upgrade::downgrade_v9_to_v8; +use crate::control_file_upgrade::upgrade_control_file; use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; use crate::state::{EvictionState, TimelinePersistentState}; -use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir}; -use utils::{bin_ser::LeSer, id::TenantTimelineId}; - -use crate::SafeKeeperConf; +use utils::bin_ser::LeSer; pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_FORMAT_VERSION: u32 = 9; @@ -54,13 +52,12 @@ pub struct FileStorage { impl FileStorage { /// Initialize storage by loading state from disk. - pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result { - let timeline_dir = get_timeline_dir(conf, ttid); - let state = Self::load_control_file_from_dir(&timeline_dir)?; + pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result { + let state = Self::load_control_file_from_dir(timeline_dir)?; Ok(FileStorage { - timeline_dir, - no_sync: conf.no_sync, + timeline_dir: timeline_dir.to_path_buf(), + no_sync, state, last_persist_at: Instant::now(), }) @@ -71,16 +68,16 @@ impl FileStorage { /// Note: we normally call this in temp directory for atomic init, so /// interested in FileStorage as a result only in tests. pub async fn create_new( - dir: Utf8PathBuf, - conf: &SafeKeeperConf, + timeline_dir: &Utf8Path, state: TimelinePersistentState, + no_sync: bool, ) -> Result { // we don't support creating new timelines in offloaded state assert!(matches!(state.eviction_state, EvictionState::Present)); let mut store = FileStorage { - timeline_dir: dir, - no_sync: conf.no_sync, + timeline_dir: timeline_dir.to_path_buf(), + no_sync, state: state.clone(), last_persist_at: Instant::now(), }; @@ -239,89 +236,46 @@ mod test { use tokio::fs; use utils::lsn::Lsn; - fn stub_conf() -> SafeKeeperConf { - let workdir = camino_tempfile::tempdir().unwrap().into_path(); - SafeKeeperConf { - workdir, - ..SafeKeeperConf::dummy() - } - } + const NO_SYNC: bool = true; - async fn load_from_control_file( - conf: &SafeKeeperConf, - ttid: &TenantTimelineId, - ) -> Result<(FileStorage, TimelinePersistentState)> { - let timeline_dir = get_timeline_dir(conf, ttid); - fs::create_dir_all(&timeline_dir) - .await - .expect("failed to create timeline dir"); - Ok(( - FileStorage::restore_new(ttid, conf)?, - FileStorage::load_control_file_from_dir(&timeline_dir)?, - )) - } + #[tokio::test] + async fn test_read_write_safekeeper_state() -> anyhow::Result<()> { + let tempdir = camino_tempfile::tempdir()?; + let mut state = TimelinePersistentState::empty(); + let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?; - async fn create( - conf: &SafeKeeperConf, - ttid: &TenantTimelineId, - ) -> Result<(FileStorage, TimelinePersistentState)> { - let timeline_dir = get_timeline_dir(conf, ttid); - fs::create_dir_all(&timeline_dir) - .await - .expect("failed to create timeline dir"); - let state = TimelinePersistentState::empty(); - let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?; - Ok((storage, state)) + // Make a change. + state.commit_lsn = Lsn(42); + storage.persist(&state).await?; + + // Reload the state. It should match the previously persisted state. + let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?; + assert_eq!(loaded_state, state); + Ok(()) } #[tokio::test] - async fn test_read_write_safekeeper_state() { - let conf = stub_conf(); - let ttid = TenantTimelineId::generate(); - { - let (mut storage, mut state) = - create(&conf, &ttid).await.expect("failed to create state"); - // change something - state.commit_lsn = Lsn(42); - storage - .persist(&state) - .await - .expect("failed to persist state"); - } - - let (_, state) = load_from_control_file(&conf, &ttid) - .await - .expect("failed to read state"); - assert_eq!(state.commit_lsn, Lsn(42)); - } - - #[tokio::test] - async fn test_safekeeper_state_checksum_mismatch() { - let conf = stub_conf(); - let ttid = TenantTimelineId::generate(); - { - let (mut storage, mut state) = - create(&conf, &ttid).await.expect("failed to read state"); - - // change something - state.commit_lsn = Lsn(42); - storage - .persist(&state) - .await - .expect("failed to persist state"); - } - let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME); - let mut data = fs::read(&control_path).await.unwrap(); - data[0] += 1; // change the first byte of the file to fail checksum validation - fs::write(&control_path, &data) - .await - .expect("failed to write control file"); - - match load_from_control_file(&conf, &ttid).await { - Err(err) => assert!(err - .to_string() - .contains("safekeeper control file checksum mismatch")), - Ok(_) => panic!("expected error"), + async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> { + let tempdir = camino_tempfile::tempdir()?; + let mut state = TimelinePersistentState::empty(); + let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?; + + // Make a change. + state.commit_lsn = Lsn(42); + storage.persist(&state).await?; + + // Change the first byte to fail checksum validation. + let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME); + let mut data = fs::read(&ctrl_path).await?; + data[0] += 1; + fs::write(&ctrl_path, &data).await?; + + // Loading the file should fail checksum validation. + if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) { + assert!(err.to_string().contains("control file checksum mismatch")) + } else { + panic!("expected checksum error") } + Ok(()) } } diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs index 52b13dc5e3..1bf0cc668f 100644 --- a/safekeeper/src/copy_timeline.rs +++ b/safekeeper/src/copy_timeline.rs @@ -154,7 +154,7 @@ pub async fn handle_request(request: Request) -> Result<()> { new_state.peer_horizon_lsn = request.until_lsn; new_state.backup_lsn = new_backup_lsn; - FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone()).await?; + FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?; // now we have a ready timeline in a temp directory validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?; diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 277becb96b..b1cddaf062 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -113,6 +113,7 @@ impl SafeKeeperConf { impl SafeKeeperConf { #[cfg(test)] + #[allow(unused)] fn dummy() -> Self { SafeKeeperConf { workdir: Utf8PathBuf::from("./"), diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index dd4d161226..c737dfcf9b 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -328,15 +328,19 @@ impl SharedState { /// Restore SharedState from control file. If file doesn't exist, bails out. fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result { let timeline_dir = get_timeline_dir(conf, ttid); - let control_store = control_file::FileStorage::restore_new(ttid, conf)?; + let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?; if control_store.server.wal_seg_size == 0 { bail!(TimelineError::UninitializedWalSegSize(*ttid)); } let sk = match control_store.eviction_state { EvictionState::Present => { - let wal_store = - wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?; + let wal_store = wal_storage::PhysicalStorage::new( + ttid, + &timeline_dir, + &control_store, + conf.no_sync, + )?; StateSK::Loaded(SafeKeeper::new( TimelineState::new(control_store), wal_store, @@ -1046,9 +1050,9 @@ impl ManagerTimeline { // trying to restore WAL storage let wal_store = wal_storage::PhysicalStorage::new( &self.ttid, - self.timeline_dir.clone(), - &conf, + &self.timeline_dir, shared.sk.state(), + conf.no_sync, )?; // updating control file diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 538bb6e5d2..33d94da034 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -244,7 +244,7 @@ impl GlobalTimelines { // immediately initialize first WAL segment as well. let state = TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?; - control_file::FileStorage::create_new(tmp_dir_path.clone(), &conf, state).await?; + control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?; let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?; Ok(timeline) } @@ -596,7 +596,7 @@ pub async fn validate_temp_timeline( bail!("wal_seg_size is not set"); } - let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?; + let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?; let commit_lsn = control_store.commit_lsn; let flush_lsn = wal_store.flush_lsn(); diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 61d7825ae6..33b8bfe28e 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -29,7 +29,6 @@ use crate::metrics::{ }; use crate::state::TimelinePersistentState; use crate::wal_backup::{read_object, remote_timeline_path}; -use crate::SafeKeeperConf; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::XLogFileName; use postgres_ffi::XLOG_BLCKSZ; @@ -87,7 +86,9 @@ pub trait Storage { pub struct PhysicalStorage { metrics: WalStorageMetrics, timeline_dir: Utf8PathBuf, - conf: SafeKeeperConf, + + /// Disables fsync if true. + no_sync: bool, /// Size of WAL segment in bytes. wal_seg_size: usize, @@ -151,9 +152,9 @@ impl PhysicalStorage { /// the disk. Otherwise, all LSNs are set to zero. pub fn new( ttid: &TenantTimelineId, - timeline_dir: Utf8PathBuf, - conf: &SafeKeeperConf, + timeline_dir: &Utf8Path, state: &TimelinePersistentState, + no_sync: bool, ) -> Result { let wal_seg_size = state.server.wal_seg_size as usize; @@ -198,8 +199,8 @@ impl PhysicalStorage { Ok(PhysicalStorage { metrics: WalStorageMetrics::default(), - timeline_dir, - conf: conf.clone(), + timeline_dir: timeline_dir.to_path_buf(), + no_sync, wal_seg_size, pg_version: state.server.pg_version, system_id: state.server.system_id, @@ -224,7 +225,7 @@ impl PhysicalStorage { /// Call fdatasync if config requires so. async fn fdatasync_file(&mut self, file: &File) -> Result<()> { - if !self.conf.no_sync { + if !self.no_sync { self.metrics .observe_flush_seconds(time_io_closure(file.sync_data()).await?); } @@ -263,9 +264,7 @@ impl PhysicalStorage { // Note: this doesn't get into observe_flush_seconds metric. But // segment init should be separate metric, if any. - if let Err(e) = - durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await - { + if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await { // Probably rename succeeded, but fsync of it failed. Remove // the file then to avoid using it. remove_file(wal_file_partial_path)