From bace19ffbe85a4be0948f4134292ddae7c1a0aa1 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Fri, 7 May 2021 15:48:52 -0700 Subject: [PATCH] wal_service: switch to Lsn type Replace XLogRecPtr with Lsn in wal_service.rs . This removes the last use of XLogSegmentOffset and XLByteToSeg, so delete them. (replaced by Lsn::segment_offset and Lsn::segment_number.) --- postgres_ffi/src/xlog_utils.rs | 10 --- walkeeper/src/wal_service.rs | 142 +++++++++++++++------------------ 2 files changed, 66 insertions(+), 86 deletions(-) diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index bbc71be7ac..b099887940 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -32,21 +32,11 @@ pub type TimeLineID = u32; pub type TimestampTz = u64; pub type XLogSegNo = u64; -#[allow(non_snake_case)] -pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 { - (xlogptr as u32) & (wal_segsz_bytes as u32 - 1) -} - #[allow(non_snake_case)] pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo } -#[allow(non_snake_case)] -pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo { - xlogptr / wal_segsz_bytes as u64 -} - #[allow(non_snake_case)] pub fn XLogSegNoOffsetToRecPtr( segno: XLogSegNo, diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index e91a1ede40..83f722fcc6 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -2,7 +2,7 @@ /// WAL service listens for client connections and /// receive WAL from wal_proposer and send it to WAL receivers /// -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use fs2::FileExt; @@ -21,11 +21,14 @@ use std::str; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use zenith_utils::bin_ser::LeSer; +use zenith_utils::lsn::Lsn; use crate::pq_protocol::*; use crate::WalAcceptorConf; use pageserver::ZTimelineId; -use postgres_ffi::xlog_utils::*; +use postgres_ffi::xlog_utils::{ + find_end_of_wal, get_current_timestamp, TimeLineID, TimestampTz, XLogFileName, XLOG_BLCKSZ, +}; type FullTransactionId = u64; @@ -33,13 +36,13 @@ const SK_MAGIC: u32 = 0xCafeCeefu32; const SK_FORMAT_VERSION: u32 = 1; const SK_PROTOCOL_VERSION: u32 = 1; const UNKNOWN_SERVER_VERSION: u32 = 0; -const END_REPLICATION_MARKER: u64 = u64::MAX; +const END_REPLICATION_MARKER: Lsn = Lsn::MAX; const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; const XLOG_HDR_SIZE: usize = 1 + 8 * 3; /* 'w' + startPos + walEnd + timestamp */ const LIBPQ_HDR_SIZE: usize = 5; /* 1 byte with message type + 4 bytes length */ const LIBPQ_MSG_SIZE_OFFS: usize = 1; const CONTROL_FILE_NAME: &str = "safekeeper.control"; -const END_OF_STREAM: XLogRecPtr = 0; +const END_OF_STREAM: Lsn = Lsn(0); /// Read some bytes from a type that implements [`Read`] into a [`BytesMut`] /// @@ -91,7 +94,7 @@ struct ServerInfo { system_id: SystemId, /// Zenith timelineid timeline_id: ZTimelineId, - wal_end: XLogRecPtr, + wal_end: Lsn, timeline: TimeLineID, wal_seg_size: u32, } @@ -101,7 +104,7 @@ struct ServerInfo { struct RequestVote { node_id: NodeId, /// volume commit LSN - vcl: XLogRecPtr, + vcl: Lsn, /// new epoch when safekeeper reaches vcl epoch: u64, } @@ -118,11 +121,11 @@ struct SafeKeeperInfo { /// information about server server: ServerInfo, /// part of WAL acknowledged by quorum - commit_lsn: XLogRecPtr, + commit_lsn: Lsn, /// locally flushed part of WAL - flush_lsn: XLogRecPtr, + flush_lsn: Lsn, /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers - restart_lsn: XLogRecPtr, + restart_lsn: Lsn, } /// Hot standby feedback received from replica @@ -139,20 +142,20 @@ struct SafeKeeperRequest { /// Sender's node identifier (looks like we do not need it for TCP streaming connection) sender_id: NodeId, /// start position of message in WAL - begin_lsn: XLogRecPtr, + begin_lsn: Lsn, /// end position of message in WAL - end_lsn: XLogRecPtr, + end_lsn: Lsn, /// restart LSN position (minimal LSN which may be needed by proxy to perform recovery) - restart_lsn: XLogRecPtr, + restart_lsn: Lsn, /// LSN committed by quorum of safekeepers - commit_lsn: XLogRecPtr, + commit_lsn: Lsn, } /// Report safekeeper state to proxy #[derive(Debug, PartialEq, Serialize, Deserialize)] struct SafeKeeperResponse { epoch: u64, - flush_lsn: XLogRecPtr, + flush_lsn: Lsn, hs_feedback: HotStandbyFeedback, } @@ -160,7 +163,7 @@ struct SafeKeeperResponse { #[derive(Debug)] struct SharedState { /// quorum commit LSN - commit_lsn: XLogRecPtr, + commit_lsn: Lsn, /// information about this safekeeper info: SafeKeeperInfo, /// opened file control file handle (needed to hold exlusive file lock @@ -214,11 +217,6 @@ trait NewSerializer: Serialize + DeserializeOwned { impl NewSerializer for T where T: Serialize + DeserializeOwned {} -/// Safe hex string parser returning proper result -fn parse_hex_str(s: &str) -> Result { - Ok(u64::from_str_radix(s, 16)?) -} - impl SafeKeeperInfo { fn new() -> SafeKeeperInfo { SafeKeeperInfo { @@ -234,13 +232,13 @@ impl SafeKeeperInfo { }, system_id: 0, /* Postgres system identifier */ timeline_id: ZTimelineId::from([0u8; 16]), - wal_end: 0, + wal_end: Lsn(0), timeline: 0, wal_seg_size: 0, }, - commit_lsn: 0, /* part of WAL acknowledged by quorum */ - flush_lsn: 0, /* locally flushed part of WAL */ - restart_lsn: 0, /* minimal LSN which may be needed for recovery of some safekeeper */ + commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ + flush_lsn: Lsn(0), /* locally flushed part of WAL */ + restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ } } } @@ -287,7 +285,7 @@ fn main_loop(conf: &WalAcceptorConf) -> Result<()> { impl Timeline { pub fn new(timelineid: ZTimelineId) -> Timeline { let shared_state = SharedState { - commit_lsn: 0, + commit_lsn: Lsn(0), info: SafeKeeperInfo::new(), control_file: None, hs_feedback: HotStandbyFeedback { @@ -304,7 +302,7 @@ impl Timeline { } // Notify caught-up WAL senders about new WAL data received - fn notify_wal_senders(&self, commit_lsn: XLogRecPtr) { + fn notify_wal_senders(&self, commit_lsn: Lsn) { let mut shared_state = self.mutex.lock().unwrap(); if shared_state.commit_lsn < commit_lsn { shared_state.commit_lsn = commit_lsn; @@ -499,7 +497,7 @@ impl Connection { Ok(()) } - // Receive WAL from wal_proposer + /// Receive WAL from wal_proposer fn receive_wal(&mut self) -> Result<()> { // Receive information about server let server_info = self.read_req::()?; @@ -566,7 +564,7 @@ impl Connection { /* Need to persist our vote first */ self.timeline().save_control_file(true)?; - let mut flushed_restart_lsn: XLogRecPtr = 0; + let mut flushed_restart_lsn = Lsn(0); let wal_seg_size = server_info.wal_seg_size as usize; /* Acknowledge the proposed candidate by returning it to the proxy */ @@ -603,16 +601,12 @@ impl Connection { } let start_pos = req.begin_lsn; let end_pos = req.end_lsn; - let rec_size = (end_pos - start_pos) as usize; + let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; assert!(rec_size <= MAX_SEND_SIZE); debug!( - "received for {} bytes between {:X}/{:X} and {:X}/{:X}", - rec_size, - start_pos >> 32, - start_pos & 0xffffffff, - end_pos >> 32, - end_pos & 0xffffffff + "received for {} bytes between {} and {}", + rec_size, start_pos, end_pos, ); /* Receive message body */ @@ -771,7 +765,7 @@ impl Connection { // fn handle_identify_system(&mut self) -> Result { let (start_pos, timeline) = self.find_end_of_wal(false); - let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32); + let lsn = start_pos.to_string(); let tli = timeline.to_string(); let sysid = self.timeline().get_info().server.system_id.to_string(); let lsn_bytes = lsn.as_bytes(); @@ -820,44 +814,43 @@ impl Connection { // Handle START_REPLICATION replication command // fn handle_start_replication(&mut self, cmd: &Bytes) -> Result { - let re = Regex::new(r"([[:xdigit:]]*)/([[:xdigit:]]*)").unwrap(); - let mut caps = re.captures_iter(str::from_utf8(&cmd[..]).unwrap()); - let cap = caps.next().unwrap(); - let mut start_pos: XLogRecPtr = (parse_hex_str(&cap[1])? << 32) | parse_hex_str(&cap[2])?; - let mut stop_pos: XLogRecPtr = if let Some(cap) = caps.next() { - (parse_hex_str(&cap[1])? << 32) | parse_hex_str(&cap[2])? - } else { - 0 - }; + // helper function to encapsulate the regex -> Lsn magic + fn get_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> { + let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); + let caps = re.captures_iter(str::from_utf8(&cmd[..])?); + let mut lsns = caps.map(|cap| cap[1].parse::()); + let start_pos = lsns + .next() + .ok_or_else(|| anyhow!("failed to find start LSN"))??; + let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0)); + Ok((start_pos, stop_pos)) + } + + let (mut start_pos, mut stop_pos) = get_start_stop(&cmd)?; + let wal_seg_size = self.timeline().get_info().server.wal_seg_size as usize; if wal_seg_size == 0 { bail!("Can not start replication before connecting to wal_proposer"); } let (wal_end, timeline) = self.find_end_of_wal(false); - if start_pos == 0 { + if start_pos == Lsn(0) { start_pos = wal_end; } - if stop_pos == 0 && self.appname == Some("wal_proposer_recovery".to_string()) { + if stop_pos == Lsn(0) && self.appname == Some("wal_proposer_recovery".to_string()) { stop_pos = wal_end; } - info!( - "Start replication from {:X}/{:>08X} till {:X}/{:>08X}", - (start_pos >> 32) as u32, - start_pos as u32, - (stop_pos >> 32) as u32, - stop_pos as u32 - ); + info!("Start replication from {} till {}", start_pos, stop_pos); BeMessage::write(&mut self.outbuf, &BeMessage::Copy); self.send()?; - let mut end_pos: XLogRecPtr; - let mut commit_lsn: XLogRecPtr; + let mut end_pos: Lsn; + let mut commit_lsn: Lsn; let mut wal_file: Option = None; self.outbuf .resize(LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + MAX_SEND_SIZE, 0u8); loop { /* Wait until we have some data to stream */ - if stop_pos != 0 { + if stop_pos != Lsn(0) { /* recovery mode: stream up to the specified LSN (VCL) */ if start_pos >= stop_pos { /* recovery finished */ @@ -909,7 +902,7 @@ impl Connection { if let Some(opened_file) = curr_file { file = opened_file; } else { - let segno = XLByteToSeg(start_pos, wal_seg_size); + let segno = start_pos.segment_number(wal_seg_size as u64); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); let wal_file_path = self .conf @@ -933,11 +926,11 @@ impl Connection { } } } - let xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize; + let xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize; // How much to read and send in message? We cannot cross the WAL file // boundary, and we don't want send more than MAX_SEND_SIZE. - let send_size = (end_pos - start_pos) as usize; + let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; let send_size = min(send_size, wal_seg_size - xlogoff); let send_size = min(send_size, MAX_SEND_SIZE); @@ -953,20 +946,16 @@ impl Connection { (msg_size - LIBPQ_MSG_SIZE_OFFS) as u32, ); self.outbuf[5] = b'w'; - BigEndian::write_u64(&mut self.outbuf[6..14], start_pos); - BigEndian::write_u64(&mut self.outbuf[14..22], end_pos); + BigEndian::write_u64(&mut self.outbuf[6..14], start_pos.0); + BigEndian::write_u64(&mut self.outbuf[14..22], end_pos.0); BigEndian::write_u64(&mut self.outbuf[22..30], get_current_timestamp()); self.stream.write_all(&self.outbuf[0..msg_size])?; start_pos += send_size as u64; - debug!( - "Sent WAL to page server up to {:X}/{:>08X}", - (end_pos >> 32) as u32, - end_pos as u32 - ); + debug!("Sent WAL to page server up to {}", end_pos); - if XLogSegmentOffset(start_pos, wal_seg_size) != 0 { + if start_pos.segment_offset(wal_seg_size as u64) != 0 { wal_file = Some(file); } } @@ -987,7 +976,7 @@ impl Connection { fn write_wal_file( &self, - startpos: XLogRecPtr, + startpos: Lsn, timeline: TimeLineID, wal_seg_size: usize, buf: &[u8], @@ -999,7 +988,7 @@ impl Connection { const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; /* Extract WAL location for this block */ - let mut xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize; + let mut xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize; while bytes_left != 0 { let bytes_to_write; @@ -1015,7 +1004,7 @@ impl Connection { } /* Open file */ - let segno = XLByteToSeg(start_pos, wal_seg_size); + let segno = start_pos.segment_number(wal_seg_size as u64); let wal_file_name = XLogFileName(timeline, segno, wal_seg_size); let wal_file_path = self .conf @@ -1074,7 +1063,7 @@ impl Connection { xlogoff += bytes_to_write; /* Did we reach the end of a WAL segment? */ - if XLogSegmentOffset(start_pos, wal_seg_size) == 0 { + if start_pos.segment_offset(wal_seg_size as u64) == 0 { xlogoff = 0; if partial { fs::rename(&wal_file_partial_path, &wal_file_path)?; @@ -1084,12 +1073,13 @@ impl Connection { Ok(()) } - // Find last WAL record. If "precise" is false then just locatelast partial segment - fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) { - find_end_of_wal( + /// Find last WAL record. If "precise" is false then just locate last partial segment + fn find_end_of_wal(&self, precise: bool) -> (Lsn, TimeLineID) { + let (lsn, timeline) = find_end_of_wal( &self.conf.data_dir, self.timeline().get_info().server.wal_seg_size as usize, precise, - ) + ); + (Lsn(lsn), timeline) } }