diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 25b5388370..bfb7c0cc82 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -13,6 +13,7 @@ pub mod s3_offload; pub mod safekeeper; pub mod send_wal; pub mod timeline; +pub mod upgrade; pub mod wal_service; pub mod defaults { diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 2a15bb3fc6..fb11f60d74 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -30,7 +30,7 @@ use zenith_utils::pq_proto::SystemId; use zenith_utils::zid::{ZTenantId, ZTimelineId}; pub const SK_MAGIC: u32 = 0xcafeceefu32; -pub const SK_FORMAT_VERSION: u32 = 1; +pub const SK_FORMAT_VERSION: u32 = 2; const SK_PROTOCOL_VERSION: u32 = 1; const UNKNOWN_SERVER_VERSION: u32 = 0; @@ -102,7 +102,7 @@ impl fmt::Debug for TermHistory { } /// Unique id of proposer. Not needed for correctness, used for monitoring. -type PgUuid = [u8; 16]; +pub type PgUuid = [u8; 16]; /// Persistent consensus state of the acceptor. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -140,12 +140,9 @@ pub struct ServerInfo { } /// Persistent information stored on safekeeper node +/// On disk data is prefixed by magic and format version and followed by checksum. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SafeKeeperState { - /// magic for verifying content the control file - pub magic: u32, - /// safekeeper format version - pub format_version: u32, /// persistent acceptor state pub acceptor_state: AcceptorState, /// information about server @@ -166,8 +163,6 @@ pub struct SafeKeeperState { impl SafeKeeperState { pub fn new() -> SafeKeeperState { SafeKeeperState { - magic: SK_MAGIC, - format_version: SK_FORMAT_VERSION, acceptor_state: AcceptorState { term: 0, term_history: TermHistory::empty(), diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 9e48a833d4..a612a3f727 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -2,6 +2,7 @@ //! persistence and support for interaction between sending and receiving wal. use anyhow::{anyhow, bail, ensure, Context, Result}; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use fs2::FileExt; use lazy_static::lazy_static; use log::*; @@ -23,6 +24,7 @@ use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, Storage, SK_FORMAT_VERSION, SK_MAGIC, }; +use crate::upgrade::upgrade_control_file; use crate::SafeKeeperConf; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; use std::convert::TryInto; @@ -34,7 +36,7 @@ const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; // dedicated lockfile to prevent running several safekeepers on the same data const LOCK_FILE_NAME: &str = "safekeeper.lock"; const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); -const CHECKSUM_SIZE: usize = std::mem::size_of::(); +pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Replica status: host standby feedback + disk consistent lsn #[derive(Debug, Clone, Copy)] @@ -134,7 +136,7 @@ impl SharedState { timelineid: ZTimelineId, create: CreateControlFile, ) -> Result { - let (file_storage, state) = SharedState::load_from_control_file(conf, timelineid, create) + let (file_storage, state) = FileStorage::load_from_control_file(conf, timelineid, create) .with_context(|| "failed to load from control file")?; let flush_lsn = if state.server.wal_seg_size != 0 { let wal_dir = conf.timeline_dir(&timelineid); @@ -155,111 +157,6 @@ impl SharedState { replicas: Vec::new(), }) } - - /// Fetch and lock control file (prevent running more than one instance of safekeeper) - /// If create=false and file doesn't exist, bails out. - fn load_from_control_file( - conf: &SafeKeeperConf, - timelineid: ZTimelineId, - create: CreateControlFile, - ) -> Result<(FileStorage, SafeKeeperState)> { - let timeline_dir = conf.timeline_dir(&timelineid); - - let control_file_path = timeline_dir.join(CONTROL_FILE_NAME); - let lock_file_path = timeline_dir.join(LOCK_FILE_NAME); - - info!( - "loading control file {}, create={:?} lock file {:?}", - control_file_path.display(), - create, - lock_file_path.display(), - ); - - let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?; - - // Lock file to prevent two or more active safekeepers - lock_file.try_lock_exclusive().map_err(|e| { - anyhow!( - "control file {:?} is locked by some other process: {}", - &control_file_path, - e - ) - })?; - - let mut control_file = OpenOptions::new() - .read(true) - .write(true) - .create(matches!(create, CreateControlFile::True)) - .open(&control_file_path) - .with_context(|| { - format!( - "failed to open control file at {}", - control_file_path.display(), - ) - })?; - - // Empty file is legit on 'create', don't try to deser from it. - let state = if control_file.metadata().unwrap().len() == 0 { - if let CreateControlFile::False = create { - bail!("control file is empty"); - } - SafeKeeperState::new() - } else { - let mut buf = Vec::new(); - control_file - .read_to_end(&mut buf) - .with_context(|| "failed to read control file")?; - - let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); - - let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = - buf[buf.len() - CHECKSUM_SIZE..].try_into()?; - let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); - - ensure!( - calculated_checksum == expected_checksum, - format!( - "safe keeper state checksum mismatch expected {} got {}", - expected_checksum, calculated_checksum - ) - ); - - let state = - SafeKeeperState::des(&buf[..buf.len() - CHECKSUM_SIZE]).with_context(|| { - format!( - "failed to deserialize safe keeper state from control file at {}", - control_file_path.display(), - ) - })?; - - if state.magic != SK_MAGIC { - bail!("bad control file magic: {}", state.magic); - } - if state.format_version != SK_FORMAT_VERSION { - bail!( - "Got incompatible format version, expected {}, got {}", - SK_FORMAT_VERSION, - state.format_version, - ); - } - state - }; - - let timelineid_str = format!("{}", timelineid); - - Ok(( - FileStorage { - lock_file, - timeline_dir, - conf: conf.clone(), - persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS - .with_label_values(&[&timelineid_str]), - persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS - .with_label_values(&[&timelineid_str]), - }, - state, - )) - } } /// Database instance (tenant) @@ -445,6 +342,117 @@ struct FileStorage { persist_nosync_control_file_seconds: Histogram, } +impl FileStorage { + // Check the magic/version in the on-disk data and deserialize it, if possible. + fn deser_sk_state(buf: &mut &[u8]) -> Result { + // Read the version independent part + let magic = buf.read_u32::()?; + if magic != SK_MAGIC { + bail!( + "bad control file magic: {:X}, expected {:X}", + magic, + SK_MAGIC + ); + } + let version = buf.read_u32::()?; + if version == SK_FORMAT_VERSION { + let res = SafeKeeperState::des(buf)?; + return Ok(res); + } + // try to upgrade + upgrade_control_file(buf, version) + } + + /// Fetch and lock control file (prevent running more than one instance of safekeeper) + /// If create=false and file doesn't exist, bails out. + fn load_from_control_file( + conf: &SafeKeeperConf, + timelineid: ZTimelineId, + create: CreateControlFile, + ) -> Result<(FileStorage, SafeKeeperState)> { + let timeline_dir = conf.timeline_dir(&timelineid); + + let control_file_path = timeline_dir.join(CONTROL_FILE_NAME); + let lock_file_path = timeline_dir.join(LOCK_FILE_NAME); + + info!( + "loading control file {}, create={:?} lock file {:?}", + control_file_path.display(), + create, + lock_file_path.display(), + ); + + let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?; + + // Lock file to prevent two or more active safekeepers + lock_file.try_lock_exclusive().map_err(|e| { + anyhow!( + "control file {:?} is locked by some other process: {}", + &control_file_path, + e + ) + })?; + + let mut control_file = OpenOptions::new() + .read(true) + .write(true) + .create(matches!(create, CreateControlFile::True)) + .open(&control_file_path) + .with_context(|| { + format!( + "failed to open control file at {}", + control_file_path.display(), + ) + })?; + + // Empty file is legit on 'create', don't try to deser from it. + let state = if control_file.metadata().unwrap().len() == 0 { + if let CreateControlFile::False = create { + bail!("control file is empty"); + } + SafeKeeperState::new() + } else { + let mut buf = Vec::new(); + control_file + .read_to_end(&mut buf) + .with_context(|| "failed to read control file")?; + + let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); + + let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = + buf[buf.len() - CHECKSUM_SIZE..].try_into()?; + let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); + + ensure!( + calculated_checksum == expected_checksum, + format!( + "safekeeper control file checksum mismatch: expected {} got {}", + expected_checksum, calculated_checksum + ) + ); + + FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context( + || format!("while reading control file {}", control_file_path.display(),), + )? + }; + + let timelineid_str = format!("{}", timelineid); + + Ok(( + FileStorage { + lock_file, + timeline_dir, + conf: conf.clone(), + persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS + .with_label_values(&[&timelineid_str]), + persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS + .with_label_values(&[&timelineid_str]), + }, + state, + )) + } +} + impl Storage for FileStorage { // persists state durably to underlying storage // for description see https://lwn.net/Articles/457667/ @@ -464,7 +472,11 @@ impl Storage for FileStorage { &control_partial_path.display() ) })?; - let mut buf = s.ser().with_context(|| "failed to serialize state")?; + let mut buf: Vec = Vec::new(); + buf.write_u32::(SK_MAGIC)?; + buf.write_u32::(SK_FORMAT_VERSION)?; + s.ser_into(&mut buf)?; + // calculate checksum before resize let checksum = crc32c::crc32c(&buf); buf.extend_from_slice(&checksum.to_le_bytes()); @@ -682,7 +694,7 @@ mod test { use super::FileStorage; use crate::{ safekeeper::{SafeKeeperState, Storage}, - timeline::{CreateControlFile, SharedState, CONTROL_FILE_NAME}, + timeline::{CreateControlFile, CONTROL_FILE_NAME}, SafeKeeperConf, }; use anyhow::Result; @@ -704,7 +716,7 @@ mod test { ) -> Result<(FileStorage, SafeKeeperState)> { fs::create_dir_all(&conf.timeline_dir(&timeline_id)) .expect("failed to create timeline dir"); - SharedState::load_from_control_file(conf, timeline_id, create) + FileStorage::load_from_control_file(conf, timeline_id, create) } #[test] @@ -749,7 +761,7 @@ mod test { match load_from_control_file(&conf, timeline_id, CreateControlFile::False) { Err(err) => assert!(err .to_string() - .contains("safe keeper state checksum mismatch")), + .contains("safekeeper control file checksum mismatch")), Ok(_) => panic!("expected error"), } } diff --git a/walkeeper/src/upgrade.rs b/walkeeper/src/upgrade.rs new file mode 100644 index 0000000000..31ede7d4b4 --- /dev/null +++ b/walkeeper/src/upgrade.rs @@ -0,0 +1,60 @@ +//! Code to deal with safekeeper control file upgrades +use crate::safekeeper::{ + AcceptorState, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry, +}; +use anyhow::{bail, Result}; +use log::*; +use serde::{Deserialize, Serialize}; +use zenith_utils::{bin_ser::LeSer, lsn::Lsn}; + +/// Persistent consensus state of the acceptor. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct AcceptorStateV1 { + /// acceptor's last term it voted for (advanced in 1 phase) + term: Term, + /// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached). + epoch: Term, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SafeKeeperStateV1 { + /// persistent acceptor state + acceptor_state: AcceptorStateV1, + /// information about server + server: ServerInfo, + /// Unique id of the last *elected* proposer we dealed with. Not needed + /// for correctness, exists for monitoring purposes. + proposer_uuid: PgUuid, + /// part of WAL acknowledged by quorum and available locally + commit_lsn: Lsn, + /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn + /// of last record streamed to everyone) + truncate_lsn: Lsn, + // Safekeeper starts receiving WAL from this LSN, zeros before it ought to + // be skipped during decoding. + wal_start_lsn: Lsn, +} + +pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { + // migrate to storing full term history + if version == 1 { + info!("reading safekeeper control file version {}", version); + let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?; + let ac = AcceptorState { + term: oldstate.acceptor_state.term, + term_history: TermHistory(vec![TermSwitchEntry { + term: oldstate.acceptor_state.epoch, + lsn: Lsn(0), + }]), + }; + return Ok(SafeKeeperState { + acceptor_state: ac, + server: oldstate.server.clone(), + proposer_uuid: oldstate.proposer_uuid, + commit_lsn: oldstate.commit_lsn, + truncate_lsn: oldstate.truncate_lsn, + wal_start_lsn: oldstate.wal_start_lsn, + }); + } + bail!("unsupported safekeeper control file version {}", version) +}