mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Split waldecoder.rs into two source files.
Move the code for decoding a WAL stream into WAL records into 'postgres_ffi', and keep the code to parse the WAL records deeper in 'pageserver' crate, renamed to walrecord.rs. This tidies up the dependencies a bit. 'walkeeper' reuses the same waldecoder routines, and it used to depend on 'pageserver' because of that. Now it only depends on 'postgres_ffi'. (The comment in walkeeper/Cargo.toml that claimed that the dependency was needed for ZTimelineId was obsolete. ZTimelineId is defined in 'zenith_utils', the dependency was actually needed for the waldecoder.)
This commit is contained in:
@@ -13,6 +13,7 @@ pub mod controlfile_utils;
|
||||
pub mod nonrelfile_utils;
|
||||
pub mod pg_constants;
|
||||
pub mod relfile_utils;
|
||||
pub mod waldecoder;
|
||||
pub mod xlog_utils;
|
||||
|
||||
// See TransactionIdIsNormal in transam.h
|
||||
|
||||
219
postgres_ffi/src/waldecoder.rs
Normal file
219
postgres_ffi/src/waldecoder.rs
Normal file
@@ -0,0 +1,219 @@
|
||||
//!
|
||||
//! Basic WAL stream decoding.
|
||||
//!
|
||||
//! This understands the WAL page and record format, enough to figure out where the WAL record
|
||||
//! boundaries are, and to reassemble WAL records that cross page boundaries.
|
||||
//!
|
||||
//! This functionality is needed by both the pageserver and the walkeepers. The pageserver needs
|
||||
//! to look deeper into the WAL records to also understand which blocks they modify, the code
|
||||
//! for that is in pageserver/src/walrecord.rs
|
||||
//!
|
||||
use super::pg_constants;
|
||||
use super::xlog_utils::*;
|
||||
use super::XLogLongPageHeaderData;
|
||||
use super::XLogPageHeaderData;
|
||||
use super::XLogRecord;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use crc32c::*;
|
||||
use log::*;
|
||||
use std::cmp::min;
|
||||
use thiserror::Error;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
pub struct WalStreamDecoder {
|
||||
lsn: Lsn,
|
||||
|
||||
startlsn: Lsn, // LSN where this record starts
|
||||
contlen: u32,
|
||||
padlen: u32,
|
||||
|
||||
inputbuf: BytesMut,
|
||||
|
||||
/// buffer used to reassemble records that cross page boundaries.
|
||||
recordbuf: BytesMut,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug, Clone)]
|
||||
#[error("{msg} at {lsn}")]
|
||||
pub struct WalDecodeError {
|
||||
msg: String,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
//
|
||||
// WalRecordStream is a Stream that returns a stream of WAL records
|
||||
// FIXME: This isn't a proper rust stream
|
||||
//
|
||||
impl WalStreamDecoder {
|
||||
pub fn new(lsn: Lsn) -> WalStreamDecoder {
|
||||
WalStreamDecoder {
|
||||
lsn,
|
||||
|
||||
startlsn: Lsn(0),
|
||||
contlen: 0,
|
||||
padlen: 0,
|
||||
|
||||
inputbuf: BytesMut::new(),
|
||||
recordbuf: BytesMut::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// The latest LSN position fed to the decoder.
|
||||
pub fn available(&self) -> Lsn {
|
||||
self.lsn + self.inputbuf.remaining() as u64
|
||||
}
|
||||
|
||||
pub fn feed_bytes(&mut self, buf: &[u8]) {
|
||||
self.inputbuf.extend_from_slice(buf);
|
||||
}
|
||||
|
||||
/// Attempt to decode another WAL record from the input that has been fed to the
|
||||
/// decoder so far.
|
||||
///
|
||||
/// Returns one of the following:
|
||||
/// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself
|
||||
/// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
|
||||
/// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid.
|
||||
///
|
||||
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
|
||||
let recordbuf;
|
||||
|
||||
// Run state machine that validates page headers, and reassembles records
|
||||
// that cross page boundaries.
|
||||
loop {
|
||||
// parse and verify page boundaries as we go
|
||||
if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 {
|
||||
// parse long header
|
||||
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf);
|
||||
|
||||
if hdr.std.xlp_pageaddr != self.lsn.0 {
|
||||
return Err(WalDecodeError {
|
||||
msg: "invalid xlog segment header".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
||||
continue;
|
||||
} else if self.lsn.block_offset() == 0 {
|
||||
if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf);
|
||||
|
||||
if hdr.xlp_pageaddr != self.lsn.0 {
|
||||
return Err(WalDecodeError {
|
||||
msg: "invalid xlog page header".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
// TODO: verify the remaining fields in the header
|
||||
|
||||
self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
||||
continue;
|
||||
} else if self.padlen > 0 {
|
||||
if self.inputbuf.remaining() < self.padlen as usize {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// skip padding
|
||||
self.inputbuf.advance(self.padlen as usize);
|
||||
self.lsn += self.padlen as u64;
|
||||
self.padlen = 0;
|
||||
} else if self.contlen == 0 {
|
||||
assert!(self.recordbuf.is_empty());
|
||||
|
||||
// need to have at least the xl_tot_len field
|
||||
if self.inputbuf.remaining() < 4 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// peek xl_tot_len at the beginning of the record.
|
||||
// FIXME: assumes little-endian
|
||||
self.startlsn = self.lsn;
|
||||
let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le();
|
||||
if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD {
|
||||
return Err(WalDecodeError {
|
||||
msg: format!("invalid xl_tot_len {}", xl_tot_len),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
|
||||
// Fast path for the common case that the whole record fits on the page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
|
||||
// Take the record from the 'inputbuf', and validate it.
|
||||
recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
|
||||
self.lsn += xl_tot_len as u64;
|
||||
break;
|
||||
} else {
|
||||
// Need to assemble the record from pieces. Remember the size of the
|
||||
// record, and loop back. On next iteration, we will reach the 'else'
|
||||
// branch below, and copy the part of the record that was on this page
|
||||
// to 'recordbuf'. Subsequent iterations will skip page headers, and
|
||||
// append the continuations from the next pages to 'recordbuf'.
|
||||
self.recordbuf.reserve(xl_tot_len as usize);
|
||||
self.contlen = xl_tot_len;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// we're continuing a record, possibly from previous page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
|
||||
// read the rest of the record, or as much as fits on this page.
|
||||
let n = min(self.contlen, pageleft) as usize;
|
||||
|
||||
if self.inputbuf.remaining() < n {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
self.recordbuf.put(self.inputbuf.split_to(n));
|
||||
self.lsn += n as u64;
|
||||
self.contlen -= n as u32;
|
||||
|
||||
if self.contlen == 0 {
|
||||
// The record is now complete.
|
||||
recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()).freeze();
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// We now have a record in the 'recordbuf' local variable.
|
||||
let xlogrec = XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]);
|
||||
|
||||
let mut crc = 0;
|
||||
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);
|
||||
crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]);
|
||||
if crc != xlogrec.xl_crc {
|
||||
return Err(WalDecodeError {
|
||||
msg: "WAL record crc mismatch".into(),
|
||||
lsn: self.lsn,
|
||||
});
|
||||
}
|
||||
|
||||
// XLOG_SWITCH records are special. If we see one, we need to skip
|
||||
// to the next WAL segment.
|
||||
if xlogrec.is_xlog_switch_record() {
|
||||
trace!("saw xlog switch record at {}", self.lsn);
|
||||
self.padlen = self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32;
|
||||
} else {
|
||||
// Pad to an 8-byte boundary
|
||||
self.padlen = self.lsn.calc_padding(8u32) as u32;
|
||||
}
|
||||
|
||||
// Always align resulting LSN on 0x8 boundary -- that is important for getPage()
|
||||
// and WalReceiver integration. Since this code is used both for WalReceiver and
|
||||
// initial WAL import let's force alignment right here.
|
||||
let result = (self.lsn.align(), recordbuf);
|
||||
Ok(Some(result))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user