Refactor pg_constants. Move them to postgres_ffi/

This commit is contained in:
anastasia
2021-04-28 13:28:56 +03:00
committed by lubennikovaav
parent f491a22d85
commit e7b112aacc
7 changed files with 82 additions and 71 deletions

View File

@@ -7,7 +7,6 @@ use std::time::Duration;
pub mod basebackup;
pub mod page_cache;
pub mod page_service;
pub mod pg_constants;
pub mod restore_local_repo;
pub mod tui;
pub mod tui_event;

View File

@@ -1,4 +1,4 @@
use crate::pg_constants;
use postgres_ffi::pg_constants;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::xlog_utils::XLogRecord;
@@ -178,10 +178,12 @@ impl WalStreamDecoder {
let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new());
let recordbuf = recordbuf.freeze();
let mut buf = recordbuf.clone();
// XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment.
if is_xlog_switch_record(&recordbuf) {
let xlogrec = XLogRecord::from_bytes(&mut buf);
if xlogrec.is_xlog_switch_record() {
trace!("saw xlog switch record at {}", self.lsn);
self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32;
} else {
@@ -237,32 +239,6 @@ impl WalStreamDecoder {
}
}
// FIXME:
const BLCKSZ: u16 = 8192;
//
// Constants from xlogrecord.h
//
const XLR_MAX_BLOCK_ID: u8 = 32;
const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
const XLR_BLOCK_ID_DATA_LONG: u8 = 254;
const XLR_BLOCK_ID_ORIGIN: u8 = 253;
const XLR_BLOCK_ID_TOPLEVEL_XID: u8 = 252;
const BKPBLOCK_FORK_MASK: u8 = 0x0F;
const _BKPBLOCK_FLAG_MASK: u8 = 0xF0;
const BKPBLOCK_HAS_IMAGE: u8 = 0x10; /* block data is an XLogRecordBlockImage */
const BKPBLOCK_HAS_DATA: u8 = 0x20;
const BKPBLOCK_WILL_INIT: u8 = 0x40; /* redo will re-init the page */
const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous */
/* Information stored in bimg_info */
const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
#[allow(dead_code)]
pub struct DecodedBkpBlock {
/* Is this block ref in use? */
@@ -330,16 +306,17 @@ pub struct DecodedWALRecord {
pub main_data_offset: usize,
}
// Is this record an XLOG_SWITCH record? They need some special processing,
// so we need to check for that before the rest of the parsing.
//
// FIXME: refactor this and decode_wal_record() below to avoid the duplication.
fn is_xlog_switch_record(rec: &Bytes) -> bool {
let mut buf = rec.clone();
let xlogrec = XLogRecord::from_bytes(&mut buf);
xlogrec.xl_info == pg_constants::XLOG_SWITCH && xlogrec.xl_rmid == pg_constants::RM_XLOG_ID
}
// // Is this record an XLOG_SWITCH record? They need some special processing,
// // so we need to check for that before the rest of the parsing.
// //
// // FIXME: refactor this and decode_wal_record() below to avoid the duplication.
// fn is_xlog_switch_record(rec: &Bytes) -> bool {
// let mut buf = rec.clone();
// let xlogrec = XLogRecord::from_bytes(&mut buf);
// xlogrec.xl_info == pg_constants::XLOG_SWITCH && xlogrec.xl_rmid == pg_constants::RM_XLOG_ID
// }
pub type Oid = u32;
pub type BlockNumber = u32;
@@ -408,10 +385,10 @@ impl XlCreateDatabase {
// The overall layout of an XLOG record is:
// Fixed-size header (XLogRecord struct)
// XLogRecordBlockHeader struct
// If BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
// If BKPIMAGE_HAS_HOLE and BKPIMAGE_IS_COMPRESSED, an
// If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
// If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an
// XLogRecordBlockCompressHeader struct follows.
// If BKPBLOCK_SAME_REL is not set, a RelFileNode follows
// If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows
// BlockNumber follows
// XLogRecordBlockHeader struct
// ...
@@ -458,29 +435,29 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let block_id = buf.get_u8();
match block_id {
XLR_BLOCK_ID_DATA_SHORT => {
pg_constants::XLR_BLOCK_ID_DATA_SHORT => {
/* XLogRecordDataHeaderShort */
main_data_len = buf.get_u8() as u32;
datatotal += main_data_len;
}
XLR_BLOCK_ID_DATA_LONG => {
pg_constants::XLR_BLOCK_ID_DATA_LONG => {
/* XLogRecordDataHeaderLong */
main_data_len = buf.get_u32_le();
datatotal += main_data_len;
}
XLR_BLOCK_ID_ORIGIN => {
pg_constants::XLR_BLOCK_ID_ORIGIN => {
// RepOriginId is uint16
buf.advance(2);
}
XLR_BLOCK_ID_TOPLEVEL_XID => {
pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
// TransactionId is uint32
buf.advance(4);
}
0..=XLR_MAX_BLOCK_ID => {
0..=pg_constants::XLR_MAX_BLOCK_ID => {
/* XLogRecordBlockHeader */
let mut blk = DecodedBkpBlock::new();
let fork_flags: u8;
@@ -497,11 +474,11 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
max_block_id = block_id;
fork_flags = buf.get_u8();
blk.forknum = fork_flags & BKPBLOCK_FORK_MASK;
blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
blk.flags = fork_flags;
blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0;
blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
blk.data_len = buf.get_u16_le();
/* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
@@ -514,16 +491,16 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
blk.hole_offset = buf.get_u16_le();
blk.bimg_info = buf.get_u8();
blk.apply_image = (blk.bimg_info & BKPIMAGE_APPLY) != 0;
blk.apply_image = (blk.bimg_info & pg_constants::BKPIMAGE_APPLY) != 0;
if blk.bimg_info & BKPIMAGE_IS_COMPRESSED != 0 {
if blk.bimg_info & BKPIMAGE_HAS_HOLE != 0 {
if blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED != 0 {
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
blk.hole_length = buf.get_u16_le();
} else {
blk.hole_length = 0;
}
} else {
blk.hole_length = BLCKSZ - blk.bimg_len;
blk.hole_length = pg_constants::BLCKSZ - blk.bimg_len;
}
datatotal += blk.bimg_len as u32;
blocks_total_len += blk.bimg_len as u32;
@@ -532,13 +509,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
* cross-check that hole_offset > 0, hole_length > 0 and
* bimg_len < BLCKSZ if the HAS_HOLE flag is set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE != 0
&& (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0
&& (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == pg_constants::BLCKSZ)
{
// TODO
/*
report_invalid_record(state,
"BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
"pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
(unsigned int) blk->hole_offset,
(unsigned int) blk->hole_length,
(unsigned int) blk->bimg_len,
@@ -551,13 +528,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
* cross-check that hole_offset == 0 and hole_length == 0 if
* the HAS_HOLE flag is not set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE == 0
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
&& (blk.hole_offset != 0 || blk.hole_length != 0)
{
// TODO
/*
report_invalid_record(state,
"BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
"pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
(unsigned int) blk->hole_offset,
(unsigned int) blk->hole_length,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
@@ -569,11 +546,11 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
* cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
* flag is set.
*/
if (blk.bimg_info & BKPIMAGE_IS_COMPRESSED == 0) && blk.bimg_len == BLCKSZ {
if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0) && blk.bimg_len == pg_constants::BLCKSZ {
// TODO
/*
report_invalid_record(state,
"BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
"pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
(unsigned int) blk->bimg_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
@@ -584,21 +561,21 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
* cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
* IS_COMPRESSED flag is set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE == 0
&& blk.bimg_info & BKPIMAGE_IS_COMPRESSED == 0
&& blk.bimg_len != BLCKSZ
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
&& blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0
&& blk.bimg_len != pg_constants::BLCKSZ
{
// TODO
/*
report_invalid_record(state,
"neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
"neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
(unsigned int) blk->data_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
}
if fork_flags & BKPBLOCK_SAME_REL == 0 {
if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
rnode_spcnode = buf.get_u32_le();
rnode_dbnode = buf.get_u32_le();
rnode_relnode = buf.get_u32_le();
@@ -607,7 +584,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
// TODO
/*
report_invalid_record(state,
"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
"pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err; */
}

View File

@@ -8,7 +8,6 @@
use crate::page_cache;
use crate::page_cache::{BufferTag, RelTag};
use crate::pg_constants;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -16,6 +15,7 @@ use anyhow::Error;
use lazy_static::lazy_static;
use log::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::pg_constants;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::collections::HashMap;

View File

@@ -37,8 +37,9 @@ use zenith_utils::lsn::Lsn;
use crate::page_cache::BufferTag;
use crate::page_cache::WALRecord;
use crate::ZTimelineId;
use crate::{pg_constants, PageServerConf};
use postgres_ffi::xlog_utils::XLogRecord;
use crate::PageServerConf;
use postgres_ffi::xlog_utils::{XLogRecord};
use postgres_ffi::pg_constants;
static TIMEOUT: Duration = Duration::from_secs(20);

View File

@@ -4,6 +4,7 @@
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
pub mod xlog_utils;
pub mod pg_constants;
use bytes::{Buf, Bytes, BytesMut};

View File

@@ -70,3 +70,29 @@ pub const XLOG_TBLSPC_CREATE: u8 = 0x00;
pub const XLOG_TBLSPC_DROP: u8 = 0x10;
pub const SIZEOF_XLOGRECORD: u32 = 24;
// FIXME:
pub const BLCKSZ: u16 = 8192;
//
// from xlogrecord.h
//
pub const XLR_MAX_BLOCK_ID: u8 = 32;
pub const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
pub const XLR_BLOCK_ID_DATA_LONG: u8 = 254;
pub const XLR_BLOCK_ID_ORIGIN: u8 = 253;
pub const XLR_BLOCK_ID_TOPLEVEL_XID: u8 = 252;
pub const BKPBLOCK_FORK_MASK: u8 = 0x0F;
pub const _BKPBLOCK_FLAG_MASK: u8 = 0xF0;
pub const BKPBLOCK_HAS_IMAGE: u8 = 0x10; /* block data is an XLogRecordBlockImage */
pub const BKPBLOCK_HAS_DATA: u8 = 0x20;
pub const BKPBLOCK_WILL_INIT: u8 = 0x40; /* redo will re-init the page */
pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous */
/* Information stored in bimg_info */
pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */

View File

@@ -7,6 +7,7 @@
// have been named the same as the corresponding PostgreSQL functions instead.
//
use crate::pg_constants;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use crc32c::*;
@@ -295,4 +296,10 @@ impl XLogRecord {
},
}
}
// Is this record an XLOG_SWITCH record? They need some special processing,
pub fn is_xlog_switch_record(&self) -> bool {
self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
}
}