Use more robust way to persist safekeeper control file.

Now safekeeper control file updated in a following way:
1. Write data to temp file
2. Fsync the temporary file (if sync option is specified)
3. Rename temporary file to actual control file
4. Fsync containing directory (if sync option is specified)
5. Fsync file after rename (if sync option is specified).

Note that action 5 is not mentioned anywhere as required but it is done
in postgres this way (see durable_rename).

Also because of the rename machinery switch to use dedicated lock file
to prevent running several safekeepers concurrently on the same data

cleanup

fsync control file after rename to match postgres behaviour
This commit is contained in:
Dmitry Rodionov
2021-10-21 20:04:38 +03:00
committed by Dmitry Rodionov
parent 229dc7704f
commit 07dddfed28
9 changed files with 293 additions and 107 deletions

1
Cargo.lock generated
View File

@@ -2360,6 +2360,7 @@ dependencies = [
"rust-s3",
"serde",
"serde_json",
"tempfile",
"tokio",
"tokio-stream",
"walkdir",

View File

@@ -38,3 +38,6 @@ postgres_ffi = { path = "../postgres_ffi" }
workspace_hack = { path = "../workspace_hack" }
zenith_metrics = { path = "../zenith_metrics" }
zenith_utils = { path = "../zenith_utils" }
[dev-dependencies]
tempfile = "3.2"

View File

@@ -6,7 +6,6 @@ use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use log::*;
use std::env;
use std::path::{Path, PathBuf};
use std::thread;
use zenith_utils::http::endpoint;
@@ -78,20 +77,7 @@ fn main() -> Result<()> {
)
.get_matches();
let mut conf = SafeKeeperConf {
// Always set to './'. We will chdir into the directory specified on the
// command line, so that when the server is running, all paths are relative
// to that.
workdir: PathBuf::from("./"),
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
ttl: None,
recall_period: None,
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
};
let mut conf: SafeKeeperConf = Default::default();
if let Some(dir) = arg_matches.value_of("datadir") {
// change into the data directory.

View File

@@ -2,6 +2,9 @@
use std::path::PathBuf;
use std::time::Duration;
use std::env;
use zenith_utils::zid::ZTimelineId;
pub mod http;
pub mod json_ctrl;
pub mod receive_wal;
@@ -42,3 +45,28 @@ pub struct SafeKeeperConf {
pub ttl: Option<Duration>,
pub recall_period: Option<Duration>,
}
impl SafeKeeperConf {
pub fn timeline_dir(&self, timelineid: &ZTimelineId) -> PathBuf {
self.workdir.join(timelineid.to_string())
}
}
impl Default for SafeKeeperConf {
fn default() -> Self {
SafeKeeperConf {
// Always set to './'. We will chdir into the directory specified on the
// command line, so that when the server is running, all paths are relative
// to that.
workdir: PathBuf::from("./"),
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
ttl: None,
recall_period: None,
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
}
}
}

View File

@@ -73,12 +73,12 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe
}
impl<'pg> ReceiveWalConn<'pg> {
pub fn new(pg: &'pg mut PostgresBackend) -> Result<ReceiveWalConn<'pg>> {
pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> {
let peer_addr = *pg.get_peer_addr();
Ok(ReceiveWalConn {
ReceiveWalConn {
pg_backend: pg,
peer_addr,
})
}
}
// Read and extract the bytes of a `CopyData` message from the postgres instance
@@ -142,7 +142,11 @@ impl<'pg> ReceiveWalConn<'pg> {
}
loop {
let reply = swh.timeline.get().process_msg(&msg)?;
let reply = swh
.timeline
.get()
.process_msg(&msg)
.with_context(|| "failed to process ProposerAcceptorMessage")?;
self.write_msg(&reply)?;
msg = self.read_msg()?;
}

View File

@@ -228,8 +228,8 @@ impl ReplicationConn {
// Open a new file.
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let timeline_id = swh.timeline.get().timelineid.to_string();
let wal_file_path = swh.conf.workdir.join(timeline_id).join(wal_file_name);
let timeline_id = swh.timeline.get().timelineid;
let wal_file_path = swh.conf.timeline_dir(&timeline_id).join(wal_file_name);
Self::open_wal_file(&wal_file_path)?
}
};

View File

@@ -1,5 +1,6 @@
//! Acceptor part of proposer-acceptor consensus algorithm.
use anyhow::Context;
use anyhow::{anyhow, bail, Result};
use byteorder::LittleEndian;
use byteorder::ReadBytesExt;
@@ -423,7 +424,9 @@ where
self.s.server.ztli = msg.ztli;
self.s.server.tli = msg.tli;
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage.persist(&self.s, true)?;
self.storage
.persist(&self.s, true)
.with_context(|| "failed to persist shared state")?;
self.metrics = SafeKeeperMetrics::new(self.s.server.ztli);

View File

@@ -7,7 +7,7 @@ use crate::receive_wal::ReceiveWalConn;
use crate::replication::ReplicationConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::SafeKeeperConf;
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use std::str::FromStr;
use std::sync::Arc;
@@ -74,7 +74,9 @@ impl postgres_backend::Handler for SendWalHandler {
} else if query_string.starts_with(b"START_REPLICATION") {
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
} else if query_string.starts_with(b"START_WAL_PUSH") {
ReceiveWalConn::new(pgb)?.run(self)?;
ReceiveWalConn::new(pgb)
.run(self)
.with_context(|| "failed to run ReceiveWalConn")?;
} else if query_string.starts_with(b"JSON_CTRL") {
handle_json_ctrl(self, pgb, &query_string)?;
} else {

View File

@@ -1,7 +1,7 @@
//! This module contains timeline id -> safekeeper state map with file-backed
//! persistence and support for interaction between sending and receiving wal.
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use fs2::FileExt;
use lazy_static::lazy_static;
use log::*;
@@ -9,7 +9,8 @@ use postgres_ffi::xlog_utils::find_end_of_wal;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
@@ -24,9 +25,16 @@ use crate::safekeeper::{
};
use crate::SafeKeeperConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
use std::convert::TryInto;
// contains persistent metadata for safekeeper
const CONTROL_FILE_NAME: &str = "safekeeper.control";
// needed to atomically update the state using `rename`
const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
// dedicated lockfile to prevent running several safekeepers on the same data
const LOCK_FILE_NAME: &str = "safekeeper.lock";
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
/// Replica status: host standby feedback + disk consistent lsn
#[derive(Debug, Clone, Copy)]
@@ -126,18 +134,10 @@ impl SharedState {
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<Self> {
let (cf, state) = SharedState::load_control_file(conf, timelineid, create)?;
let timelineid_str = format!("{}", timelineid);
let storage = FileStorage {
control_file: cf,
conf: conf.clone(),
persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
};
let (file_storage, state) = SharedState::load_from_control_file(conf, timelineid, create)
.with_context(|| "failed to load from control file")?;
let (flush_lsn, tli) = if state.server.wal_seg_size != 0 {
let wal_dir = conf.workdir.join(format!("{}", timelineid));
let wal_dir = conf.timeline_dir(&timelineid);
find_end_of_wal(
&wal_dir,
state.server.wal_seg_size as usize,
@@ -150,80 +150,114 @@ impl SharedState {
Ok(Self {
notified_commit_lsn: Lsn(0),
sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state),
sk: SafeKeeper::new(Lsn(flush_lsn), tli, file_storage, state),
replicas: Vec::new(),
})
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_control_file(
fn load_from_control_file(
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<(File, SafeKeeperState)> {
let control_file_path = conf
.workdir
.join(timelineid.to_string())
.join(CONTROL_FILE_NAME);
) -> Result<(FileStorage, SafeKeeperState)> {
let timeline_dir = conf.timeline_dir(&timelineid);
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
let lock_file_path = timeline_dir.join(LOCK_FILE_NAME);
info!(
"loading control file {}, create={:?}",
"loading control file {}, create={:?} lock file {:?}",
control_file_path.display(),
create
create,
lock_file_path.display(),
);
let mut opts = OpenOptions::new();
opts.read(true).write(true);
if let CreateControlFile::True = create {
opts.create(true);
}
match opts.open(&control_file_path) {
Ok(mut file) => {
// Lock file to prevent two or more active safekeepers
match file.try_lock_exclusive() {
Ok(()) => {}
Err(e) => {
bail!(
"control file {:?} is locked by some other process: {}",
&control_file_path,
e
);
}
}
// Empty file is legit on 'create', don't try to deser from it.
if file.metadata().unwrap().len() == 0 {
if let CreateControlFile::False = create {
bail!("control file is empty");
}
Ok((file, SafeKeeperState::new()))
} else {
match SafeKeeperState::des_from(&mut file) {
Err(e) => {
bail!("failed to read control file {:?}: {}", control_file_path, e);
}
Ok(s) => {
if s.magic != SK_MAGIC {
bail!("bad control file magic: {}", s.magic);
}
if s.format_version != SK_FORMAT_VERSION {
bail!(
"incompatible format version: {} vs. {}",
s.format_version,
SK_FORMAT_VERSION
);
}
Ok((file, s))
}
}
}
let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?;
// Lock file to prevent two or more active safekeepers
lock_file.try_lock_exclusive().map_err(|e| {
anyhow!(
"control file {:?} is locked by some other process: {}",
&control_file_path,
e
)
})?;
let mut control_file = OpenOptions::new()
.read(true)
.write(true)
.create(matches!(create, CreateControlFile::True))
.open(&control_file_path)
.with_context(|| {
format!(
"failed to open control file at {}",
control_file_path.display(),
)
})?;
// Empty file is legit on 'create', don't try to deser from it.
let state = if control_file.metadata().unwrap().len() == 0 {
if let CreateControlFile::False = create {
bail!("control file is empty");
}
Err(e) => {
SafeKeeperState::new()
} else {
let mut buf = Vec::new();
control_file
.read_to_end(&mut buf)
.with_context(|| "failed to read control file")?;
let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
ensure!(
calculated_checksum == expected_checksum,
format!(
"safe keeper state checksum mismatch expected {} got {}",
expected_checksum, calculated_checksum
)
);
let state =
SafeKeeperState::des(&buf[..buf.len() - CHECKSUM_SIZE]).with_context(|| {
format!(
"failed to deserialize safe keeper state from control file at {}",
control_file_path.display(),
)
})?;
if state.magic != SK_MAGIC {
bail!("bad control file magic: {}", state.magic);
}
if state.format_version != SK_FORMAT_VERSION {
bail!(
"failed to open control file {:?}: {}",
&control_file_path,
e
"Got incompatible format version, expected {}, got {}",
SK_FORMAT_VERSION,
state.format_version,
);
}
}
state
};
let timelineid_str = format!("{}", timelineid);
Ok((
FileStorage {
lock_file,
timeline_dir,
conf: conf.clone(),
persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
},
state,
))
}
}
@@ -385,7 +419,8 @@ impl GlobalTimelines {
);
fs::create_dir_all(timeline_id.to_string())?;
let shared_state = SharedState::create_restore(conf, timeline_id, create)?;
let shared_state = SharedState::create_restore(conf, timeline_id, create)
.with_context(|| "failed to restore shared state")?;
let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
timelines.insert((tenant_id, timeline_id), Arc::clone(&new_tli));
@@ -397,13 +432,18 @@ impl GlobalTimelines {
#[derive(Debug)]
struct FileStorage {
control_file: File,
// file used to prevent concurrent safekeepers running on the same data
lock_file: File,
// save timeline dir to avoid reconstructing it every time
timeline_dir: PathBuf,
conf: SafeKeeperConf,
persist_sync_control_file_seconds: Histogram,
persist_nosync_control_file_seconds: Histogram,
}
impl Storage for FileStorage {
// persists state durably to underlying storage
// for description see https://lwn.net/Articles/457667/
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()> {
let _timer = if sync {
&self.persist_sync_control_file_seconds
@@ -411,10 +451,56 @@ impl Storage for FileStorage {
&self.persist_nosync_control_file_seconds
}
.start_timer();
self.control_file.seek(SeekFrom::Start(0))?;
s.ser_into(&mut self.control_file)?;
// write data to safekeeper.control.partial
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
let mut control_partial = File::create(&control_partial_path).with_context(|| {
format!(
"failed to create partial control file at: {}",
&control_partial_path.display()
)
})?;
let mut buf = s.ser().with_context(|| "failed to serialize state")?;
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
control_partial.write_all(&buf).with_context(|| {
format!(
"failed to write safekeeper state into control file at: {}",
control_partial_path.display()
)
})?;
if sync {
self.control_file.sync_all()?;
// fsync the file
control_partial.sync_all().with_context(|| {
format!(
"failed to sync partial control file at {}",
control_partial_path.display()
)
})?;
}
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
// rename should be atomic
fs::rename(&control_partial_path, &control_path)?;
if sync {
// this sync is not required by any standard but postgres does this (see durable_rename)
File::open(&control_path)
.and_then(|f| f.sync_all())
.with_context(|| {
format!(
"failed to sync control file at: {}",
&control_path.display()
)
})?;
// fsync the directory (linux specific)
File::open(&self.timeline_dir)
.and_then(|f| f.sync_all())
.with_context(|| "failed to sync control file directory")?;
}
Ok(())
}
@@ -448,15 +534,10 @@ impl Storage for FileStorage {
let segno = start_pos.segment_number(wal_seg_size);
// note: we basically don't support changing pg timeline
let wal_file_name = XLogFileName(server.tli, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_path = self.conf.timeline_dir(&ztli).join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.timeline_dir(&ztli)
.join(wal_file_name.clone() + ".partial");
{
@@ -515,3 +596,81 @@ impl Storage for FileStorage {
Ok(())
}
}
#[cfg(test)]
mod test {
use super::FileStorage;
use crate::{
safekeeper::{SafeKeeperState, Storage},
timeline::{CreateControlFile, SharedState, CONTROL_FILE_NAME},
SafeKeeperConf,
};
use anyhow::Result;
use std::fs;
use zenith_utils::{lsn::Lsn, zid::ZTimelineId};
fn stub_conf() -> SafeKeeperConf {
let workdir = tempfile::tempdir().unwrap().into_path();
SafeKeeperConf {
workdir,
..Default::default()
}
}
fn load_from_control_file(
conf: &SafeKeeperConf,
timeline_id: ZTimelineId,
create: CreateControlFile,
) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(&conf.timeline_dir(&timeline_id))
.expect("failed to create timeline dir");
SharedState::load_from_control_file(conf, timeline_id, create)
}
#[test]
fn test_read_write_safekeeper_state() {
let conf = stub_conf();
let timeline_id = ZTimelineId::generate();
{
let (mut storage, mut state) =
load_from_control_file(&conf, timeline_id, CreateControlFile::True)
.expect("failed to read state");
// change something
state.wal_start_lsn = Lsn(42);
storage
.persist(&state, true)
.expect("failed to persist state");
}
let (_, state) = load_from_control_file(&conf, timeline_id, CreateControlFile::False)
.expect("failed to read state");
assert_eq!(state.wal_start_lsn, Lsn(42));
}
#[test]
fn test_safekeeper_state_checksum_mismatch() {
let conf = stub_conf();
let timeline_id = ZTimelineId::generate();
{
let (mut storage, mut state) =
load_from_control_file(&conf, timeline_id, CreateControlFile::True)
.expect("failed to read state");
// change something
state.wal_start_lsn = Lsn(42);
storage
.persist(&state, true)
.expect("failed to persist state");
}
let control_path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data).expect("failed to write control file");
match load_from_control_file(&conf, timeline_id, CreateControlFile::False) {
Err(err) => assert!(err
.to_string()
.contains("safe keeper state checksum mismatch")),
Ok(_) => panic!("expected error"),
}
}
}