From e7b112aacc65d4a3589886c9f3fca608a09d58f2 Mon Sep 17 00:00:00 2001 From: anastasia Date: Wed, 28 Apr 2021 13:28:56 +0300 Subject: [PATCH] Refactor pg_constants. Move them to postgres_ffi/ --- pageserver/src/lib.rs | 1 - pageserver/src/waldecoder.rs | 111 +++++++----------- pageserver/src/walreceiver.rs | 2 +- pageserver/src/walredo.rs | 5 +- postgres_ffi/src/lib.rs | 1 + .../src/pg_constants.rs | 26 ++++ postgres_ffi/src/xlog_utils.rs | 7 ++ 7 files changed, 82 insertions(+), 71 deletions(-) rename {pageserver => postgres_ffi}/src/pg_constants.rs (71%) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 225906c943..84e80f3ecb 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -7,7 +7,6 @@ use std::time::Duration; pub mod basebackup; pub mod page_cache; pub mod page_service; -pub mod pg_constants; pub mod restore_local_repo; pub mod tui; pub mod tui_event; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 4b9c8da62f..bbe9cd2fd8 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -1,4 +1,4 @@ -use crate::pg_constants; +use postgres_ffi::pg_constants; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use postgres_ffi::xlog_utils::XLogRecord; @@ -178,10 +178,12 @@ impl WalStreamDecoder { let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()); let recordbuf = recordbuf.freeze(); + let mut buf = recordbuf.clone(); // XLOG_SWITCH records are special. If we see one, we need to skip // to the next WAL segment. - if is_xlog_switch_record(&recordbuf) { + let xlogrec = XLogRecord::from_bytes(&mut buf); + if xlogrec.is_xlog_switch_record() { trace!("saw xlog switch record at {}", self.lsn); self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32; } else { @@ -237,32 +239,6 @@ impl WalStreamDecoder { } } -// FIXME: -const BLCKSZ: u16 = 8192; - -// -// Constants from xlogrecord.h -// - -const XLR_MAX_BLOCK_ID: u8 = 32; - -const XLR_BLOCK_ID_DATA_SHORT: u8 = 255; -const XLR_BLOCK_ID_DATA_LONG: u8 = 254; -const XLR_BLOCK_ID_ORIGIN: u8 = 253; -const XLR_BLOCK_ID_TOPLEVEL_XID: u8 = 252; - -const BKPBLOCK_FORK_MASK: u8 = 0x0F; -const _BKPBLOCK_FLAG_MASK: u8 = 0xF0; -const BKPBLOCK_HAS_IMAGE: u8 = 0x10; /* block data is an XLogRecordBlockImage */ -const BKPBLOCK_HAS_DATA: u8 = 0x20; -const BKPBLOCK_WILL_INIT: u8 = 0x40; /* redo will re-init the page */ -const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous */ - -/* Information stored in bimg_info */ -const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ -const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ -const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ - #[allow(dead_code)] pub struct DecodedBkpBlock { /* Is this block ref in use? */ @@ -330,16 +306,17 @@ pub struct DecodedWALRecord { pub main_data_offset: usize, } -// Is this record an XLOG_SWITCH record? They need some special processing, -// so we need to check for that before the rest of the parsing. -// -// FIXME: refactor this and decode_wal_record() below to avoid the duplication. -fn is_xlog_switch_record(rec: &Bytes) -> bool { - let mut buf = rec.clone(); - - let xlogrec = XLogRecord::from_bytes(&mut buf); - xlogrec.xl_info == pg_constants::XLOG_SWITCH && xlogrec.xl_rmid == pg_constants::RM_XLOG_ID -} +// // Is this record an XLOG_SWITCH record? They need some special processing, +// // so we need to check for that before the rest of the parsing. +// // +// // FIXME: refactor this and decode_wal_record() below to avoid the duplication. +// fn is_xlog_switch_record(rec: &Bytes) -> bool { +// let mut buf = rec.clone(); + +// let xlogrec = XLogRecord::from_bytes(&mut buf); +// xlogrec.xl_info == pg_constants::XLOG_SWITCH && xlogrec.xl_rmid == pg_constants::RM_XLOG_ID +// } + pub type Oid = u32; pub type BlockNumber = u32; @@ -408,10 +385,10 @@ impl XlCreateDatabase { // The overall layout of an XLOG record is: // Fixed-size header (XLogRecord struct) // XLogRecordBlockHeader struct -// If BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows -// If BKPIMAGE_HAS_HOLE and BKPIMAGE_IS_COMPRESSED, an +// If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows +// If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an // XLogRecordBlockCompressHeader struct follows. -// If BKPBLOCK_SAME_REL is not set, a RelFileNode follows +// If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows // BlockNumber follows // XLogRecordBlockHeader struct // ... @@ -458,29 +435,29 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let block_id = buf.get_u8(); match block_id { - XLR_BLOCK_ID_DATA_SHORT => { + pg_constants::XLR_BLOCK_ID_DATA_SHORT => { /* XLogRecordDataHeaderShort */ main_data_len = buf.get_u8() as u32; datatotal += main_data_len; } - XLR_BLOCK_ID_DATA_LONG => { + pg_constants::XLR_BLOCK_ID_DATA_LONG => { /* XLogRecordDataHeaderLong */ main_data_len = buf.get_u32_le(); datatotal += main_data_len; } - XLR_BLOCK_ID_ORIGIN => { + pg_constants::XLR_BLOCK_ID_ORIGIN => { // RepOriginId is uint16 buf.advance(2); } - XLR_BLOCK_ID_TOPLEVEL_XID => { + pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => { // TransactionId is uint32 buf.advance(4); } - 0..=XLR_MAX_BLOCK_ID => { + 0..=pg_constants::XLR_MAX_BLOCK_ID => { /* XLogRecordBlockHeader */ let mut blk = DecodedBkpBlock::new(); let fork_flags: u8; @@ -497,11 +474,11 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { max_block_id = block_id; fork_flags = buf.get_u8(); - blk.forknum = fork_flags & BKPBLOCK_FORK_MASK; + blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK; blk.flags = fork_flags; - blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0; - blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0; - blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0; + blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0; + blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0; + blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0; blk.data_len = buf.get_u16_le(); /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */ @@ -514,16 +491,16 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { blk.hole_offset = buf.get_u16_le(); blk.bimg_info = buf.get_u8(); - blk.apply_image = (blk.bimg_info & BKPIMAGE_APPLY) != 0; + blk.apply_image = (blk.bimg_info & pg_constants::BKPIMAGE_APPLY) != 0; - if blk.bimg_info & BKPIMAGE_IS_COMPRESSED != 0 { - if blk.bimg_info & BKPIMAGE_HAS_HOLE != 0 { + if blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED != 0 { + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 { blk.hole_length = buf.get_u16_le(); } else { blk.hole_length = 0; } } else { - blk.hole_length = BLCKSZ - blk.bimg_len; + blk.hole_length = pg_constants::BLCKSZ - blk.bimg_len; } datatotal += blk.bimg_len as u32; blocks_total_len += blk.bimg_len as u32; @@ -532,13 +509,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { * cross-check that hole_offset > 0, hole_length > 0 and * bimg_len < BLCKSZ if the HAS_HOLE flag is set. */ - if blk.bimg_info & BKPIMAGE_HAS_HOLE != 0 - && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ) + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 + && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == pg_constants::BLCKSZ) { // TODO /* report_invalid_record(state, - "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", + "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, (unsigned int) blk->bimg_len, @@ -551,13 +528,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { * cross-check that hole_offset == 0 and hole_length == 0 if * the HAS_HOLE flag is not set. */ - if blk.bimg_info & BKPIMAGE_HAS_HOLE == 0 + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 && (blk.hole_offset != 0 || blk.hole_length != 0) { // TODO /* report_invalid_record(state, - "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", + "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); @@ -569,11 +546,11 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED * flag is set. */ - if (blk.bimg_info & BKPIMAGE_IS_COMPRESSED == 0) && blk.bimg_len == BLCKSZ { + if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0) && blk.bimg_len == pg_constants::BLCKSZ { // TODO /* report_invalid_record(state, - "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X", + "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X", (unsigned int) blk->bimg_len, (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); goto err; @@ -584,21 +561,21 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor * IS_COMPRESSED flag is set. */ - if blk.bimg_info & BKPIMAGE_HAS_HOLE == 0 - && blk.bimg_info & BKPIMAGE_IS_COMPRESSED == 0 - && blk.bimg_len != BLCKSZ + if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 + && blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0 + && blk.bimg_len != pg_constants::BLCKSZ { // TODO /* report_invalid_record(state, - "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X", + "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X", (unsigned int) blk->data_len, (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); goto err; */ } } - if fork_flags & BKPBLOCK_SAME_REL == 0 { + if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 { rnode_spcnode = buf.get_u32_le(); rnode_dbnode = buf.get_u32_le(); rnode_relnode = buf.get_u32_le(); @@ -607,7 +584,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { // TODO /* report_invalid_record(state, - "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", + "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X", (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); goto err; */ } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index b11ca965de..e97f7c7c9b 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -8,7 +8,6 @@ use crate::page_cache; use crate::page_cache::{BufferTag, RelTag}; -use crate::pg_constants; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; @@ -16,6 +15,7 @@ use anyhow::Error; use lazy_static::lazy_static; use log::*; use postgres_ffi::xlog_utils::*; +use postgres_ffi::pg_constants; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::collections::HashMap; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 229e1c9516..8f2eb33b3a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -37,8 +37,9 @@ use zenith_utils::lsn::Lsn; use crate::page_cache::BufferTag; use crate::page_cache::WALRecord; use crate::ZTimelineId; -use crate::{pg_constants, PageServerConf}; -use postgres_ffi::xlog_utils::XLogRecord; +use crate::PageServerConf; +use postgres_ffi::xlog_utils::{XLogRecord}; +use postgres_ffi::pg_constants; static TIMEOUT: Duration = Duration::from_secs(20); diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index d1496da2ef..61bdd398be 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -4,6 +4,7 @@ include!(concat!(env!("OUT_DIR"), "/bindings.rs")); pub mod xlog_utils; +pub mod pg_constants; use bytes::{Buf, Bytes, BytesMut}; diff --git a/pageserver/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs similarity index 71% rename from pageserver/src/pg_constants.rs rename to postgres_ffi/src/pg_constants.rs index b2a54bc78b..a43aab1633 100644 --- a/pageserver/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -70,3 +70,29 @@ pub const XLOG_TBLSPC_CREATE: u8 = 0x00; pub const XLOG_TBLSPC_DROP: u8 = 0x10; pub const SIZEOF_XLOGRECORD: u32 = 24; + + +// FIXME: +pub const BLCKSZ: u16 = 8192; + +// +// from xlogrecord.h +// +pub const XLR_MAX_BLOCK_ID: u8 = 32; + +pub const XLR_BLOCK_ID_DATA_SHORT: u8 = 255; +pub const XLR_BLOCK_ID_DATA_LONG: u8 = 254; +pub const XLR_BLOCK_ID_ORIGIN: u8 = 253; +pub const XLR_BLOCK_ID_TOPLEVEL_XID: u8 = 252; + +pub const BKPBLOCK_FORK_MASK: u8 = 0x0F; +pub const _BKPBLOCK_FLAG_MASK: u8 = 0xF0; +pub const BKPBLOCK_HAS_IMAGE: u8 = 0x10; /* block data is an XLogRecordBlockImage */ +pub const BKPBLOCK_HAS_DATA: u8 = 0x20; +pub const BKPBLOCK_WILL_INIT: u8 = 0x40; /* redo will re-init the page */ +pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous */ + +/* Information stored in bimg_info */ +pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ +pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ +pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 5e38294b1f..85a0ae2f63 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -7,6 +7,7 @@ // have been named the same as the corresponding PostgreSQL functions instead. // +use crate::pg_constants; use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, Bytes}; use crc32c::*; @@ -295,4 +296,10 @@ impl XLogRecord { }, } } + + // Is this record an XLOG_SWITCH record? They need some special processing, + pub fn is_xlog_switch_record(&self) -> bool { + + self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID + } }