Remove some unwraps from waldecoder (#1539)

This commit is contained in:
bojanserafimov
2022-05-04 17:41:05 -04:00
committed by GitHub
parent 02e5083695
commit bc569dde51
9 changed files with 70 additions and 50 deletions

View File

@@ -89,7 +89,12 @@ impl WalStreamDecoder {
return Ok(None);
}
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf);
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
WalDecodeError {
msg: format!("long header deserialization failed {}", e),
lsn: self.lsn,
}
})?;
if hdr.std.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
@@ -106,7 +111,12 @@ impl WalStreamDecoder {
return Ok(None);
}
let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf);
let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| {
WalDecodeError {
msg: format!("header deserialization failed {}", e),
lsn: self.lsn,
}
})?;
if hdr.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
@@ -188,7 +198,13 @@ impl WalStreamDecoder {
}
// We now have a record in the 'recordbuf' local variable.
let xlogrec = XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]);
let xlogrec =
XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| {
WalDecodeError {
msg: format!("xlog record deserialization failed {}", e),
lsn: self.lsn,
}
})?;
let mut crc = 0;
crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]);

View File

@@ -15,7 +15,7 @@ use crate::XLogPageHeaderData;
use crate::XLogRecord;
use crate::XLOG_PAGE_MAGIC;
use anyhow::{bail, Result};
use anyhow::bail;
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use bytes::{Buf, Bytes};
@@ -28,6 +28,8 @@ use std::io::prelude::*;
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use utils::bin_ser::DeserializeError;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
pub const XLOG_FNAME_LEN: usize = 24;
@@ -144,7 +146,7 @@ fn find_end_of_wal_segment(
tli: TimeLineID,
wal_seg_size: usize,
start_offset: usize, // start reading at this point
) -> Result<u32> {
) -> anyhow::Result<u32> {
// step back to the beginning of the page to read it in...
let mut offs: usize = start_offset - start_offset % XLOG_BLCKSZ;
let mut contlen: usize = 0;
@@ -272,7 +274,7 @@ pub fn find_end_of_wal(
wal_seg_size: usize,
precise: bool,
start_lsn: Lsn, // start reading WAL at this point or later
) -> Result<(XLogRecPtr, TimeLineID)> {
) -> anyhow::Result<(XLogRecPtr, TimeLineID)> {
let mut high_segno: XLogSegNo = 0;
let mut high_tli: TimeLineID = 0;
let mut high_ispartial = false;
@@ -354,19 +356,19 @@ pub fn main() {
}
impl XLogRecord {
pub fn from_slice(buf: &[u8]) -> XLogRecord {
pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
use utils::bin_ser::LeSer;
XLogRecord::des(buf).unwrap()
XLogRecord::des(buf)
}
pub fn from_bytes<B: Buf>(buf: &mut B) -> XLogRecord {
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
use utils::bin_ser::LeSer;
XLogRecord::des_from(&mut buf.reader()).unwrap()
XLogRecord::des_from(&mut buf.reader())
}
pub fn encode(&self) -> Bytes {
pub fn encode(&self) -> Result<Bytes, SerializeError> {
use utils::bin_ser::LeSer;
self.ser().unwrap().into()
Ok(self.ser()?.into())
}
// Is this record an XLOG_SWITCH record? They need some special processing,
@@ -376,35 +378,35 @@ impl XLogRecord {
}
impl XLogPageHeaderData {
pub fn from_bytes<B: Buf>(buf: &mut B) -> XLogPageHeaderData {
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
use utils::bin_ser::LeSer;
XLogPageHeaderData::des_from(&mut buf.reader()).unwrap()
XLogPageHeaderData::des_from(&mut buf.reader())
}
}
impl XLogLongPageHeaderData {
pub fn from_bytes<B: Buf>(buf: &mut B) -> XLogLongPageHeaderData {
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
use utils::bin_ser::LeSer;
XLogLongPageHeaderData::des_from(&mut buf.reader()).unwrap()
XLogLongPageHeaderData::des_from(&mut buf.reader())
}
pub fn encode(&self) -> Bytes {
pub fn encode(&self) -> Result<Bytes, SerializeError> {
use utils::bin_ser::LeSer;
self.ser().unwrap().into()
self.ser().map(|b| b.into())
}
}
pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
impl CheckPoint {
pub fn encode(&self) -> Bytes {
pub fn encode(&self) -> Result<Bytes, SerializeError> {
use utils::bin_ser::LeSer;
self.ser().unwrap().into()
Ok(self.ser()?.into())
}
pub fn decode(buf: &[u8]) -> Result<CheckPoint, anyhow::Error> {
pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
use utils::bin_ser::LeSer;
Ok(CheckPoint::des(buf)?)
CheckPoint::des(buf)
}
/// Update next XID based on provided new_xid and stored epoch.
@@ -442,7 +444,7 @@ impl CheckPoint {
// Generate new, empty WAL segment.
// We need this segment to start compute node.
//
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Bytes {
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize);
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, pg_constants::WAL_SEGMENT_SIZE);
@@ -462,12 +464,12 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Bytes {
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
let hdr_bytes = hdr.encode();
let hdr_bytes = hdr.encode()?;
seg_buf.extend_from_slice(&hdr_bytes);
//zero out the rest of the file
seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0);
seg_buf.freeze()
Ok(seg_buf.freeze())
}
#[cfg(test)]

View File

@@ -10,7 +10,7 @@
//! This module is responsible for creation of such tarball
//! from data stored in object storage.
//!
use anyhow::{ensure, Context, Result};
use anyhow::{anyhow, ensure, Context, Result};
use bytes::{BufMut, BytesMut};
use std::fmt::Write as FmtWrite;
use std::io;
@@ -323,7 +323,8 @@ impl<'a> Basebackup<'a> {
let wal_file_name = XLogFileName(PG_TLI, segno, pg_constants::WAL_SEGMENT_SIZE);
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?;
let wal_seg = generate_wal_segment(segno, pg_control.system_identifier);
let wal_seg = generate_wal_segment(segno, pg_control.system_identifier)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == pg_constants::WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..])?;
Ok(())

View File

@@ -274,7 +274,7 @@ fn import_control_file<R: Repository>(
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&buffer)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode();
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
Ok(pg_control)

View File

@@ -375,7 +375,7 @@ impl Layer for DeltaLayer {
write!(&mut desc, " img {} bytes", img.len()).unwrap();
}
Ok(Value::WalRecord(rec)) => {
let wal_desc = walrecord::describe_wal_record(&rec);
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
write!(
&mut desc,
" rec {} bytes will_init: {} {}",

View File

@@ -207,7 +207,7 @@ impl Layer for InMemoryLayer {
write!(&mut desc, " img {} bytes", img.len())?;
}
Ok(Value::WalRecord(rec)) => {
let wal_desc = walrecord::describe_wal_record(&rec);
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
write!(
&mut desc,
" rec {} bytes will_init: {} {}",

View File

@@ -21,6 +21,7 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use anyhow::Context;
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
@@ -82,7 +83,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
) -> Result<()> {
let mut modification = timeline.begin_modification(lsn);
let mut decoded = decode_wal_record(recdata);
let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -251,7 +252,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
let new_checkpoint_bytes = self.checkpoint.encode();
let new_checkpoint_bytes = self.checkpoint.encode()?;
modification.put_checkpoint(new_checkpoint_bytes)?;
self.checkpoint_modified = false;

View File

@@ -1,6 +1,7 @@
//!
//! Functions for parsing WAL records.
//!
use anyhow::Result;
use bytes::{Buf, Bytes};
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD};
@@ -9,6 +10,7 @@ use postgres_ffi::{BlockNumber, OffsetNumber};
use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::bin_ser::DeserializeError;
/// Each update to a page is represented by a ZenithWalRecord. It can be a wrapper
/// around a PostgreSQL WAL record, or a custom zenith-specific "record".
@@ -503,7 +505,7 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeError> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -514,7 +516,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
// 1. Parse XLogRecord struct
// FIXME: assume little-endian here
let xlogrec = XLogRecord::from_bytes(&mut buf);
let xlogrec = XLogRecord::from_bytes(&mut buf)?;
trace!(
"decode_wal_record xl_rmid = {} xl_info = {}",
@@ -742,34 +744,32 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
assert_eq!(buf.remaining(), main_data_len as usize);
}
DecodedWALRecord {
Ok(DecodedWALRecord {
xl_xid: xlogrec.xl_xid,
xl_info: xlogrec.xl_info,
xl_rmid: xlogrec.xl_rmid,
record,
blocks,
main_data_offset,
}
})
}
///
/// Build a human-readable string to describe a WAL record
///
/// For debugging purposes
pub fn describe_wal_record(rec: &ZenithWalRecord) -> String {
pub fn describe_wal_record(rec: &ZenithWalRecord) -> Result<String, DeserializeError> {
match rec {
ZenithWalRecord::Postgres { will_init, rec } => {
format!(
"will_init: {}, {}",
will_init,
describe_postgres_wal_record(rec)
)
}
_ => format!("{:?}", rec),
ZenithWalRecord::Postgres { will_init, rec } => Ok(format!(
"will_init: {}, {}",
will_init,
describe_postgres_wal_record(rec)?
)),
_ => Ok(format!("{:?}", rec)),
}
}
fn describe_postgres_wal_record(record: &Bytes) -> String {
fn describe_postgres_wal_record(record: &Bytes) -> Result<String, DeserializeError> {
// TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this.
// Maybe use the postgres wal redo process, the same used for replaying WAL records?
// Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly,
@@ -782,7 +782,7 @@ fn describe_postgres_wal_record(record: &Bytes) -> String {
// 1. Parse XLogRecord struct
// FIXME: assume little-endian here
let xlogrec = XLogRecord::from_bytes(&mut buf);
let xlogrec = XLogRecord::from_bytes(&mut buf)?;
let unknown_str: String;
@@ -830,5 +830,5 @@ fn describe_postgres_wal_record(record: &Bytes) -> String {
}
};
String::from(result)
Ok(String::from(result))
}

View File

@@ -239,13 +239,13 @@ fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
xl_crc: 0, // crc will be calculated later
};
let header_bytes = header.encode();
let header_bytes = header.encode().expect("failed to encode header");
let crc = crc32c_append(0, &data);
let crc = crc32c_append(crc, &header_bytes[0..xlog_utils::XLOG_RECORD_CRC_OFFS]);
header.xl_crc = crc;
let mut wal: Vec<u8> = Vec::new();
wal.extend_from_slice(&header.encode());
wal.extend_from_slice(&header.encode().expect("failed to encode header"));
wal.extend_from_slice(&data);
// WAL start position must be aligned at 8 bytes,