diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 68ac52e30f..c4caa18b32 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -16,14 +16,17 @@ use crate::XLogPageHeaderData; use crate::XLogRecord; use crate::XLOG_PAGE_MAGIC; +use anyhow::{bail, Result}; use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, Bytes}; use bytes::{BufMut, BytesMut}; use crc32c::*; use log::*; +use std::cmp::max; use std::cmp::min; use std::fs::{self, File}; use std::io::prelude::*; +use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::time::SystemTime; use zenith_utils::lsn::Lsn; @@ -125,8 +128,10 @@ fn find_end_of_wal_segment( segno: XLogSegNo, tli: TimeLineID, wal_seg_size: usize, -) -> u32 { - let mut offs: usize = 0; + start_offset: usize, // start reading at this point +) -> Result { + // step back to the beginning of the page to read it in... + let mut offs: usize = start_offset - start_offset % XLOG_BLCKSZ; let mut contlen: usize = 0; let mut wal_crc: u32 = 0; let mut crc: u32 = 0; @@ -135,24 +140,33 @@ fn find_end_of_wal_segment( let file_name = XLogFileName(tli, segno, wal_seg_size); let mut last_valid_rec_pos: usize = 0; let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap(); + file.seek(SeekFrom::Start(offs as u64))?; let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS]; while offs < wal_seg_size { // we are at the beginning of the page; read it in if offs % XLOG_BLCKSZ == 0 { - if let Ok(bytes_read) = file.read(&mut buf) { - if bytes_read != buf.len() { - break; - } - } else { - break; + let bytes_read = file.read(&mut buf)?; + if bytes_read != buf.len() { + bail!( + "failed to read {} bytes from {} at {}", + XLOG_BLCKSZ, + file_name, + offs + ); } + let xlp_magic = LittleEndian::read_u16(&buf[0..2]); let xlp_info = LittleEndian::read_u16(&buf[2..4]); let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]); + // this is expected in current usage when valid WAL starts after page header if xlp_magic != XLOG_PAGE_MAGIC as u16 { - info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic); - break; + trace!( + "invalid WAL file {}.partial magic {} at {:?}", + file_name, + xlp_magic, + Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)), + ); } if offs == 0 { offs = XLOG_SIZE_OF_XLOG_LONG_PHD; @@ -162,11 +176,18 @@ fn find_end_of_wal_segment( } else { offs += XLOG_SIZE_OF_XLOG_SHORT_PHD; } + // ... and step forward again if asked + offs = max(offs, start_offset); + // beginning of the next record } else if contlen == 0 { let page_offs = offs % XLOG_BLCKSZ; let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize; if xl_tot_len == 0 { + info!( + "find_end_of_wal_segment reached zeros at {:?}", + Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)) + ); break; // zeros, reached the end } last_valid_rec_pos = offs; @@ -217,7 +238,7 @@ fn find_end_of_wal_segment( } } } - last_valid_rec_pos as u32 + Ok(last_valid_rec_pos as u32) } /// @@ -230,7 +251,8 @@ pub fn find_end_of_wal( data_dir: &Path, wal_seg_size: usize, precise: bool, -) -> (XLogRecPtr, TimeLineID) { + start_lsn: Lsn, // start reading WAL at this point or later +) -> Result<(XLogRecPtr, TimeLineID)> { let mut high_segno: XLogSegNo = 0; let mut high_tli: TimeLineID = 0; let mut high_ispartial = false; @@ -272,19 +294,32 @@ pub fn find_end_of_wal( high_segno += 1; } else if precise { /* otherwise locate last record in last partial segment */ - high_offs = find_end_of_wal_segment(data_dir, high_segno, high_tli, wal_seg_size); + if start_lsn.segment_number(wal_seg_size) > high_segno { + bail!( + "provided start_lsn {:?} is beyond highest segno {:?} available", + start_lsn, + high_segno, + ); + } + high_offs = find_end_of_wal_segment( + data_dir, + high_segno, + high_tli, + wal_seg_size, + start_lsn.segment_offset(wal_seg_size), + )?; } let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); - return (high_ptr, high_tli); + return Ok((high_ptr, high_tli)); } - (0, 0) + Ok((0, 0)) } pub fn main() { let mut data_dir = PathBuf::new(); data_dir.push("."); let wal_seg_size = 16 * 1024 * 1024; - let (wal_end, tli) = find_end_of_wal(&data_dir, wal_seg_size, true); + let (wal_end, tli) = find_end_of_wal(&data_dir, wal_seg_size, true, Lsn(0)).unwrap(); println!( "wal_end={:>08X}{:>08X}, tli={}", (wal_end >> 32) as u32, @@ -463,7 +498,7 @@ mod tests { let wal_seg_size = 16 * 1024 * 1024; // 3. Check end_of_wal on non-partial WAL segment (we treat it as fully populated) - let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true); + let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap(); let wal_end = Lsn(wal_end); println!("wal_end={}, tli={}", wal_end, tli); assert_eq!(wal_end, "0/2000000".parse::().unwrap()); @@ -489,7 +524,7 @@ mod tests { wal_dir.join("000000010000000000000001.partial"), ) .unwrap(); - let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true); + let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap(); let wal_end = Lsn(wal_end); println!("wal_end={}, tli={}", wal_end, tli); assert_eq!(wal_end, waldump_wal_end); diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 16301edf98..3f78ce3e2b 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -75,6 +75,9 @@ pub struct SafeKeeperState { /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn /// of last record streamed to everyone) pub truncate_lsn: Lsn, + // Safekeeper starts receiving WAL from this LSN, zeros before it ought to + // be skipped during decoding. + pub wal_start_lsn: Lsn, } impl SafeKeeperState { @@ -94,6 +97,7 @@ impl SafeKeeperState { proposer_uuid: [0; 16], commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ truncate_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ + wal_start_lsn: Lsn(0), } } } @@ -413,6 +417,7 @@ where } self.s.proposer_uuid = msg.h.proposer_uuid; + let mut sync_control_file = false; // do the job let mut last_rec_lsn = Lsn(0); @@ -439,9 +444,15 @@ where } } } + + // If this was the first record we ever receieved, remember LSN to help + // find_end_of_wal skip the hole in the beginning. + if self.s.wal_start_lsn == Lsn(0) { + self.s.wal_start_lsn = msg.h.begin_lsn; + sync_control_file = true; + } } - let mut sync_control_file = false; /* * Epoch switch happen when written WAL record cross the boundary. * The boundary is maximum of last WAL position at this node (FlushLSN) and global diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 197ed58f8c..6e6bc8f97c 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -61,7 +61,12 @@ impl SharedState { }; let (flush_lsn, tli) = if state.server.wal_seg_size != 0 { let wal_dir = conf.data_dir.join(format!("{}", timelineid)); - find_end_of_wal(&wal_dir, state.server.wal_seg_size as usize, true) + find_end_of_wal( + &wal_dir, + state.server.wal_seg_size as usize, + true, + state.wal_start_lsn, + )? } else { (0, 0) };