diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 48f0e8a443..ea5d0cba14 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -10,7 +10,7 @@ use std::fs::File; use std::path::{Path, PathBuf}; use std::thread; use tracing::*; -use walkeeper::timeline::{CreateControlFile, FileStorage}; +use walkeeper::control_file::{self, CreateControlFile}; use zenith_utils::http::endpoint; use zenith_utils::{logging, tcp_listener, GIT_VERSION}; @@ -96,7 +96,10 @@ fn main() -> Result<()> { .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { - let state = FileStorage::load_control_file(Path::new(addr), CreateControlFile::False)?; + let state = control_file::FileStorage::load_control_file( + Path::new(addr), + CreateControlFile::False, + )?; let json = serde_json::to_string(&state)?; print!("{}", json); return Ok(()); diff --git a/walkeeper/src/control_file.rs b/walkeeper/src/control_file.rs new file mode 100644 index 0000000000..9732cdca14 --- /dev/null +++ b/walkeeper/src/control_file.rs @@ -0,0 +1,297 @@ +//! Control file serialization, deserialization and persistence. + +use anyhow::{bail, ensure, Context, Result}; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use lazy_static::lazy_static; + +use std::fs::{self, File, OpenOptions}; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; + +use tracing::*; +use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; +use zenith_utils::bin_ser::LeSer; + +use zenith_utils::zid::ZTenantTimelineId; + +use crate::control_file_upgrade::upgrade_control_file; +use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC}; + +use crate::SafeKeeperConf; + +use std::convert::TryInto; + +// contains persistent metadata for safekeeper +const CONTROL_FILE_NAME: &str = "safekeeper.control"; +// needed to atomically update the state using `rename` +const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; +pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); + +// A named boolean. +#[derive(Debug)] +pub enum CreateControlFile { + True, + False, +} + +lazy_static! { + static ref PERSIST_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!( + "safekeeper_persist_control_file_seconds", + "Seconds to persist and sync control file, grouped by timeline", + &["tenant_id", "timeline_id"], + DISK_WRITE_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec"); +} + +pub trait Storage { + /// Persist safekeeper state on disk. + fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; +} + +#[derive(Debug)] +pub struct FileStorage { + // save timeline dir to avoid reconstructing it every time + timeline_dir: PathBuf, + conf: SafeKeeperConf, + persist_control_file_seconds: Histogram, +} + +impl FileStorage { + pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage { + let timeline_dir = conf.timeline_dir(zttid); + let tenant_id = zttid.tenant_id.to_string(); + let timeline_id = zttid.timeline_id.to_string(); + FileStorage { + timeline_dir, + conf: conf.clone(), + persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS + .with_label_values(&[&tenant_id, &timeline_id]), + } + } + + // Check the magic/version in the on-disk data and deserialize it, if possible. + fn deser_sk_state(buf: &mut &[u8]) -> Result { + // Read the version independent part + let magic = buf.read_u32::()?; + if magic != SK_MAGIC { + bail!( + "bad control file magic: {:X}, expected {:X}", + magic, + SK_MAGIC + ); + } + let version = buf.read_u32::()?; + if version == SK_FORMAT_VERSION { + let res = SafeKeeperState::des(buf)?; + return Ok(res); + } + // try to upgrade + upgrade_control_file(buf, version) + } + + // Load control file for given zttid at path specified by conf. + pub fn load_control_file_conf( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + create: CreateControlFile, + ) -> Result { + let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME); + Self::load_control_file(path, create) + } + + /// Read in the control file. + /// If create=false and file doesn't exist, bails out. + pub fn load_control_file>( + control_file_path: P, + create: CreateControlFile, + ) -> Result { + info!( + "loading control file {}, create={:?}", + control_file_path.as_ref().display(), + create, + ); + + let mut control_file = OpenOptions::new() + .read(true) + .write(true) + .create(matches!(create, CreateControlFile::True)) + .open(&control_file_path) + .with_context(|| { + format!( + "failed to open control file at {}", + control_file_path.as_ref().display(), + ) + })?; + + // Empty file is legit on 'create', don't try to deser from it. + let state = if control_file.metadata().unwrap().len() == 0 { + if let CreateControlFile::False = create { + bail!("control file is empty"); + } + SafeKeeperState::new() + } else { + let mut buf = Vec::new(); + control_file + .read_to_end(&mut buf) + .context("failed to read control file")?; + + let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); + + let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = + buf[buf.len() - CHECKSUM_SIZE..].try_into()?; + let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); + + ensure!( + calculated_checksum == expected_checksum, + format!( + "safekeeper control file checksum mismatch: expected {} got {}", + expected_checksum, calculated_checksum + ) + ); + + FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context( + || { + format!( + "while reading control file {}", + control_file_path.as_ref().display(), + ) + }, + )? + }; + Ok(state) + } +} + +impl Storage for FileStorage { + // persists state durably to underlying storage + // for description see https://lwn.net/Articles/457667/ + fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { + let _timer = &self.persist_control_file_seconds.start_timer(); + + // write data to safekeeper.control.partial + let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); + let mut control_partial = File::create(&control_partial_path).with_context(|| { + format!( + "failed to create partial control file at: {}", + &control_partial_path.display() + ) + })?; + let mut buf: Vec = Vec::new(); + buf.write_u32::(SK_MAGIC)?; + buf.write_u32::(SK_FORMAT_VERSION)?; + s.ser_into(&mut buf)?; + + // calculate checksum before resize + let checksum = crc32c::crc32c(&buf); + buf.extend_from_slice(&checksum.to_le_bytes()); + + control_partial.write_all(&buf).with_context(|| { + format!( + "failed to write safekeeper state into control file at: {}", + control_partial_path.display() + ) + })?; + + // fsync the file + control_partial.sync_all().with_context(|| { + format!( + "failed to sync partial control file at {}", + control_partial_path.display() + ) + })?; + + let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); + + // rename should be atomic + fs::rename(&control_partial_path, &control_path)?; + // this sync is not required by any standard but postgres does this (see durable_rename) + File::open(&control_path) + .and_then(|f| f.sync_all()) + .with_context(|| { + format!( + "failed to sync control file at: {}", + &control_path.display() + ) + })?; + + // fsync the directory (linux specific) + File::open(&self.timeline_dir) + .and_then(|f| f.sync_all()) + .context("failed to sync control file directory")?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::FileStorage; + use super::*; + use crate::{safekeeper::SafeKeeperState, SafeKeeperConf, ZTenantTimelineId}; + use anyhow::Result; + use std::fs; + use zenith_utils::lsn::Lsn; + + fn stub_conf() -> SafeKeeperConf { + let workdir = tempfile::tempdir().unwrap().into_path(); + SafeKeeperConf { + workdir, + ..Default::default() + } + } + + fn load_from_control_file( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + create: CreateControlFile, + ) -> Result<(FileStorage, SafeKeeperState)> { + fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); + Ok(( + FileStorage::new(zttid, conf), + FileStorage::load_control_file_conf(conf, zttid, create)?, + )) + } + + #[test] + fn test_read_write_safekeeper_state() { + let conf = stub_conf(); + let zttid = ZTenantTimelineId::generate(); + { + let (mut storage, mut state) = + load_from_control_file(&conf, &zttid, CreateControlFile::True) + .expect("failed to read state"); + // change something + state.wal_start_lsn = Lsn(42); + storage.persist(&state).expect("failed to persist state"); + } + + let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False) + .expect("failed to read state"); + assert_eq!(state.wal_start_lsn, Lsn(42)); + } + + #[test] + fn test_safekeeper_state_checksum_mismatch() { + let conf = stub_conf(); + let zttid = ZTenantTimelineId::generate(); + { + let (mut storage, mut state) = + load_from_control_file(&conf, &zttid, CreateControlFile::True) + .expect("failed to read state"); + // change something + state.wal_start_lsn = Lsn(42); + storage.persist(&state).expect("failed to persist state"); + } + let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME); + let mut data = fs::read(&control_path).unwrap(); + data[0] += 1; // change the first byte of the file to fail checksum validation + fs::write(&control_path, &data).expect("failed to write control file"); + + match load_from_control_file(&conf, &zttid, CreateControlFile::False) { + Err(err) => assert!(err + .to_string() + .contains("safekeeper control file checksum mismatch")), + Ok(_) => panic!("expected error"), + } + } +} diff --git a/walkeeper/src/upgrade.rs b/walkeeper/src/control_file_upgrade.rs similarity index 100% rename from walkeeper/src/upgrade.rs rename to walkeeper/src/control_file_upgrade.rs diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs index 21890764db..5367954842 100644 --- a/walkeeper/src/handler.rs +++ b/walkeeper/src/handler.rs @@ -19,7 +19,7 @@ use zenith_utils::pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; use crate::callmemaybe::CallmeEvent; -use crate::timeline::CreateControlFile; +use crate::control_file::CreateControlFile; use tokio::sync::mpsc::UnboundedSender; /// Safekeeper handler of postgres commands diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index b8d9a4ae36..11a29ac6d3 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -7,9 +7,9 @@ use zenith_utils::http::{RequestExt, RouterBuilder}; use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZTenantTimelineId; +use crate::control_file::CreateControlFile; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; -use crate::timeline::CreateControlFile; use crate::timeline::GlobalTimelines; use crate::SafeKeeperConf; use zenith_utils::http::endpoint; diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index c29ef79e90..6c3e0b264e 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -5,6 +5,8 @@ use std::time::Duration; use zenith_utils::zid::ZTenantTimelineId; pub mod callmemaybe; +pub mod control_file; +pub mod control_file_upgrade; pub mod handler; pub mod http; pub mod json_ctrl; @@ -13,8 +15,8 @@ pub mod s3_offload; pub mod safekeeper; pub mod send_wal; pub mod timeline; -pub mod upgrade; pub mod wal_service; +pub mod wal_storage; pub mod defaults { use const_format::formatcp; diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index bb22672f98..981a0f4d57 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -3,7 +3,7 @@ use anyhow::{bail, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use postgres_ffi::waldecoder::WalStreamDecoder; + use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; use std::cmp::min; @@ -13,12 +13,11 @@ use tracing::*; use lazy_static::lazy_static; +use crate::control_file; use crate::send_wal::HotStandbyFeedback; +use crate::wal_storage; use postgres_ffi::xlog_utils::MAX_SEND_SIZE; -use zenith_metrics::{ - register_gauge_vec, register_histogram_vec, Gauge, GaugeVec, Histogram, HistogramVec, - DISK_WRITE_SECONDS_BUCKETS, -}; +use zenith_metrics::{register_gauge_vec, Gauge, GaugeVec}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use zenith_utils::pq_proto::SystemId; @@ -407,133 +406,87 @@ impl AcceptorProposerMessage { } } -pub trait Storage { - /// Persist safekeeper state on disk. - fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; - /// Write piece of wal in buf to disk and sync it. - fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>; - // Truncate WAL at specified LSN - fn truncate_wal(&mut self, s: &ServerInfo, endpos: Lsn) -> Result<()>; -} - lazy_static! { // The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). // i64 is faster than f64, so update to u64 when available. - static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!( - "safekeeper_flush_lsn", - "Current flush_lsn, grouped by timeline", - &["tenant_id", "timeline_id"] - ) - .expect("Failed to register safekeeper_flush_lsn gauge vec"); static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!( "safekeeper_commit_lsn", "Current commit_lsn (not necessarily persisted to disk), grouped by timeline", &["tenant_id", "timeline_id"] ) .expect("Failed to register safekeeper_commit_lsn gauge vec"); - static ref WRITE_WAL_BYTES: HistogramVec = register_histogram_vec!( - "safekeeper_write_wal_bytes", - "Bytes written to WAL in a single request, grouped by timeline", - &["tenant_id", "timeline_id"], - vec![1.0, 10.0, 100.0, 1024.0, 8192.0, 128.0 * 1024.0, 1024.0 * 1024.0, 10.0 * 1024.0 * 1024.0] - ) - .expect("Failed to register safekeeper_write_wal_bytes histogram vec"); - static ref WRITE_WAL_SECONDS: HistogramVec = register_histogram_vec!( - "safekeeper_write_wal_seconds", - "Seconds spent writing and syncing WAL to a disk in a single request, grouped by timeline", - &["tenant_id", "timeline_id"], - DISK_WRITE_SECONDS_BUCKETS.to_vec() - ) - .expect("Failed to register safekeeper_write_wal_seconds histogram vec"); } struct SafeKeeperMetrics { - flush_lsn: Gauge, commit_lsn: Gauge, - write_wal_bytes: Histogram, - write_wal_seconds: Histogram, } -struct SafeKeeperMetricsBuilder { - tenant_id: ZTenantId, - timeline_id: ZTimelineId, - flush_lsn: Lsn, - commit_lsn: Lsn, -} - -impl SafeKeeperMetricsBuilder { - fn build(self) -> SafeKeeperMetrics { - let tenant_id = self.tenant_id.to_string(); - let timeline_id = self.timeline_id.to_string(); - let m = SafeKeeperMetrics { - flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), +impl SafeKeeperMetrics { + fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId, commit_lsn: Lsn) -> Self { + let tenant_id = tenant_id.to_string(); + let timeline_id = timeline_id.to_string(); + let m = Self { commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), - write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]), - write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), }; - m.flush_lsn.set(u64::from(self.flush_lsn) as f64); - m.commit_lsn.set(u64::from(self.commit_lsn) as f64); + m.commit_lsn.set(u64::from(commit_lsn) as f64); m } } /// SafeKeeper which consumes events (messages from compute) and provides /// replies. -pub struct SafeKeeper { - /// Locally flushed part of WAL with full records (end_lsn of last record). - /// Established by reading wal. - pub flush_lsn: Lsn, +pub struct SafeKeeper { // Cached metrics so we don't have to recompute labels on each update. metrics: SafeKeeperMetrics, + /// not-yet-flushed pairs of same named fields in s.* pub commit_lsn: Lsn, pub truncate_lsn: Lsn, - pub storage: ST, pub s: SafeKeeperState, // persistent part - decoder: WalStreamDecoder, + + pub control_store: CTRL, + pub wal_store: WAL, } -impl SafeKeeper +impl SafeKeeper where - ST: Storage, + CTRL: control_file::Storage, + WAL: wal_storage::Storage, { // constructor pub fn new( ztli: ZTimelineId, - flush_lsn: Lsn, - storage: ST, + control_store: CTRL, + wal_store: WAL, state: SafeKeeperState, - ) -> SafeKeeper { + ) -> SafeKeeper { if state.server.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.server.timeline_id { panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.server.timeline_id); } + SafeKeeper { - flush_lsn, - metrics: SafeKeeperMetricsBuilder { - tenant_id: state.server.tenant_id, - timeline_id: ztli, - flush_lsn, - commit_lsn: state.commit_lsn, - } - .build(), + metrics: SafeKeeperMetrics::new(state.server.tenant_id, ztli, state.commit_lsn), commit_lsn: state.commit_lsn, truncate_lsn: state.truncate_lsn, - storage, s: state, - decoder: WalStreamDecoder::new(Lsn(0)), + control_store, + wal_store, } } /// Get history of term switches for the available WAL fn get_term_history(&self) -> TermHistory { - self.s.acceptor_state.term_history.up_to(self.flush_lsn) + self.s + .acceptor_state + .term_history + .up_to(self.wal_store.flush_lsn()) } #[cfg(test)] fn get_epoch(&self) -> Term { - self.s.acceptor_state.get_epoch(self.flush_lsn) + self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn()) } /// Process message from proposer and possibly form reply. Concurrent @@ -575,21 +528,20 @@ where } // set basic info about server, if not yet + // TODO: verify that is doesn't change after self.s.server.system_id = msg.system_id; self.s.server.tenant_id = msg.tenant_id; self.s.server.timeline_id = msg.ztli; self.s.server.wal_seg_size = msg.wal_seg_size; - self.storage + self.control_store .persist(&self.s) .context("failed to persist shared state")?; - self.metrics = SafeKeeperMetricsBuilder { - tenant_id: self.s.server.tenant_id, - timeline_id: self.s.server.timeline_id, - flush_lsn: self.flush_lsn, - commit_lsn: self.commit_lsn, - } - .build(); + // pass wal_seg_size to read WAL and find flush_lsn + self.wal_store.init_storage(&self.s)?; + + // update tenant_id/timeline_id in metrics + self.metrics = SafeKeeperMetrics::new(msg.tenant_id, msg.ztli, self.commit_lsn); info!( "processed greeting from proposer {:?}, sending term {:?}", @@ -609,14 +561,14 @@ where let mut resp = VoteResponse { term: self.s.acceptor_state.term, vote_given: false as u64, - flush_lsn: self.flush_lsn, + flush_lsn: self.wal_store.flush_lsn(), truncate_lsn: self.s.truncate_lsn, term_history: self.get_term_history(), }; if self.s.acceptor_state.term < msg.term { self.s.acceptor_state.term = msg.term; // persist vote before sending it out - self.storage.persist(&self.s)?; + self.control_store.persist(&self.s)?; resp.term = self.s.acceptor_state.term; resp.vote_given = true as u64; } @@ -628,7 +580,7 @@ where fn bump_if_higher(&mut self, term: Term) -> Result<()> { if self.s.acceptor_state.term < term { self.s.acceptor_state.term = term; - self.storage.persist(&self.s)?; + self.control_store.persist(&self.s)?; } Ok(()) } @@ -637,7 +589,7 @@ where fn append_response(&self) -> AppendResponse { AppendResponse { term: self.s.acceptor_state.term, - flush_lsn: self.flush_lsn, + flush_lsn: self.wal_store.flush_lsn(), commit_lsn: self.s.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), @@ -653,22 +605,12 @@ where return Ok(None); } - // TODO: cross check divergence point + // truncate wal, update the lsns + self.wal_store.truncate_wal(msg.start_streaming_at)?; - // streaming must not create a hole - assert!(self.flush_lsn == Lsn(0) || self.flush_lsn >= msg.start_streaming_at); - - // truncate obsolete part of WAL - if self.flush_lsn != Lsn(0) { - self.storage - .truncate_wal(&self.s.server, msg.start_streaming_at)?; - } - // update our end of WAL pointer - self.flush_lsn = msg.start_streaming_at; - self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64); // and now adopt term history from proposer self.s.acceptor_state.term_history = msg.term_history.clone(); - self.storage.persist(&self.s)?; + self.control_store.persist(&self.s)?; info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -694,42 +636,14 @@ where // After ProposerElected, which performs truncation, we should get only // indeed append requests (but flush_lsn is advanced only on record // boundary, so might be less). - assert!(self.flush_lsn <= msg.h.begin_lsn); + assert!(self.wal_store.flush_lsn() <= msg.h.begin_lsn); self.s.proposer_uuid = msg.h.proposer_uuid; let mut sync_control_file = false; // do the job - let mut last_rec_lsn = Lsn(0); if !msg.wal_data.is_empty() { - self.metrics - .write_wal_bytes - .observe(msg.wal_data.len() as f64); - { - let _timer = self.metrics.write_wal_seconds.start_timer(); - self.storage - .write_wal(&self.s.server, msg.h.begin_lsn, &msg.wal_data)?; - } - - // figure out last record's end lsn for reporting (if we got the - // whole record) - if self.decoder.available() != msg.h.begin_lsn { - info!( - "restart decoder from {} to {}", - self.decoder.available(), - msg.h.begin_lsn, - ); - self.decoder = WalStreamDecoder::new(msg.h.begin_lsn); - } - self.decoder.feed_bytes(&msg.wal_data); - loop { - match self.decoder.poll_decode()? { - None => break, // no full record yet - Some((lsn, _rec)) => { - last_rec_lsn = lsn; - } - } - } + self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; // If this was the first record we ever receieved, remember LSN to help // find_end_of_wal skip the hole in the beginning. @@ -739,16 +653,11 @@ where } } - if last_rec_lsn > self.flush_lsn { - self.flush_lsn = last_rec_lsn; - self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64); - } - // Advance commit_lsn taking into account what we have locally. // commit_lsn can be 0, being unknown to new walproposer while he hasn't // collected majority of its epoch acks yet, ignore it in this case. if msg.h.commit_lsn != Lsn(0) { - let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); + let commit_lsn = min(msg.h.commit_lsn, self.wal_store.flush_lsn()); // If new commit_lsn reached epoch switch, force sync of control // file: walproposer in sync mode is very interested when this // happens. Note: this is for sync-safekeepers mode only, as @@ -774,7 +683,7 @@ where } if sync_control_file { - self.storage.persist(&self.s)?; + self.control_store.persist(&self.s)?; } let resp = self.append_response(); @@ -793,34 +702,52 @@ where #[cfg(test)] mod tests { use super::*; + use crate::wal_storage::Storage; // fake storage for tests - struct InMemoryStorage { + struct InMemoryState { persisted_state: SafeKeeperState, } - impl Storage for InMemoryStorage { + impl control_file::Storage for InMemoryState { fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { self.persisted_state = s.clone(); Ok(()) } + } - fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> { + struct DummyWalStore { + lsn: Lsn, + } + + impl wal_storage::Storage for DummyWalStore { + fn flush_lsn(&self) -> Lsn { + self.lsn + } + + fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> { Ok(()) } - fn truncate_wal(&mut self, _server: &ServerInfo, _end_pos: Lsn) -> Result<()> { + fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + self.lsn = startpos + buf.len() as u64; + Ok(()) + } + + fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { + self.lsn = end_pos; Ok(()) } } #[test] fn test_voting() { - let storage = InMemoryStorage { + let storage = InMemoryState { persisted_state: SafeKeeperState::new(), }; + let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::new()); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::new()); // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); @@ -831,11 +758,11 @@ mod tests { } // reboot... - let state = sk.storage.persisted_state.clone(); - let storage = InMemoryStorage { + let state = sk.control_store.persisted_state.clone(); + let storage = InMemoryState { persisted_state: state.clone(), }; - sk = SafeKeeper::new(ztli, Lsn(0), storage, state); + sk = SafeKeeper::new(ztli, storage, sk.wal_store, state); // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request); @@ -847,11 +774,12 @@ mod tests { #[test] fn test_epoch_switch() { - let storage = InMemoryStorage { + let storage = InMemoryState { persisted_state: SafeKeeperState::new(), }; + let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::new()); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::new()); let mut ar_hdr = AppendRequestHeader { term: 1, @@ -892,7 +820,7 @@ mod tests { }; let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - sk.flush_lsn = Lsn(3); // imitate the complete record at 3 %) + sk.wal_store.truncate_wal(Lsn(3)).unwrap(); // imitate the complete record at 3 %) assert_eq!(sk.get_epoch(), 1); } } diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index a8b4e33ad2..1febd71842 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -3,20 +3,16 @@ use crate::handler::SafekeeperPostgresHandler; use crate::timeline::{ReplicaState, Timeline, TimelineTools}; +use crate::wal_storage::WalReader; use anyhow::{bail, Context, Result}; -use postgres_ffi::xlog_utils::{ - get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, -}; +use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::cmp::min; -use std::fs::File; -use std::io::{Read, Seek, SeekFrom}; use std::net::Shutdown; -use std::path::Path; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -194,24 +190,6 @@ impl ReplicationConn { Ok(()) } - /// Helper function for opening a wal file. - fn open_wal_file(wal_file_path: &Path) -> Result { - // First try to open the .partial file. - let mut partial_path = wal_file_path.to_owned(); - partial_path.set_extension("partial"); - if let Ok(opened_file) = File::open(&partial_path) { - return Ok(opened_file); - } - - // If that failed, try it without the .partial extension. - File::open(&wal_file_path) - .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) - .map_err(|e| { - error!("{}", e); - e - }) - } - /// /// Handle START_REPLICATION replication command /// @@ -311,7 +289,15 @@ impl ReplicationConn { pgb.write_message(&BeMessage::CopyBothResponse)?; let mut end_pos = Lsn(0); - let mut wal_file: Option = None; + + let mut wal_reader = WalReader::new( + spg.conf.timeline_dir(&spg.timeline.get().zttid), + wal_seg_size, + start_pos, + ); + + // buffer for wal sending, limited by MAX_SEND_SIZE + let mut send_buf = vec![0u8; MAX_SEND_SIZE]; loop { if let Some(stop_pos) = stop_pos { @@ -345,53 +331,26 @@ impl ReplicationConn { } } - // Take the `File` from `wal_file`, or open a new file. - let mut file = match wal_file.take() { - Some(file) => file, - None => { - // Open a new file. - let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let wal_file_path = spg - .conf - .timeline_dir(&spg.timeline.get().zttid) - .join(wal_file_name); - Self::open_wal_file(&wal_file_path)? - } - }; - - let xlogoff = start_pos.segment_offset(wal_seg_size) as usize; - - // How much to read and send in message? We cannot cross the WAL file - // boundary, and we don't want send more than MAX_SEND_SIZE. let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; - let send_size = min(send_size, wal_seg_size - xlogoff); - let send_size = min(send_size, MAX_SEND_SIZE); + let send_size = min(send_size, send_buf.len()); - // Read some data from the file. - let mut file_buf = vec![0u8; send_size]; - file.seek(SeekFrom::Start(xlogoff as u64)) - .and_then(|_| file.read_exact(&mut file_buf)) - .context("Failed to read data from WAL file")?; + let send_buf = &mut send_buf[..send_size]; + + // read wal into buffer + let send_size = wal_reader.read(send_buf)?; + let send_buf = &send_buf[..send_size]; // Write some data to the network socket. pgb.write_message(&BeMessage::XLogData(XLogDataBody { wal_start: start_pos.0, wal_end: end_pos.0, timestamp: get_current_timestamp(), - data: &file_buf, + data: send_buf, })) .context("Failed to send XLogData")?; start_pos += send_size as u64; - trace!("sent WAL up to {}", start_pos); - - // Decide whether to reuse this file. If we don't set wal_file here - // a new file will be opened next time. - if start_pos.segment_offset(wal_seg_size) != 0 { - wal_file = Some(file); - } } Ok(()) } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index ff3e8f8183..c639e81b79 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -1,42 +1,35 @@ //! This module contains timeline id -> safekeeper state map with file-backed //! persistence and support for interaction between sending and receiving wal. -use anyhow::{bail, ensure, Context, Result}; -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use anyhow::{Context, Result}; + use lazy_static::lazy_static; -use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI}; + use std::cmp::{max, min}; use std::collections::HashMap; -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; +use std::fs::{self}; + use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tracing::*; -use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; -use zenith_utils::bin_ser::LeSer; + use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZTenantTimelineId; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; +use crate::control_file::{self, CreateControlFile}; + use crate::safekeeper::{ - AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, - Storage, SK_FORMAT_VERSION, SK_MAGIC, + AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, }; use crate::send_wal::HotStandbyFeedback; -use crate::upgrade::upgrade_control_file; +use crate::wal_storage::{self, Storage}; use crate::SafeKeeperConf; -use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; -use std::convert::TryInto; + use zenith_utils::pq_proto::ZenithFeedback; -// contains persistent metadata for safekeeper -const CONTROL_FILE_NAME: &str = "safekeeper.control"; -// needed to atomically update the state using `rename` -const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); -pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Replica status update + hot standby feedback #[derive(Debug, Clone, Copy)] @@ -75,7 +68,7 @@ impl ReplicaState { /// Shared state associated with database instance struct SharedState { /// Safekeeper object - sk: SafeKeeper, + sk: SafeKeeper, /// For receiving-sending wal cooperation /// quorum commit LSN we've notified walsenders about notified_commit_lsn: Lsn, @@ -93,23 +86,6 @@ struct SharedState { pageserver_connstr: Option, } -// A named boolean. -#[derive(Debug)] -pub enum CreateControlFile { - True, - False, -} - -lazy_static! { - static ref PERSIST_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!( - "safekeeper_persist_control_file_seconds", - "Seconds to persist and sync control file, grouped by timeline", - &["tenant_id", "timeline_id"], - DISK_WRITE_SECONDS_BUCKETS.to_vec() - ) - .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec"); -} - impl SharedState { /// Restore SharedState from control file. /// If create=false and file doesn't exist, bails out. @@ -118,32 +94,18 @@ impl SharedState { zttid: &ZTenantTimelineId, create: CreateControlFile, ) -> Result { - let state = FileStorage::load_control_file_conf(conf, zttid, create) + let state = control_file::FileStorage::load_control_file_conf(conf, zttid, create) .context("failed to load from control file")?; - let file_storage = FileStorage::new(zttid, conf); - let flush_lsn = if state.server.wal_seg_size != 0 { - let wal_dir = conf.timeline_dir(zttid); - Lsn(find_end_of_wal( - &wal_dir, - state.server.wal_seg_size as usize, - true, - state.wal_start_lsn, - )? - .0) - } else { - Lsn(0) - }; - info!( - "timeline {} created or restored: flush_lsn={}, commit_lsn={}, truncate_lsn={}", - zttid.timeline_id, flush_lsn, state.commit_lsn, state.truncate_lsn, - ); - if flush_lsn < state.commit_lsn || flush_lsn < state.truncate_lsn { - warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or truncate_lsn from control file", zttid.timeline_id); - } + + let control_store = control_file::FileStorage::new(zttid, conf); + + let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); + + info!("timeline {} created or restored", zttid.timeline_id); Ok(Self { notified_commit_lsn: Lsn(0), - sk: SafeKeeper::new(zttid.timeline_id, flush_lsn, file_storage, state), + sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, state), replicas: Vec::new(), active: false, num_computes: 0, @@ -450,7 +412,7 @@ impl Timeline { pub fn get_end_of_wal(&self) -> Lsn { let shared_state = self.mutex.lock().unwrap(); - shared_state.sk.flush_lsn + shared_state.sk.wal_store.flush_lsn() } } @@ -526,398 +488,3 @@ impl GlobalTimelines { } } } - -#[derive(Debug)] -pub struct FileStorage { - // save timeline dir to avoid reconstructing it every time - timeline_dir: PathBuf, - conf: SafeKeeperConf, - persist_control_file_seconds: Histogram, -} - -impl FileStorage { - fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage { - let timeline_dir = conf.timeline_dir(zttid); - let tenant_id = zttid.tenant_id.to_string(); - let timeline_id = zttid.timeline_id.to_string(); - FileStorage { - timeline_dir, - conf: conf.clone(), - persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS - .with_label_values(&[&tenant_id, &timeline_id]), - } - } - - // Check the magic/version in the on-disk data and deserialize it, if possible. - fn deser_sk_state(buf: &mut &[u8]) -> Result { - // Read the version independent part - let magic = buf.read_u32::()?; - if magic != SK_MAGIC { - bail!( - "bad control file magic: {:X}, expected {:X}", - magic, - SK_MAGIC - ); - } - let version = buf.read_u32::()?; - if version == SK_FORMAT_VERSION { - let res = SafeKeeperState::des(buf)?; - return Ok(res); - } - // try to upgrade - upgrade_control_file(buf, version) - } - - // Load control file for given zttid at path specified by conf. - fn load_control_file_conf( - conf: &SafeKeeperConf, - zttid: &ZTenantTimelineId, - create: CreateControlFile, - ) -> Result { - let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME); - Self::load_control_file(path, create) - } - - /// Read in the control file. - /// If create=false and file doesn't exist, bails out. - pub fn load_control_file>( - control_file_path: P, - create: CreateControlFile, - ) -> Result { - info!( - "loading control file {}, create={:?}", - control_file_path.as_ref().display(), - create, - ); - - let mut control_file = OpenOptions::new() - .read(true) - .write(true) - .create(matches!(create, CreateControlFile::True)) - .open(&control_file_path) - .with_context(|| { - format!( - "failed to open control file at {}", - control_file_path.as_ref().display(), - ) - })?; - - // Empty file is legit on 'create', don't try to deser from it. - let state = if control_file.metadata().unwrap().len() == 0 { - if let CreateControlFile::False = create { - bail!("control file is empty"); - } - SafeKeeperState::new() - } else { - let mut buf = Vec::new(); - control_file - .read_to_end(&mut buf) - .context("failed to read control file")?; - - let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); - - let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = - buf[buf.len() - CHECKSUM_SIZE..].try_into()?; - let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); - - ensure!( - calculated_checksum == expected_checksum, - format!( - "safekeeper control file checksum mismatch: expected {} got {}", - expected_checksum, calculated_checksum - ) - ); - - FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context( - || { - format!( - "while reading control file {}", - control_file_path.as_ref().display(), - ) - }, - )? - }; - Ok(state) - } - - /// Helper returning full path to WAL segment file and its .partial brother. - fn wal_file_paths(&self, segno: XLogSegNo, wal_seg_size: usize) -> (PathBuf, PathBuf) { - let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let wal_file_path = self.timeline_dir.join(wal_file_name.clone()); - let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial"); - (wal_file_path, wal_file_partial_path) - } -} - -impl Storage for FileStorage { - // persists state durably to underlying storage - // for description see https://lwn.net/Articles/457667/ - fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { - let _timer = &self.persist_control_file_seconds.start_timer(); - - // write data to safekeeper.control.partial - let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); - let mut control_partial = File::create(&control_partial_path).with_context(|| { - format!( - "failed to create partial control file at: {}", - &control_partial_path.display() - ) - })?; - let mut buf: Vec = Vec::new(); - buf.write_u32::(SK_MAGIC)?; - buf.write_u32::(SK_FORMAT_VERSION)?; - s.ser_into(&mut buf)?; - - // calculate checksum before resize - let checksum = crc32c::crc32c(&buf); - buf.extend_from_slice(&checksum.to_le_bytes()); - - control_partial.write_all(&buf).with_context(|| { - format!( - "failed to write safekeeper state into control file at: {}", - control_partial_path.display() - ) - })?; - - // fsync the file - control_partial.sync_all().with_context(|| { - format!( - "failed to sync partial control file at {}", - control_partial_path.display() - ) - })?; - - let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); - - // rename should be atomic - fs::rename(&control_partial_path, &control_path)?; - // this sync is not required by any standard but postgres does this (see durable_rename) - File::open(&control_path) - .and_then(|f| f.sync_all()) - .with_context(|| { - format!( - "failed to sync control file at: {}", - &control_path.display() - ) - })?; - - // fsync the directory (linux specific) - File::open(&self.timeline_dir) - .and_then(|f| f.sync_all()) - .context("failed to sync control file directory")?; - Ok(()) - } - - fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()> { - let mut bytes_left: usize = buf.len(); - let mut bytes_written: usize = 0; - let mut partial; - let mut start_pos = startpos; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - let wal_seg_size = server.wal_seg_size as usize; - - /* Extract WAL location for this block */ - let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize; - - while bytes_left != 0 { - let bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach wal - * segment size. - */ - if xlogoff + bytes_left > wal_seg_size { - bytes_to_write = wal_seg_size - xlogoff; - } else { - bytes_to_write = bytes_left; - } - - /* Open file */ - let segno = start_pos.segment_number(wal_seg_size); - let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) - { - /* Try to open existed partial file */ - wal_file = file; - partial = true; - } else { - /* Create and fill new partial file */ - partial = true; - match OpenOptions::new() - .create(true) - .write(true) - .open(&wal_file_partial_path) - { - Ok(mut file) => { - for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { - file.write_all(ZERO_BLOCK)?; - } - wal_file = file; - } - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); - } - } - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; - - // Flush file, if not said otherwise - if !self.conf.no_sync { - wal_file.sync_all()?; - } - } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - start_pos += bytes_to_write as u64; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if start_pos.segment_offset(wal_seg_size) == 0 { - xlogoff = 0; - if partial { - fs::rename(&wal_file_partial_path, &wal_file_path)?; - } - } - } - Ok(()) - } - - fn truncate_wal(&mut self, server: &ServerInfo, end_pos: Lsn) -> Result<()> { - let partial; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - let wal_seg_size = server.wal_seg_size as usize; - - /* Extract WAL location for this block */ - let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize; - - /* Open file */ - let mut segno = end_pos.segment_number(wal_seg_size); - let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else { - wal_file = OpenOptions::new() - .write(true) - .open(&wal_file_partial_path)?; - partial = true; - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - while xlogoff < wal_seg_size { - let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff); - wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?; - xlogoff += bytes_to_write; - } - // Flush file, if not said otherwise - if !self.conf.no_sync { - wal_file.sync_all()?; - } - } - if !partial { - // Make segment partial once again - fs::rename(&wal_file_path, &wal_file_partial_path)?; - } - // Remove all subsequent segments - loop { - segno += 1; - let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); - // TODO: better use fs::try_exists which is currenty avaialble only in nightly build - if wal_file_path.exists() { - fs::remove_file(&wal_file_path)?; - } else if wal_file_partial_path.exists() { - fs::remove_file(&wal_file_partial_path)?; - } else { - break; - } - } - Ok(()) - } -} - -#[cfg(test)] -mod test { - use super::FileStorage; - use crate::{ - safekeeper::{SafeKeeperState, Storage}, - timeline::{CreateControlFile, CONTROL_FILE_NAME}, - SafeKeeperConf, ZTenantTimelineId, - }; - use anyhow::Result; - use std::fs; - use zenith_utils::lsn::Lsn; - - fn stub_conf() -> SafeKeeperConf { - let workdir = tempfile::tempdir().unwrap().into_path(); - SafeKeeperConf { - workdir, - ..Default::default() - } - } - - fn load_from_control_file( - conf: &SafeKeeperConf, - zttid: &ZTenantTimelineId, - create: CreateControlFile, - ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); - Ok(( - FileStorage::new(zttid, conf), - FileStorage::load_control_file_conf(conf, zttid, create)?, - )) - } - - #[test] - fn test_read_write_safekeeper_state() { - let conf = stub_conf(); - let zttid = ZTenantTimelineId::generate(); - { - let (mut storage, mut state) = - load_from_control_file(&conf, &zttid, CreateControlFile::True) - .expect("failed to read state"); - // change something - state.wal_start_lsn = Lsn(42); - storage.persist(&state).expect("failed to persist state"); - } - - let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False) - .expect("failed to read state"); - assert_eq!(state.wal_start_lsn, Lsn(42)); - } - - #[test] - fn test_safekeeper_state_checksum_mismatch() { - let conf = stub_conf(); - let zttid = ZTenantTimelineId::generate(); - { - let (mut storage, mut state) = - load_from_control_file(&conf, &zttid, CreateControlFile::True) - .expect("failed to read state"); - // change something - state.wal_start_lsn = Lsn(42); - storage.persist(&state).expect("failed to persist state"); - } - let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME); - let mut data = fs::read(&control_path).unwrap(); - data[0] += 1; // change the first byte of the file to fail checksum validation - fs::write(&control_path, &data).expect("failed to write control file"); - - match load_from_control_file(&conf, &zttid, CreateControlFile::False) { - Err(err) => assert!(err - .to_string() - .contains("safekeeper control file checksum mismatch")), - Ok(_) => panic!("expected error"), - } - } -} diff --git a/walkeeper/src/wal_storage.rs b/walkeeper/src/wal_storage.rs new file mode 100644 index 0000000000..f8abc26af9 --- /dev/null +++ b/walkeeper/src/wal_storage.rs @@ -0,0 +1,493 @@ +//! This module has everything to deal with WAL -- reading and writing to disk. +//! +//! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal. +//! PG timeline is always 1, so WAL segments are usually have names like this: +//! - 000000010000000000000001 +//! - 000000010000000000000002.partial +//! +//! Note that last file has `.partial` suffix, that's different from postgres. + +use anyhow::{anyhow, Context, Result}; +use std::io::{Read, Seek, SeekFrom}; + +use lazy_static::lazy_static; +use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI}; +use std::cmp::min; + +use std::fs::{self, File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use tracing::*; + +use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTenantTimelineId; + +use crate::safekeeper::SafeKeeperState; + +use crate::SafeKeeperConf; +use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; + +use postgres_ffi::waldecoder::WalStreamDecoder; + +use zenith_metrics::{ + register_gauge_vec, register_histogram_vec, Gauge, GaugeVec, Histogram, HistogramVec, + DISK_WRITE_SECONDS_BUCKETS, +}; + +lazy_static! { + // The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). + // i64 is faster than f64, so update to u64 when available. + static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!( + "safekeeper_flush_lsn", + "Current flush_lsn, grouped by timeline", + &["tenant_id", "timeline_id"] + ) + .expect("Failed to register safekeeper_flush_lsn gauge vec"); + static ref WRITE_WAL_BYTES: HistogramVec = register_histogram_vec!( + "safekeeper_write_wal_bytes", + "Bytes written to WAL in a single request, grouped by timeline", + &["tenant_id", "timeline_id"], + vec![1.0, 10.0, 100.0, 1024.0, 8192.0, 128.0 * 1024.0, 1024.0 * 1024.0, 10.0 * 1024.0 * 1024.0] + ) + .expect("Failed to register safekeeper_write_wal_bytes histogram vec"); + static ref WRITE_WAL_SECONDS: HistogramVec = register_histogram_vec!( + "safekeeper_write_wal_seconds", + "Seconds spent writing and syncing WAL to a disk in a single request, grouped by timeline", + &["tenant_id", "timeline_id"], + DISK_WRITE_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_write_wal_seconds histogram vec"); +} + +struct WalStorageMetrics { + flush_lsn: Gauge, + write_wal_bytes: Histogram, + write_wal_seconds: Histogram, +} + +impl WalStorageMetrics { + fn new(zttid: &ZTenantTimelineId) -> Self { + let tenant_id = zttid.tenant_id.to_string(); + let timeline_id = zttid.timeline_id.to_string(); + Self { + flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), + write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]), + write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), + } + } +} + +pub trait Storage { + /// lsn of last durably stored WAL record. + fn flush_lsn(&self) -> Lsn; + + /// Init storage with wal_seg_size and read WAL from disk to get latest lsn. + fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>; + + /// Write piece of wal in buf to disk and sync it. + fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>; + + // Truncate WAL at specified LSN. + fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>; +} + +pub struct PhysicalStorage { + metrics: WalStorageMetrics, + zttid: ZTenantTimelineId, + timeline_dir: PathBuf, + conf: SafeKeeperConf, + + // fields below are filled upon initialization + + // None if unitialized, Some(lsn) if storage is initialized + wal_seg_size: Option, + + // Relationship of lsns: + // `write_lsn` >= `write_record_lsn` >= `flush_record_lsn` + // + // All lsns are zeroes, if storage is just created, and there are no segments on disk. + + // Written to disk, but possibly still in the cache and not fully persisted. + // Also can be ahead of record_lsn, if happen to be in the middle of a WAL record. + write_lsn: Lsn, + + // The LSN of the last WAL record written to disk. Still can be not fully flushed. + write_record_lsn: Lsn, + + // The LSN of the last WAL record flushed to disk. + flush_record_lsn: Lsn, + + // Decoder is required for detecting boundaries of WAL records. + decoder: WalStreamDecoder, +} + +impl PhysicalStorage { + pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> PhysicalStorage { + let timeline_dir = conf.timeline_dir(zttid); + PhysicalStorage { + metrics: WalStorageMetrics::new(zttid), + zttid: *zttid, + timeline_dir, + conf: conf.clone(), + wal_seg_size: None, + write_lsn: Lsn(0), + write_record_lsn: Lsn(0), + flush_record_lsn: Lsn(0), + decoder: WalStreamDecoder::new(Lsn(0)), + } + } + + // wrapper for flush_lsn updates that also updates metrics + fn update_flush_lsn(&mut self) { + self.flush_record_lsn = self.write_record_lsn; + self.metrics.flush_lsn.set(self.flush_record_lsn.0 as f64); + } + + /// Helper returning full path to WAL segment file and its .partial brother. + fn wal_file_paths(&self, segno: XLogSegNo) -> Result<(PathBuf, PathBuf)> { + let wal_seg_size = self + .wal_seg_size + .ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?; + + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self.timeline_dir.join(wal_file_name.clone()); + let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial"); + Ok((wal_file_path, wal_file_partial_path)) + } + + // TODO: this function is going to be refactored soon, what will change: + // - flush will be called separately from write_wal, this function + // will only write bytes to disk + // - File will be cached in PhysicalStorage, to remove extra syscalls, + // such as open(), seek(), close() + fn write_and_flush(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + let wal_seg_size = self + .wal_seg_size + .ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?; + + let mut bytes_left: usize = buf.len(); + let mut bytes_written: usize = 0; + let mut partial; + let mut start_pos = startpos; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + + /* Extract WAL location for this block */ + let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize; + + while bytes_left != 0 { + let bytes_to_write; + + /* + * If crossing a WAL boundary, only write up until we reach wal + * segment size. + */ + if xlogoff + bytes_left > wal_seg_size { + bytes_to_write = wal_seg_size - xlogoff; + } else { + bytes_to_write = bytes_left; + } + + /* Open file */ + let segno = start_pos.segment_number(wal_seg_size); + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?; + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) + { + /* Try to open existed partial file */ + wal_file = file; + partial = true; + } else { + /* Create and fill new partial file */ + partial = true; + match OpenOptions::new() + .create(true) + .write(true) + .open(&wal_file_partial_path) + { + Ok(mut file) => { + for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { + file.write_all(ZERO_BLOCK)?; + } + wal_file = file; + } + Err(e) => { + error!("Failed to open log file {:?}: {}", &wal_file_path, e); + return Err(e.into()); + } + } + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; + + // Flush file, if not said otherwise + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + start_pos += bytes_to_write as u64; + xlogoff += bytes_to_write; + + /* Did we reach the end of a WAL segment? */ + if start_pos.segment_offset(wal_seg_size) == 0 { + xlogoff = 0; + if partial { + fs::rename(&wal_file_partial_path, &wal_file_path)?; + } + } + } + Ok(()) + } +} + +impl Storage for PhysicalStorage { + // flush_lsn returns lsn of last durably stored WAL record. + fn flush_lsn(&self) -> Lsn { + self.flush_record_lsn + } + + // Storage needs to know wal_seg_size to know which segment to read/write, but + // wal_seg_size is not always known at the moment of storage creation. This method + // allows to postpone its initialization. + fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> { + if state.server.wal_seg_size == 0 { + // wal_seg_size is still unknown + return Ok(()); + } + + if let Some(wal_seg_size) = self.wal_seg_size { + // physical storage is already initialized + assert_eq!(wal_seg_size, state.server.wal_seg_size as usize); + return Ok(()); + } + + // initialize physical storage + let wal_seg_size = state.server.wal_seg_size as usize; + self.wal_seg_size = Some(wal_seg_size); + + // we need to read WAL from disk to know which LSNs are stored on disk + self.write_lsn = + Lsn(find_end_of_wal(&self.timeline_dir, wal_seg_size, true, state.wal_start_lsn)?.0); + + self.write_record_lsn = self.write_lsn; + + // TODO: do we really know that write_lsn is fully flushed to disk? + // If not, maybe it's better to call fsync() here to be sure? + self.update_flush_lsn(); + + info!( + "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, truncate_lsn={}", + self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.truncate_lsn, + ); + if self.flush_record_lsn < state.commit_lsn || self.flush_record_lsn < state.truncate_lsn { + warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or truncate_lsn from control file", self.zttid.timeline_id); + } + + Ok(()) + } + + // Write and flush WAL to disk. + fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + if self.write_lsn > startpos { + warn!( + "write_wal rewrites WAL written before, write_lsn={}, startpos={}", + self.write_lsn, startpos + ); + } + if self.write_lsn < startpos { + warn!( + "write_wal creates gap in written WAL, write_lsn={}, startpos={}", + self.write_lsn, startpos + ); + // TODO: return error if write_lsn is not zero + } + + { + let _timer = self.metrics.write_wal_seconds.start_timer(); + self.write_and_flush(startpos, buf)?; + } + + // WAL is written and flushed, updating lsns + self.write_lsn = startpos + buf.len() as u64; + self.metrics.write_wal_bytes.observe(buf.len() as f64); + + // figure out last record's end lsn for reporting (if we got the + // whole record) + if self.decoder.available() != startpos { + info!( + "restart decoder from {} to {}", + self.decoder.available(), + startpos, + ); + self.decoder = WalStreamDecoder::new(startpos); + } + self.decoder.feed_bytes(buf); + loop { + match self.decoder.poll_decode()? { + None => break, // no full record yet + Some((lsn, _rec)) => { + self.write_record_lsn = lsn; + } + } + } + + self.update_flush_lsn(); + Ok(()) + } + + // Truncate written WAL by removing all WAL segments after the given LSN. + // end_pos must point to the end of the WAL record. + fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { + let wal_seg_size = self + .wal_seg_size + .ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?; + + // TODO: cross check divergence point + + // nothing to truncate + if self.write_lsn == Lsn(0) { + return Ok(()); + } + + // Streaming must not create a hole, so truncate cannot be called on non-written lsn + assert!(self.write_lsn >= end_pos); + + // open segment files and delete or fill end with zeroes + + let partial; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + + /* Extract WAL location for this block */ + let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize; + + /* Open file */ + let mut segno = end_pos.segment_number(wal_seg_size); + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?; + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else { + wal_file = OpenOptions::new() + .write(true) + .open(&wal_file_partial_path)?; + partial = true; + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + while xlogoff < wal_seg_size { + let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff); + wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?; + xlogoff += bytes_to_write; + } + // Flush file, if not said otherwise + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + if !partial { + // Make segment partial once again + fs::rename(&wal_file_path, &wal_file_partial_path)?; + } + // Remove all subsequent segments + loop { + segno += 1; + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?; + // TODO: better use fs::try_exists which is currenty avaialble only in nightly build + if wal_file_path.exists() { + fs::remove_file(&wal_file_path)?; + } else if wal_file_partial_path.exists() { + fs::remove_file(&wal_file_partial_path)?; + } else { + break; + } + } + + // Update lsns + self.write_lsn = end_pos; + self.write_record_lsn = end_pos; + self.update_flush_lsn(); + Ok(()) + } +} + +pub struct WalReader { + timeline_dir: PathBuf, + wal_seg_size: usize, + pos: Lsn, + file: Option, +} + +impl WalReader { + pub fn new(timeline_dir: PathBuf, wal_seg_size: usize, pos: Lsn) -> Self { + Self { + timeline_dir, + wal_seg_size, + pos, + file: None, + } + } + + pub fn read(&mut self, buf: &mut [u8]) -> Result { + // Take the `File` from `wal_file`, or open a new file. + let mut file = match self.file.take() { + Some(file) => file, + None => { + // Open a new file. + let segno = self.pos.segment_number(self.wal_seg_size); + let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size); + let wal_file_path = self.timeline_dir.join(wal_file_name); + Self::open_wal_file(&wal_file_path)? + } + }; + + let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize; + + // How much to read and send in message? We cannot cross the WAL file + // boundary, and we don't want send more than provided buffer. + let send_size = min(buf.len(), self.wal_seg_size - xlogoff); + + // Read some data from the file. + let buf = &mut buf[0..send_size]; + file.seek(SeekFrom::Start(xlogoff as u64)) + .and_then(|_| file.read_exact(buf)) + .context("Failed to read data from WAL file")?; + + self.pos += send_size as u64; + + // Decide whether to reuse this file. If we don't set wal_file here + // a new file will be opened next time. + if self.pos.segment_offset(self.wal_seg_size) != 0 { + self.file = Some(file); + } + + Ok(send_size) + } + + /// Helper function for opening a wal file. + fn open_wal_file(wal_file_path: &Path) -> Result { + // First try to open the .partial file. + let mut partial_path = wal_file_path.to_owned(); + partial_path.set_extension("partial"); + if let Ok(opened_file) = File::open(&partial_path) { + return Ok(opened_file); + } + + // If that failed, try it without the .partial extension. + File::open(&wal_file_path) + .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) + .map_err(|e| { + error!("{}", e); + e + }) + } +}