// // This file contains common utilities for dealing with PostgreSQL WAL files and // LSNs. // // Many of these functions have been copied from PostgreSQL, and rewritten in // Rust. That's why they don't follow the usual Rust naming conventions, they // have been named the same as the corresponding PostgreSQL functions instead. // use crate::pg_constants; use crate::CheckPoint; use crate::FullTransactionId; use crate::XLogLongPageHeaderData; use crate::XLogPageHeaderData; use crate::XLogRecord; use crate::XLOG_PAGE_MAGIC; use anyhow::{bail, ensure}; use byteorder::{ByteOrder, LittleEndian}; use bytes::BytesMut; use bytes::{Buf, Bytes}; use crc32c::*; use log::*; use std::cmp::max; use std::cmp::min; use std::fs::{self, File}; use std::io::prelude::*; use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::time::SystemTime; use utils::bin_ser::DeserializeError; use utils::bin_ser::SerializeError; use utils::const_assert; use utils::lsn::Lsn; pub const XLOG_FNAME_LEN: usize = 24; pub const XLOG_BLCKSZ: usize = 8192; pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001; pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8; pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2; pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::(); pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::(); pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::(); #[allow(clippy::identity_op)] pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2; // PG timeline is always 1, changing it doesn't have useful meaning in Zenith. pub const PG_TLI: u32 = 1; pub type XLogRecPtr = u64; pub type TimeLineID = u32; pub type TimestampTz = i64; pub type XLogSegNo = u64; /// Interval of checkpointing metadata file. We should store metadata file to enforce /// predicate that checkpoint.nextXid is larger than any XID in WAL. /// But flushing checkpoint file for each transaction seems to be too expensive, /// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform /// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions. /// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE /// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG. const XID_CHECKPOINT_INTERVAL: u32 = 1024; #[allow(non_snake_case)] pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo } #[allow(non_snake_case)] pub fn XLogSegNoOffsetToRecPtr( segno: XLogSegNo, offset: u32, wal_segsz_bytes: usize, ) -> XLogRecPtr { segno * (wal_segsz_bytes as u64) + (offset as u64) } #[allow(non_snake_case)] pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String { return format!( "{:>08X}{:>08X}{:>08X}", tli, logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes), logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes) ); } #[allow(non_snake_case)] pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) { let tli = u32::from_str_radix(&fname[0..8], 16).unwrap(); let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo; let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo; (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli) } #[allow(non_snake_case)] pub fn IsXLogFileName(fname: &str) -> bool { return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit()); } #[allow(non_snake_case)] pub fn IsPartialXLogFileName(fname: &str) -> bool { fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8]) } /// If LSN points to the beginning of the page, then shift it to first record, /// otherwise align on 8-bytes boundary (required for WAL records) pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn { if lsn.0 % XLOG_BLCKSZ as u64 == 0 { let hdr_size = if lsn.0 % seg_sz as u64 == 0 { XLOG_SIZE_OF_XLOG_LONG_PHD } else { XLOG_SIZE_OF_XLOG_SHORT_PHD }; lsn + hdr_size as u64 } else { lsn.align() } } pub fn get_current_timestamp() -> TimestampTz { to_pg_timestamp(SystemTime::now()) } pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz { const UNIX_EPOCH_JDATE: u64 = 2440588; /* == date2j(1970, 1, 1) */ const POSTGRES_EPOCH_JDATE: u64 = 2451545; /* == date2j(2000, 1, 1) */ const SECS_PER_DAY: u64 = 86400; const USECS_PER_SEC: u64 = 1000000; match time.duration_since(SystemTime::UNIX_EPOCH) { Ok(n) => { ((n.as_secs() - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * USECS_PER_SEC + n.subsec_micros() as u64) as i64 } Err(_) => panic!("SystemTime before UNIX EPOCH!"), } } /// Return offset of the last valid record in the segment segno, starting /// looking at start_offset. Returns start_offset if no records found. fn find_end_of_wal_segment( data_dir: &Path, segno: XLogSegNo, tli: TimeLineID, wal_seg_size: usize, start_offset: usize, // start reading at this point ) -> anyhow::Result { // step back to the beginning of the page to read it in... let mut offs: usize = start_offset - start_offset % XLOG_BLCKSZ; let mut skipping_first_contrecord: bool = false; let mut contlen: usize = 0; let mut xl_crc: u32 = 0; let mut crc: u32 = 0; let mut rec_offs: usize = 0; let mut buf = [0u8; XLOG_BLCKSZ]; let file_name = XLogFileName(tli, segno, wal_seg_size); let mut last_valid_rec_pos: usize = start_offset; // assume at given start_offset begins new record let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap(); file.seek(SeekFrom::Start(offs as u64))?; // xl_crc is the last field in XLogRecord, will not be read into rec_hdr const_assert!(XLOG_RECORD_CRC_OFFS + 4 == XLOG_SIZE_OF_XLOG_RECORD); let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS]; trace!("find_end_of_wal_segment(data_dir={}, segno={}, tli={}, wal_seg_size={}, start_offset=0x{:x})", data_dir.display(), segno, tli, wal_seg_size, start_offset); while offs < wal_seg_size { // we are at the beginning of the page; read it in if offs % XLOG_BLCKSZ == 0 { trace!("offs=0x{:x}: new page", offs); let bytes_read = file.read(&mut buf)?; if bytes_read != buf.len() { bail!( "failed to read {} bytes from {} at {}", XLOG_BLCKSZ, file_name, offs ); } let xlp_magic = LittleEndian::read_u16(&buf[0..2]); let xlp_info = LittleEndian::read_u16(&buf[2..4]); let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]); trace!( " xlp_magic=0x{:x}, xlp_info=0x{:x}, xlp_rem_len={}", xlp_magic, xlp_info, xlp_rem_len ); // this is expected in current usage when valid WAL starts after page header if xlp_magic != XLOG_PAGE_MAGIC as u16 { trace!( " invalid WAL file {}.partial magic {} at {:?}", file_name, xlp_magic, Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)), ); } if offs == 0 { offs += XLOG_SIZE_OF_XLOG_LONG_PHD; if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 { trace!(" first record is contrecord"); skipping_first_contrecord = true; contlen = xlp_rem_len as usize; if offs < start_offset { // Pre-condition failed: the beginning of the segment is unexpectedly corrupted. ensure!(start_offset - offs >= contlen, "start_offset is in the middle of the first record (which happens to be a contrecord), \ expected to be on a record boundary. Is beginning of the segment corrupted?"); contlen = 0; // keep skipping_first_contrecord to avoid counting the contrecord as valid, we did not check it. } } else { trace!(" first record is not contrecord"); } } else { offs += XLOG_SIZE_OF_XLOG_SHORT_PHD; } // ... and step forward again if asked trace!(" skipped header to 0x{:x}", offs); offs = max(offs, start_offset); // beginning of the next record } else if contlen == 0 { let page_offs = offs % XLOG_BLCKSZ; let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize; trace!("offs=0x{:x}: new record, xl_tot_len={}", offs, xl_tot_len); if xl_tot_len == 0 { info!( "find_end_of_wal_segment reached zeros at {:?}, last records ends at {:?}", Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)), Lsn(XLogSegNoOffsetToRecPtr( segno, last_valid_rec_pos as u32, wal_seg_size )) ); break; // zeros, reached the end } if skipping_first_contrecord { skipping_first_contrecord = false; trace!(" first contrecord has been just completed"); } else { trace!( " updating last_valid_rec_pos: 0x{:x} --> 0x{:x}", last_valid_rec_pos, offs ); last_valid_rec_pos = offs; } offs += 4; rec_offs = 4; contlen = xl_tot_len - 4; trace!( " reading rec_hdr[0..4] <-- [0x{:x}; 0x{:x})", page_offs, page_offs + 4 ); rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]); } else { // we're continuing a record, possibly from previous page. let page_offs = offs % XLOG_BLCKSZ; let pageleft = XLOG_BLCKSZ - page_offs; // read the rest of the record, or as much as fits on this page. let n = min(contlen, pageleft); trace!( "offs=0x{:x}, record continuation, pageleft={}, contlen={}", offs, pageleft, contlen ); // fill rec_hdr header up to (but not including) xl_crc field trace!( " rec_offs={}, XLOG_RECORD_CRC_OFFS={}, XLOG_SIZE_OF_XLOG_RECORD={}", rec_offs, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD ); if rec_offs < XLOG_RECORD_CRC_OFFS { let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n); trace!( " reading rec_hdr[{}..{}] <-- [0x{:x}; 0x{:x})", rec_offs, rec_offs + len, page_offs, page_offs + len ); rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]); } if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD { let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS; // All records are aligned on 8-byte boundary, so their 8-byte frames // cannot be split between pages. As xl_crc is the last field, // its content is always on the same page. const_assert!(XLOG_RECORD_CRC_OFFS % 8 == 4); // We should always start reading aligned records even in incorrect WALs so if // the condition is false it is likely a bug. However, it is localized somewhere // in this function, hence we do not crash and just report failure instead. ensure!(crc_offs % 8 == 4, "Record is not aligned properly (bug?)"); xl_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]); trace!( " reading xl_crc: [0x{:x}; 0x{:x}) = 0x{:x}", crc_offs, crc_offs + 4, xl_crc ); crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]); trace!( " initializing crc: [0x{:x}; 0x{:x}); crc = 0x{:x}", crc_offs + 4, page_offs + n, crc ); } else if rec_offs > XLOG_RECORD_CRC_OFFS { // As all records are 8-byte aligned, the header is already fully read and `crc` is initialized in the branch above. ensure!(rec_offs >= XLOG_SIZE_OF_XLOG_RECORD); let old_crc = crc; crc = crc32c_append(crc, &buf[page_offs..page_offs + n]); trace!( " appending to crc: [0x{:x}; 0x{:x}); 0x{:x} --> 0x{:x}", page_offs, page_offs + n, old_crc, crc ); } else { // Correct because of the way conditions are written above. assert!(rec_offs + n < XLOG_SIZE_OF_XLOG_RECORD); // If `skipping_first_contrecord == true`, we may be reading from a middle of a record // which started in the previous segment. Hence there is no point in validating the header. if !skipping_first_contrecord && rec_offs + n > XLOG_RECORD_CRC_OFFS { info!( "Curiously corrupted WAL: a record stops inside the header; \ offs=0x{:x}, record continuation, pageleft={}, contlen={}", offs, pageleft, contlen ); break; } // Do nothing: we are still reading the header. It's accounted in CRC in the end of the record. } rec_offs += n; offs += n; contlen -= n; if contlen == 0 { trace!(" record completed at 0x{:x}", offs); crc = crc32c_append(crc, &rec_hdr); offs = (offs + 7) & !7; // pad on 8 bytes boundary */ trace!( " padded offs to 0x{:x}, crc is {:x}, expected crc is {:x}", offs, crc, xl_crc ); if skipping_first_contrecord { // do nothing, the flag will go down on next iteration when we're reading new record trace!(" first conrecord has been just completed"); } else if crc == xl_crc { // record is valid, advance the result to its end (with // alignment to the next record taken into account) trace!( " updating last_valid_rec_pos: 0x{:x} --> 0x{:x}", last_valid_rec_pos, offs ); last_valid_rec_pos = offs; } else { info!( "CRC mismatch {} vs {} at {}", crc, xl_crc, last_valid_rec_pos ); break; } } } } trace!("last_valid_rec_pos=0x{:x}", last_valid_rec_pos); Ok(last_valid_rec_pos as u32) } /// /// Scan a directory that contains PostgreSQL WAL files, for the end of WAL. /// If precise, returns end LSN (next insertion point, basically); /// otherwise, start of the last segment. /// Returns (0, 0) if there is no WAL. /// pub fn find_end_of_wal( data_dir: &Path, wal_seg_size: usize, precise: bool, start_lsn: Lsn, // start reading WAL at this point or later ) -> anyhow::Result<(XLogRecPtr, TimeLineID)> { let mut high_segno: XLogSegNo = 0; let mut high_tli: TimeLineID = 0; let mut high_ispartial = false; for entry in fs::read_dir(data_dir).unwrap().flatten() { let ispartial: bool; let entry_name = entry.file_name(); let fname = entry_name.to_str().unwrap(); /* * Check if the filename looks like an xlog file, or a .partial file. */ if IsXLogFileName(fname) { ispartial = false; } else if IsPartialXLogFileName(fname) { ispartial = true; } else { continue; } let (segno, tli) = XLogFromFileName(fname, wal_seg_size); if !ispartial && entry.metadata().unwrap().len() != wal_seg_size as u64 { continue; } if segno > high_segno || (segno == high_segno && tli > high_tli) || (segno == high_segno && tli == high_tli && high_ispartial && !ispartial) { high_segno = segno; high_tli = tli; high_ispartial = ispartial; } } if high_segno > 0 { let mut high_offs = 0; /* * Move the starting pointer to the start of the next segment, if the * highest one we saw was completed. */ if !high_ispartial { high_segno += 1; } else if precise { /* otherwise locate last record in last partial segment */ if start_lsn.segment_number(wal_seg_size) > high_segno { bail!( "provided start_lsn {:?} is beyond highest segno {:?} available", start_lsn, high_segno, ); } let start_offset = if start_lsn.segment_number(wal_seg_size) == high_segno { start_lsn.segment_offset(wal_seg_size) } else { 0 }; high_offs = find_end_of_wal_segment( data_dir, high_segno, high_tli, wal_seg_size, start_offset, )?; } let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); return Ok((high_ptr, high_tli)); } Ok((0, 0)) } pub fn main() { let mut data_dir = PathBuf::new(); data_dir.push("."); let wal_seg_size = 16 * 1024 * 1024; let (wal_end, tli) = find_end_of_wal(&data_dir, wal_seg_size, true, Lsn(0)).unwrap(); println!( "wal_end={:>08X}{:>08X}, tli={}", (wal_end >> 32) as u32, wal_end as u32, tli ); } impl XLogRecord { pub fn from_slice(buf: &[u8]) -> Result { use utils::bin_ser::LeSer; XLogRecord::des(buf) } pub fn from_bytes(buf: &mut B) -> Result { use utils::bin_ser::LeSer; XLogRecord::des_from(&mut buf.reader()) } pub fn encode(&self) -> Result { use utils::bin_ser::LeSer; Ok(self.ser()?.into()) } // Is this record an XLOG_SWITCH record? They need some special processing, pub fn is_xlog_switch_record(&self) -> bool { self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID } } impl XLogPageHeaderData { pub fn from_bytes(buf: &mut B) -> Result { use utils::bin_ser::LeSer; XLogPageHeaderData::des_from(&mut buf.reader()) } } impl XLogLongPageHeaderData { pub fn from_bytes(buf: &mut B) -> Result { use utils::bin_ser::LeSer; XLogLongPageHeaderData::des_from(&mut buf.reader()) } pub fn encode(&self) -> Result { use utils::bin_ser::LeSer; self.ser().map(|b| b.into()) } } pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::(); impl CheckPoint { pub fn encode(&self) -> Result { use utils::bin_ser::LeSer; Ok(self.ser()?.into()) } pub fn decode(buf: &[u8]) -> Result { use utils::bin_ser::LeSer; CheckPoint::des(buf) } /// Update next XID based on provided new_xid and stored epoch. /// Next XID should be greater than new_xid. This handles 32-bit /// XID wraparound correctly. /// /// Returns 'true' if the XID was updated. pub fn update_next_xid(&mut self, xid: u32) -> bool { // nextXid should nw greater than any XID in WAL, so increment provided XID and check for wraparround. let mut new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID); // To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL. // XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE new_xid = new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1); let full_xid = self.nextXid.value; let old_xid = full_xid as u32; if new_xid.wrapping_sub(old_xid) as i32 > 0 { let mut epoch = full_xid >> 32; if new_xid < old_xid { // wrap-around epoch += 1; } let nextXid = (epoch << 32) | new_xid as u64; if nextXid != self.nextXid.value { self.nextXid = FullTransactionId { value: nextXid }; return true; } } false } } // // Generate new, empty WAL segment. // We need this segment to start compute node. // pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result { let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize); let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, pg_constants::WAL_SEGMENT_SIZE); let hdr = XLogLongPageHeaderData { std: { XLogPageHeaderData { xlp_magic: XLOG_PAGE_MAGIC as u16, xlp_info: pg_constants::XLP_LONG_HEADER, xlp_tli: PG_TLI, xlp_pageaddr: pageaddr, xlp_rem_len: 0, ..Default::default() // Put 0 in padding fields. } }, xlp_sysid: system_id, xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, xlp_xlog_blcksz: XLOG_BLCKSZ as u32, }; let hdr_bytes = hdr.encode()?; seg_buf.extend_from_slice(&hdr_bytes); //zero out the rest of the file seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0); Ok(seg_buf.freeze()) } #[cfg(test)] mod tests { use super::*; use regex::Regex; use std::{env, str::FromStr}; fn init_logging() { let _ = env_logger::Builder::from_env( env_logger::Env::default() .default_filter_or("wal_generate=info,postgres_ffi::xlog_utils=trace"), ) .is_test(true) .try_init(); } fn test_end_of_wal( test_name: &str, generate_wal: impl Fn(&mut postgres::Client) -> anyhow::Result, expected_end_of_wal_non_partial: Lsn, last_segment: &str, ) { use wal_generate::*; // 1. Generate some WAL let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) .join("..") .join(".."); let cfg = Conf { pg_distrib_dir: top_path.join("tmp_install"), datadir: top_path.join(format!("test_output/{}", test_name)), }; if cfg.datadir.exists() { fs::remove_dir_all(&cfg.datadir).unwrap(); } cfg.initdb().unwrap(); let mut srv = cfg.start_server().unwrap(); let expected_wal_end: Lsn = u64::from(generate_wal(&mut srv.connect_with_timeout().unwrap()).unwrap()).into(); srv.kill(); // 2. Pick WAL generated by initdb let wal_dir = cfg.datadir.join("pg_wal"); let wal_seg_size = 16 * 1024 * 1024; // 3. Check end_of_wal on non-partial WAL segment (we treat it as fully populated) let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap(); let wal_end = Lsn(wal_end); info!( "find_end_of_wal returned (wal_end={}, tli={})", wal_end, tli ); assert_eq!(wal_end, expected_end_of_wal_non_partial); // 4. Get the actual end of WAL by pg_waldump let waldump_output = cfg .pg_waldump("000000010000000000000001", last_segment) .unwrap() .stderr; let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); let caps = match Regex::new(r"invalid record length at (.+):") .unwrap() .captures(waldump_output) { Some(caps) => caps, None => { error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output); panic!(); } }; let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap(); info!( "waldump erred on {}, expected wal end at {}", waldump_wal_end, expected_wal_end ); assert_eq!(waldump_wal_end, expected_wal_end); // 5. Rename file to partial to actually find last valid lsn fs::rename( wal_dir.join(last_segment), wal_dir.join(format!("{}.partial", last_segment)), ) .unwrap(); let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap(); let wal_end = Lsn(wal_end); info!( "find_end_of_wal returned (wal_end={}, tli={})", wal_end, tli ); assert_eq!(wal_end, waldump_wal_end); } #[test] pub fn test_find_end_of_wal_simple() { init_logging(); test_end_of_wal( "test_find_end_of_wal_simple", wal_generate::generate_simple, "0/2000000".parse::().unwrap(), "000000010000000000000001", ); } #[test] pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() { init_logging(); test_end_of_wal( "test_find_end_of_wal_crossing_segment_followed_by_small_one", wal_generate::generate_wal_record_crossing_segment_followed_by_small_one, "0/3000000".parse::().unwrap(), "000000010000000000000002", ); } #[test] #[ignore = "not yet fixed, needs correct parsing of pre-last segments"] // TODO pub fn test_find_end_of_wal_last_crossing_segment() { init_logging(); test_end_of_wal( "test_find_end_of_wal_last_crossing_segment", wal_generate::generate_last_wal_record_crossing_segment, "0/3000000".parse::().unwrap(), "000000010000000000000002", ); } /// Check the math in update_next_xid /// /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, /// currently 1024. #[test] pub fn test_update_next_xid() { let checkpoint_buf = [0u8; std::mem::size_of::()]; let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); checkpoint.nextXid = FullTransactionId { value: 10 }; assert_eq!(checkpoint.nextXid.value, 10); // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL // boundary checkpoint.update_next_xid(100); assert_eq!(checkpoint.nextXid.value, 1024); // No change checkpoint.update_next_xid(500); assert_eq!(checkpoint.nextXid.value, 1024); checkpoint.update_next_xid(1023); assert_eq!(checkpoint.nextXid.value, 1024); // The function returns the *next* XID, given the highest XID seen so // far. So when we pass 1024, the nextXid gets bumped up to the next // XID_CHECKPOINT_INTERVAL boundary. checkpoint.update_next_xid(1024); assert_eq!(checkpoint.nextXid.value, 2048); } }