wal_service: switch to Lsn type

Replace XLogRecPtr with Lsn in wal_service.rs .

This removes the last use of XLogSegmentOffset and XLByteToSeg, so
delete them. (replaced by Lsn::segment_offset and Lsn::segment_number.)
This commit is contained in:
Eric Seppanen
2021-05-07 15:48:52 -07:00
parent 60d66267a9
commit bace19ffbe
2 changed files with 66 additions and 86 deletions

View File

@@ -32,21 +32,11 @@ pub type TimeLineID = u32;
pub type TimestampTz = u64;
pub type XLogSegNo = u64;
#[allow(non_snake_case)]
pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 {
(xlogptr as u32) & (wal_segsz_bytes as u32 - 1)
}
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
}
#[allow(non_snake_case)]
pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo {
xlogptr / wal_segsz_bytes as u64
}
#[allow(non_snake_case)]
pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo,

View File

@@ -2,7 +2,7 @@
/// WAL service listens for client connections and
/// receive WAL from wal_proposer and send it to WAL receivers
///
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use fs2::FileExt;
@@ -21,11 +21,14 @@ use std::str;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use crate::pq_protocol::*;
use crate::WalAcceptorConf;
use pageserver::ZTimelineId;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::xlog_utils::{
find_end_of_wal, get_current_timestamp, TimeLineID, TimestampTz, XLogFileName, XLOG_BLCKSZ,
};
type FullTransactionId = u64;
@@ -33,13 +36,13 @@ const SK_MAGIC: u32 = 0xCafeCeefu32;
const SK_FORMAT_VERSION: u32 = 1;
const SK_PROTOCOL_VERSION: u32 = 1;
const UNKNOWN_SERVER_VERSION: u32 = 0;
const END_REPLICATION_MARKER: u64 = u64::MAX;
const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
const XLOG_HDR_SIZE: usize = 1 + 8 * 3; /* 'w' + startPos + walEnd + timestamp */
const LIBPQ_HDR_SIZE: usize = 5; /* 1 byte with message type + 4 bytes length */
const LIBPQ_MSG_SIZE_OFFS: usize = 1;
const CONTROL_FILE_NAME: &str = "safekeeper.control";
const END_OF_STREAM: XLogRecPtr = 0;
const END_OF_STREAM: Lsn = Lsn(0);
/// Read some bytes from a type that implements [`Read`] into a [`BytesMut`]
///
@@ -91,7 +94,7 @@ struct ServerInfo {
system_id: SystemId,
/// Zenith timelineid
timeline_id: ZTimelineId,
wal_end: XLogRecPtr,
wal_end: Lsn,
timeline: TimeLineID,
wal_seg_size: u32,
}
@@ -101,7 +104,7 @@ struct ServerInfo {
struct RequestVote {
node_id: NodeId,
/// volume commit LSN
vcl: XLogRecPtr,
vcl: Lsn,
/// new epoch when safekeeper reaches vcl
epoch: u64,
}
@@ -118,11 +121,11 @@ struct SafeKeeperInfo {
/// information about server
server: ServerInfo,
/// part of WAL acknowledged by quorum
commit_lsn: XLogRecPtr,
commit_lsn: Lsn,
/// locally flushed part of WAL
flush_lsn: XLogRecPtr,
flush_lsn: Lsn,
/// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers
restart_lsn: XLogRecPtr,
restart_lsn: Lsn,
}
/// Hot standby feedback received from replica
@@ -139,20 +142,20 @@ struct SafeKeeperRequest {
/// Sender's node identifier (looks like we do not need it for TCP streaming connection)
sender_id: NodeId,
/// start position of message in WAL
begin_lsn: XLogRecPtr,
begin_lsn: Lsn,
/// end position of message in WAL
end_lsn: XLogRecPtr,
end_lsn: Lsn,
/// restart LSN position (minimal LSN which may be needed by proxy to perform recovery)
restart_lsn: XLogRecPtr,
restart_lsn: Lsn,
/// LSN committed by quorum of safekeepers
commit_lsn: XLogRecPtr,
commit_lsn: Lsn,
}
/// Report safekeeper state to proxy
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct SafeKeeperResponse {
epoch: u64,
flush_lsn: XLogRecPtr,
flush_lsn: Lsn,
hs_feedback: HotStandbyFeedback,
}
@@ -160,7 +163,7 @@ struct SafeKeeperResponse {
#[derive(Debug)]
struct SharedState {
/// quorum commit LSN
commit_lsn: XLogRecPtr,
commit_lsn: Lsn,
/// information about this safekeeper
info: SafeKeeperInfo,
/// opened file control file handle (needed to hold exlusive file lock
@@ -214,11 +217,6 @@ trait NewSerializer: Serialize + DeserializeOwned {
impl<T> NewSerializer for T where T: Serialize + DeserializeOwned {}
/// Safe hex string parser returning proper result
fn parse_hex_str(s: &str) -> Result<u64> {
Ok(u64::from_str_radix(s, 16)?)
}
impl SafeKeeperInfo {
fn new() -> SafeKeeperInfo {
SafeKeeperInfo {
@@ -234,13 +232,13 @@ impl SafeKeeperInfo {
},
system_id: 0, /* Postgres system identifier */
timeline_id: ZTimelineId::from([0u8; 16]),
wal_end: 0,
wal_end: Lsn(0),
timeline: 0,
wal_seg_size: 0,
},
commit_lsn: 0, /* part of WAL acknowledged by quorum */
flush_lsn: 0, /* locally flushed part of WAL */
restart_lsn: 0, /* minimal LSN which may be needed for recovery of some safekeeper */
commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */
flush_lsn: Lsn(0), /* locally flushed part of WAL */
restart_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */
}
}
}
@@ -287,7 +285,7 @@ fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
impl Timeline {
pub fn new(timelineid: ZTimelineId) -> Timeline {
let shared_state = SharedState {
commit_lsn: 0,
commit_lsn: Lsn(0),
info: SafeKeeperInfo::new(),
control_file: None,
hs_feedback: HotStandbyFeedback {
@@ -304,7 +302,7 @@ impl Timeline {
}
// Notify caught-up WAL senders about new WAL data received
fn notify_wal_senders(&self, commit_lsn: XLogRecPtr) {
fn notify_wal_senders(&self, commit_lsn: Lsn) {
let mut shared_state = self.mutex.lock().unwrap();
if shared_state.commit_lsn < commit_lsn {
shared_state.commit_lsn = commit_lsn;
@@ -499,7 +497,7 @@ impl Connection {
Ok(())
}
// Receive WAL from wal_proposer
/// Receive WAL from wal_proposer
fn receive_wal(&mut self) -> Result<()> {
// Receive information about server
let server_info = self.read_req::<ServerInfo>()?;
@@ -566,7 +564,7 @@ impl Connection {
/* Need to persist our vote first */
self.timeline().save_control_file(true)?;
let mut flushed_restart_lsn: XLogRecPtr = 0;
let mut flushed_restart_lsn = Lsn(0);
let wal_seg_size = server_info.wal_seg_size as usize;
/* Acknowledge the proposed candidate by returning it to the proxy */
@@ -603,16 +601,12 @@ impl Connection {
}
let start_pos = req.begin_lsn;
let end_pos = req.end_lsn;
let rec_size = (end_pos - start_pos) as usize;
let rec_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
assert!(rec_size <= MAX_SEND_SIZE);
debug!(
"received for {} bytes between {:X}/{:X} and {:X}/{:X}",
rec_size,
start_pos >> 32,
start_pos & 0xffffffff,
end_pos >> 32,
end_pos & 0xffffffff
"received for {} bytes between {} and {}",
rec_size, start_pos, end_pos,
);
/* Receive message body */
@@ -771,7 +765,7 @@ impl Connection {
//
fn handle_identify_system(&mut self) -> Result<bool> {
let (start_pos, timeline) = self.find_end_of_wal(false);
let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32);
let lsn = start_pos.to_string();
let tli = timeline.to_string();
let sysid = self.timeline().get_info().server.system_id.to_string();
let lsn_bytes = lsn.as_bytes();
@@ -820,44 +814,43 @@ impl Connection {
// Handle START_REPLICATION replication command
//
fn handle_start_replication(&mut self, cmd: &Bytes) -> Result<bool> {
let re = Regex::new(r"([[:xdigit:]]*)/([[:xdigit:]]*)").unwrap();
let mut caps = re.captures_iter(str::from_utf8(&cmd[..]).unwrap());
let cap = caps.next().unwrap();
let mut start_pos: XLogRecPtr = (parse_hex_str(&cap[1])? << 32) | parse_hex_str(&cap[2])?;
let mut stop_pos: XLogRecPtr = if let Some(cap) = caps.next() {
(parse_hex_str(&cap[1])? << 32) | parse_hex_str(&cap[2])?
} else {
0
};
// helper function to encapsulate the regex -> Lsn magic
fn get_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> {
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
let caps = re.captures_iter(str::from_utf8(&cmd[..])?);
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
let start_pos = lsns
.next()
.ok_or_else(|| anyhow!("failed to find start LSN"))??;
let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0));
Ok((start_pos, stop_pos))
}
let (mut start_pos, mut stop_pos) = get_start_stop(&cmd)?;
let wal_seg_size = self.timeline().get_info().server.wal_seg_size as usize;
if wal_seg_size == 0 {
bail!("Can not start replication before connecting to wal_proposer");
}
let (wal_end, timeline) = self.find_end_of_wal(false);
if start_pos == 0 {
if start_pos == Lsn(0) {
start_pos = wal_end;
}
if stop_pos == 0 && self.appname == Some("wal_proposer_recovery".to_string()) {
if stop_pos == Lsn(0) && self.appname == Some("wal_proposer_recovery".to_string()) {
stop_pos = wal_end;
}
info!(
"Start replication from {:X}/{:>08X} till {:X}/{:>08X}",
(start_pos >> 32) as u32,
start_pos as u32,
(stop_pos >> 32) as u32,
stop_pos as u32
);
info!("Start replication from {} till {}", start_pos, stop_pos);
BeMessage::write(&mut self.outbuf, &BeMessage::Copy);
self.send()?;
let mut end_pos: XLogRecPtr;
let mut commit_lsn: XLogRecPtr;
let mut end_pos: Lsn;
let mut commit_lsn: Lsn;
let mut wal_file: Option<File> = None;
self.outbuf
.resize(LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + MAX_SEND_SIZE, 0u8);
loop {
/* Wait until we have some data to stream */
if stop_pos != 0 {
if stop_pos != Lsn(0) {
/* recovery mode: stream up to the specified LSN (VCL) */
if start_pos >= stop_pos {
/* recovery finished */
@@ -909,7 +902,7 @@ impl Connection {
if let Some(opened_file) = curr_file {
file = opened_file;
} else {
let segno = XLByteToSeg(start_pos, wal_seg_size);
let segno = start_pos.segment_number(wal_seg_size as u64);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self
.conf
@@ -933,11 +926,11 @@ impl Connection {
}
}
}
let xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize;
let xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize;
// How much to read and send in message? We cannot cross the WAL file
// boundary, and we don't want send more than MAX_SEND_SIZE.
let send_size = (end_pos - start_pos) as usize;
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
let send_size = min(send_size, wal_seg_size - xlogoff);
let send_size = min(send_size, MAX_SEND_SIZE);
@@ -953,20 +946,16 @@ impl Connection {
(msg_size - LIBPQ_MSG_SIZE_OFFS) as u32,
);
self.outbuf[5] = b'w';
BigEndian::write_u64(&mut self.outbuf[6..14], start_pos);
BigEndian::write_u64(&mut self.outbuf[14..22], end_pos);
BigEndian::write_u64(&mut self.outbuf[6..14], start_pos.0);
BigEndian::write_u64(&mut self.outbuf[14..22], end_pos.0);
BigEndian::write_u64(&mut self.outbuf[22..30], get_current_timestamp());
self.stream.write_all(&self.outbuf[0..msg_size])?;
start_pos += send_size as u64;
debug!(
"Sent WAL to page server up to {:X}/{:>08X}",
(end_pos >> 32) as u32,
end_pos as u32
);
debug!("Sent WAL to page server up to {}", end_pos);
if XLogSegmentOffset(start_pos, wal_seg_size) != 0 {
if start_pos.segment_offset(wal_seg_size as u64) != 0 {
wal_file = Some(file);
}
}
@@ -987,7 +976,7 @@ impl Connection {
fn write_wal_file(
&self,
startpos: XLogRecPtr,
startpos: Lsn,
timeline: TimeLineID,
wal_seg_size: usize,
buf: &[u8],
@@ -999,7 +988,7 @@ impl Connection {
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/* Extract WAL location for this block */
let mut xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize;
let mut xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize;
while bytes_left != 0 {
let bytes_to_write;
@@ -1015,7 +1004,7 @@ impl Connection {
}
/* Open file */
let segno = XLByteToSeg(start_pos, wal_seg_size);
let segno = start_pos.segment_number(wal_seg_size as u64);
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
let wal_file_path = self
.conf
@@ -1074,7 +1063,7 @@ impl Connection {
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if XLogSegmentOffset(start_pos, wal_seg_size) == 0 {
if start_pos.segment_offset(wal_seg_size as u64) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;
@@ -1084,12 +1073,13 @@ impl Connection {
Ok(())
}
// Find last WAL record. If "precise" is false then just locatelast partial segment
fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) {
find_end_of_wal(
/// Find last WAL record. If "precise" is false then just locate last partial segment
fn find_end_of_wal(&self, precise: bool) -> (Lsn, TimeLineID) {
let (lsn, timeline) = find_end_of_wal(
&self.conf.data_dir,
self.timeline().get_info().server.wal_seg_size as usize,
precise,
)
);
(Lsn(lsn), timeline)
}
}