From dba1d36a4aa4c2c1176241711fa9a12f09c44277 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Mon, 21 Feb 2022 17:20:53 +0300 Subject: [PATCH] Refactor WAL utils in safekeeper (#1290) wal_storage.rs was split up from timeline.rs, safekeeper.rs and send_wal.rs, and now contains all WAL related code from the safekeeper. Now there are PhysicalStorage for persisting WAL to disk and WalReader for reading it. This allows optimizing PhysicalStorage without affecting too much of other code. Also there is a separate structure for persisting control file now in control_file.rs. --- walkeeper/src/bin/safekeeper.rs | 7 +- walkeeper/src/control_file.rs | 297 +++++++++++ .../{upgrade.rs => control_file_upgrade.rs} | 0 walkeeper/src/handler.rs | 2 +- walkeeper/src/http/routes.rs | 2 +- walkeeper/src/lib.rs | 4 +- walkeeper/src/safekeeper.rs | 230 +++----- walkeeper/src/send_wal.rs | 77 +-- walkeeper/src/timeline.rs | 475 +---------------- walkeeper/src/wal_storage.rs | 493 ++++++++++++++++++ 10 files changed, 918 insertions(+), 669 deletions(-) create mode 100644 walkeeper/src/control_file.rs rename walkeeper/src/{upgrade.rs => control_file_upgrade.rs} (100%) create mode 100644 walkeeper/src/wal_storage.rs diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 48f0e8a443..ea5d0cba14 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -10,7 +10,7 @@ use std::fs::File; use std::path::{Path, PathBuf}; use std::thread; use tracing::*; -use walkeeper::timeline::{CreateControlFile, FileStorage}; +use walkeeper::control_file::{self, CreateControlFile}; use zenith_utils::http::endpoint; use zenith_utils::{logging, tcp_listener, GIT_VERSION}; @@ -96,7 +96,10 @@ fn main() -> Result<()> { .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { - let state = FileStorage::load_control_file(Path::new(addr), CreateControlFile::False)?; + let state = control_file::FileStorage::load_control_file( + Path::new(addr), + CreateControlFile::False, + )?; let json = serde_json::to_string(&state)?; print!("{}", json); return Ok(()); diff --git a/walkeeper/src/control_file.rs b/walkeeper/src/control_file.rs new file mode 100644 index 0000000000..9732cdca14 --- /dev/null +++ b/walkeeper/src/control_file.rs @@ -0,0 +1,297 @@ +//! Control file serialization, deserialization and persistence. + +use anyhow::{bail, ensure, Context, Result}; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use lazy_static::lazy_static; + +use std::fs::{self, File, OpenOptions}; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; + +use tracing::*; +use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; +use zenith_utils::bin_ser::LeSer; + +use zenith_utils::zid::ZTenantTimelineId; + +use crate::control_file_upgrade::upgrade_control_file; +use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC}; + +use crate::SafeKeeperConf; + +use std::convert::TryInto; + +// contains persistent metadata for safekeeper +const CONTROL_FILE_NAME: &str = "safekeeper.control"; +// needed to atomically update the state using `rename` +const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; +pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); + +// A named boolean. +#[derive(Debug)] +pub enum CreateControlFile { + True, + False, +} + +lazy_static! { + static ref PERSIST_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!( + "safekeeper_persist_control_file_seconds", + "Seconds to persist and sync control file, grouped by timeline", + &["tenant_id", "timeline_id"], + DISK_WRITE_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec"); +} + +pub trait Storage { + /// Persist safekeeper state on disk. + fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; +} + +#[derive(Debug)] +pub struct FileStorage { + // save timeline dir to avoid reconstructing it every time + timeline_dir: PathBuf, + conf: SafeKeeperConf, + persist_control_file_seconds: Histogram, +} + +impl FileStorage { + pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage { + let timeline_dir = conf.timeline_dir(zttid); + let tenant_id = zttid.tenant_id.to_string(); + let timeline_id = zttid.timeline_id.to_string(); + FileStorage { + timeline_dir, + conf: conf.clone(), + persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS + .with_label_values(&[&tenant_id, &timeline_id]), + } + } + + // Check the magic/version in the on-disk data and deserialize it, if possible. + fn deser_sk_state(buf: &mut &[u8]) -> Result { + // Read the version independent part + let magic = buf.read_u32::()?; + if magic != SK_MAGIC { + bail!( + "bad control file magic: {:X}, expected {:X}", + magic, + SK_MAGIC + ); + } + let version = buf.read_u32::()?; + if version == SK_FORMAT_VERSION { + let res = SafeKeeperState::des(buf)?; + return Ok(res); + } + // try to upgrade + upgrade_control_file(buf, version) + } + + // Load control file for given zttid at path specified by conf. + pub fn load_control_file_conf( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + create: CreateControlFile, + ) -> Result { + let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME); + Self::load_control_file(path, create) + } + + /// Read in the control file. + /// If create=false and file doesn't exist, bails out. + pub fn load_control_file>( + control_file_path: P, + create: CreateControlFile, + ) -> Result { + info!( + "loading control file {}, create={:?}", + control_file_path.as_ref().display(), + create, + ); + + let mut control_file = OpenOptions::new() + .read(true) + .write(true) + .create(matches!(create, CreateControlFile::True)) + .open(&control_file_path) + .with_context(|| { + format!( + "failed to open control file at {}", + control_file_path.as_ref().display(), + ) + })?; + + // Empty file is legit on 'create', don't try to deser from it. + let state = if control_file.metadata().unwrap().len() == 0 { + if let CreateControlFile::False = create { + bail!("control file is empty"); + } + SafeKeeperState::new() + } else { + let mut buf = Vec::new(); + control_file + .read_to_end(&mut buf) + .context("failed to read control file")?; + + let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); + + let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = + buf[buf.len() - CHECKSUM_SIZE..].try_into()?; + let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); + + ensure!( + calculated_checksum == expected_checksum, + format!( + "safekeeper control file checksum mismatch: expected {} got {}", + expected_checksum, calculated_checksum + ) + ); + + FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context( + || { + format!( + "while reading control file {}", + control_file_path.as_ref().display(), + ) + }, + )? + }; + Ok(state) + } +} + +impl Storage for FileStorage { + // persists state durably to underlying storage + // for description see https://lwn.net/Articles/457667/ + fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { + let _timer = &self.persist_control_file_seconds.start_timer(); + + // write data to safekeeper.control.partial + let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); + let mut control_partial = File::create(&control_partial_path).with_context(|| { + format!( + "failed to create partial control file at: {}", + &control_partial_path.display() + ) + })?; + let mut buf: Vec = Vec::new(); + buf.write_u32::(SK_MAGIC)?; + buf.write_u32::(SK_FORMAT_VERSION)?; + s.ser_into(&mut buf)?; + + // calculate checksum before resize + let checksum = crc32c::crc32c(&buf); + buf.extend_from_slice(&checksum.to_le_bytes()); + + control_partial.write_all(&buf).with_context(|| { + format!( + "failed to write safekeeper state into control file at: {}", + control_partial_path.display() + ) + })?; + + // fsync the file + control_partial.sync_all().with_context(|| { + format!( + "failed to sync partial control file at {}", + control_partial_path.display() + ) + })?; + + let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); + + // rename should be atomic + fs::rename(&control_partial_path, &control_path)?; + // this sync is not required by any standard but postgres does this (see durable_rename) + File::open(&control_path) + .and_then(|f| f.sync_all()) + .with_context(|| { + format!( + "failed to sync control file at: {}", + &control_path.display() + ) + })?; + + // fsync the directory (linux specific) + File::open(&self.timeline_dir) + .and_then(|f| f.sync_all()) + .context("failed to sync control file directory")?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::FileStorage; + use super::*; + use crate::{safekeeper::SafeKeeperState, SafeKeeperConf, ZTenantTimelineId}; + use anyhow::Result; + use std::fs; + use zenith_utils::lsn::Lsn; + + fn stub_conf() -> SafeKeeperConf { + let workdir = tempfile::tempdir().unwrap().into_path(); + SafeKeeperConf { + workdir, + ..Default::default() + } + } + + fn load_from_control_file( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + create: CreateControlFile, + ) -> Result<(FileStorage, SafeKeeperState)> { + fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); + Ok(( + FileStorage::new(zttid, conf), + FileStorage::load_control_file_conf(conf, zttid, create)?, + )) + } + + #[test] + fn test_read_write_safekeeper_state() { + let conf = stub_conf(); + let zttid = ZTenantTimelineId::generate(); + { + let (mut storage, mut state) = + load_from_control_file(&conf, &zttid, CreateControlFile::True) + .expect("failed to read state"); + // change something + state.wal_start_lsn = Lsn(42); + storage.persist(&state).expect("failed to persist state"); + } + + let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False) + .expect("failed to read state"); + assert_eq!(state.wal_start_lsn, Lsn(42)); + } + + #[test] + fn test_safekeeper_state_checksum_mismatch() { + let conf = stub_conf(); + let zttid = ZTenantTimelineId::generate(); + { + let (mut storage, mut state) = + load_from_control_file(&conf, &zttid, CreateControlFile::True) + .expect("failed to read state"); + // change something + state.wal_start_lsn = Lsn(42); + storage.persist(&state).expect("failed to persist state"); + } + let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME); + let mut data = fs::read(&control_path).unwrap(); + data[0] += 1; // change the first byte of the file to fail checksum validation + fs::write(&control_path, &data).expect("failed to write control file"); + + match load_from_control_file(&conf, &zttid, CreateControlFile::False) { + Err(err) => assert!(err + .to_string() + .contains("safekeeper control file checksum mismatch")), + Ok(_) => panic!("expected error"), + } + } +} diff --git a/walkeeper/src/upgrade.rs b/walkeeper/src/control_file_upgrade.rs similarity index 100% rename from walkeeper/src/upgrade.rs rename to walkeeper/src/control_file_upgrade.rs diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs index 21890764db..5367954842 100644 --- a/walkeeper/src/handler.rs +++ b/walkeeper/src/handler.rs @@ -19,7 +19,7 @@ use zenith_utils::pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; use crate::callmemaybe::CallmeEvent; -use crate::timeline::CreateControlFile; +use crate::control_file::CreateControlFile; use tokio::sync::mpsc::UnboundedSender; /// Safekeeper handler of postgres commands diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index b8d9a4ae36..11a29ac6d3 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -7,9 +7,9 @@ use zenith_utils::http::{RequestExt, RouterBuilder}; use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZTenantTimelineId; +use crate::control_file::CreateControlFile; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; -use crate::timeline::CreateControlFile; use crate::timeline::GlobalTimelines; use crate::SafeKeeperConf; use zenith_utils::http::endpoint; diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index c29ef79e90..6c3e0b264e 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -5,6 +5,8 @@ use std::time::Duration; use zenith_utils::zid::ZTenantTimelineId; pub mod callmemaybe; +pub mod control_file; +pub mod control_file_upgrade; pub mod handler; pub mod http; pub mod json_ctrl; @@ -13,8 +15,8 @@ pub mod s3_offload; pub mod safekeeper; pub mod send_wal; pub mod timeline; -pub mod upgrade; pub mod wal_service; +pub mod wal_storage; pub mod defaults { use const_format::formatcp; diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index bb22672f98..981a0f4d57 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -3,7 +3,7 @@ use anyhow::{bail, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use postgres_ffi::waldecoder::WalStreamDecoder; + use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; use std::cmp::min; @@ -13,12 +13,11 @@ use tracing::*; use lazy_static::lazy_static; +use crate::control_file; use crate::send_wal::HotStandbyFeedback; +use crate::wal_storage; use postgres_ffi::xlog_utils::MAX_SEND_SIZE; -use zenith_metrics::{ - register_gauge_vec, register_histogram_vec, Gauge, GaugeVec, Histogram, HistogramVec, - DISK_WRITE_SECONDS_BUCKETS, -}; +use zenith_metrics::{register_gauge_vec, Gauge, GaugeVec}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use zenith_utils::pq_proto::SystemId; @@ -407,133 +406,87 @@ impl AcceptorProposerMessage { } } -pub trait Storage { - /// Persist safekeeper state on disk. - fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; - /// Write piece of wal in buf to disk and sync it. - fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>; - // Truncate WAL at specified LSN - fn truncate_wal(&mut self, s: &ServerInfo, endpos: Lsn) -> Result<()>; -} - lazy_static! { // The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). // i64 is faster than f64, so update to u64 when available. - static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!( - "safekeeper_flush_lsn", - "Current flush_lsn, grouped by timeline", - &["tenant_id", "timeline_id"] - ) - .expect("Failed to register safekeeper_flush_lsn gauge vec"); static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!( "safekeeper_commit_lsn", "Current commit_lsn (not necessarily persisted to disk), grouped by timeline", &["tenant_id", "timeline_id"] ) .expect("Failed to register safekeeper_commit_lsn gauge vec"); - static ref WRITE_WAL_BYTES: HistogramVec = register_histogram_vec!( - "safekeeper_write_wal_bytes", - "Bytes written to WAL in a single request, grouped by timeline", - &["tenant_id", "timeline_id"], - vec![1.0, 10.0, 100.0, 1024.0, 8192.0, 128.0 * 1024.0, 1024.0 * 1024.0, 10.0 * 1024.0 * 1024.0] - ) - .expect("Failed to register safekeeper_write_wal_bytes histogram vec"); - static ref WRITE_WAL_SECONDS: HistogramVec = register_histogram_vec!( - "safekeeper_write_wal_seconds", - "Seconds spent writing and syncing WAL to a disk in a single request, grouped by timeline", - &["tenant_id", "timeline_id"], - DISK_WRITE_SECONDS_BUCKETS.to_vec() - ) - .expect("Failed to register safekeeper_write_wal_seconds histogram vec"); } struct SafeKeeperMetrics { - flush_lsn: Gauge, commit_lsn: Gauge, - write_wal_bytes: Histogram, - write_wal_seconds: Histogram, } -struct SafeKeeperMetricsBuilder { - tenant_id: ZTenantId, - timeline_id: ZTimelineId, - flush_lsn: Lsn, - commit_lsn: Lsn, -} - -impl SafeKeeperMetricsBuilder { - fn build(self) -> SafeKeeperMetrics { - let tenant_id = self.tenant_id.to_string(); - let timeline_id = self.timeline_id.to_string(); - let m = SafeKeeperMetrics { - flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), +impl SafeKeeperMetrics { + fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId, commit_lsn: Lsn) -> Self { + let tenant_id = tenant_id.to_string(); + let timeline_id = timeline_id.to_string(); + let m = Self { commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), - write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]), - write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), }; - m.flush_lsn.set(u64::from(self.flush_lsn) as f64); - m.commit_lsn.set(u64::from(self.commit_lsn) as f64); + m.commit_lsn.set(u64::from(commit_lsn) as f64); m } } /// SafeKeeper which consumes events (messages from compute) and provides /// replies. -pub struct SafeKeeper { - /// Locally flushed part of WAL with full records (end_lsn of last record). - /// Established by reading wal. - pub flush_lsn: Lsn, +pub struct SafeKeeper { // Cached metrics so we don't have to recompute labels on each update. metrics: SafeKeeperMetrics, + /// not-yet-flushed pairs of same named fields in s.* pub commit_lsn: Lsn, pub truncate_lsn: Lsn, - pub storage: ST, pub s: SafeKeeperState, // persistent part - decoder: WalStreamDecoder, + + pub control_store: CTRL, + pub wal_store: WAL, } -impl SafeKeeper +impl SafeKeeper where - ST: Storage, + CTRL: control_file::Storage, + WAL: wal_storage::Storage, { // constructor pub fn new( ztli: ZTimelineId, - flush_lsn: Lsn, - storage: ST, + control_store: CTRL, + wal_store: WAL, state: SafeKeeperState, - ) -> SafeKeeper { + ) -> SafeKeeper { if state.server.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.server.timeline_id { panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.server.timeline_id); } + SafeKeeper { - flush_lsn, - metrics: SafeKeeperMetricsBuilder { - tenant_id: state.server.tenant_id, - timeline_id: ztli, - flush_lsn, - commit_lsn: state.commit_lsn, - } - .build(), + metrics: SafeKeeperMetrics::new(state.server.tenant_id, ztli, state.commit_lsn), commit_lsn: state.commit_lsn, truncate_lsn: state.truncate_lsn, - storage, s: state, - decoder: WalStreamDecoder::new(Lsn(0)), + control_store, + wal_store, } } /// Get history of term switches for the available WAL fn get_term_history(&self) -> TermHistory { - self.s.acceptor_state.term_history.up_to(self.flush_lsn) + self.s + .acceptor_state + .term_history + .up_to(self.wal_store.flush_lsn()) } #[cfg(test)] fn get_epoch(&self) -> Term { - self.s.acceptor_state.get_epoch(self.flush_lsn) + self.s.acceptor_state.get_epoch(self.wal_store.flush_lsn()) } /// Process message from proposer and possibly form reply. Concurrent @@ -575,21 +528,20 @@ where } // set basic info about server, if not yet + // TODO: verify that is doesn't change after self.s.server.system_id = msg.system_id; self.s.server.tenant_id = msg.tenant_id; self.s.server.timeline_id = msg.ztli; self.s.server.wal_seg_size = msg.wal_seg_size; - self.storage + self.control_store .persist(&self.s) .context("failed to persist shared state")?; - self.metrics = SafeKeeperMetricsBuilder { - tenant_id: self.s.server.tenant_id, - timeline_id: self.s.server.timeline_id, - flush_lsn: self.flush_lsn, - commit_lsn: self.commit_lsn, - } - .build(); + // pass wal_seg_size to read WAL and find flush_lsn + self.wal_store.init_storage(&self.s)?; + + // update tenant_id/timeline_id in metrics + self.metrics = SafeKeeperMetrics::new(msg.tenant_id, msg.ztli, self.commit_lsn); info!( "processed greeting from proposer {:?}, sending term {:?}", @@ -609,14 +561,14 @@ where let mut resp = VoteResponse { term: self.s.acceptor_state.term, vote_given: false as u64, - flush_lsn: self.flush_lsn, + flush_lsn: self.wal_store.flush_lsn(), truncate_lsn: self.s.truncate_lsn, term_history: self.get_term_history(), }; if self.s.acceptor_state.term < msg.term { self.s.acceptor_state.term = msg.term; // persist vote before sending it out - self.storage.persist(&self.s)?; + self.control_store.persist(&self.s)?; resp.term = self.s.acceptor_state.term; resp.vote_given = true as u64; } @@ -628,7 +580,7 @@ where fn bump_if_higher(&mut self, term: Term) -> Result<()> { if self.s.acceptor_state.term < term { self.s.acceptor_state.term = term; - self.storage.persist(&self.s)?; + self.control_store.persist(&self.s)?; } Ok(()) } @@ -637,7 +589,7 @@ where fn append_response(&self) -> AppendResponse { AppendResponse { term: self.s.acceptor_state.term, - flush_lsn: self.flush_lsn, + flush_lsn: self.wal_store.flush_lsn(), commit_lsn: self.s.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), @@ -653,22 +605,12 @@ where return Ok(None); } - // TODO: cross check divergence point + // truncate wal, update the lsns + self.wal_store.truncate_wal(msg.start_streaming_at)?; - // streaming must not create a hole - assert!(self.flush_lsn == Lsn(0) || self.flush_lsn >= msg.start_streaming_at); - - // truncate obsolete part of WAL - if self.flush_lsn != Lsn(0) { - self.storage - .truncate_wal(&self.s.server, msg.start_streaming_at)?; - } - // update our end of WAL pointer - self.flush_lsn = msg.start_streaming_at; - self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64); // and now adopt term history from proposer self.s.acceptor_state.term_history = msg.term_history.clone(); - self.storage.persist(&self.s)?; + self.control_store.persist(&self.s)?; info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -694,42 +636,14 @@ where // After ProposerElected, which performs truncation, we should get only // indeed append requests (but flush_lsn is advanced only on record // boundary, so might be less). - assert!(self.flush_lsn <= msg.h.begin_lsn); + assert!(self.wal_store.flush_lsn() <= msg.h.begin_lsn); self.s.proposer_uuid = msg.h.proposer_uuid; let mut sync_control_file = false; // do the job - let mut last_rec_lsn = Lsn(0); if !msg.wal_data.is_empty() { - self.metrics - .write_wal_bytes - .observe(msg.wal_data.len() as f64); - { - let _timer = self.metrics.write_wal_seconds.start_timer(); - self.storage - .write_wal(&self.s.server, msg.h.begin_lsn, &msg.wal_data)?; - } - - // figure out last record's end lsn for reporting (if we got the - // whole record) - if self.decoder.available() != msg.h.begin_lsn { - info!( - "restart decoder from {} to {}", - self.decoder.available(), - msg.h.begin_lsn, - ); - self.decoder = WalStreamDecoder::new(msg.h.begin_lsn); - } - self.decoder.feed_bytes(&msg.wal_data); - loop { - match self.decoder.poll_decode()? { - None => break, // no full record yet - Some((lsn, _rec)) => { - last_rec_lsn = lsn; - } - } - } + self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; // If this was the first record we ever receieved, remember LSN to help // find_end_of_wal skip the hole in the beginning. @@ -739,16 +653,11 @@ where } } - if last_rec_lsn > self.flush_lsn { - self.flush_lsn = last_rec_lsn; - self.metrics.flush_lsn.set(u64::from(self.flush_lsn) as f64); - } - // Advance commit_lsn taking into account what we have locally. // commit_lsn can be 0, being unknown to new walproposer while he hasn't // collected majority of its epoch acks yet, ignore it in this case. if msg.h.commit_lsn != Lsn(0) { - let commit_lsn = min(msg.h.commit_lsn, self.flush_lsn); + let commit_lsn = min(msg.h.commit_lsn, self.wal_store.flush_lsn()); // If new commit_lsn reached epoch switch, force sync of control // file: walproposer in sync mode is very interested when this // happens. Note: this is for sync-safekeepers mode only, as @@ -774,7 +683,7 @@ where } if sync_control_file { - self.storage.persist(&self.s)?; + self.control_store.persist(&self.s)?; } let resp = self.append_response(); @@ -793,34 +702,52 @@ where #[cfg(test)] mod tests { use super::*; + use crate::wal_storage::Storage; // fake storage for tests - struct InMemoryStorage { + struct InMemoryState { persisted_state: SafeKeeperState, } - impl Storage for InMemoryStorage { + impl control_file::Storage for InMemoryState { fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { self.persisted_state = s.clone(); Ok(()) } + } - fn write_wal(&mut self, _server: &ServerInfo, _startpos: Lsn, _buf: &[u8]) -> Result<()> { + struct DummyWalStore { + lsn: Lsn, + } + + impl wal_storage::Storage for DummyWalStore { + fn flush_lsn(&self) -> Lsn { + self.lsn + } + + fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> { Ok(()) } - fn truncate_wal(&mut self, _server: &ServerInfo, _end_pos: Lsn) -> Result<()> { + fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + self.lsn = startpos + buf.len() as u64; + Ok(()) + } + + fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { + self.lsn = end_pos; Ok(()) } } #[test] fn test_voting() { - let storage = InMemoryStorage { + let storage = InMemoryState { persisted_state: SafeKeeperState::new(), }; + let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::new()); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::new()); // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); @@ -831,11 +758,11 @@ mod tests { } // reboot... - let state = sk.storage.persisted_state.clone(); - let storage = InMemoryStorage { + let state = sk.control_store.persisted_state.clone(); + let storage = InMemoryState { persisted_state: state.clone(), }; - sk = SafeKeeper::new(ztli, Lsn(0), storage, state); + sk = SafeKeeper::new(ztli, storage, sk.wal_store, state); // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request); @@ -847,11 +774,12 @@ mod tests { #[test] fn test_epoch_switch() { - let storage = InMemoryStorage { + let storage = InMemoryState { persisted_state: SafeKeeperState::new(), }; + let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::new()); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::new()); let mut ar_hdr = AppendRequestHeader { term: 1, @@ -892,7 +820,7 @@ mod tests { }; let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); assert!(resp.is_ok()); - sk.flush_lsn = Lsn(3); // imitate the complete record at 3 %) + sk.wal_store.truncate_wal(Lsn(3)).unwrap(); // imitate the complete record at 3 %) assert_eq!(sk.get_epoch(), 1); } } diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index a8b4e33ad2..1febd71842 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -3,20 +3,16 @@ use crate::handler::SafekeeperPostgresHandler; use crate::timeline::{ReplicaState, Timeline, TimelineTools}; +use crate::wal_storage::WalReader; use anyhow::{bail, Context, Result}; -use postgres_ffi::xlog_utils::{ - get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, -}; +use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::cmp::min; -use std::fs::File; -use std::io::{Read, Seek, SeekFrom}; use std::net::Shutdown; -use std::path::Path; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -194,24 +190,6 @@ impl ReplicationConn { Ok(()) } - /// Helper function for opening a wal file. - fn open_wal_file(wal_file_path: &Path) -> Result { - // First try to open the .partial file. - let mut partial_path = wal_file_path.to_owned(); - partial_path.set_extension("partial"); - if let Ok(opened_file) = File::open(&partial_path) { - return Ok(opened_file); - } - - // If that failed, try it without the .partial extension. - File::open(&wal_file_path) - .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) - .map_err(|e| { - error!("{}", e); - e - }) - } - /// /// Handle START_REPLICATION replication command /// @@ -311,7 +289,15 @@ impl ReplicationConn { pgb.write_message(&BeMessage::CopyBothResponse)?; let mut end_pos = Lsn(0); - let mut wal_file: Option = None; + + let mut wal_reader = WalReader::new( + spg.conf.timeline_dir(&spg.timeline.get().zttid), + wal_seg_size, + start_pos, + ); + + // buffer for wal sending, limited by MAX_SEND_SIZE + let mut send_buf = vec![0u8; MAX_SEND_SIZE]; loop { if let Some(stop_pos) = stop_pos { @@ -345,53 +331,26 @@ impl ReplicationConn { } } - // Take the `File` from `wal_file`, or open a new file. - let mut file = match wal_file.take() { - Some(file) => file, - None => { - // Open a new file. - let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let wal_file_path = spg - .conf - .timeline_dir(&spg.timeline.get().zttid) - .join(wal_file_name); - Self::open_wal_file(&wal_file_path)? - } - }; - - let xlogoff = start_pos.segment_offset(wal_seg_size) as usize; - - // How much to read and send in message? We cannot cross the WAL file - // boundary, and we don't want send more than MAX_SEND_SIZE. let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; - let send_size = min(send_size, wal_seg_size - xlogoff); - let send_size = min(send_size, MAX_SEND_SIZE); + let send_size = min(send_size, send_buf.len()); - // Read some data from the file. - let mut file_buf = vec![0u8; send_size]; - file.seek(SeekFrom::Start(xlogoff as u64)) - .and_then(|_| file.read_exact(&mut file_buf)) - .context("Failed to read data from WAL file")?; + let send_buf = &mut send_buf[..send_size]; + + // read wal into buffer + let send_size = wal_reader.read(send_buf)?; + let send_buf = &send_buf[..send_size]; // Write some data to the network socket. pgb.write_message(&BeMessage::XLogData(XLogDataBody { wal_start: start_pos.0, wal_end: end_pos.0, timestamp: get_current_timestamp(), - data: &file_buf, + data: send_buf, })) .context("Failed to send XLogData")?; start_pos += send_size as u64; - trace!("sent WAL up to {}", start_pos); - - // Decide whether to reuse this file. If we don't set wal_file here - // a new file will be opened next time. - if start_pos.segment_offset(wal_seg_size) != 0 { - wal_file = Some(file); - } } Ok(()) } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index ff3e8f8183..c639e81b79 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -1,42 +1,35 @@ //! This module contains timeline id -> safekeeper state map with file-backed //! persistence and support for interaction between sending and receiving wal. -use anyhow::{bail, ensure, Context, Result}; -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use anyhow::{Context, Result}; + use lazy_static::lazy_static; -use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI}; + use std::cmp::{max, min}; use std::collections::HashMap; -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; +use std::fs::{self}; + use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tracing::*; -use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; -use zenith_utils::bin_ser::LeSer; + use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZTenantTimelineId; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; +use crate::control_file::{self, CreateControlFile}; + use crate::safekeeper::{ - AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, - Storage, SK_FORMAT_VERSION, SK_MAGIC, + AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, }; use crate::send_wal::HotStandbyFeedback; -use crate::upgrade::upgrade_control_file; +use crate::wal_storage::{self, Storage}; use crate::SafeKeeperConf; -use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; -use std::convert::TryInto; + use zenith_utils::pq_proto::ZenithFeedback; -// contains persistent metadata for safekeeper -const CONTROL_FILE_NAME: &str = "safekeeper.control"; -// needed to atomically update the state using `rename` -const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); -pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Replica status update + hot standby feedback #[derive(Debug, Clone, Copy)] @@ -75,7 +68,7 @@ impl ReplicaState { /// Shared state associated with database instance struct SharedState { /// Safekeeper object - sk: SafeKeeper, + sk: SafeKeeper, /// For receiving-sending wal cooperation /// quorum commit LSN we've notified walsenders about notified_commit_lsn: Lsn, @@ -93,23 +86,6 @@ struct SharedState { pageserver_connstr: Option, } -// A named boolean. -#[derive(Debug)] -pub enum CreateControlFile { - True, - False, -} - -lazy_static! { - static ref PERSIST_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!( - "safekeeper_persist_control_file_seconds", - "Seconds to persist and sync control file, grouped by timeline", - &["tenant_id", "timeline_id"], - DISK_WRITE_SECONDS_BUCKETS.to_vec() - ) - .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec"); -} - impl SharedState { /// Restore SharedState from control file. /// If create=false and file doesn't exist, bails out. @@ -118,32 +94,18 @@ impl SharedState { zttid: &ZTenantTimelineId, create: CreateControlFile, ) -> Result { - let state = FileStorage::load_control_file_conf(conf, zttid, create) + let state = control_file::FileStorage::load_control_file_conf(conf, zttid, create) .context("failed to load from control file")?; - let file_storage = FileStorage::new(zttid, conf); - let flush_lsn = if state.server.wal_seg_size != 0 { - let wal_dir = conf.timeline_dir(zttid); - Lsn(find_end_of_wal( - &wal_dir, - state.server.wal_seg_size as usize, - true, - state.wal_start_lsn, - )? - .0) - } else { - Lsn(0) - }; - info!( - "timeline {} created or restored: flush_lsn={}, commit_lsn={}, truncate_lsn={}", - zttid.timeline_id, flush_lsn, state.commit_lsn, state.truncate_lsn, - ); - if flush_lsn < state.commit_lsn || flush_lsn < state.truncate_lsn { - warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or truncate_lsn from control file", zttid.timeline_id); - } + + let control_store = control_file::FileStorage::new(zttid, conf); + + let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); + + info!("timeline {} created or restored", zttid.timeline_id); Ok(Self { notified_commit_lsn: Lsn(0), - sk: SafeKeeper::new(zttid.timeline_id, flush_lsn, file_storage, state), + sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, state), replicas: Vec::new(), active: false, num_computes: 0, @@ -450,7 +412,7 @@ impl Timeline { pub fn get_end_of_wal(&self) -> Lsn { let shared_state = self.mutex.lock().unwrap(); - shared_state.sk.flush_lsn + shared_state.sk.wal_store.flush_lsn() } } @@ -526,398 +488,3 @@ impl GlobalTimelines { } } } - -#[derive(Debug)] -pub struct FileStorage { - // save timeline dir to avoid reconstructing it every time - timeline_dir: PathBuf, - conf: SafeKeeperConf, - persist_control_file_seconds: Histogram, -} - -impl FileStorage { - fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage { - let timeline_dir = conf.timeline_dir(zttid); - let tenant_id = zttid.tenant_id.to_string(); - let timeline_id = zttid.timeline_id.to_string(); - FileStorage { - timeline_dir, - conf: conf.clone(), - persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS - .with_label_values(&[&tenant_id, &timeline_id]), - } - } - - // Check the magic/version in the on-disk data and deserialize it, if possible. - fn deser_sk_state(buf: &mut &[u8]) -> Result { - // Read the version independent part - let magic = buf.read_u32::()?; - if magic != SK_MAGIC { - bail!( - "bad control file magic: {:X}, expected {:X}", - magic, - SK_MAGIC - ); - } - let version = buf.read_u32::()?; - if version == SK_FORMAT_VERSION { - let res = SafeKeeperState::des(buf)?; - return Ok(res); - } - // try to upgrade - upgrade_control_file(buf, version) - } - - // Load control file for given zttid at path specified by conf. - fn load_control_file_conf( - conf: &SafeKeeperConf, - zttid: &ZTenantTimelineId, - create: CreateControlFile, - ) -> Result { - let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME); - Self::load_control_file(path, create) - } - - /// Read in the control file. - /// If create=false and file doesn't exist, bails out. - pub fn load_control_file>( - control_file_path: P, - create: CreateControlFile, - ) -> Result { - info!( - "loading control file {}, create={:?}", - control_file_path.as_ref().display(), - create, - ); - - let mut control_file = OpenOptions::new() - .read(true) - .write(true) - .create(matches!(create, CreateControlFile::True)) - .open(&control_file_path) - .with_context(|| { - format!( - "failed to open control file at {}", - control_file_path.as_ref().display(), - ) - })?; - - // Empty file is legit on 'create', don't try to deser from it. - let state = if control_file.metadata().unwrap().len() == 0 { - if let CreateControlFile::False = create { - bail!("control file is empty"); - } - SafeKeeperState::new() - } else { - let mut buf = Vec::new(); - control_file - .read_to_end(&mut buf) - .context("failed to read control file")?; - - let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); - - let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = - buf[buf.len() - CHECKSUM_SIZE..].try_into()?; - let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); - - ensure!( - calculated_checksum == expected_checksum, - format!( - "safekeeper control file checksum mismatch: expected {} got {}", - expected_checksum, calculated_checksum - ) - ); - - FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context( - || { - format!( - "while reading control file {}", - control_file_path.as_ref().display(), - ) - }, - )? - }; - Ok(state) - } - - /// Helper returning full path to WAL segment file and its .partial brother. - fn wal_file_paths(&self, segno: XLogSegNo, wal_seg_size: usize) -> (PathBuf, PathBuf) { - let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let wal_file_path = self.timeline_dir.join(wal_file_name.clone()); - let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial"); - (wal_file_path, wal_file_partial_path) - } -} - -impl Storage for FileStorage { - // persists state durably to underlying storage - // for description see https://lwn.net/Articles/457667/ - fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { - let _timer = &self.persist_control_file_seconds.start_timer(); - - // write data to safekeeper.control.partial - let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); - let mut control_partial = File::create(&control_partial_path).with_context(|| { - format!( - "failed to create partial control file at: {}", - &control_partial_path.display() - ) - })?; - let mut buf: Vec = Vec::new(); - buf.write_u32::(SK_MAGIC)?; - buf.write_u32::(SK_FORMAT_VERSION)?; - s.ser_into(&mut buf)?; - - // calculate checksum before resize - let checksum = crc32c::crc32c(&buf); - buf.extend_from_slice(&checksum.to_le_bytes()); - - control_partial.write_all(&buf).with_context(|| { - format!( - "failed to write safekeeper state into control file at: {}", - control_partial_path.display() - ) - })?; - - // fsync the file - control_partial.sync_all().with_context(|| { - format!( - "failed to sync partial control file at {}", - control_partial_path.display() - ) - })?; - - let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); - - // rename should be atomic - fs::rename(&control_partial_path, &control_path)?; - // this sync is not required by any standard but postgres does this (see durable_rename) - File::open(&control_path) - .and_then(|f| f.sync_all()) - .with_context(|| { - format!( - "failed to sync control file at: {}", - &control_path.display() - ) - })?; - - // fsync the directory (linux specific) - File::open(&self.timeline_dir) - .and_then(|f| f.sync_all()) - .context("failed to sync control file directory")?; - Ok(()) - } - - fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()> { - let mut bytes_left: usize = buf.len(); - let mut bytes_written: usize = 0; - let mut partial; - let mut start_pos = startpos; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - let wal_seg_size = server.wal_seg_size as usize; - - /* Extract WAL location for this block */ - let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize; - - while bytes_left != 0 { - let bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach wal - * segment size. - */ - if xlogoff + bytes_left > wal_seg_size { - bytes_to_write = wal_seg_size - xlogoff; - } else { - bytes_to_write = bytes_left; - } - - /* Open file */ - let segno = start_pos.segment_number(wal_seg_size); - let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) - { - /* Try to open existed partial file */ - wal_file = file; - partial = true; - } else { - /* Create and fill new partial file */ - partial = true; - match OpenOptions::new() - .create(true) - .write(true) - .open(&wal_file_partial_path) - { - Ok(mut file) => { - for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { - file.write_all(ZERO_BLOCK)?; - } - wal_file = file; - } - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); - } - } - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; - - // Flush file, if not said otherwise - if !self.conf.no_sync { - wal_file.sync_all()?; - } - } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - start_pos += bytes_to_write as u64; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if start_pos.segment_offset(wal_seg_size) == 0 { - xlogoff = 0; - if partial { - fs::rename(&wal_file_partial_path, &wal_file_path)?; - } - } - } - Ok(()) - } - - fn truncate_wal(&mut self, server: &ServerInfo, end_pos: Lsn) -> Result<()> { - let partial; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - let wal_seg_size = server.wal_seg_size as usize; - - /* Extract WAL location for this block */ - let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize; - - /* Open file */ - let mut segno = end_pos.segment_number(wal_seg_size); - let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else { - wal_file = OpenOptions::new() - .write(true) - .open(&wal_file_partial_path)?; - partial = true; - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - while xlogoff < wal_seg_size { - let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff); - wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?; - xlogoff += bytes_to_write; - } - // Flush file, if not said otherwise - if !self.conf.no_sync { - wal_file.sync_all()?; - } - } - if !partial { - // Make segment partial once again - fs::rename(&wal_file_path, &wal_file_partial_path)?; - } - // Remove all subsequent segments - loop { - segno += 1; - let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); - // TODO: better use fs::try_exists which is currenty avaialble only in nightly build - if wal_file_path.exists() { - fs::remove_file(&wal_file_path)?; - } else if wal_file_partial_path.exists() { - fs::remove_file(&wal_file_partial_path)?; - } else { - break; - } - } - Ok(()) - } -} - -#[cfg(test)] -mod test { - use super::FileStorage; - use crate::{ - safekeeper::{SafeKeeperState, Storage}, - timeline::{CreateControlFile, CONTROL_FILE_NAME}, - SafeKeeperConf, ZTenantTimelineId, - }; - use anyhow::Result; - use std::fs; - use zenith_utils::lsn::Lsn; - - fn stub_conf() -> SafeKeeperConf { - let workdir = tempfile::tempdir().unwrap().into_path(); - SafeKeeperConf { - workdir, - ..Default::default() - } - } - - fn load_from_control_file( - conf: &SafeKeeperConf, - zttid: &ZTenantTimelineId, - create: CreateControlFile, - ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); - Ok(( - FileStorage::new(zttid, conf), - FileStorage::load_control_file_conf(conf, zttid, create)?, - )) - } - - #[test] - fn test_read_write_safekeeper_state() { - let conf = stub_conf(); - let zttid = ZTenantTimelineId::generate(); - { - let (mut storage, mut state) = - load_from_control_file(&conf, &zttid, CreateControlFile::True) - .expect("failed to read state"); - // change something - state.wal_start_lsn = Lsn(42); - storage.persist(&state).expect("failed to persist state"); - } - - let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False) - .expect("failed to read state"); - assert_eq!(state.wal_start_lsn, Lsn(42)); - } - - #[test] - fn test_safekeeper_state_checksum_mismatch() { - let conf = stub_conf(); - let zttid = ZTenantTimelineId::generate(); - { - let (mut storage, mut state) = - load_from_control_file(&conf, &zttid, CreateControlFile::True) - .expect("failed to read state"); - // change something - state.wal_start_lsn = Lsn(42); - storage.persist(&state).expect("failed to persist state"); - } - let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME); - let mut data = fs::read(&control_path).unwrap(); - data[0] += 1; // change the first byte of the file to fail checksum validation - fs::write(&control_path, &data).expect("failed to write control file"); - - match load_from_control_file(&conf, &zttid, CreateControlFile::False) { - Err(err) => assert!(err - .to_string() - .contains("safekeeper control file checksum mismatch")), - Ok(_) => panic!("expected error"), - } - } -} diff --git a/walkeeper/src/wal_storage.rs b/walkeeper/src/wal_storage.rs new file mode 100644 index 0000000000..f8abc26af9 --- /dev/null +++ b/walkeeper/src/wal_storage.rs @@ -0,0 +1,493 @@ +//! This module has everything to deal with WAL -- reading and writing to disk. +//! +//! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal. +//! PG timeline is always 1, so WAL segments are usually have names like this: +//! - 000000010000000000000001 +//! - 000000010000000000000002.partial +//! +//! Note that last file has `.partial` suffix, that's different from postgres. + +use anyhow::{anyhow, Context, Result}; +use std::io::{Read, Seek, SeekFrom}; + +use lazy_static::lazy_static; +use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI}; +use std::cmp::min; + +use std::fs::{self, File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use tracing::*; + +use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTenantTimelineId; + +use crate::safekeeper::SafeKeeperState; + +use crate::SafeKeeperConf; +use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; + +use postgres_ffi::waldecoder::WalStreamDecoder; + +use zenith_metrics::{ + register_gauge_vec, register_histogram_vec, Gauge, GaugeVec, Histogram, HistogramVec, + DISK_WRITE_SECONDS_BUCKETS, +}; + +lazy_static! { + // The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). + // i64 is faster than f64, so update to u64 when available. + static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!( + "safekeeper_flush_lsn", + "Current flush_lsn, grouped by timeline", + &["tenant_id", "timeline_id"] + ) + .expect("Failed to register safekeeper_flush_lsn gauge vec"); + static ref WRITE_WAL_BYTES: HistogramVec = register_histogram_vec!( + "safekeeper_write_wal_bytes", + "Bytes written to WAL in a single request, grouped by timeline", + &["tenant_id", "timeline_id"], + vec![1.0, 10.0, 100.0, 1024.0, 8192.0, 128.0 * 1024.0, 1024.0 * 1024.0, 10.0 * 1024.0 * 1024.0] + ) + .expect("Failed to register safekeeper_write_wal_bytes histogram vec"); + static ref WRITE_WAL_SECONDS: HistogramVec = register_histogram_vec!( + "safekeeper_write_wal_seconds", + "Seconds spent writing and syncing WAL to a disk in a single request, grouped by timeline", + &["tenant_id", "timeline_id"], + DISK_WRITE_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_write_wal_seconds histogram vec"); +} + +struct WalStorageMetrics { + flush_lsn: Gauge, + write_wal_bytes: Histogram, + write_wal_seconds: Histogram, +} + +impl WalStorageMetrics { + fn new(zttid: &ZTenantTimelineId) -> Self { + let tenant_id = zttid.tenant_id.to_string(); + let timeline_id = zttid.timeline_id.to_string(); + Self { + flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), + write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]), + write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), + } + } +} + +pub trait Storage { + /// lsn of last durably stored WAL record. + fn flush_lsn(&self) -> Lsn; + + /// Init storage with wal_seg_size and read WAL from disk to get latest lsn. + fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>; + + /// Write piece of wal in buf to disk and sync it. + fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>; + + // Truncate WAL at specified LSN. + fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>; +} + +pub struct PhysicalStorage { + metrics: WalStorageMetrics, + zttid: ZTenantTimelineId, + timeline_dir: PathBuf, + conf: SafeKeeperConf, + + // fields below are filled upon initialization + + // None if unitialized, Some(lsn) if storage is initialized + wal_seg_size: Option, + + // Relationship of lsns: + // `write_lsn` >= `write_record_lsn` >= `flush_record_lsn` + // + // All lsns are zeroes, if storage is just created, and there are no segments on disk. + + // Written to disk, but possibly still in the cache and not fully persisted. + // Also can be ahead of record_lsn, if happen to be in the middle of a WAL record. + write_lsn: Lsn, + + // The LSN of the last WAL record written to disk. Still can be not fully flushed. + write_record_lsn: Lsn, + + // The LSN of the last WAL record flushed to disk. + flush_record_lsn: Lsn, + + // Decoder is required for detecting boundaries of WAL records. + decoder: WalStreamDecoder, +} + +impl PhysicalStorage { + pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> PhysicalStorage { + let timeline_dir = conf.timeline_dir(zttid); + PhysicalStorage { + metrics: WalStorageMetrics::new(zttid), + zttid: *zttid, + timeline_dir, + conf: conf.clone(), + wal_seg_size: None, + write_lsn: Lsn(0), + write_record_lsn: Lsn(0), + flush_record_lsn: Lsn(0), + decoder: WalStreamDecoder::new(Lsn(0)), + } + } + + // wrapper for flush_lsn updates that also updates metrics + fn update_flush_lsn(&mut self) { + self.flush_record_lsn = self.write_record_lsn; + self.metrics.flush_lsn.set(self.flush_record_lsn.0 as f64); + } + + /// Helper returning full path to WAL segment file and its .partial brother. + fn wal_file_paths(&self, segno: XLogSegNo) -> Result<(PathBuf, PathBuf)> { + let wal_seg_size = self + .wal_seg_size + .ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?; + + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self.timeline_dir.join(wal_file_name.clone()); + let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial"); + Ok((wal_file_path, wal_file_partial_path)) + } + + // TODO: this function is going to be refactored soon, what will change: + // - flush will be called separately from write_wal, this function + // will only write bytes to disk + // - File will be cached in PhysicalStorage, to remove extra syscalls, + // such as open(), seek(), close() + fn write_and_flush(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + let wal_seg_size = self + .wal_seg_size + .ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?; + + let mut bytes_left: usize = buf.len(); + let mut bytes_written: usize = 0; + let mut partial; + let mut start_pos = startpos; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + + /* Extract WAL location for this block */ + let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize; + + while bytes_left != 0 { + let bytes_to_write; + + /* + * If crossing a WAL boundary, only write up until we reach wal + * segment size. + */ + if xlogoff + bytes_left > wal_seg_size { + bytes_to_write = wal_seg_size - xlogoff; + } else { + bytes_to_write = bytes_left; + } + + /* Open file */ + let segno = start_pos.segment_number(wal_seg_size); + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?; + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) + { + /* Try to open existed partial file */ + wal_file = file; + partial = true; + } else { + /* Create and fill new partial file */ + partial = true; + match OpenOptions::new() + .create(true) + .write(true) + .open(&wal_file_partial_path) + { + Ok(mut file) => { + for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { + file.write_all(ZERO_BLOCK)?; + } + wal_file = file; + } + Err(e) => { + error!("Failed to open log file {:?}: {}", &wal_file_path, e); + return Err(e.into()); + } + } + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; + + // Flush file, if not said otherwise + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + start_pos += bytes_to_write as u64; + xlogoff += bytes_to_write; + + /* Did we reach the end of a WAL segment? */ + if start_pos.segment_offset(wal_seg_size) == 0 { + xlogoff = 0; + if partial { + fs::rename(&wal_file_partial_path, &wal_file_path)?; + } + } + } + Ok(()) + } +} + +impl Storage for PhysicalStorage { + // flush_lsn returns lsn of last durably stored WAL record. + fn flush_lsn(&self) -> Lsn { + self.flush_record_lsn + } + + // Storage needs to know wal_seg_size to know which segment to read/write, but + // wal_seg_size is not always known at the moment of storage creation. This method + // allows to postpone its initialization. + fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> { + if state.server.wal_seg_size == 0 { + // wal_seg_size is still unknown + return Ok(()); + } + + if let Some(wal_seg_size) = self.wal_seg_size { + // physical storage is already initialized + assert_eq!(wal_seg_size, state.server.wal_seg_size as usize); + return Ok(()); + } + + // initialize physical storage + let wal_seg_size = state.server.wal_seg_size as usize; + self.wal_seg_size = Some(wal_seg_size); + + // we need to read WAL from disk to know which LSNs are stored on disk + self.write_lsn = + Lsn(find_end_of_wal(&self.timeline_dir, wal_seg_size, true, state.wal_start_lsn)?.0); + + self.write_record_lsn = self.write_lsn; + + // TODO: do we really know that write_lsn is fully flushed to disk? + // If not, maybe it's better to call fsync() here to be sure? + self.update_flush_lsn(); + + info!( + "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, truncate_lsn={}", + self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.truncate_lsn, + ); + if self.flush_record_lsn < state.commit_lsn || self.flush_record_lsn < state.truncate_lsn { + warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or truncate_lsn from control file", self.zttid.timeline_id); + } + + Ok(()) + } + + // Write and flush WAL to disk. + fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + if self.write_lsn > startpos { + warn!( + "write_wal rewrites WAL written before, write_lsn={}, startpos={}", + self.write_lsn, startpos + ); + } + if self.write_lsn < startpos { + warn!( + "write_wal creates gap in written WAL, write_lsn={}, startpos={}", + self.write_lsn, startpos + ); + // TODO: return error if write_lsn is not zero + } + + { + let _timer = self.metrics.write_wal_seconds.start_timer(); + self.write_and_flush(startpos, buf)?; + } + + // WAL is written and flushed, updating lsns + self.write_lsn = startpos + buf.len() as u64; + self.metrics.write_wal_bytes.observe(buf.len() as f64); + + // figure out last record's end lsn for reporting (if we got the + // whole record) + if self.decoder.available() != startpos { + info!( + "restart decoder from {} to {}", + self.decoder.available(), + startpos, + ); + self.decoder = WalStreamDecoder::new(startpos); + } + self.decoder.feed_bytes(buf); + loop { + match self.decoder.poll_decode()? { + None => break, // no full record yet + Some((lsn, _rec)) => { + self.write_record_lsn = lsn; + } + } + } + + self.update_flush_lsn(); + Ok(()) + } + + // Truncate written WAL by removing all WAL segments after the given LSN. + // end_pos must point to the end of the WAL record. + fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { + let wal_seg_size = self + .wal_seg_size + .ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?; + + // TODO: cross check divergence point + + // nothing to truncate + if self.write_lsn == Lsn(0) { + return Ok(()); + } + + // Streaming must not create a hole, so truncate cannot be called on non-written lsn + assert!(self.write_lsn >= end_pos); + + // open segment files and delete or fill end with zeroes + + let partial; + const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; + + /* Extract WAL location for this block */ + let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize; + + /* Open file */ + let mut segno = end_pos.segment_number(wal_seg_size); + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?; + { + let mut wal_file: File; + /* Try to open already completed segment */ + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + wal_file = file; + partial = false; + } else { + wal_file = OpenOptions::new() + .write(true) + .open(&wal_file_partial_path)?; + partial = true; + } + wal_file.seek(SeekFrom::Start(xlogoff as u64))?; + while xlogoff < wal_seg_size { + let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff); + wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?; + xlogoff += bytes_to_write; + } + // Flush file, if not said otherwise + if !self.conf.no_sync { + wal_file.sync_all()?; + } + } + if !partial { + // Make segment partial once again + fs::rename(&wal_file_path, &wal_file_partial_path)?; + } + // Remove all subsequent segments + loop { + segno += 1; + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?; + // TODO: better use fs::try_exists which is currenty avaialble only in nightly build + if wal_file_path.exists() { + fs::remove_file(&wal_file_path)?; + } else if wal_file_partial_path.exists() { + fs::remove_file(&wal_file_partial_path)?; + } else { + break; + } + } + + // Update lsns + self.write_lsn = end_pos; + self.write_record_lsn = end_pos; + self.update_flush_lsn(); + Ok(()) + } +} + +pub struct WalReader { + timeline_dir: PathBuf, + wal_seg_size: usize, + pos: Lsn, + file: Option, +} + +impl WalReader { + pub fn new(timeline_dir: PathBuf, wal_seg_size: usize, pos: Lsn) -> Self { + Self { + timeline_dir, + wal_seg_size, + pos, + file: None, + } + } + + pub fn read(&mut self, buf: &mut [u8]) -> Result { + // Take the `File` from `wal_file`, or open a new file. + let mut file = match self.file.take() { + Some(file) => file, + None => { + // Open a new file. + let segno = self.pos.segment_number(self.wal_seg_size); + let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size); + let wal_file_path = self.timeline_dir.join(wal_file_name); + Self::open_wal_file(&wal_file_path)? + } + }; + + let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize; + + // How much to read and send in message? We cannot cross the WAL file + // boundary, and we don't want send more than provided buffer. + let send_size = min(buf.len(), self.wal_seg_size - xlogoff); + + // Read some data from the file. + let buf = &mut buf[0..send_size]; + file.seek(SeekFrom::Start(xlogoff as u64)) + .and_then(|_| file.read_exact(buf)) + .context("Failed to read data from WAL file")?; + + self.pos += send_size as u64; + + // Decide whether to reuse this file. If we don't set wal_file here + // a new file will be opened next time. + if self.pos.segment_offset(self.wal_seg_size) != 0 { + self.file = Some(file); + } + + Ok(send_size) + } + + /// Helper function for opening a wal file. + fn open_wal_file(wal_file_path: &Path) -> Result { + // First try to open the .partial file. + let mut partial_path = wal_file_path.to_owned(); + partial_path.set_extension("partial"); + if let Ok(opened_file) = File::open(&partial_path) { + return Ok(opened_file); + } + + // If that failed, try it without the .partial extension. + File::open(&wal_file_path) + .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) + .map_err(|e| { + error!("{}", e); + e + }) + } +}