From c77e30116ef16ee7c455f022325231e27c800190 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 10 Dec 2021 15:08:09 +0200 Subject: [PATCH] Split waldecoder.rs into two source files. Move the code for decoding a WAL stream into WAL records into 'postgres_ffi', and keep the code to parse the WAL records deeper in 'pageserver' crate, renamed to walrecord.rs. This tidies up the dependencies a bit. 'walkeeper' reuses the same waldecoder routines, and it used to depend on 'pageserver' because of that. Now it only depends on 'postgres_ffi'. (The comment in walkeeper/Cargo.toml that claimed that the dependency was needed for ZTimelineId was obsolete. ZTimelineId is defined in 'zenith_utils', the dependency was actually needed for the waldecoder.) --- Cargo.lock | 1 - .../src/layered_repository/delta_layer.rs | 4 +- pageserver/src/lib.rs | 2 +- pageserver/src/restore_local_repo.rs | 3 +- pageserver/src/walreceiver.rs | 3 +- .../src/{waldecoder.rs => walrecord.rs} | 215 +---------------- pageserver/src/walredo.rs | 4 +- postgres_ffi/src/lib.rs | 1 + postgres_ffi/src/waldecoder.rs | 219 ++++++++++++++++++ walkeeper/Cargo.toml | 2 - walkeeper/src/safekeeper.rs | 2 +- 11 files changed, 235 insertions(+), 221 deletions(-) rename pageserver/src/{waldecoder.rs => walrecord.rs} (78%) create mode 100644 postgres_ffi/src/waldecoder.rs diff --git a/Cargo.lock b/Cargo.lock index 9b14b70935..2b1fdb5141 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2330,7 +2330,6 @@ dependencies = [ "humantime", "hyper", "lazy_static", - "pageserver", "postgres", "postgres-protocol", "postgres_ffi", diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 7538d22571..11eec36824 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -44,7 +44,7 @@ use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE, }; use crate::virtual_file::VirtualFile; -use crate::waldecoder; +use crate::walrecord; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, ensure, Result}; @@ -363,7 +363,7 @@ impl Layer for DeltaLayer { write!(&mut desc, " img {} bytes", img.len())?; } PageVersion::Wal(rec) => { - let wal_desc = waldecoder::describe_wal_record(&rec.rec); + let wal_desc = walrecord::describe_wal_record(&rec.rec); write!( &mut desc, " rec {} bytes will_init: {} {}", diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 660b913b0e..f4bf4bafaf 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -22,8 +22,8 @@ pub mod restore_local_repo; pub mod tenant_mgr; pub mod tenant_threads; pub mod virtual_file; -pub mod waldecoder; pub mod walreceiver; +pub mod walrecord; pub mod walredo; pub mod defaults { diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 92429a20ad..02da19630d 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -16,9 +16,10 @@ use tracing::*; use crate::relish::*; use crate::repository::*; -use crate::waldecoder::*; +use crate::walrecord::*; use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment; use postgres_ffi::relfile_utils::*; +use postgres_ffi::waldecoder::*; use postgres_ffi::xlog_utils::*; use postgres_ffi::Oid; use postgres_ffi::{pg_constants, CheckPoint, ControlFileData}; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index ba4f1aa1e5..944b69eda6 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -10,13 +10,14 @@ use crate::restore_local_repo; use crate::tenant_mgr; use crate::tenant_mgr::TenantState; use crate::tenant_threads; -use crate::waldecoder::*; +use crate::walrecord::*; use crate::PageServerConf; use anyhow::{bail, Error, Result}; use lazy_static::lazy_static; use postgres::fallible_iterator::FallibleIterator; use postgres::replication::ReplicationIter; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; +use postgres_ffi::waldecoder::*; use postgres_ffi::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/walrecord.rs similarity index 78% rename from pageserver/src/waldecoder.rs rename to pageserver/src/walrecord.rs index 20e04bcad9..26dbbf04ed 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/walrecord.rs @@ -1,220 +1,15 @@ //! -//! WAL decoder. For each WAL record, it decodes the record to figure out which data blocks -//! the record affects, so that they can be stored in repository. +//! Functions for parsing WAL records. //! -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use crc32c::*; -use log::*; +use bytes::{Buf, Bytes}; use postgres_ffi::pg_constants; -use postgres_ffi::xlog_utils::*; -use postgres_ffi::XLogLongPageHeaderData; -use postgres_ffi::XLogPageHeaderData; +use postgres_ffi::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD}; use postgres_ffi::XLogRecord; use postgres_ffi::{BlockNumber, OffsetNumber}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; -use std::cmp::min; -use thiserror::Error; -use zenith_utils::lsn::Lsn; +use tracing::*; -#[allow(dead_code)] -pub struct WalStreamDecoder { - lsn: Lsn, - - startlsn: Lsn, // LSN where this record starts - contlen: u32, - padlen: u32, - - inputbuf: BytesMut, - - recordbuf: BytesMut, -} - -#[derive(Error, Debug, Clone)] -#[error("{msg} at {lsn}")] -pub struct WalDecodeError { - msg: String, - lsn: Lsn, -} - -// -// WalRecordStream is a Stream that returns a stream of WAL records -// FIXME: This isn't a proper rust stream -// -impl WalStreamDecoder { - pub fn new(lsn: Lsn) -> WalStreamDecoder { - WalStreamDecoder { - lsn, - - startlsn: Lsn(0), - contlen: 0, - padlen: 0, - - inputbuf: BytesMut::new(), - recordbuf: BytesMut::new(), - } - } - - // The latest LSN position fed to the decoder. - pub fn available(&self) -> Lsn { - self.lsn + self.inputbuf.remaining() as u64 - } - - pub fn feed_bytes(&mut self, buf: &[u8]) { - self.inputbuf.extend_from_slice(buf); - } - - /// Attempt to decode another WAL record from the input that has been fed to the - /// decoder so far. - /// - /// Returns one of the following: - /// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself - /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function - /// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid. - /// - pub fn poll_decode(&mut self) -> Result, WalDecodeError> { - let recordbuf; - - // Run state machine that validates page headers, and reassembles records - // that cross page boundaries. - loop { - // parse and verify page boundaries as we go - if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 { - // parse long header - - if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD { - return Ok(None); - } - - let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf); - - if hdr.std.xlp_pageaddr != self.lsn.0 { - return Err(WalDecodeError { - msg: "invalid xlog segment header".into(), - lsn: self.lsn, - }); - } - // TODO: verify the remaining fields in the header - - self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64; - continue; - } else if self.lsn.block_offset() == 0 { - if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD { - return Ok(None); - } - - let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf); - - if hdr.xlp_pageaddr != self.lsn.0 { - return Err(WalDecodeError { - msg: "invalid xlog page header".into(), - lsn: self.lsn, - }); - } - // TODO: verify the remaining fields in the header - - self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64; - continue; - } else if self.padlen > 0 { - if self.inputbuf.remaining() < self.padlen as usize { - return Ok(None); - } - - // skip padding - self.inputbuf.advance(self.padlen as usize); - self.lsn += self.padlen as u64; - self.padlen = 0; - } else if self.contlen == 0 { - assert!(self.recordbuf.is_empty()); - - // need to have at least the xl_tot_len field - if self.inputbuf.remaining() < 4 { - return Ok(None); - } - - // peek xl_tot_len at the beginning of the record. - // FIXME: assumes little-endian - self.startlsn = self.lsn; - let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le(); - if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD { - return Err(WalDecodeError { - msg: format!("invalid xl_tot_len {}", xl_tot_len), - lsn: self.lsn, - }); - } - - // Fast path for the common case that the whole record fits on the page. - let pageleft = self.lsn.remaining_in_block() as u32; - if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft { - // Take the record from the 'inputbuf', and validate it. - recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize); - self.lsn += xl_tot_len as u64; - break; - } else { - // Need to assemble the record from pieces. Remember the size of the - // record, and loop back. On next iteration, we will reach the 'else' - // branch below, and copy the part of the record that was on this page - // to 'recordbuf'. Subsequent iterations will skip page headers, and - // append the continuations from the next pages to 'recordbuf'. - self.recordbuf.reserve(xl_tot_len as usize); - self.contlen = xl_tot_len; - continue; - } - } else { - // we're continuing a record, possibly from previous page. - let pageleft = self.lsn.remaining_in_block() as u32; - - // read the rest of the record, or as much as fits on this page. - let n = min(self.contlen, pageleft) as usize; - - if self.inputbuf.remaining() < n { - return Ok(None); - } - - self.recordbuf.put(self.inputbuf.split_to(n)); - self.lsn += n as u64; - self.contlen -= n as u32; - - if self.contlen == 0 { - // The record is now complete. - recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()).freeze(); - break; - } - continue; - } - } - - // We now have a record in the 'recordbuf' local variable. - let xlogrec = XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]); - - let mut crc = 0; - crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); - crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); - if crc != xlogrec.xl_crc { - return Err(WalDecodeError { - msg: "WAL record crc mismatch".into(), - lsn: self.lsn, - }); - } - - // XLOG_SWITCH records are special. If we see one, we need to skip - // to the next WAL segment. - if xlogrec.is_xlog_switch_record() { - trace!("saw xlog switch record at {}", self.lsn); - self.padlen = self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32; - } else { - // Pad to an 8-byte boundary - self.padlen = self.lsn.calc_padding(8u32) as u32; - } - - // Always align resulting LSN on 0x8 boundary -- that is important for getPage() - // and WalReceiver integration. Since this code is used both for WalReceiver and - // initial WAL import let's force alignment right here. - let result = (self.lsn.align(), recordbuf); - Ok(Some(result)) - } -} - -#[allow(dead_code)] +/// DecodedBkpBlock represents per-page data contained in a WAL record. #[derive(Default)] pub struct DecodedBkpBlock { /* Is this block ref in use? */ diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index dc0968250b..a49a6447c4 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -43,8 +43,8 @@ use zenith_utils::zid::ZTenantId; use crate::relish::*; use crate::repository::WALRecord; -use crate::waldecoder::XlMultiXactCreate; -use crate::waldecoder::XlXactParsedRecord; +use crate::walrecord::XlMultiXactCreate; +use crate::walrecord::XlXactParsedRecord; use crate::PageServerConf; use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift; use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset; diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index 707a63e4b9..d5a4468dc7 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -13,6 +13,7 @@ pub mod controlfile_utils; pub mod nonrelfile_utils; pub mod pg_constants; pub mod relfile_utils; +pub mod waldecoder; pub mod xlog_utils; // See TransactionIdIsNormal in transam.h diff --git a/postgres_ffi/src/waldecoder.rs b/postgres_ffi/src/waldecoder.rs new file mode 100644 index 0000000000..ac48b1b0f3 --- /dev/null +++ b/postgres_ffi/src/waldecoder.rs @@ -0,0 +1,219 @@ +//! +//! Basic WAL stream decoding. +//! +//! This understands the WAL page and record format, enough to figure out where the WAL record +//! boundaries are, and to reassemble WAL records that cross page boundaries. +//! +//! This functionality is needed by both the pageserver and the walkeepers. The pageserver needs +//! to look deeper into the WAL records to also understand which blocks they modify, the code +//! for that is in pageserver/src/walrecord.rs +//! +use super::pg_constants; +use super::xlog_utils::*; +use super::XLogLongPageHeaderData; +use super::XLogPageHeaderData; +use super::XLogRecord; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use crc32c::*; +use log::*; +use std::cmp::min; +use thiserror::Error; +use zenith_utils::lsn::Lsn; + +pub struct WalStreamDecoder { + lsn: Lsn, + + startlsn: Lsn, // LSN where this record starts + contlen: u32, + padlen: u32, + + inputbuf: BytesMut, + + /// buffer used to reassemble records that cross page boundaries. + recordbuf: BytesMut, +} + +#[derive(Error, Debug, Clone)] +#[error("{msg} at {lsn}")] +pub struct WalDecodeError { + msg: String, + lsn: Lsn, +} + +// +// WalRecordStream is a Stream that returns a stream of WAL records +// FIXME: This isn't a proper rust stream +// +impl WalStreamDecoder { + pub fn new(lsn: Lsn) -> WalStreamDecoder { + WalStreamDecoder { + lsn, + + startlsn: Lsn(0), + contlen: 0, + padlen: 0, + + inputbuf: BytesMut::new(), + recordbuf: BytesMut::new(), + } + } + + // The latest LSN position fed to the decoder. + pub fn available(&self) -> Lsn { + self.lsn + self.inputbuf.remaining() as u64 + } + + pub fn feed_bytes(&mut self, buf: &[u8]) { + self.inputbuf.extend_from_slice(buf); + } + + /// Attempt to decode another WAL record from the input that has been fed to the + /// decoder so far. + /// + /// Returns one of the following: + /// Ok((Lsn, Bytes)): a tuple containing the LSN of next record, and the record itself + /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function + /// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid. + /// + pub fn poll_decode(&mut self) -> Result, WalDecodeError> { + let recordbuf; + + // Run state machine that validates page headers, and reassembles records + // that cross page boundaries. + loop { + // parse and verify page boundaries as we go + if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 { + // parse long header + + if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_LONG_PHD { + return Ok(None); + } + + let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf); + + if hdr.std.xlp_pageaddr != self.lsn.0 { + return Err(WalDecodeError { + msg: "invalid xlog segment header".into(), + lsn: self.lsn, + }); + } + // TODO: verify the remaining fields in the header + + self.lsn += XLOG_SIZE_OF_XLOG_LONG_PHD as u64; + continue; + } else if self.lsn.block_offset() == 0 { + if self.inputbuf.remaining() < XLOG_SIZE_OF_XLOG_SHORT_PHD { + return Ok(None); + } + + let hdr = XLogPageHeaderData::from_bytes(&mut self.inputbuf); + + if hdr.xlp_pageaddr != self.lsn.0 { + return Err(WalDecodeError { + msg: "invalid xlog page header".into(), + lsn: self.lsn, + }); + } + // TODO: verify the remaining fields in the header + + self.lsn += XLOG_SIZE_OF_XLOG_SHORT_PHD as u64; + continue; + } else if self.padlen > 0 { + if self.inputbuf.remaining() < self.padlen as usize { + return Ok(None); + } + + // skip padding + self.inputbuf.advance(self.padlen as usize); + self.lsn += self.padlen as u64; + self.padlen = 0; + } else if self.contlen == 0 { + assert!(self.recordbuf.is_empty()); + + // need to have at least the xl_tot_len field + if self.inputbuf.remaining() < 4 { + return Ok(None); + } + + // peek xl_tot_len at the beginning of the record. + // FIXME: assumes little-endian + self.startlsn = self.lsn; + let xl_tot_len = (&self.inputbuf[0..4]).get_u32_le(); + if (xl_tot_len as usize) < XLOG_SIZE_OF_XLOG_RECORD { + return Err(WalDecodeError { + msg: format!("invalid xl_tot_len {}", xl_tot_len), + lsn: self.lsn, + }); + } + + // Fast path for the common case that the whole record fits on the page. + let pageleft = self.lsn.remaining_in_block() as u32; + if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft { + // Take the record from the 'inputbuf', and validate it. + recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize); + self.lsn += xl_tot_len as u64; + break; + } else { + // Need to assemble the record from pieces. Remember the size of the + // record, and loop back. On next iteration, we will reach the 'else' + // branch below, and copy the part of the record that was on this page + // to 'recordbuf'. Subsequent iterations will skip page headers, and + // append the continuations from the next pages to 'recordbuf'. + self.recordbuf.reserve(xl_tot_len as usize); + self.contlen = xl_tot_len; + continue; + } + } else { + // we're continuing a record, possibly from previous page. + let pageleft = self.lsn.remaining_in_block() as u32; + + // read the rest of the record, or as much as fits on this page. + let n = min(self.contlen, pageleft) as usize; + + if self.inputbuf.remaining() < n { + return Ok(None); + } + + self.recordbuf.put(self.inputbuf.split_to(n)); + self.lsn += n as u64; + self.contlen -= n as u32; + + if self.contlen == 0 { + // The record is now complete. + recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()).freeze(); + break; + } + continue; + } + } + + // We now have a record in the 'recordbuf' local variable. + let xlogrec = XLogRecord::from_slice(&recordbuf[0..XLOG_SIZE_OF_XLOG_RECORD]); + + let mut crc = 0; + crc = crc32c_append(crc, &recordbuf[XLOG_RECORD_CRC_OFFS + 4..]); + crc = crc32c_append(crc, &recordbuf[0..XLOG_RECORD_CRC_OFFS]); + if crc != xlogrec.xl_crc { + return Err(WalDecodeError { + msg: "WAL record crc mismatch".into(), + lsn: self.lsn, + }); + } + + // XLOG_SWITCH records are special. If we see one, we need to skip + // to the next WAL segment. + if xlogrec.is_xlog_switch_record() { + trace!("saw xlog switch record at {}", self.lsn); + self.padlen = self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32; + } else { + // Pad to an 8-byte boundary + self.padlen = self.lsn.calc_padding(8u32) as u32; + } + + // Always align resulting LSN on 0x8 boundary -- that is important for getPage() + // and WalReceiver integration. Since this code is used both for WalReceiver and + // initial WAL import let's force alignment right here. + let result = (self.lsn.align(), recordbuf); + Ok(Some(result)) + } +} diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index c6c6593160..4285291247 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -32,8 +32,6 @@ hex = "0.4.3" const_format = "0.2.21" tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } -# FIXME: 'pageserver' is needed for ZTimelineId. Refactor -pageserver = { path = "../pageserver" } postgres_ffi = { path = "../postgres_ffi" } workspace_hack = { path = "../workspace_hack" } zenith_metrics = { path = "../zenith_metrics" } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 77bc74ec16..89b5a8cdea 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -8,7 +8,7 @@ use bytes::Buf; use bytes::BufMut; use bytes::Bytes; use bytes::BytesMut; -use pageserver::waldecoder::WalStreamDecoder; +use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::xlog_utils::TimeLineID; use serde::{Deserialize, Serialize}; use std::cmp::min;