From bc569dde51639073cf241369f3fc872121d0c811 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Wed, 4 May 2022 17:41:05 -0400 Subject: [PATCH] Remove some unwraps from waldecoder (#1539) --- libs/postgres_ffi/src/waldecoder.rs | 22 +++++++-- libs/postgres_ffi/src/xlog_utils.rs | 46 ++++++++++--------- pageserver/src/basebackup.rs | 5 +- pageserver/src/import_datadir.rs | 2 +- .../src/layered_repository/delta_layer.rs | 2 +- .../src/layered_repository/inmemory_layer.rs | 2 +- pageserver/src/walingest.rs | 5 +- pageserver/src/walrecord.rs | 32 ++++++------- safekeeper/src/json_ctrl.rs | 4 +- 9 files changed, 70 insertions(+), 50 deletions(-) diff --git a/libs/postgres_ffi/src/waldecoder.rs b/libs/postgres_ffi/src/waldecoder.rs index 9d1089ed46..95ea9660e8 100644 --- a/libs/postgres_ffi/src/waldecoder.rs +++ b/libs/postgres_ffi/src/waldecoder.rs @@ -89,7 +89,12 @@ impl WalStreamDecoder { return Ok(None); } - let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf); + let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| { + WalDecodeError { + msg: format!("long header deserialization failed {}", e), + lsn: self.lsn, + } + })?; if hdr.std.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { @@ -106,7 +111,12 @@ impl WalStreamDecoder { return Ok(None); } - let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf); + let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf).map_err(|e| { + WalDecodeError { + msg: format!("header deserialization failed {}", e), + lsn: self.lsn, + } + })?; if hdr.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { @@ -188,7 +198,13 @@ impl WalStreamDecoder { } // We now have a record in the 'recordbuf' local variable. - let xlogrec = XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]); + let xlogrec = + XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]).map_err(|e| { + WalDecodeError { + msg: format!("xlog record deserialization failed {}", e), + lsn: self.lsn, + } + })?; let mut crc = 0; crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index bd4b7df690..7882058868 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -15,7 +15,7 @@ use crate::XLogPageHeaderData; use crate::XLogRecord; use crate::XLOG_PAGE_MAGIC; -use anyhow::{bail, Result}; +use anyhow::bail; use byteorder::{ByteOrder, LittleEndian}; use bytes::BytesMut; use bytes::{Buf, Bytes}; @@ -28,6 +28,8 @@ use std::io::prelude::*; use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::time::SystemTime; +use utils::bin_ser::DeserializeError; +use utils::bin_ser::SerializeError; use utils::lsn::Lsn; pub const XLOG_FNAME_LEN: usize = 24; @@ -144,7 +146,7 @@ fn find_end_of_wal_segment( tli: TimeLineID, wal_seg_size: usize, start_offset: usize, // start reading at this point -) -> Result { +) -> anyhow::Result { // step back to the beginning of the page to read it in... let mut offs: usize = start_offset - start_offset % XLOG_BLCKSZ; let mut contlen: usize = 0; @@ -272,7 +274,7 @@ pub fn find_end_of_wal( wal_seg_size: usize, precise: bool, start_lsn: Lsn, // start reading WAL at this point or later -) -> Result<(XLogRecPtr, TimeLineID)> { +) -> anyhow::Result<(XLogRecPtr, TimeLineID)> { let mut high_segno: XLogSegNo = 0; let mut high_tli: TimeLineID = 0; let mut high_ispartial = false; @@ -354,19 +356,19 @@ pub fn main() { } impl XLogRecord { - pub fn from_slice(buf: &[u8]) -> XLogRecord { + pub fn from_slice(buf: &[u8]) -> Result { use utils::bin_ser::LeSer; - XLogRecord::des(buf).unwrap() + XLogRecord::des(buf) } - pub fn from_bytes(buf: &mut B) -> XLogRecord { + pub fn from_bytes(buf: &mut B) -> Result { use utils::bin_ser::LeSer; - XLogRecord::des_from(&mut buf.reader()).unwrap() + XLogRecord::des_from(&mut buf.reader()) } - pub fn encode(&self) -> Bytes { + pub fn encode(&self) -> Result { use utils::bin_ser::LeSer; - self.ser().unwrap().into() + Ok(self.ser()?.into()) } // Is this record an XLOG_SWITCH record? They need some special processing, @@ -376,35 +378,35 @@ impl XLogRecord { } impl XLogPageHeaderData { - pub fn from_bytes(buf: &mut B) -> XLogPageHeaderData { + pub fn from_bytes(buf: &mut B) -> Result { use utils::bin_ser::LeSer; - XLogPageHeaderData::des_from(&mut buf.reader()).unwrap() + XLogPageHeaderData::des_from(&mut buf.reader()) } } impl XLogLongPageHeaderData { - pub fn from_bytes(buf: &mut B) -> XLogLongPageHeaderData { + pub fn from_bytes(buf: &mut B) -> Result { use utils::bin_ser::LeSer; - XLogLongPageHeaderData::des_from(&mut buf.reader()).unwrap() + XLogLongPageHeaderData::des_from(&mut buf.reader()) } - pub fn encode(&self) -> Bytes { + pub fn encode(&self) -> Result { use utils::bin_ser::LeSer; - self.ser().unwrap().into() + self.ser().map(|b| b.into()) } } pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::(); impl CheckPoint { - pub fn encode(&self) -> Bytes { + pub fn encode(&self) -> Result { use utils::bin_ser::LeSer; - self.ser().unwrap().into() + Ok(self.ser()?.into()) } - pub fn decode(buf: &[u8]) -> Result { + pub fn decode(buf: &[u8]) -> Result { use utils::bin_ser::LeSer; - Ok(CheckPoint::des(buf)?) + CheckPoint::des(buf) } /// Update next XID based on provided new_xid and stored epoch. @@ -442,7 +444,7 @@ impl CheckPoint { // Generate new, empty WAL segment. // We need this segment to start compute node. // -pub fn generate_wal_segment(segno: u64, system_id: u64) -> Bytes { +pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result { let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize); let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, pg_constants::WAL_SEGMENT_SIZE); @@ -462,12 +464,12 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Bytes { xlp_xlog_blcksz: XLOG_BLCKSZ as u32, }; - let hdr_bytes = hdr.encode(); + let hdr_bytes = hdr.encode()?; seg_buf.extend_from_slice(&hdr_bytes); //zero out the rest of the file seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0); - seg_buf.freeze() + Ok(seg_buf.freeze()) } #[cfg(test)] diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 14e6d40759..92d35130d8 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -10,7 +10,7 @@ //! This module is responsible for creation of such tarball //! from data stored in object storage. //! -use anyhow::{ensure, Context, Result}; +use anyhow::{anyhow, ensure, Context, Result}; use bytes::{BufMut, BytesMut}; use std::fmt::Write as FmtWrite; use std::io; @@ -323,7 +323,8 @@ impl<'a> Basebackup<'a> { let wal_file_name = XLogFileName(PG_TLI, segno, pg_constants::WAL_SEGMENT_SIZE); let wal_file_path = format!("pg_wal/{}", wal_file_name); let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?; - let wal_seg = generate_wal_segment(segno, pg_control.system_identifier); + let wal_seg = generate_wal_segment(segno, pg_control.system_identifier) + .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; ensure!(wal_seg.len() == pg_constants::WAL_SEGMENT_SIZE); self.ar.append(&header, &wal_seg[..])?; Ok(()) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 8f49903e6c..703ee8f1b1 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -274,7 +274,7 @@ fn import_control_file( // Extract the checkpoint record and import it separately. let pg_control = ControlFileData::decode(&buffer)?; - let checkpoint_bytes = pg_control.checkPointCopy.encode(); + let checkpoint_bytes = pg_control.checkPointCopy.encode()?; modification.put_checkpoint(checkpoint_bytes)?; Ok(pg_control) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 4952f64ccd..1e1ec716a6 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -375,7 +375,7 @@ impl Layer for DeltaLayer { write!(&mut desc, " img {} bytes", img.len()).unwrap(); } Ok(Value::WalRecord(rec)) => { - let wal_desc = walrecord::describe_wal_record(&rec); + let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); write!( &mut desc, " rec {} bytes will_init: {} {}", diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 714a0bc579..856baa2e8a 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -207,7 +207,7 @@ impl Layer for InMemoryLayer { write!(&mut desc, " img {} bytes", img.len())?; } Ok(Value::WalRecord(rec)) => { - let wal_desc = walrecord::describe_wal_record(&rec); + let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); write!( &mut desc, " rec {} bytes will_init: {} {}", diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index a929e290ad..fbdb328d2c 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -21,6 +21,7 @@ //! redo Postgres process, but some records it can handle directly with //! bespoken Rust code. +use anyhow::Context; use postgres_ffi::nonrelfile_utils::clogpage_precedes; use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment; @@ -82,7 +83,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { ) -> Result<()> { let mut modification = timeline.begin_modification(lsn); - let mut decoded = decode_wal_record(recdata); + let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?; let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -251,7 +252,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { - let new_checkpoint_bytes = self.checkpoint.encode(); + let new_checkpoint_bytes = self.checkpoint.encode()?; modification.put_checkpoint(new_checkpoint_bytes)?; self.checkpoint_modified = false; diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index e8699cfa22..5a384360e2 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -1,6 +1,7 @@ //! //! Functions for parsing WAL records. //! +use anyhow::Result; use bytes::{Buf, Bytes}; use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD}; @@ -9,6 +10,7 @@ use postgres_ffi::{BlockNumber, OffsetNumber}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; use serde::{Deserialize, Serialize}; use tracing::*; +use utils::bin_ser::DeserializeError; /// Each update to a page is represented by a ZenithWalRecord. It can be a wrapper /// around a PostgreSQL WAL record, or a custom zenith-specific "record". @@ -503,7 +505,7 @@ impl XlMultiXactTruncate { // block data // ... // main data -pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { +pub fn decode_wal_record(record: Bytes) -> Result { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; @@ -514,7 +516,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { // 1. Parse XLogRecord struct // FIXME: assume little-endian here - let xlogrec = XLogRecord::from_bytes(&mut buf); + let xlogrec = XLogRecord::from_bytes(&mut buf)?; trace!( "decode_wal_record xl_rmid = {} xl_info = {}", @@ -742,34 +744,32 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { assert_eq!(buf.remaining(), main_data_len as usize); } - DecodedWALRecord { + Ok(DecodedWALRecord { xl_xid: xlogrec.xl_xid, xl_info: xlogrec.xl_info, xl_rmid: xlogrec.xl_rmid, record, blocks, main_data_offset, - } + }) } /// /// Build a human-readable string to describe a WAL record /// /// For debugging purposes -pub fn describe_wal_record(rec: &ZenithWalRecord) -> String { +pub fn describe_wal_record(rec: &ZenithWalRecord) -> Result { match rec { - ZenithWalRecord::Postgres { will_init, rec } => { - format!( - "will_init: {}, {}", - will_init, - describe_postgres_wal_record(rec) - ) - } - _ => format!("{:?}", rec), + ZenithWalRecord::Postgres { will_init, rec } => Ok(format!( + "will_init: {}, {}", + will_init, + describe_postgres_wal_record(rec)? + )), + _ => Ok(format!("{:?}", rec)), } } -fn describe_postgres_wal_record(record: &Bytes) -> String { +fn describe_postgres_wal_record(record: &Bytes) -> Result { // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this. // Maybe use the postgres wal redo process, the same used for replaying WAL records? // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly, @@ -782,7 +782,7 @@ fn describe_postgres_wal_record(record: &Bytes) -> String { // 1. Parse XLogRecord struct // FIXME: assume little-endian here - let xlogrec = XLogRecord::from_bytes(&mut buf); + let xlogrec = XLogRecord::from_bytes(&mut buf)?; let unknown_str: String; @@ -830,5 +830,5 @@ fn describe_postgres_wal_record(record: &Bytes) -> String { } }; - String::from(result) + Ok(String::from(result)) } diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index d21d5ad73b..43514997d4 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -239,13 +239,13 @@ fn encode_logical_message(prefix: &str, message: &str) -> Vec { xl_crc: 0, // crc will be calculated later }; - let header_bytes = header.encode(); + let header_bytes = header.encode().expect("failed to encode header"); let crc = crc32c_append(0, &data); let crc = crc32c_append(crc, &header_bytes[0..xlog_utils::XLOG_RECORD_CRC_OFFS]); header.xl_crc = crc; let mut wal: Vec = Vec::new(); - wal.extend_from_slice(&header.encode()); + wal.extend_from_slice(&header.encode().expect("failed to encode header")); wal.extend_from_slice(&data); // WAL start position must be aligned at 8 bytes,