diff --git a/Cargo.lock b/Cargo.lock index 47ad0d3bed..84e91449a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2360,6 +2360,7 @@ dependencies = [ "rust-s3", "serde", "serde_json", + "tempfile", "tokio", "tokio-stream", "walkdir", diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 2e2e435236..539a925ebd 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -38,3 +38,6 @@ postgres_ffi = { path = "../postgres_ffi" } workspace_hack = { path = "../workspace_hack" } zenith_metrics = { path = "../zenith_metrics" } zenith_utils = { path = "../zenith_utils" } + +[dev-dependencies] +tempfile = "3.2" diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 02b7e5059b..e85d49a8c6 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -6,7 +6,6 @@ use clap::{App, Arg}; use const_format::formatcp; use daemonize::Daemonize; use log::*; -use std::env; use std::path::{Path, PathBuf}; use std::thread; use zenith_utils::http::endpoint; @@ -78,20 +77,7 @@ fn main() -> Result<()> { ) .get_matches(); - let mut conf = SafeKeeperConf { - // Always set to './'. We will chdir into the directory specified on the - // command line, so that when the server is running, all paths are relative - // to that. - workdir: PathBuf::from("./"), - daemonize: false, - no_sync: false, - pageserver_addr: None, - listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(), - listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(), - ttl: None, - recall_period: None, - pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(), - }; + let mut conf: SafeKeeperConf = Default::default(); if let Some(dir) = arg_matches.value_of("datadir") { // change into the data directory. diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index a961cb5dfe..25b5388370 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -2,6 +2,9 @@ use std::path::PathBuf; use std::time::Duration; +use std::env; +use zenith_utils::zid::ZTimelineId; + pub mod http; pub mod json_ctrl; pub mod receive_wal; @@ -42,3 +45,28 @@ pub struct SafeKeeperConf { pub ttl: Option, pub recall_period: Option, } + +impl SafeKeeperConf { + pub fn timeline_dir(&self, timelineid: &ZTimelineId) -> PathBuf { + self.workdir.join(timelineid.to_string()) + } +} + +impl Default for SafeKeeperConf { + fn default() -> Self { + SafeKeeperConf { + // Always set to './'. We will chdir into the directory specified on the + // command line, so that when the server is running, all paths are relative + // to that. + workdir: PathBuf::from("./"), + daemonize: false, + no_sync: false, + pageserver_addr: None, + listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), + listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), + ttl: None, + recall_period: None, + pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(), + } + } +} diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 8665fd224e..9498980802 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -73,12 +73,12 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe } impl<'pg> ReceiveWalConn<'pg> { - pub fn new(pg: &'pg mut PostgresBackend) -> Result> { + pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> { let peer_addr = *pg.get_peer_addr(); - Ok(ReceiveWalConn { + ReceiveWalConn { pg_backend: pg, peer_addr, - }) + } } // Read and extract the bytes of a `CopyData` message from the postgres instance @@ -142,7 +142,11 @@ impl<'pg> ReceiveWalConn<'pg> { } loop { - let reply = swh.timeline.get().process_msg(&msg)?; + let reply = swh + .timeline + .get() + .process_msg(&msg) + .with_context(|| "failed to process ProposerAcceptorMessage")?; self.write_msg(&reply)?; msg = self.read_msg()?; } diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index b6cbfdec12..caf4b4ef6a 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -228,8 +228,8 @@ impl ReplicationConn { // Open a new file. let segno = start_pos.segment_number(wal_seg_size); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); - let timeline_id = swh.timeline.get().timelineid.to_string(); - let wal_file_path = swh.conf.workdir.join(timeline_id).join(wal_file_name); + let timeline_id = swh.timeline.get().timelineid; + let wal_file_path = swh.conf.timeline_dir(&timeline_id).join(wal_file_name); Self::open_wal_file(&wal_file_path)? } }; diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index d2590c12b6..7d471af2fa 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -1,5 +1,6 @@ //! Acceptor part of proposer-acceptor consensus algorithm. +use anyhow::Context; use anyhow::{anyhow, bail, Result}; use byteorder::LittleEndian; use byteorder::ReadBytesExt; @@ -423,7 +424,9 @@ where self.s.server.ztli = msg.ztli; self.s.server.tli = msg.tli; self.s.server.wal_seg_size = msg.wal_seg_size; - self.storage.persist(&self.s, true)?; + self.storage + .persist(&self.s, true) + .with_context(|| "failed to persist shared state")?; self.metrics = SafeKeeperMetrics::new(self.s.server.ztli); diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 565ef201b8..e849a4e76d 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -7,7 +7,7 @@ use crate::receive_wal::ReceiveWalConn; use crate::replication::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; use crate::SafeKeeperConf; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; use std::str::FromStr; use std::sync::Arc; @@ -74,7 +74,9 @@ impl postgres_backend::Handler for SendWalHandler { } else if query_string.starts_with(b"START_REPLICATION") { ReplicationConn::new(pgb).run(self, pgb, &query_string)?; } else if query_string.starts_with(b"START_WAL_PUSH") { - ReceiveWalConn::new(pgb)?.run(self)?; + ReceiveWalConn::new(pgb) + .run(self) + .with_context(|| "failed to run ReceiveWalConn")?; } else if query_string.starts_with(b"JSON_CTRL") { handle_json_ctrl(self, pgb, &query_string)?; } else { diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index c0edc34d00..d548c7799a 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -1,7 +1,7 @@ //! This module contains timeline id -> safekeeper state map with file-backed //! persistence and support for interaction between sending and receiving wal. -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use fs2::FileExt; use lazy_static::lazy_static; use log::*; @@ -9,7 +9,8 @@ use postgres_ffi::xlog_utils::find_end_of_wal; use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; -use std::io::{Seek, SeekFrom, Write}; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::path::PathBuf; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; @@ -24,9 +25,16 @@ use crate::safekeeper::{ }; use crate::SafeKeeperConf; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; +use std::convert::TryInto; +// contains persistent metadata for safekeeper const CONTROL_FILE_NAME: &str = "safekeeper.control"; +// needed to atomically update the state using `rename` +const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; +// dedicated lockfile to prevent running several safekeepers on the same data +const LOCK_FILE_NAME: &str = "safekeeper.lock"; const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); +const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Replica status: host standby feedback + disk consistent lsn #[derive(Debug, Clone, Copy)] @@ -126,18 +134,10 @@ impl SharedState { timelineid: ZTimelineId, create: CreateControlFile, ) -> Result { - let (cf, state) = SharedState::load_control_file(conf, timelineid, create)?; - let timelineid_str = format!("{}", timelineid); - let storage = FileStorage { - control_file: cf, - conf: conf.clone(), - persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS - .with_label_values(&[&timelineid_str]), - persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS - .with_label_values(&[&timelineid_str]), - }; + let (file_storage, state) = SharedState::load_from_control_file(conf, timelineid, create) + .with_context(|| "failed to load from control file")?; let (flush_lsn, tli) = if state.server.wal_seg_size != 0 { - let wal_dir = conf.workdir.join(format!("{}", timelineid)); + let wal_dir = conf.timeline_dir(&timelineid); find_end_of_wal( &wal_dir, state.server.wal_seg_size as usize, @@ -150,80 +150,114 @@ impl SharedState { Ok(Self { notified_commit_lsn: Lsn(0), - sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state), + sk: SafeKeeper::new(Lsn(flush_lsn), tli, file_storage, state), replicas: Vec::new(), }) } /// Fetch and lock control file (prevent running more than one instance of safekeeper) /// If create=false and file doesn't exist, bails out. - fn load_control_file( + fn load_from_control_file( conf: &SafeKeeperConf, timelineid: ZTimelineId, create: CreateControlFile, - ) -> Result<(File, SafeKeeperState)> { - let control_file_path = conf - .workdir - .join(timelineid.to_string()) - .join(CONTROL_FILE_NAME); + ) -> Result<(FileStorage, SafeKeeperState)> { + let timeline_dir = conf.timeline_dir(&timelineid); + + let control_file_path = timeline_dir.join(CONTROL_FILE_NAME); + let lock_file_path = timeline_dir.join(LOCK_FILE_NAME); + info!( - "loading control file {}, create={:?}", + "loading control file {}, create={:?} lock file {:?}", control_file_path.display(), - create + create, + lock_file_path.display(), ); - let mut opts = OpenOptions::new(); - opts.read(true).write(true); - if let CreateControlFile::True = create { - opts.create(true); - } - match opts.open(&control_file_path) { - Ok(mut file) => { - // Lock file to prevent two or more active safekeepers - match file.try_lock_exclusive() { - Ok(()) => {} - Err(e) => { - bail!( - "control file {:?} is locked by some other process: {}", - &control_file_path, - e - ); - } - } - // Empty file is legit on 'create', don't try to deser from it. - if file.metadata().unwrap().len() == 0 { - if let CreateControlFile::False = create { - bail!("control file is empty"); - } - Ok((file, SafeKeeperState::new())) - } else { - match SafeKeeperState::des_from(&mut file) { - Err(e) => { - bail!("failed to read control file {:?}: {}", control_file_path, e); - } - Ok(s) => { - if s.magic != SK_MAGIC { - bail!("bad control file magic: {}", s.magic); - } - if s.format_version != SK_FORMAT_VERSION { - bail!( - "incompatible format version: {} vs. {}", - s.format_version, - SK_FORMAT_VERSION - ); - } - Ok((file, s)) - } - } - } + + let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?; + + // Lock file to prevent two or more active safekeepers + lock_file.try_lock_exclusive().map_err(|e| { + anyhow!( + "control file {:?} is locked by some other process: {}", + &control_file_path, + e + ) + })?; + + let mut control_file = OpenOptions::new() + .read(true) + .write(true) + .create(matches!(create, CreateControlFile::True)) + .open(&control_file_path) + .with_context(|| { + format!( + "failed to open control file at {}", + control_file_path.display(), + ) + })?; + + // Empty file is legit on 'create', don't try to deser from it. + let state = if control_file.metadata().unwrap().len() == 0 { + if let CreateControlFile::False = create { + bail!("control file is empty"); } - Err(e) => { + SafeKeeperState::new() + } else { + let mut buf = Vec::new(); + control_file + .read_to_end(&mut buf) + .with_context(|| "failed to read control file")?; + + let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); + + let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = + buf[buf.len() - CHECKSUM_SIZE..].try_into()?; + let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); + + ensure!( + calculated_checksum == expected_checksum, + format!( + "safe keeper state checksum mismatch expected {} got {}", + expected_checksum, calculated_checksum + ) + ); + + let state = + SafeKeeperState::des(&buf[..buf.len() - CHECKSUM_SIZE]).with_context(|| { + format!( + "failed to deserialize safe keeper state from control file at {}", + control_file_path.display(), + ) + })?; + + if state.magic != SK_MAGIC { + bail!("bad control file magic: {}", state.magic); + } + if state.format_version != SK_FORMAT_VERSION { bail!( - "failed to open control file {:?}: {}", - &control_file_path, - e + "Got incompatible format version, expected {}, got {}", + SK_FORMAT_VERSION, + state.format_version, ); } - } + state + }; + + let timelineid_str = format!("{}", timelineid); + + Ok(( + FileStorage { + lock_file, + timeline_dir, + conf: conf.clone(), + persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS + .with_label_values(&[&timelineid_str]), + persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS + .with_label_values(&[&timelineid_str]), + }, + state, + )) } } @@ -385,7 +419,8 @@ impl GlobalTimelines { ); fs::create_dir_all(timeline_id.to_string())?; - let shared_state = SharedState::create_restore(conf, timeline_id, create)?; + let shared_state = SharedState::create_restore(conf, timeline_id, create) + .with_context(|| "failed to restore shared state")?; let new_tli = Arc::new(Timeline::new(timeline_id, shared_state)); timelines.insert((tenant_id, timeline_id), Arc::clone(&new_tli)); @@ -397,13 +432,18 @@ impl GlobalTimelines { #[derive(Debug)] struct FileStorage { - control_file: File, + // file used to prevent concurrent safekeepers running on the same data + lock_file: File, + // save timeline dir to avoid reconstructing it every time + timeline_dir: PathBuf, conf: SafeKeeperConf, persist_sync_control_file_seconds: Histogram, persist_nosync_control_file_seconds: Histogram, } impl Storage for FileStorage { + // persists state durably to underlying storage + // for description see https://lwn.net/Articles/457667/ fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()> { let _timer = if sync { &self.persist_sync_control_file_seconds @@ -411,10 +451,56 @@ impl Storage for FileStorage { &self.persist_nosync_control_file_seconds } .start_timer(); - self.control_file.seek(SeekFrom::Start(0))?; - s.ser_into(&mut self.control_file)?; + + // write data to safekeeper.control.partial + let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); + let mut control_partial = File::create(&control_partial_path).with_context(|| { + format!( + "failed to create partial control file at: {}", + &control_partial_path.display() + ) + })?; + let mut buf = s.ser().with_context(|| "failed to serialize state")?; + // calculate checksum before resize + let checksum = crc32c::crc32c(&buf); + buf.extend_from_slice(&checksum.to_le_bytes()); + + control_partial.write_all(&buf).with_context(|| { + format!( + "failed to write safekeeper state into control file at: {}", + control_partial_path.display() + ) + })?; + if sync { - self.control_file.sync_all()?; + // fsync the file + control_partial.sync_all().with_context(|| { + format!( + "failed to sync partial control file at {}", + control_partial_path.display() + ) + })?; + } + + let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); + + // rename should be atomic + fs::rename(&control_partial_path, &control_path)?; + if sync { + // this sync is not required by any standard but postgres does this (see durable_rename) + File::open(&control_path) + .and_then(|f| f.sync_all()) + .with_context(|| { + format!( + "failed to sync control file at: {}", + &control_path.display() + ) + })?; + + // fsync the directory (linux specific) + File::open(&self.timeline_dir) + .and_then(|f| f.sync_all()) + .with_context(|| "failed to sync control file directory")?; } Ok(()) } @@ -448,15 +534,10 @@ impl Storage for FileStorage { let segno = start_pos.segment_number(wal_seg_size); // note: we basically don't support changing pg timeline let wal_file_name = XLogFileName(server.tli, segno, wal_seg_size); - let wal_file_path = self - .conf - .workdir - .join(ztli.to_string()) - .join(wal_file_name.clone()); + let wal_file_path = self.conf.timeline_dir(&ztli).join(wal_file_name.clone()); let wal_file_partial_path = self .conf - .workdir - .join(ztli.to_string()) + .timeline_dir(&ztli) .join(wal_file_name.clone() + ".partial"); { @@ -515,3 +596,81 @@ impl Storage for FileStorage { Ok(()) } } + +#[cfg(test)] +mod test { + use super::FileStorage; + use crate::{ + safekeeper::{SafeKeeperState, Storage}, + timeline::{CreateControlFile, SharedState, CONTROL_FILE_NAME}, + SafeKeeperConf, + }; + use anyhow::Result; + use std::fs; + use zenith_utils::{lsn::Lsn, zid::ZTimelineId}; + + fn stub_conf() -> SafeKeeperConf { + let workdir = tempfile::tempdir().unwrap().into_path(); + SafeKeeperConf { + workdir, + ..Default::default() + } + } + + fn load_from_control_file( + conf: &SafeKeeperConf, + timeline_id: ZTimelineId, + create: CreateControlFile, + ) -> Result<(FileStorage, SafeKeeperState)> { + fs::create_dir_all(&conf.timeline_dir(&timeline_id)) + .expect("failed to create timeline dir"); + SharedState::load_from_control_file(conf, timeline_id, create) + } + + #[test] + fn test_read_write_safekeeper_state() { + let conf = stub_conf(); + let timeline_id = ZTimelineId::generate(); + { + let (mut storage, mut state) = + load_from_control_file(&conf, timeline_id, CreateControlFile::True) + .expect("failed to read state"); + // change something + state.wal_start_lsn = Lsn(42); + storage + .persist(&state, true) + .expect("failed to persist state"); + } + + let (_, state) = load_from_control_file(&conf, timeline_id, CreateControlFile::False) + .expect("failed to read state"); + assert_eq!(state.wal_start_lsn, Lsn(42)); + } + + #[test] + fn test_safekeeper_state_checksum_mismatch() { + let conf = stub_conf(); + let timeline_id = ZTimelineId::generate(); + { + let (mut storage, mut state) = + load_from_control_file(&conf, timeline_id, CreateControlFile::True) + .expect("failed to read state"); + // change something + state.wal_start_lsn = Lsn(42); + storage + .persist(&state, true) + .expect("failed to persist state"); + } + let control_path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME); + let mut data = fs::read(&control_path).unwrap(); + data[0] += 1; // change the first byte of the file to fail checksum validation + fs::write(&control_path, &data).expect("failed to write control file"); + + match load_from_control_file(&conf, timeline_id, CreateControlFile::False) { + Err(err) => assert!(err + .to_string() + .contains("safe keeper state checksum mismatch")), + Ok(_) => panic!("expected error"), + } + } +}