diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 8cdfd92fc1..29b00c8d36 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -16,22 +16,22 @@ use crate::XLogRecord; use crate::XLOG_PAGE_MAGIC; use crate::pg_constants::WAL_SEGMENT_SIZE; -use anyhow::{anyhow, bail, ensure}; -use byteorder::{ByteOrder, LittleEndian}; +use crate::waldecoder::WalStreamDecoder; + use bytes::BytesMut; use bytes::{Buf, Bytes}; -use crc32c::*; + use log::*; -use std::cmp::max; -use std::cmp::min; -use std::fs::{self, File}; + +use std::fs::File; use std::io::prelude::*; +use std::io::ErrorKind; use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::time::SystemTime; use utils::bin_ser::DeserializeError; use utils::bin_ser::SerializeError; -use utils::const_assert; + use utils::lsn::Lsn; pub const XLOG_FNAME_LEN: usize = 24; @@ -140,338 +140,93 @@ pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz { } } -/// Return offset of the last valid record in the segment segno, starting -/// looking at start_offset. Returns start_offset if no records found. -fn find_end_of_wal_segment( - data_dir: &Path, - segno: XLogSegNo, - tli: TimeLineID, - wal_seg_size: usize, - start_offset: usize, // start reading at this point -) -> anyhow::Result { - // step back to the beginning of the page to read it in... - let mut offs: usize = start_offset - start_offset % XLOG_BLCKSZ; - let mut skipping_first_contrecord: bool = false; - let mut contlen: usize = 0; - let mut xl_crc: u32 = 0; - let mut crc: u32 = 0; - let mut rec_offs: usize = 0; - let mut buf = [0u8; XLOG_BLCKSZ]; - let file_name = XLogFileName(tli, segno, wal_seg_size); - let mut last_valid_rec_pos: usize = start_offset; // assume at given start_offset begins new record - let mut file = File::open(data_dir.join(file_name.clone() + ".partial"))?; - file.seek(SeekFrom::Start(offs as u64))?; - // xl_crc is the last field in XLogRecord, will not be read into rec_hdr - const_assert!(XLOG_RECORD_CRC_OFFS + 4 == XLOG_SIZE_OF_XLOG_RECORD); - let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS]; - - trace!("find_end_of_wal_segment(data_dir={}, segno={}, tli={}, wal_seg_size={}, start_offset=0x{:x})", data_dir.display(), segno, tli, wal_seg_size, start_offset); - while offs < wal_seg_size { - // we are at the beginning of the page; read it in - if offs % XLOG_BLCKSZ == 0 { - trace!("offs=0x{:x}: new page", offs); - let bytes_read = file.read(&mut buf)?; - if bytes_read != buf.len() { - bail!( - "failed to read {} bytes from {} at {}", - XLOG_BLCKSZ, - file_name, - offs - ); - } - - let xlp_magic = LittleEndian::read_u16(&buf[0..2]); - let xlp_info = LittleEndian::read_u16(&buf[2..4]); - let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]); - trace!( - " xlp_magic=0x{:x}, xlp_info=0x{:x}, xlp_rem_len={}", - xlp_magic, - xlp_info, - xlp_rem_len - ); - // this is expected in current usage when valid WAL starts after page header - if xlp_magic != XLOG_PAGE_MAGIC as u16 { - trace!( - " invalid WAL file {}.partial magic {} at {:?}", - file_name, - xlp_magic, - Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)), - ); - } - if offs == 0 { - offs += XLOG_SIZE_OF_XLOG_LONG_PHD; - if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 { - trace!(" first record is contrecord"); - skipping_first_contrecord = true; - contlen = xlp_rem_len as usize; - if offs < start_offset { - // Pre-condition failed: the beginning of the segment is unexpectedly corrupted. - ensure!(start_offset - offs >= contlen, - "start_offset is in the middle of the first record (which happens to be a contrecord), \ - expected to be on a record boundary. Is beginning of the segment corrupted?"); - contlen = 0; - // keep skipping_first_contrecord to avoid counting the contrecord as valid, we did not check it. - } - } else { - trace!(" first record is not contrecord"); - } - } else { - offs += XLOG_SIZE_OF_XLOG_SHORT_PHD; - } - // ... and step forward again if asked - trace!(" skipped header to 0x{:x}", offs); - offs = max(offs, start_offset); - // beginning of the next record - } else if contlen == 0 { - let page_offs = offs % XLOG_BLCKSZ; - let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize; - trace!("offs=0x{:x}: new record, xl_tot_len={}", offs, xl_tot_len); - if xl_tot_len == 0 { - info!( - "find_end_of_wal_segment reached zeros at {:?}, last records ends at {:?}", - Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)), - Lsn(XLogSegNoOffsetToRecPtr( - segno, - last_valid_rec_pos as u32, - wal_seg_size - )) - ); - break; // zeros, reached the end - } - if skipping_first_contrecord { - skipping_first_contrecord = false; - trace!(" first contrecord has been just completed"); - } else { - trace!( - " updating last_valid_rec_pos: 0x{:x} --> 0x{:x}", - last_valid_rec_pos, - offs - ); - last_valid_rec_pos = offs; - } - offs += 4; - rec_offs = 4; - contlen = xl_tot_len - 4; - trace!( - " reading rec_hdr[0..4] <-- [0x{:x}; 0x{:x})", - page_offs, - page_offs + 4 - ); - rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]); - } else { - // we're continuing a record, possibly from previous page. - let page_offs = offs % XLOG_BLCKSZ; - let pageleft = XLOG_BLCKSZ - page_offs; - - // read the rest of the record, or as much as fits on this page. - let n = min(contlen, pageleft); - trace!( - "offs=0x{:x}, record continuation, pageleft={}, contlen={}", - offs, - pageleft, - contlen - ); - // fill rec_hdr header up to (but not including) xl_crc field - trace!( - " rec_offs={}, XLOG_RECORD_CRC_OFFS={}, XLOG_SIZE_OF_XLOG_RECORD={}", - rec_offs, - XLOG_RECORD_CRC_OFFS, - XLOG_SIZE_OF_XLOG_RECORD - ); - if rec_offs < XLOG_RECORD_CRC_OFFS { - let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n); - trace!( - " reading rec_hdr[{}..{}] <-- [0x{:x}; 0x{:x})", - rec_offs, - rec_offs + len, - page_offs, - page_offs + len - ); - rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]); - } - if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD { - let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS; - // All records are aligned on 8-byte boundary, so their 8-byte frames - // cannot be split between pages. As xl_crc is the last field, - // its content is always on the same page. - const_assert!(XLOG_RECORD_CRC_OFFS % 8 == 4); - // We should always start reading aligned records even in incorrect WALs so if - // the condition is false it is likely a bug. However, it is localized somewhere - // in this function, hence we do not crash and just report failure instead. - ensure!(crc_offs % 8 == 4, "Record is not aligned properly (bug?)"); - xl_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]); - trace!( - " reading xl_crc: [0x{:x}; 0x{:x}) = 0x{:x}", - crc_offs, - crc_offs + 4, - xl_crc - ); - crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]); - trace!( - " initializing crc: [0x{:x}; 0x{:x}); crc = 0x{:x}", - crc_offs + 4, - page_offs + n, - crc - ); - } else if rec_offs > XLOG_RECORD_CRC_OFFS { - // As all records are 8-byte aligned, the header is already fully read and `crc` is initialized in the branch above. - ensure!(rec_offs >= XLOG_SIZE_OF_XLOG_RECORD); - let old_crc = crc; - crc = crc32c_append(crc, &buf[page_offs..page_offs + n]); - trace!( - " appending to crc: [0x{:x}; 0x{:x}); 0x{:x} --> 0x{:x}", - page_offs, - page_offs + n, - old_crc, - crc - ); - } else { - // Correct because of the way conditions are written above. - assert!(rec_offs + n < XLOG_SIZE_OF_XLOG_RECORD); - // If `skipping_first_contrecord == true`, we may be reading from a middle of a record - // which started in the previous segment. Hence there is no point in validating the header. - if !skipping_first_contrecord && rec_offs + n > XLOG_RECORD_CRC_OFFS { - info!( - "Curiously corrupted WAL: a record stops inside the header; \ - offs=0x{:x}, record continuation, pageleft={}, contlen={}", - offs, pageleft, contlen - ); - break; - } - // Do nothing: we are still reading the header. It's accounted in CRC in the end of the record. - } - rec_offs += n; - offs += n; - contlen -= n; - - if contlen == 0 { - trace!(" record completed at 0x{:x}", offs); - crc = crc32c_append(crc, &rec_hdr); - offs = (offs + 7) & !7; // pad on 8 bytes boundary */ - trace!( - " padded offs to 0x{:x}, crc is {:x}, expected crc is {:x}", - offs, - crc, - xl_crc - ); - if skipping_first_contrecord { - // do nothing, the flag will go down on next iteration when we're reading new record - trace!(" first conrecord has been just completed"); - } else if crc == xl_crc { - // record is valid, advance the result to its end (with - // alignment to the next record taken into account) - trace!( - " updating last_valid_rec_pos: 0x{:x} --> 0x{:x}", - last_valid_rec_pos, - offs - ); - last_valid_rec_pos = offs; - } else { - info!( - "CRC mismatch {} vs {} at {}", - crc, xl_crc, last_valid_rec_pos - ); - break; - } - } - } - } - trace!("last_valid_rec_pos=0x{:x}", last_valid_rec_pos); - Ok(last_valid_rec_pos as u32) -} - -/// -/// Scan a directory that contains PostgreSQL WAL files, for the end of WAL. -/// If precise, returns end LSN (next insertion point, basically); -/// otherwise, start of the last segment. -/// Returns (0, 0) if there is no WAL. -/// +// Returns (aligned) end_lsn of the last record in data_dir with WAL segments. +// start_lsn must point to some previously known record boundary (beginning of +// the next record). If no valid record after is found, start_lsn is returned +// back. pub fn find_end_of_wal( data_dir: &Path, wal_seg_size: usize, - precise: bool, - start_lsn: Lsn, // start reading WAL at this point or later -) -> anyhow::Result<(XLogRecPtr, TimeLineID)> { - let mut high_segno: XLogSegNo = 0; - let mut high_tli: TimeLineID = 0; - let mut high_ispartial = false; + start_lsn: Lsn, // start reading WAL at this point; must point at record start_lsn. +) -> anyhow::Result { + let mut result = start_lsn; + let mut curr_lsn = start_lsn; + let mut buf = [0u8; XLOG_BLCKSZ]; + let mut decoder = WalStreamDecoder::new(start_lsn); - for entry in fs::read_dir(data_dir)?.flatten() { - let ispartial: bool; - let entry_name = entry.file_name(); - let fname = entry_name - .to_str() - .ok_or_else(|| anyhow!("Invalid file name"))?; - - /* - * Check if the filename looks like an xlog file, or a .partial file. - */ - if IsXLogFileName(fname) { - ispartial = false; - } else if IsPartialXLogFileName(fname) { - ispartial = true; - } else { - continue; - } - let (segno, tli) = XLogFromFileName(fname, wal_seg_size); - if !ispartial && entry.metadata()?.len() != wal_seg_size as u64 { - continue; - } - if segno > high_segno - || (segno == high_segno && tli > high_tli) - || (segno == high_segno && tli == high_tli && high_ispartial && !ispartial) - { - high_segno = segno; - high_tli = tli; - high_ispartial = ispartial; - } - } - if high_segno > 0 { - let mut high_offs = 0; - /* - * Move the starting pointer to the start of the next segment, if the - * highest one we saw was completed. - */ - if !high_ispartial { - high_segno += 1; - } else if precise { - /* otherwise locate last record in last partial segment */ - if start_lsn.segment_number(wal_seg_size) > high_segno { - bail!( - "provided start_lsn {:?} is beyond highest segno {:?} available", - start_lsn, - high_segno, + // loop over segments + loop { + let segno = curr_lsn.segment_number(wal_seg_size); + let seg_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let seg_file_path = data_dir.join(seg_file_name); + match open_wal_segment(&seg_file_path)? { + None => { + // no more segments + info!( + "find_end_of_wal reached end at {:?}, segment {:?} doesn't exist", + result, seg_file_path ); + return Ok(result); + } + Some(mut segment) => { + let seg_offs = curr_lsn.segment_offset(wal_seg_size); + segment.seek(SeekFrom::Start(seg_offs as u64))?; + // loop inside segment + loop { + let bytes_read = segment.read(&mut buf)?; + if bytes_read == 0 { + break; // EOF + } + curr_lsn += bytes_read as u64; + decoder.feed_bytes(&buf[0..bytes_read]); + + // advance result past all completely read records + loop { + match decoder.poll_decode() { + Ok(Some(record)) => result = record.0, + Err(e) => { + info!( + "find_end_of_wal reached end at {:?}, decode error: {:?}", + result, e + ); + return Ok(result); + } + Ok(None) => break, // need more data + } + } + } } - let start_offset = if start_lsn.segment_number(wal_seg_size) == high_segno { - start_lsn.segment_offset(wal_seg_size) - } else { - 0 - }; - high_offs = find_end_of_wal_segment( - data_dir, - high_segno, - high_tli, - wal_seg_size, - start_offset, - )?; } - let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size); - return Ok((high_ptr, high_tli)); } - Ok((0, 0)) +} + +// Open .partial or full WAL segment file, if present. +fn open_wal_segment(seg_file_path: &Path) -> anyhow::Result> { + let mut partial_path = seg_file_path.to_owned(); + partial_path.set_extension("partial"); + match File::open(partial_path) { + Ok(file) => Ok(Some(file)), + Err(e) => match e.kind() { + ErrorKind::NotFound => { + // .partial not found, try full + match File::open(seg_file_path) { + Ok(file) => Ok(Some(file)), + Err(e) => match e.kind() { + ErrorKind::NotFound => Ok(None), + _ => Err(e.into()), + }, + } + } + _ => Err(e.into()), + }, + } } pub fn main() { let mut data_dir = PathBuf::new(); data_dir.push("."); - let (wal_end, tli) = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, true, Lsn(0)).unwrap(); - println!( - "wal_end={:>08X}{:>08X}, tli={}", - (wal_end >> 32) as u32, - wal_end as u32, - tli - ); + let wal_end = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, Lsn(0)).unwrap(); + println!("wal_end={:?}", wal_end); } impl XLogRecord { @@ -595,7 +350,10 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result( - test_name: &str, - expected_end_of_wal_non_partial: Lsn, - ) { + fn test_end_of_wal(test_name: &str) { use wal_craft::*; // Craft some WAL let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) @@ -630,7 +385,7 @@ mod tests { .iter() .map(|&lsn| u64::from(lsn).into()) .collect(); - let expected_end_of_wal_partial: Lsn = u64::from(expected_end_of_wal_partial).into(); + let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into(); srv.kill(); // Check find_end_of_wal on the initial WAL @@ -642,10 +397,10 @@ mod tests { .filter(|fname| IsXLogFileName(fname)) .max() .unwrap(); - check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal_partial); - for start_lsn in std::iter::once(Lsn(0)) - .chain(intermediate_lsns) - .chain(std::iter::once(expected_end_of_wal_partial)) + check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal); + for start_lsn in intermediate_lsns + .iter() + .chain(std::iter::once(&expected_end_of_wal)) { // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`. // We assume that `start_lsn` is non-decreasing. @@ -660,7 +415,7 @@ mod tests { } let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE); let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); - if seg_start_lsn > u64::from(start_lsn) { + if seg_start_lsn > u64::from(*start_lsn) { continue; } let mut f = File::options().write(true).open(file.path()).unwrap(); @@ -668,18 +423,12 @@ mod tests { f.write_all( &ZEROS[0..min( WAL_SEGMENT_SIZE, - (u64::from(start_lsn) - seg_start_lsn) as usize, + (u64::from(*start_lsn) - seg_start_lsn) as usize, )], ) .unwrap(); } - check_end_of_wal( - &cfg, - &last_segment, - start_lsn, - expected_end_of_wal_non_partial, - expected_end_of_wal_partial, - ); + check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal); } } @@ -716,18 +465,15 @@ mod tests { cfg: &wal_craft::Conf, last_segment: &str, start_lsn: Lsn, - expected_end_of_wal_non_partial: Lsn, - expected_end_of_wal_partial: Lsn, + expected_end_of_wal: Lsn, ) { // Check end_of_wal on non-partial WAL segment (we treat it as fully populated) - let (wal_end, tli) = - find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap(); - let wal_end = Lsn(wal_end); - info!( - "find_end_of_wal returned (wal_end={}, tli={}) with non-partial WAL segment", - wal_end, tli - ); - assert_eq!(wal_end, expected_end_of_wal_non_partial); + // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); + // info!( + // "find_end_of_wal returned wal_end={} with non-partial WAL segment", + // wal_end + // ); + // assert_eq!(wal_end, expected_end_of_wal_non_partial); // Rename file to partial to actually find last valid lsn, then rename it back. fs::rename( @@ -735,14 +481,12 @@ mod tests { cfg.wal_dir().join(format!("{}.partial", last_segment)), ) .unwrap(); - let (wal_end, tli) = - find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap(); - let wal_end = Lsn(wal_end); + let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); info!( - "find_end_of_wal returned (wal_end={}, tli={}) with partial WAL segment", - wal_end, tli + "find_end_of_wal returned wal_end={} with partial WAL segment", + wal_end ); - assert_eq!(wal_end, expected_end_of_wal_partial); + assert_eq!(wal_end, expected_end_of_wal); fs::rename( cfg.wal_dir().join(format!("{}.partial", last_segment)), cfg.wal_dir().join(last_segment), @@ -755,10 +499,7 @@ mod tests { #[test] pub fn test_find_end_of_wal_simple() { init_logging(); - test_end_of_wal::( - "test_find_end_of_wal_simple", - "0/2000000".parse::().unwrap(), - ); + test_end_of_wal::("test_find_end_of_wal_simple"); } #[test] @@ -766,17 +507,14 @@ mod tests { init_logging(); test_end_of_wal::( "test_find_end_of_wal_crossing_segment_followed_by_small_one", - "0/3000000".parse::().unwrap(), ); } #[test] - #[ignore = "not yet fixed, needs correct parsing of pre-last segments"] // TODO pub fn test_find_end_of_wal_last_crossing_segment() { init_logging(); test_end_of_wal::( "test_find_end_of_wal_last_crossing_segment", - "0/3000000".parse::().unwrap(), ); } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 2a36d5c04c..5f4bf588c7 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -332,7 +332,7 @@ impl Storage for PhysicalStorage { self.write_lsn = if state.commit_lsn == Lsn(0) { Lsn(0) } else { - Lsn(find_end_of_wal(&self.timeline_dir, wal_seg_size, true, state.commit_lsn)?.0) + find_end_of_wal(&self.timeline_dir, wal_seg_size, state.commit_lsn)? }; self.write_record_lsn = self.write_lsn;