From 112909c5e41a072f06718c201eef3008864973b5 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 30 Aug 2021 17:32:40 +0300 Subject: [PATCH] Handle wal records larger than WAL segment size in find_end_of_wal --- postgres_ffi/src/xlog_utils.rs | 109 +++++++++++++++++++++++---------- 1 file changed, 77 insertions(+), 32 deletions(-) diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 5989febb79..1780cbbee2 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -148,7 +148,16 @@ fn find_end_of_wal_segment( if check_contrec { let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize; contlen = xlp_rem_len as usize; - assert!(*rec_offs + contlen == xl_tot_len); + if *rec_offs + contlen < xl_tot_len + || (*rec_offs + contlen != xl_tot_len + && contlen != XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_LONG_PHD) + { + info!( + "Corrupted continuation record: offs={}, contlen={}, xl_tot_len={}", + *rec_offs, contlen, xl_tot_len + ); + return 0; + } } else { offs += ((xlp_rem_len + 7) & !7) as usize; } @@ -259,17 +268,21 @@ pub fn find_end_of_wal( let mut crc: u32 = 0; let mut rec_offs: usize = 0; let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD]; - let file_name = XLogFileName(high_tli, high_segno, wal_seg_size); + let wal_dir = data_dir.join("pg_wal"); /* * To be able to calculate CRC of records crossing segment boundary, - * we need to parse previous segment. - * FIXME: handle case when wal record is larger than WAL segment + * we need to parse previous segments. + * So first traverse segments in backward direction to locate record start + * and then traverse forward, accumulating CRC. */ - if high_segno > 2 { - let prev_offs = find_end_of_wal_segment( + let mut prev_segno = high_segno - 1; + let mut prev_offs: u32 = 0; + while prev_segno > 1 { + // TOFO: first segment constains dummy checkpoint record at the beginning + prev_offs = find_end_of_wal_segment( data_dir, - high_segno - 1, + prev_segno, high_tli, wal_seg_size, false, @@ -278,43 +291,75 @@ pub fn find_end_of_wal( &mut crc, false, ); - if prev_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD { - info!( - "Segment {} doesn't contain any valid record {}", - high_segno - 1, - prev_offs + if prev_offs != 0 { + break; + } + prev_segno -= 1; + } + if prev_offs != 0 { + // found start of WAL record + let first_segno = prev_segno; + let first_offs = prev_offs; + while prev_segno + 1 < high_segno { + // now traverse record in forward direction, accumulating CRC + prev_segno += 1; + prev_offs = find_end_of_wal_segment( + data_dir, + prev_segno, + high_tli, + wal_seg_size, + false, + &mut rec_offs, + &mut rec_hdr, + &mut crc, + true, + ); + if prev_offs == 0 { + info!("Segment {} is corrupted", prev_segno,); + break; + } + } + if prev_offs != 0 { + high_offs = find_end_of_wal_segment( + data_dir, + high_segno, + high_tli, + wal_seg_size, + high_ispartial, + &mut rec_offs, + &mut rec_hdr, + &mut crc, + true, ); } - assert!(prev_offs as usize > XLOG_SIZE_OF_XLOG_LONG_PHD); - high_offs = find_end_of_wal_segment( - data_dir, - high_segno, - high_tli, - wal_seg_size, - high_ispartial, - &mut rec_offs, - &mut rec_hdr, - &mut crc, - true, - ); - - if high_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD { - // If last segment contais no valid records, then return back to previous segment - let wal_dir = data_dir.join("pg_wal"); + if high_offs == 0 { + // If last segment contais no valid records, then return back + info!("Last WAL segment {} contains no valid record, truncate WAL till {} segment", + high_segno, first_segno); + // Remove last segments containing corrupted WAL record + for segno in first_segno + 1..high_segno { + let file_name = XLogFileName(high_tli, segno, wal_seg_size); + let file_path = wal_dir.join(file_name); + if let Err(e) = fs::remove_file(&file_path) { + info!("Failed to remove file {:?}: {}", &file_path, e); + } + } + let file_name = XLogFileName(high_tli, high_segno, wal_seg_size); let file_path = if high_ispartial { wal_dir.join(file_name.clone() + ".partial") } else { wal_dir.join(file_name.clone()) }; - info!("Remove empty WAL segment {:?}", &file_path); if let Err(e) = fs::remove_file(&file_path) { info!("Failed to remove file {:?}: {}", &file_path, e); } high_ispartial = false; // previous segment should not be partial - high_segno -= 1; - high_offs = prev_offs; + high_segno = first_segno; + high_offs = first_offs; } } else { + // failed to locate previous segment + assert!(prev_segno <= 1); high_offs = find_end_of_wal_segment( data_dir, high_segno, @@ -331,7 +376,7 @@ pub fn find_end_of_wal( // If last segment is not marked as partial, it means that next segment // was not written. Let's make this segment partial once again. if !high_ispartial { - let wal_dir = data_dir.join("pg_wal"); + let file_name = XLogFileName(high_tli, high_segno, wal_seg_size); if let Err(e) = fs::rename( wal_dir.join(file_name.clone()), wal_dir.join(file_name.clone() + ".partial"),