From 74b78608d9b3cd26cd1cf9b90020d6e3b91308f9 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Wed, 12 May 2021 23:35:14 -0700 Subject: [PATCH] split timeline code into a separate file --- walkeeper/src/lib.rs | 1 + walkeeper/src/replication.rs | 14 +- walkeeper/src/send_wal.rs | 15 +- walkeeper/src/timeline.rs | 150 +++++++++++++++ walkeeper/src/wal_service.rs | 341 ++++++++++++----------------------- 5 files changed, 278 insertions(+), 243 deletions(-) create mode 100644 walkeeper/src/timeline.rs diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 5f6287f1f8..8adcefcd6f 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -7,6 +7,7 @@ pub mod pq_protocol; pub mod replication; pub mod s3_offload; pub mod send_wal; +pub mod timeline; pub mod wal_service; use crate::pq_protocol::SystemId; diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 26aef50279..cc69bf33c5 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -2,10 +2,9 @@ //! "START REPLICATION" message. use crate::pq_protocol::{BeMessage, FeMessage}; -use crate::send_wal::SendWal; -use crate::wal_service::{ - HotStandbyFeedback, Timeline, TimelineTools, END_REPLICATION_MARKER, MAX_SEND_SIZE, -}; +use crate::send_wal::SendWalConn; +use crate::timeline::{Timeline, TimelineTools}; +use crate::wal_service::{HotStandbyFeedback, END_REPLICATION_MARKER, MAX_SEND_SIZE}; use crate::WalAcceptorConf; use anyhow::{anyhow, bail, Result}; use byteorder::{BigEndian, ByteOrder}; @@ -40,7 +39,8 @@ impl HotStandbyFeedback { } } -pub struct ReplicationHandler { +/// A network connection that's speaking the replication protocol. +pub struct ReplicationConn { timeline: Option>, /// Postgres connection, buffered input /// @@ -55,9 +55,9 @@ pub struct ReplicationHandler { appname: Option, } -impl ReplicationHandler { +impl ReplicationConn { /// Create a new `SendWal`, consuming the `Connection`. - pub fn new(conn: SendWal) -> Self { + pub fn new(conn: SendWalConn) -> Self { Self { timeline: conn.timeline, stream_in: Some(conn.stream_in), diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 59df68e320..0ee6a7829c 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -1,11 +1,13 @@ -//! This implements the libpq replication protocol between wal_acceptor and replicas/pagers +//! This implements the libpq replication protocol between wal_acceptor +//! and replicas/pagers //! use crate::pq_protocol::{ BeMessage, FeMessage, FeStartupMessage, RowDescriptor, StartupRequestCode, }; -use crate::replication::ReplicationHandler; -use crate::wal_service::{Connection, Timeline, TimelineTools}; +use crate::replication::ReplicationConn; +use crate::timeline::{Timeline, TimelineTools}; +use crate::wal_service::Connection; use crate::WalAcceptorConf; use anyhow::{bail, Result}; use bytes::BytesMut; @@ -14,7 +16,8 @@ use std::io::{BufReader, Write}; use std::net::{SocketAddr, TcpStream}; use std::sync::Arc; -pub struct SendWal { +/// A network connection that's speaking the libpq replication protocol. +pub struct SendWalConn { pub timeline: Option>, /// Postgres connection, buffered input pub stream_in: BufReader, @@ -28,7 +31,7 @@ pub struct SendWal { appname: Option, } -impl SendWal { +impl SendWalConn { /// Create a new `SendWal`, consuming the `Connection`. pub fn new(conn: Connection) -> Self { Self { @@ -80,7 +83,7 @@ impl SendWal { self.handle_identify_system()?; } else if q.body.starts_with(b"START_REPLICATION") { // Create a new replication object, consuming `self`. - ReplicationHandler::new(self).run(&q.body)?; + ReplicationConn::new(self).run(&q.body)?; break; } else { bail!("Unexpected command {:?}", q.body); diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs new file mode 100644 index 0000000000..1642aaa787 --- /dev/null +++ b/walkeeper/src/timeline.rs @@ -0,0 +1,150 @@ +//! This module contains tools for managing timelines. +//! + +use crate::wal_service::{HotStandbyFeedback, SafeKeeperInfo, SharedState, END_REPLICATION_MARKER}; +use crate::WalAcceptorConf; +use anyhow::Result; +use lazy_static::lazy_static; +use log::*; +use pageserver::ZTimelineId; +use postgres_ffi::xlog_utils::{find_end_of_wal, TimeLineID}; +use std::collections::HashMap; +use std::fs; +use std::path::Path; +use std::sync::{Arc, Condvar, Mutex}; +use zenith_utils::lsn::Lsn; + +/// Database instance (tenant) +#[derive(Debug)] +pub struct Timeline { + pub timelineid: ZTimelineId, + mutex: Mutex, + /// conditional variable used to notify wal senders + cond: Condvar, +} + +impl Timeline { + pub fn new(timelineid: ZTimelineId, shared_state: SharedState) -> Timeline { + Timeline { + timelineid, + mutex: Mutex::new(shared_state), + cond: Condvar::new(), + } + } + + /// Wait for an LSN to be committed. + /// + /// Returns the last committed LSN, which will be at least + /// as high as the LSN waited for. + /// + pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn { + let mut shared_state = self.mutex.lock().unwrap(); + loop { + let commit_lsn = shared_state.commit_lsn; + // This must be `>`, not `>=`. + if commit_lsn > lsn { + return commit_lsn; + } + shared_state = self.cond.wait(shared_state).unwrap(); + } + } + + // Notify caught-up WAL senders about new WAL data received + pub fn notify_wal_senders(&self, commit_lsn: Lsn) { + let mut shared_state = self.mutex.lock().unwrap(); + if shared_state.commit_lsn < commit_lsn { + shared_state.commit_lsn = commit_lsn; + self.cond.notify_all(); + } + } + + fn _stop_wal_senders(&self) { + self.notify_wal_senders(END_REPLICATION_MARKER); + } + + pub fn get_info(&self) -> SafeKeeperInfo { + return self.mutex.lock().unwrap().info.clone(); + } + + pub fn set_info(&self, info: &SafeKeeperInfo) { + self.mutex.lock().unwrap().info = info.clone(); + } + + // Accumulate hot standby feedbacks from replicas + pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.add_hs_feedback(feedback); + } + + pub fn get_hs_feedback(&self) -> HotStandbyFeedback { + let shared_state = self.mutex.lock().unwrap(); + shared_state.hs_feedback.clone() + } + + pub fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.load_control_file(conf, self.timelineid) + } + + pub fn save_control_file(&self, sync: bool) -> Result<()> { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.save_control_file(sync) + } +} + +// Utilities needed by various Connection-like objects +pub trait TimelineTools { + fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>; + fn get(&self) -> &Arc; + fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID); +} + +impl TimelineTools for Option> { + fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> { + // We will only set the timeline once. If it were to ever change, + // anyone who cloned the Arc would be out of date. + assert!(self.is_none()); + *self = Some(GlobalTimelines::store(timeline_id)?); + Ok(()) + } + + fn get(&self) -> &Arc { + self.as_ref().unwrap() + } + + /// Find last WAL record. If "precise" is false then just locate last partial segment + fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) { + let seg_size = self.get().get_info().server.wal_seg_size as usize; + let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise); + (Lsn(lsn), timeline) + } +} + +lazy_static! { + pub static ref TIMELINES: Mutex>> = + Mutex::new(HashMap::new()); +} + +/// A zero-sized struct used to manage access to the global timelines map. +struct GlobalTimelines; + +impl GlobalTimelines { + /// Store a new timeline into the global TIMELINES map. + fn store(timeline_id: ZTimelineId) -> Result> { + let mut timelines = TIMELINES.lock().unwrap(); + + match timelines.get(&timeline_id) { + Some(result) => Ok(Arc::clone(result)), + None => { + info!("creating timeline dir {}", timeline_id); + fs::create_dir_all(timeline_id.to_string())?; + + let shared_state = SharedState::new(); + + let new_tid = Arc::new(Timeline::new(timeline_id, shared_state)); + timelines.insert(timeline_id, Arc::clone(&new_tid)); + Ok(new_tid) + } + } + } +} diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 992baca7f4..65b09d0949 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -6,30 +6,26 @@ use anyhow::{bail, Result}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use bytes::{Buf, BufMut, BytesMut}; use fs2::FileExt; -use lazy_static::lazy_static; use log::*; use postgres::{Client, NoTls}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::cmp::{max, min}; -use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write}; use std::mem; use std::net::{SocketAddr, TcpListener, TcpStream}; -use std::path::Path; use std::str; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::Arc; use std::thread; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use crate::pq_protocol::*; -use crate::send_wal::SendWal; +use crate::send_wal::SendWalConn; +use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; use pageserver::ZTimelineId; -use postgres_ffi::xlog_utils::{ - find_end_of_wal, TimeLineID, TimestampTz, XLogFileName, XLOG_BLCKSZ, -}; +use postgres_ffi::xlog_utils::{TimeLineID, TimestampTz, XLogFileName, XLOG_BLCKSZ}; type FullTransactionId = u64; @@ -39,8 +35,8 @@ const SK_PROTOCOL_VERSION: u32 = 1; const UNKNOWN_SERVER_VERSION: u32 = 0; pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; -const CONTROL_FILE_NAME: &str = "safekeeper.control"; const END_OF_STREAM: Lsn = Lsn(0); +const CONTROL_FILE_NAME: &str = "safekeeper.control"; /// Unique node identifier used by Paxos #[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] @@ -75,7 +71,7 @@ struct RequestVote { } /// Information of about storage node -#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct SafeKeeperInfo { /// magic for verifying content the control file pub magic: u32, @@ -94,7 +90,7 @@ pub struct SafeKeeperInfo { } /// Hot standby feedback received from replica -#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct HotStandbyFeedback { pub ts: TimestampTz, pub xmin: FullTransactionId, @@ -126,51 +122,121 @@ struct SafeKeeperResponse { /// Shared state associated with database instance (tenant) #[derive(Debug)] -struct SharedState { +pub struct SharedState { /// quorum commit LSN - commit_lsn: Lsn, + pub commit_lsn: Lsn, /// information about this safekeeper - info: SafeKeeperInfo, + pub info: SafeKeeperInfo, /// opened file control file handle (needed to hold exlusive file lock - control_file: Option, + pub control_file: Option, /// combined hot standby feedback from all replicas - hs_feedback: HotStandbyFeedback, + pub hs_feedback: HotStandbyFeedback, } -/// Database instance (tenant) -#[derive(Debug)] -pub struct Timeline { - pub timelineid: ZTimelineId, - mutex: Mutex, - /// conditional variable used to notify wal senders - cond: Condvar, -} +impl SharedState { + pub fn new() -> Self { + Self { + commit_lsn: Lsn(0), + info: SafeKeeperInfo::new(), + control_file: None, + hs_feedback: HotStandbyFeedback { + ts: 0, + xmin: u64::MAX, + catalog_xmin: u64::MAX, + }, + } + } -// Useful utilities needed by various Connection-like objects -pub trait TimelineTools { - fn set(&mut self, timeline_id: ZTimelineId) -> Result<()>; - fn get(&self) -> &Arc; - fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID); -} + /// Accumulate hot standby feedbacks from replicas + pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) { + self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin); + self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin); + self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts); + } -impl TimelineTools for Option> { - fn set(&mut self, timeline_id: ZTimelineId) -> Result<()> { - // We will only set the timeline once. If it were to ever change, - // anyone who cloned the Arc would be out of date. - assert!(self.is_none()); - *self = Some(GlobalTimelines::store(timeline_id)?); + /// Load and lock control file (prevent running more than one instance of safekeeper) + pub fn load_control_file( + &mut self, + conf: &WalAcceptorConf, + timelineid: ZTimelineId, + ) -> Result<()> { + if self.control_file.is_some() { + info!("control file for timeline {} is already open", timelineid); + return Ok(()); + } + + let control_file_path = conf + .data_dir + .join(timelineid.to_string()) + .join(CONTROL_FILE_NAME); + info!("loading control file {}", control_file_path.display()); + match OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&control_file_path) + { + Ok(file) => { + // Lock file to prevent two or more active wal_acceptors + match file.try_lock_exclusive() { + Ok(()) => {} + Err(e) => { + bail!( + "Control file {:?} is locked by some other process: {}", + &control_file_path, + e + ); + } + } + self.control_file = Some(file); + + const SIZE: usize = mem::size_of::(); + let mut buf = [0u8; SIZE]; + if self + .control_file + .as_mut() + .unwrap() + .read_exact(&mut buf) + .is_ok() + { + let mut input = BytesMut::new(); + input.extend_from_slice(&buf); + let my_info = SafeKeeperInfo::unpack(&mut input); + + if my_info.magic != SK_MAGIC { + bail!("Invalid control file magic: {}", my_info.magic); + } + if my_info.format_version != SK_FORMAT_VERSION { + bail!( + "Incompatible format version: {} vs. {}", + my_info.format_version, + SK_FORMAT_VERSION + ); + } + self.info = my_info; + } + } + Err(e) => { + panic!( + "Failed to open control file {:?}: {}", + &control_file_path, e + ); + } + } Ok(()) } - fn get(&self) -> &Arc { - self.as_ref().unwrap() - } + pub fn save_control_file(&mut self, sync: bool) -> Result<()> { + let mut buf = BytesMut::new(); + self.info.pack(&mut buf); - /// Find last WAL record. If "precise" is false then just locate last partial segment - fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) { - let seg_size = self.get().get_info().server.wal_seg_size as usize; - let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise); - (Lsn(lsn), timeline) + let file = self.control_file.as_mut().unwrap(); + file.seek(SeekFrom::Start(0))?; + file.write_all(&buf[..])?; + if sync { + file.sync_all()?; + } + Ok(()) } } @@ -233,32 +299,6 @@ impl SafeKeeperInfo { } } -lazy_static! { - pub static ref TIMELINES: Mutex>> = - Mutex::new(HashMap::new()); -} - -/// A zero-sized struct used to manage access to the global timelines map. -struct GlobalTimelines; - -impl GlobalTimelines { - /// Store a new timeline into the global TIMELINES map. - fn store(timeline_id: ZTimelineId) -> Result> { - let mut timelines = TIMELINES.lock().unwrap(); - - match timelines.get(&timeline_id) { - Some(result) => Ok(Arc::clone(result)), - None => { - info!("creating timeline dir {}", timeline_id); - fs::create_dir_all(timeline_id.to_string())?; - let new_tid = Arc::new(Timeline::new(timeline_id)); - timelines.insert(timeline_id, Arc::clone(&new_tid)); - Ok(new_tid) - } - } - } -} - /// Accept incoming TCP connections and spawn them into a background thread. pub fn thread_main(conf: WalAcceptorConf) -> Result<()> { info!("Starting wal acceptor on {}", conf.listen_addr); @@ -294,165 +334,6 @@ fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> { Ok(()) } -impl Timeline { - pub fn new(timelineid: ZTimelineId) -> Timeline { - let shared_state = SharedState { - commit_lsn: Lsn(0), - info: SafeKeeperInfo::new(), - control_file: None, - hs_feedback: HotStandbyFeedback { - ts: 0, - xmin: u64::MAX, - catalog_xmin: u64::MAX, - }, - }; - Timeline { - timelineid, - mutex: Mutex::new(shared_state), - cond: Condvar::new(), - } - } - - /// Wait for an LSN to be committed. - /// - /// Returns the last committed LSN, which will be at least - /// as high as the LSN waited for. - /// - pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn { - let mut shared_state = self.mutex.lock().unwrap(); - loop { - let commit_lsn = shared_state.commit_lsn; - // This must be `>`, not `>=`. - if commit_lsn > lsn { - return commit_lsn; - } - shared_state = self.cond.wait(shared_state).unwrap(); - } - } - - // Notify caught-up WAL senders about new WAL data received - fn notify_wal_senders(&self, commit_lsn: Lsn) { - let mut shared_state = self.mutex.lock().unwrap(); - if shared_state.commit_lsn < commit_lsn { - shared_state.commit_lsn = commit_lsn; - self.cond.notify_all(); - } - } - - fn _stop_wal_senders(&self) { - self.notify_wal_senders(END_REPLICATION_MARKER); - } - - pub fn get_info(&self) -> SafeKeeperInfo { - return self.mutex.lock().unwrap().info; - } - - fn set_info(&self, info: &SafeKeeperInfo) { - self.mutex.lock().unwrap().info = *info; - } - - // Accumulate hot standby feedbacks from replicas - pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.hs_feedback.xmin = min(shared_state.hs_feedback.xmin, feedback.xmin); - shared_state.hs_feedback.catalog_xmin = - min(shared_state.hs_feedback.catalog_xmin, feedback.catalog_xmin); - shared_state.hs_feedback.ts = max(shared_state.hs_feedback.ts, feedback.ts); - } - - fn get_hs_feedback(&self) -> HotStandbyFeedback { - let shared_state = self.mutex.lock().unwrap(); - shared_state.hs_feedback - } - - // Load and lock control file (prevent running more than one instance of safekeeper) - fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> { - let mut shared_state = self.mutex.lock().unwrap(); - - if shared_state.control_file.is_some() { - info!( - "control file for timeline {} is already open", - self.timelineid - ); - return Ok(()); - } - - let control_file_path = conf - .data_dir - .join(self.timelineid.to_string()) - .join(CONTROL_FILE_NAME); - info!("loading control file {}", control_file_path.display()); - match OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&control_file_path) - { - Ok(file) => { - // Lock file to prevent two or more active wal_acceptors - match file.try_lock_exclusive() { - Ok(()) => {} - Err(e) => { - bail!( - "Control file {:?} is locked by some other process: {}", - &control_file_path, - e - ); - } - } - shared_state.control_file = Some(file); - - const SIZE: usize = mem::size_of::(); - let mut buf = [0u8; SIZE]; - if shared_state - .control_file - .as_mut() - .unwrap() - .read_exact(&mut buf) - .is_ok() - { - let mut input = BytesMut::new(); - input.extend_from_slice(&buf); - let my_info = SafeKeeperInfo::unpack(&mut input); - - if my_info.magic != SK_MAGIC { - bail!("Invalid control file magic: {}", my_info.magic); - } - if my_info.format_version != SK_FORMAT_VERSION { - bail!( - "Incompatible format version: {} vs. {}", - my_info.format_version, - SK_FORMAT_VERSION - ); - } - shared_state.info = my_info; - } - } - Err(e) => { - panic!( - "Failed to open control file {:?}: {}", - &control_file_path, e - ); - } - } - Ok(()) - } - - fn save_control_file(&self, sync: bool) -> Result<()> { - let mut buf = BytesMut::new(); - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.info.pack(&mut buf); - - let file = shared_state.control_file.as_mut().unwrap(); - file.seek(SeekFrom::Start(0))?; - file.write_all(&buf[..])?; - if sync { - file.sync_all()?; - } - Ok(()) - } -} - impl Connection { pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result { let peer_addr = socket.peer_addr()?; @@ -485,7 +366,7 @@ impl Connection { self.stream_in.read_u32::()?; self.receive_wal()?; // internal protocol between wal_proposer and wal_acceptor } else { - SendWal::new(self).run()?; // libpq replication protocol between wal_acceptor and replicas/pagers + SendWalConn::new(self).run()?; // libpq replication protocol between wal_acceptor and replicas/pagers } Ok(()) }