Handle wal records larger than WAL segment size in find_end_of_wal

This commit is contained in:
Konstantin Knizhnik
2021-08-30 17:32:40 +03:00
parent 07adc9dbda
commit 112909c5e4

View File

@@ -148,7 +148,16 @@ fn find_end_of_wal_segment(
if check_contrec {
let xl_tot_len = LittleEndian::read_u32(&rec_hdr[0..4]) as usize;
contlen = xlp_rem_len as usize;
assert!(*rec_offs + contlen == xl_tot_len);
if *rec_offs + contlen < xl_tot_len
|| (*rec_offs + contlen != xl_tot_len
&& contlen != XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_LONG_PHD)
{
info!(
"Corrupted continuation record: offs={}, contlen={}, xl_tot_len={}",
*rec_offs, contlen, xl_tot_len
);
return 0;
}
} else {
offs += ((xlp_rem_len + 7) & !7) as usize;
}
@@ -259,17 +268,21 @@ pub fn find_end_of_wal(
let mut crc: u32 = 0;
let mut rec_offs: usize = 0;
let mut rec_hdr = [0u8; XLOG_SIZE_OF_XLOG_RECORD];
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
let wal_dir = data_dir.join("pg_wal");
/*
* To be able to calculate CRC of records crossing segment boundary,
* we need to parse previous segment.
* FIXME: handle case when wal record is larger than WAL segment
* we need to parse previous segments.
* So first traverse segments in backward direction to locate record start
* and then traverse forward, accumulating CRC.
*/
if high_segno > 2 {
let prev_offs = find_end_of_wal_segment(
let mut prev_segno = high_segno - 1;
let mut prev_offs: u32 = 0;
while prev_segno > 1 {
// TOFO: first segment constains dummy checkpoint record at the beginning
prev_offs = find_end_of_wal_segment(
data_dir,
high_segno - 1,
prev_segno,
high_tli,
wal_seg_size,
false,
@@ -278,43 +291,75 @@ pub fn find_end_of_wal(
&mut crc,
false,
);
if prev_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD {
info!(
"Segment {} doesn't contain any valid record {}",
high_segno - 1,
prev_offs
if prev_offs != 0 {
break;
}
prev_segno -= 1;
}
if prev_offs != 0 {
// found start of WAL record
let first_segno = prev_segno;
let first_offs = prev_offs;
while prev_segno + 1 < high_segno {
// now traverse record in forward direction, accumulating CRC
prev_segno += 1;
prev_offs = find_end_of_wal_segment(
data_dir,
prev_segno,
high_tli,
wal_seg_size,
false,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
if prev_offs == 0 {
info!("Segment {} is corrupted", prev_segno,);
break;
}
}
if prev_offs != 0 {
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
}
assert!(prev_offs as usize > XLOG_SIZE_OF_XLOG_LONG_PHD);
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
high_tli,
wal_seg_size,
high_ispartial,
&mut rec_offs,
&mut rec_hdr,
&mut crc,
true,
);
if high_offs as usize <= XLOG_SIZE_OF_XLOG_LONG_PHD {
// If last segment contais no valid records, then return back to previous segment
let wal_dir = data_dir.join("pg_wal");
if high_offs == 0 {
// If last segment contais no valid records, then return back
info!("Last WAL segment {} contains no valid record, truncate WAL till {} segment",
high_segno, first_segno);
// Remove last segments containing corrupted WAL record
for segno in first_segno + 1..high_segno {
let file_name = XLogFileName(high_tli, segno, wal_seg_size);
let file_path = wal_dir.join(file_name);
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
}
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
let file_path = if high_ispartial {
wal_dir.join(file_name.clone() + ".partial")
} else {
wal_dir.join(file_name.clone())
};
info!("Remove empty WAL segment {:?}", &file_path);
if let Err(e) = fs::remove_file(&file_path) {
info!("Failed to remove file {:?}: {}", &file_path, e);
}
high_ispartial = false; // previous segment should not be partial
high_segno -= 1;
high_offs = prev_offs;
high_segno = first_segno;
high_offs = first_offs;
}
} else {
// failed to locate previous segment
assert!(prev_segno <= 1);
high_offs = find_end_of_wal_segment(
data_dir,
high_segno,
@@ -331,7 +376,7 @@ pub fn find_end_of_wal(
// If last segment is not marked as partial, it means that next segment
// was not written. Let's make this segment partial once again.
if !high_ispartial {
let wal_dir = data_dir.join("pg_wal");
let file_name = XLogFileName(high_tli, high_segno, wal_seg_size);
if let Err(e) = fs::rename(
wal_dir.join(file_name.clone()),
wal_dir.join(file_name.clone() + ".partial"),