Bump safekeeper control file version and allow reading the previous one.

Should have been a part of cba4da3f4d to provide upgrade for previously
existing clusters. Separates version independent header (magic + version) out of
SafeKeeperState to choose what to deserialize.
This commit is contained in:
Arseny Sher
2021-12-06 18:08:16 +03:00
parent 0a8c672630
commit bd34d7ecfc
4 changed files with 187 additions and 119 deletions

View File

@@ -13,6 +13,7 @@ pub mod s3_offload;
pub mod safekeeper;
pub mod send_wal;
pub mod timeline;
pub mod upgrade;
pub mod wal_service;
pub mod defaults {

View File

@@ -30,7 +30,7 @@ use zenith_utils::pq_proto::SystemId;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 1;
pub const SK_FORMAT_VERSION: u32 = 2;
const SK_PROTOCOL_VERSION: u32 = 1;
const UNKNOWN_SERVER_VERSION: u32 = 0;
@@ -102,7 +102,7 @@ impl fmt::Debug for TermHistory {
}
/// Unique id of proposer. Not needed for correctness, used for monitoring.
type PgUuid = [u8; 16];
pub type PgUuid = [u8; 16];
/// Persistent consensus state of the acceptor.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -140,12 +140,9 @@ pub struct ServerInfo {
}
/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SafeKeeperState {
/// magic for verifying content the control file
pub magic: u32,
/// safekeeper format version
pub format_version: u32,
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
@@ -166,8 +163,6 @@ pub struct SafeKeeperState {
impl SafeKeeperState {
pub fn new() -> SafeKeeperState {
SafeKeeperState {
magic: SK_MAGIC,
format_version: SK_FORMAT_VERSION,
acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),

View File

@@ -2,6 +2,7 @@
//! persistence and support for interaction between sending and receiving wal.
use anyhow::{anyhow, bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fs2::FileExt;
use lazy_static::lazy_static;
use log::*;
@@ -23,6 +24,7 @@ use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
Storage, SK_FORMAT_VERSION, SK_MAGIC,
};
use crate::upgrade::upgrade_control_file;
use crate::SafeKeeperConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
use std::convert::TryInto;
@@ -34,7 +36,7 @@ const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
// dedicated lockfile to prevent running several safekeepers on the same data
const LOCK_FILE_NAME: &str = "safekeeper.lock";
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
/// Replica status: host standby feedback + disk consistent lsn
#[derive(Debug, Clone, Copy)]
@@ -134,7 +136,7 @@ impl SharedState {
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<Self> {
let (file_storage, state) = SharedState::load_from_control_file(conf, timelineid, create)
let (file_storage, state) = FileStorage::load_from_control_file(conf, timelineid, create)
.with_context(|| "failed to load from control file")?;
let flush_lsn = if state.server.wal_seg_size != 0 {
let wal_dir = conf.timeline_dir(&timelineid);
@@ -155,111 +157,6 @@ impl SharedState {
replicas: Vec::new(),
})
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_from_control_file(
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<(FileStorage, SafeKeeperState)> {
let timeline_dir = conf.timeline_dir(&timelineid);
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
let lock_file_path = timeline_dir.join(LOCK_FILE_NAME);
info!(
"loading control file {}, create={:?} lock file {:?}",
control_file_path.display(),
create,
lock_file_path.display(),
);
let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?;
// Lock file to prevent two or more active safekeepers
lock_file.try_lock_exclusive().map_err(|e| {
anyhow!(
"control file {:?} is locked by some other process: {}",
&control_file_path,
e
)
})?;
let mut control_file = OpenOptions::new()
.read(true)
.write(true)
.create(matches!(create, CreateControlFile::True))
.open(&control_file_path)
.with_context(|| {
format!(
"failed to open control file at {}",
control_file_path.display(),
)
})?;
// Empty file is legit on 'create', don't try to deser from it.
let state = if control_file.metadata().unwrap().len() == 0 {
if let CreateControlFile::False = create {
bail!("control file is empty");
}
SafeKeeperState::new()
} else {
let mut buf = Vec::new();
control_file
.read_to_end(&mut buf)
.with_context(|| "failed to read control file")?;
let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
ensure!(
calculated_checksum == expected_checksum,
format!(
"safe keeper state checksum mismatch expected {} got {}",
expected_checksum, calculated_checksum
)
);
let state =
SafeKeeperState::des(&buf[..buf.len() - CHECKSUM_SIZE]).with_context(|| {
format!(
"failed to deserialize safe keeper state from control file at {}",
control_file_path.display(),
)
})?;
if state.magic != SK_MAGIC {
bail!("bad control file magic: {}", state.magic);
}
if state.format_version != SK_FORMAT_VERSION {
bail!(
"Got incompatible format version, expected {}, got {}",
SK_FORMAT_VERSION,
state.format_version,
);
}
state
};
let timelineid_str = format!("{}", timelineid);
Ok((
FileStorage {
lock_file,
timeline_dir,
conf: conf.clone(),
persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
},
state,
))
}
}
/// Database instance (tenant)
@@ -445,6 +342,117 @@ struct FileStorage {
persist_nosync_control_file_seconds: Histogram,
}
impl FileStorage {
// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
// Read the version independent part
let magic = buf.read_u32::<LittleEndian>()?;
if magic != SK_MAGIC {
bail!(
"bad control file magic: {:X}, expected {:X}",
magic,
SK_MAGIC
);
}
let version = buf.read_u32::<LittleEndian>()?;
if version == SK_FORMAT_VERSION {
let res = SafeKeeperState::des(buf)?;
return Ok(res);
}
// try to upgrade
upgrade_control_file(buf, version)
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_from_control_file(
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<(FileStorage, SafeKeeperState)> {
let timeline_dir = conf.timeline_dir(&timelineid);
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
let lock_file_path = timeline_dir.join(LOCK_FILE_NAME);
info!(
"loading control file {}, create={:?} lock file {:?}",
control_file_path.display(),
create,
lock_file_path.display(),
);
let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?;
// Lock file to prevent two or more active safekeepers
lock_file.try_lock_exclusive().map_err(|e| {
anyhow!(
"control file {:?} is locked by some other process: {}",
&control_file_path,
e
)
})?;
let mut control_file = OpenOptions::new()
.read(true)
.write(true)
.create(matches!(create, CreateControlFile::True))
.open(&control_file_path)
.with_context(|| {
format!(
"failed to open control file at {}",
control_file_path.display(),
)
})?;
// Empty file is legit on 'create', don't try to deser from it.
let state = if control_file.metadata().unwrap().len() == 0 {
if let CreateControlFile::False = create {
bail!("control file is empty");
}
SafeKeeperState::new()
} else {
let mut buf = Vec::new();
control_file
.read_to_end(&mut buf)
.with_context(|| "failed to read control file")?;
let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
ensure!(
calculated_checksum == expected_checksum,
format!(
"safekeeper control file checksum mismatch: expected {} got {}",
expected_checksum, calculated_checksum
)
);
FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context(
|| format!("while reading control file {}", control_file_path.display(),),
)?
};
let timelineid_str = format!("{}", timelineid);
Ok((
FileStorage {
lock_file,
timeline_dir,
conf: conf.clone(),
persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
},
state,
))
}
}
impl Storage for FileStorage {
// persists state durably to underlying storage
// for description see https://lwn.net/Articles/457667/
@@ -464,7 +472,11 @@ impl Storage for FileStorage {
&control_partial_path.display()
)
})?;
let mut buf = s.ser().with_context(|| "failed to serialize state")?;
let mut buf: Vec<u8> = Vec::new();
buf.write_u32::<LittleEndian>(SK_MAGIC)?;
buf.write_u32::<LittleEndian>(SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
@@ -682,7 +694,7 @@ mod test {
use super::FileStorage;
use crate::{
safekeeper::{SafeKeeperState, Storage},
timeline::{CreateControlFile, SharedState, CONTROL_FILE_NAME},
timeline::{CreateControlFile, CONTROL_FILE_NAME},
SafeKeeperConf,
};
use anyhow::Result;
@@ -704,7 +716,7 @@ mod test {
) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(&conf.timeline_dir(&timeline_id))
.expect("failed to create timeline dir");
SharedState::load_from_control_file(conf, timeline_id, create)
FileStorage::load_from_control_file(conf, timeline_id, create)
}
#[test]
@@ -749,7 +761,7 @@ mod test {
match load_from_control_file(&conf, timeline_id, CreateControlFile::False) {
Err(err) => assert!(err
.to_string()
.contains("safe keeper state checksum mismatch")),
.contains("safekeeper control file checksum mismatch")),
Ok(_) => panic!("expected error"),
}
}

60
walkeeper/src/upgrade.rs Normal file
View File

@@ -0,0 +1,60 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry,
};
use anyhow::{bail, Result};
use log::*;
use serde::{Deserialize, Serialize};
use zenith_utils::{bin_ser::LeSer, lsn::Lsn};
/// Persistent consensus state of the acceptor.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct AcceptorStateV1 {
/// acceptor's last term it voted for (advanced in 1 phase)
term: Term,
/// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached).
epoch: Term,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SafeKeeperStateV1 {
/// persistent acceptor state
acceptor_state: AcceptorStateV1,
/// information about server
server: ServerInfo,
/// Unique id of the last *elected* proposer we dealed with. Not needed
/// for correctness, exists for monitoring purposes.
proposer_uuid: PgUuid,
/// part of WAL acknowledged by quorum and available locally
commit_lsn: Lsn,
/// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone)
truncate_lsn: Lsn,
// Safekeeper starts receiving WAL from this LSN, zeros before it ought to
// be skipped during decoding.
wal_start_lsn: Lsn,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
// migrate to storing full term history
if version == 1 {
info!("reading safekeeper control file version {}", version);
let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
let ac = AcceptorState {
term: oldstate.acceptor_state.term,
term_history: TermHistory(vec![TermSwitchEntry {
term: oldstate.acceptor_state.epoch,
lsn: Lsn(0),
}]),
};
return Ok(SafeKeeperState {
acceptor_state: ac,
server: oldstate.server.clone(),
proposer_uuid: oldstate.proposer_uuid,
commit_lsn: oldstate.commit_lsn,
truncate_lsn: oldstate.truncate_lsn,
wal_start_lsn: oldstate.wal_start_lsn,
});
}
bail!("unsupported safekeeper control file version {}", version)
}