mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Decode main_data in decode_wal_record().
Replay XLOG_XACT_COMMIT and XLOG_XACT_ABORT records in walredo. Don't wait for lsn catchup before walreceiver connected. Use 'request_nonrel' branch of vendor/postgres
This commit is contained in:
@@ -106,6 +106,7 @@ struct PageCacheShared {
|
||||
first_valid_lsn: u64,
|
||||
last_valid_lsn: u64,
|
||||
last_record_lsn: u64,
|
||||
walreceiver_works: bool,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
@@ -169,6 +170,7 @@ fn init_page_cache() -> PageCache {
|
||||
first_valid_lsn: 0,
|
||||
last_valid_lsn: 0,
|
||||
last_record_lsn: 0,
|
||||
walreceiver_works: false,
|
||||
}),
|
||||
valid_lsn_condvar: Condvar::new(),
|
||||
|
||||
@@ -246,7 +248,7 @@ pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
|
||||
pub struct BufferTag {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
@@ -260,6 +262,10 @@ pub struct WALRecord {
|
||||
pub lsn: u64, // LSN at the *end* of the record
|
||||
pub will_init: bool,
|
||||
pub rec: Bytes,
|
||||
// Remember the offset of main_data in rec,
|
||||
// so that we don't have to parse the record again.
|
||||
// If record has no main_data, this offset equals rec.len().
|
||||
pub main_data_offset: usize,
|
||||
}
|
||||
|
||||
// Public interface functions
|
||||
@@ -283,26 +289,40 @@ impl PageCache {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let mut waited = false;
|
||||
|
||||
while lsn > shared.last_valid_lsn {
|
||||
// TODO: Wait for the WAL receiver to catch up
|
||||
waited = true;
|
||||
// There is a a race at postgres instance start
|
||||
// when we request a page before walsender established connection
|
||||
// and was able to stream the page. Just don't wait and return what we have.
|
||||
// TODO is there any corner case when this is incorrect?
|
||||
if !shared.walreceiver_works {
|
||||
trace!(
|
||||
"not caught up yet: {}, requested {}",
|
||||
" walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
shared.last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
let wait_result = self
|
||||
.valid_lsn_condvar
|
||||
.wait_timeout(shared, TIMEOUT)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
shared = wait_result.0;
|
||||
if wait_result.1.timed_out() {
|
||||
bail!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
if shared.walreceiver_works {
|
||||
while lsn > shared.last_valid_lsn {
|
||||
// TODO: Wait for the WAL receiver to catch up
|
||||
waited = true;
|
||||
trace!(
|
||||
"not caught up yet: {}, requested {}",
|
||||
shared.last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
let wait_result = self
|
||||
.valid_lsn_condvar
|
||||
.wait_timeout(shared, TIMEOUT)
|
||||
.unwrap();
|
||||
|
||||
shared = wait_result.0;
|
||||
if wait_result.1.timed_out() {
|
||||
bail!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if waited {
|
||||
@@ -512,12 +532,17 @@ impl PageCache {
|
||||
}
|
||||
|
||||
//
|
||||
pub fn advance_last_valid_lsn(&self, lsn: u64) {
|
||||
pub fn advance_last_valid_lsn(&self, lsn: u64, from_walreceiver: bool) {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
|
||||
// Can't move backwards.
|
||||
let oldlsn = shared.last_valid_lsn;
|
||||
if lsn >= oldlsn {
|
||||
// Now we receive entries from walreceiver and should wait
|
||||
if from_walreceiver {
|
||||
shared.walreceiver_works = true;
|
||||
}
|
||||
|
||||
shared.last_valid_lsn = lsn;
|
||||
self.valid_lsn_condvar.notify_all();
|
||||
|
||||
|
||||
@@ -9,3 +9,45 @@ pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
|
||||
pub const PG_XACT_FORKNUM: u32 = 44;
|
||||
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
|
||||
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;
|
||||
|
||||
//
|
||||
// constants from clog.h
|
||||
//
|
||||
pub const CLOG_XACTS_PER_BYTE: u32 = 4;
|
||||
pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
|
||||
pub const CLOG_BITS_PER_XACT: u8 = 2;
|
||||
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
|
||||
|
||||
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
|
||||
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
|
||||
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
|
||||
|
||||
pub const CLOG_ZEROPAGE: u8 = 0x00;
|
||||
pub const CLOG_TRUNCATE: u8 = 0x10;
|
||||
|
||||
// From xact.h
|
||||
pub const XLOG_XACT_COMMIT: u8 = 0x00;
|
||||
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
|
||||
/* mask for filtering opcodes out of xl_info */
|
||||
pub const XLOG_XACT_OPMASK: u8 = 0x70;
|
||||
/* does this record have a 'xinfo' field or not */
|
||||
pub const XLOG_XACT_HAS_INFO: u8 = 0x80;
|
||||
|
||||
/*
|
||||
* The following flags, stored in xinfo, determine which information is
|
||||
* contained in commit/abort records.
|
||||
*/
|
||||
pub const XACT_XINFO_HAS_DBINFO: u32 = 1;
|
||||
pub const XACT_XINFO_HAS_SUBXACTS: u32 = 2;
|
||||
pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4;
|
||||
|
||||
// From pg_control.h and rmgrlist.h
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const RM_XLOG_ID: u8 = 0;
|
||||
pub const RM_XACT_ID: u8 = 1;
|
||||
pub const RM_CLOG_ID: u8 = 3;
|
||||
// pub const RM_MULTIXACT_ID:u8 = 6;
|
||||
|
||||
// from xlogreader.h
|
||||
pub const XLR_INFO_MASK: u8 = 0x0F;
|
||||
|
||||
@@ -29,7 +29,7 @@ use bytes::Bytes;
|
||||
use crate::page_cache;
|
||||
use crate::page_cache::BufferTag;
|
||||
use crate::page_cache::PageCache;
|
||||
use crate::waldecoder::WalStreamDecoder;
|
||||
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
|
||||
@@ -300,8 +300,7 @@ fn restore_wal(
|
||||
break;
|
||||
}
|
||||
if let Some((lsn, recdata)) = rec.unwrap() {
|
||||
let decoded = crate::waldecoder::decode_wal_record(recdata.clone());
|
||||
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
// Put the WAL record to the page cache. We make a separate copy of
|
||||
// it for every block it modifies. (The actual WAL record is kept in
|
||||
// a Bytes, which uses a reference counter for the underlying buffer,
|
||||
@@ -319,14 +318,14 @@ fn restore_wal(
|
||||
lsn: lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset,
|
||||
};
|
||||
|
||||
pcache.put_wal_record(tag, rec);
|
||||
}
|
||||
|
||||
// Now that this record has been handled, let the page cache know that
|
||||
// it is up-to-date to this LSN
|
||||
pcache.advance_last_valid_lsn(lsn);
|
||||
pcache.advance_last_valid_lsn(lsn, false);
|
||||
last_lsn = lsn;
|
||||
} else {
|
||||
break;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::pg_constants;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use log::*;
|
||||
use std::cmp::min;
|
||||
@@ -248,6 +249,7 @@ const BLCKSZ: u16 = 8192;
|
||||
//
|
||||
// Constants from xlogrecord.h
|
||||
//
|
||||
|
||||
const XLR_MAX_BLOCK_ID: u8 = 32;
|
||||
|
||||
const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
|
||||
@@ -276,6 +278,7 @@ pub struct DecodedBkpBlock {
|
||||
pub rnode_spcnode: u32,
|
||||
pub rnode_dbnode: u32,
|
||||
pub rnode_relnode: u32,
|
||||
// Note that we have a few special forknum values for non-rel files.
|
||||
pub forknum: u8,
|
||||
pub blkno: u32,
|
||||
|
||||
@@ -294,10 +297,33 @@ pub struct DecodedBkpBlock {
|
||||
|
||||
/* Buffer holding the rmgr-specific data associated with this block */
|
||||
has_data: bool,
|
||||
//char *data;
|
||||
data_len: u16,
|
||||
}
|
||||
|
||||
impl DecodedBkpBlock {
|
||||
pub fn new() -> DecodedBkpBlock {
|
||||
DecodedBkpBlock {
|
||||
rnode_spcnode: 0,
|
||||
rnode_dbnode: 0,
|
||||
rnode_relnode: 0,
|
||||
forknum: 0,
|
||||
blkno: 0,
|
||||
|
||||
flags: 0,
|
||||
has_image: false,
|
||||
apply_image: false,
|
||||
will_init: false,
|
||||
hole_offset: 0,
|
||||
hole_length: 0,
|
||||
bimg_len: 0,
|
||||
bimg_info: 0,
|
||||
|
||||
has_data: false,
|
||||
data_len: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_upper_case_globals)]
|
||||
const SizeOfXLogRecord: u32 = 24;
|
||||
|
||||
@@ -305,12 +331,9 @@ pub struct DecodedWALRecord {
|
||||
pub record: Bytes, // raw XLogRecord
|
||||
|
||||
pub blocks: Vec<DecodedBkpBlock>,
|
||||
pub main_data_offset: usize,
|
||||
}
|
||||
|
||||
// From pg_control.h and rmgrlist.h
|
||||
const XLOG_SWITCH: u8 = 0x40;
|
||||
const RM_XLOG_ID: u8 = 0;
|
||||
|
||||
// Is this record an XLOG_SWITCH record? They need some special processing,
|
||||
// so we need to check for that before the rest of the parsing.
|
||||
//
|
||||
@@ -327,55 +350,88 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
|
||||
buf.advance(2); // 2 bytes of padding
|
||||
let _xl_crc = buf.get_u32_le();
|
||||
|
||||
return xl_info == XLOG_SWITCH && xl_rmid == RM_XLOG_ID;
|
||||
return xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID;
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct RelFileNode {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
pub relnode: u32,
|
||||
}
|
||||
|
||||
//
|
||||
// Routines to decode a WAL record and figure out which blocks are modified
|
||||
//
|
||||
// See xlogrecord.h for details
|
||||
// The overall layout of an XLOG record is:
|
||||
// Fixed-size header (XLogRecord struct)
|
||||
// XLogRecordBlockHeader struct
|
||||
// If BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
|
||||
// If BKPIMAGE_HAS_HOLE and BKPIMAGE_IS_COMPRESSED, an
|
||||
// XLogRecordBlockCompressHeader struct follows.
|
||||
// If BKPBLOCK_SAME_REL is not set, a RelFileNode follows
|
||||
// BlockNumber follows
|
||||
// XLogRecordBlockHeader struct
|
||||
// ...
|
||||
// XLogRecordDataHeader[Short|Long] struct
|
||||
// block data
|
||||
// block data
|
||||
// ...
|
||||
// main data
|
||||
pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
let mut rnode_spcnode: u32 = 0;
|
||||
let mut rnode_dbnode: u32 = 0;
|
||||
let mut rnode_relnode: u32 = 0;
|
||||
let mut got_rnode = false;
|
||||
|
||||
let mut buf = rec.clone();
|
||||
|
||||
// 1. Parse XLogRecord struct
|
||||
|
||||
// FIXME: assume little-endian here
|
||||
let xl_tot_len = buf.get_u32_le();
|
||||
let _xl_xid = buf.get_u32_le();
|
||||
let _xl_prev = buf.get_u64_le();
|
||||
let _xl_info = buf.get_u8();
|
||||
let _xl_rmid = buf.get_u8();
|
||||
let xl_xid = buf.get_u32_le();
|
||||
let xl_prev = buf.get_u64_le();
|
||||
let xl_info = buf.get_u8();
|
||||
let xl_rmid = buf.get_u8();
|
||||
buf.advance(2); // 2 bytes of padding
|
||||
let _xl_crc = buf.get_u32_le();
|
||||
|
||||
trace!(
|
||||
"decode_wal_record xl_rmid = {} xl_info = {}",
|
||||
xl_rmid,
|
||||
xl_info
|
||||
);
|
||||
|
||||
let remaining = xl_tot_len - SizeOfXLogRecord;
|
||||
|
||||
if buf.remaining() != remaining as usize {
|
||||
//TODO error
|
||||
}
|
||||
|
||||
let mut rnode_spcnode: u32 = 0;
|
||||
let mut rnode_dbnode: u32 = 0;
|
||||
let mut rnode_relnode: u32 = 0;
|
||||
let mut got_rnode = false;
|
||||
|
||||
// Decode the headers
|
||||
|
||||
let mut max_block_id = 0;
|
||||
let mut blocks_total_len: u32 = 0;
|
||||
let mut main_data_len = 0;
|
||||
let mut datatotal: u32 = 0;
|
||||
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
|
||||
|
||||
// 2. Decode the headers.
|
||||
// XLogRecordBlockHeaders if any,
|
||||
// XLogRecordDataHeader[Short|Long]
|
||||
while buf.remaining() > datatotal as usize {
|
||||
let block_id = buf.get_u8();
|
||||
|
||||
match block_id {
|
||||
XLR_BLOCK_ID_DATA_SHORT => {
|
||||
/* XLogRecordDataHeaderShort */
|
||||
let main_data_len = buf.get_u8() as u32;
|
||||
|
||||
main_data_len = buf.get_u8() as u32;
|
||||
datatotal += main_data_len;
|
||||
}
|
||||
|
||||
XLR_BLOCK_ID_DATA_LONG => {
|
||||
/* XLogRecordDataHeaderShort */
|
||||
let main_data_len = buf.get_u32();
|
||||
|
||||
/* XLogRecordDataHeaderLong */
|
||||
main_data_len = buf.get_u32_le();
|
||||
datatotal += main_data_len;
|
||||
}
|
||||
|
||||
@@ -391,25 +447,7 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
|
||||
0..=XLR_MAX_BLOCK_ID => {
|
||||
/* XLogRecordBlockHeader */
|
||||
let mut blk = DecodedBkpBlock {
|
||||
rnode_spcnode: 0,
|
||||
rnode_dbnode: 0,
|
||||
rnode_relnode: 0,
|
||||
forknum: 0,
|
||||
blkno: 0,
|
||||
|
||||
flags: 0,
|
||||
has_image: false,
|
||||
apply_image: false,
|
||||
will_init: false,
|
||||
hole_offset: 0,
|
||||
hole_length: 0,
|
||||
bimg_len: 0,
|
||||
bimg_info: 0,
|
||||
|
||||
has_data: false,
|
||||
data_len: 0,
|
||||
};
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
let fork_flags: u8;
|
||||
|
||||
if block_id <= max_block_id {
|
||||
@@ -429,28 +467,12 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0;
|
||||
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
|
||||
blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0;
|
||||
|
||||
blk.data_len = buf.get_u16_le();
|
||||
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
|
||||
// TODO
|
||||
/*
|
||||
if (blk->has_data && blk->data_len == 0)
|
||||
{
|
||||
report_invalid_record(state,
|
||||
"BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
|
||||
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
|
||||
goto err;
|
||||
}
|
||||
if (!blk->has_data && blk->data_len != 0)
|
||||
{
|
||||
report_invalid_record(state,
|
||||
"BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
|
||||
(unsigned int) blk->data_len,
|
||||
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
|
||||
goto err;
|
||||
}
|
||||
*/
|
||||
|
||||
/* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
|
||||
|
||||
datatotal += blk.data_len as u32;
|
||||
blocks_total_len += blk.data_len as u32;
|
||||
|
||||
if blk.has_image {
|
||||
blk.bimg_len = buf.get_u16_le();
|
||||
@@ -469,6 +491,7 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
blk.hole_length = BLCKSZ - blk.bimg_len;
|
||||
}
|
||||
datatotal += blk.bimg_len as u32;
|
||||
blocks_total_len += blk.bimg_len as u32;
|
||||
|
||||
/*
|
||||
* cross-check that hole_offset > 0, hole_length > 0 and
|
||||
@@ -544,28 +567,28 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
rnode_spcnode = buf.get_u32_le();
|
||||
rnode_dbnode = buf.get_u32_le();
|
||||
rnode_relnode = buf.get_u32_le();
|
||||
//rnode = &blk->rnode;
|
||||
got_rnode = true;
|
||||
} else {
|
||||
if !got_rnode {
|
||||
// TODO
|
||||
/*
|
||||
report_invalid_record(state,
|
||||
"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
|
||||
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
|
||||
goto err;
|
||||
*/
|
||||
}
|
||||
|
||||
//blk->rnode = *rnode;
|
||||
} else if !got_rnode {
|
||||
// TODO
|
||||
/*
|
||||
report_invalid_record(state,
|
||||
"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
|
||||
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
|
||||
goto err; */
|
||||
}
|
||||
|
||||
blk.rnode_spcnode = rnode_spcnode;
|
||||
blk.rnode_dbnode = rnode_dbnode;
|
||||
blk.rnode_relnode = rnode_relnode;
|
||||
|
||||
blk.blkno = buf.get_u32_le();
|
||||
|
||||
//println!("this record affects {}/{}/{} blk {}",rnode_spcnode, rnode_dbnode, rnode_relnode, blk.blkno);
|
||||
trace!(
|
||||
"this record affects {}/{}/{} blk {}",
|
||||
rnode_spcnode,
|
||||
rnode_dbnode,
|
||||
rnode_relnode,
|
||||
blk.blkno
|
||||
);
|
||||
|
||||
blocks.push(blk);
|
||||
}
|
||||
@@ -576,20 +599,58 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Ok, we've parsed the fragment headers, and verified that the total
|
||||
* length of the payload in the fragments is equal to the amount of data
|
||||
* left. Copy the data of each fragment to a separate buffer.
|
||||
*
|
||||
* We could just set up pointers into readRecordBuf, but we want to align
|
||||
* the data for the convenience of the callers. Backup images are not
|
||||
* copied, however; they don't need alignment.
|
||||
*/
|
||||
// 3. Decode blocks.
|
||||
// We don't need them, so just skip blocks_total_len bytes
|
||||
buf.advance(blocks_total_len as usize);
|
||||
|
||||
// Since we don't care about the data payloads here, we're done.
|
||||
let main_data_offset = (xl_tot_len - main_data_len) as usize;
|
||||
|
||||
return DecodedWALRecord {
|
||||
// 4. Decode main_data
|
||||
if main_data_len > 0 {
|
||||
assert_eq!(buf.remaining(), main_data_len as usize);
|
||||
}
|
||||
|
||||
//5. Handle special CLOG and XACT records
|
||||
if xl_rmid == pg_constants::RM_CLOG_ID {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
||||
blk.blkno = buf.get_i32_le() as u32;
|
||||
trace!("RM_CLOG_ID updates block {}", blk.blkno);
|
||||
blocks.push(blk);
|
||||
} else if xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
||||
blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT xl_prev {:X}/{:X} xid {} updates block {}",
|
||||
(xl_prev >> 32),
|
||||
xl_prev & 0xffffffff,
|
||||
xl_xid,
|
||||
blk.blkno
|
||||
);
|
||||
blocks.push(blk);
|
||||
//TODO parse commit record to extract subtrans entries
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
||||
blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_ABORT xl_prev {:X}/{:X} xid {} updates block {}",
|
||||
(xl_prev >> 32),
|
||||
xl_prev & 0xffffffff,
|
||||
xl_xid,
|
||||
blk.blkno
|
||||
);
|
||||
blocks.push(blk);
|
||||
//TODO parse abort record to extract subtrans entries
|
||||
}
|
||||
}
|
||||
|
||||
DecodedWALRecord {
|
||||
record: rec,
|
||||
blocks,
|
||||
};
|
||||
main_data_offset: main_data_offset,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,6 +145,7 @@ async fn walreceiver_main(
|
||||
// Start streaming the WAL, from where we left off previously.
|
||||
//
|
||||
let mut startpoint = pcache.get_last_valid_lsn();
|
||||
let last_valid_lsn = pcache.get_last_valid_lsn();
|
||||
if startpoint == 0 {
|
||||
// If we start here with identify.xlogpos we will have race condition with
|
||||
// postgres start: insert into postgres may request page that was modified with lsn
|
||||
@@ -167,7 +168,9 @@ async fn walreceiver_main(
|
||||
}
|
||||
}
|
||||
debug!(
|
||||
"starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...",
|
||||
"last_valid_lsn {:X}/{:X} starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...",
|
||||
(last_valid_lsn >> 32),
|
||||
(last_valid_lsn & 0xffffffff),
|
||||
(startpoint >> 32),
|
||||
(startpoint & 0xffffffff),
|
||||
timelineid,
|
||||
@@ -213,7 +216,6 @@ async fn walreceiver_main(
|
||||
loop {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
|
||||
// Put the WAL record to the page cache. We make a separate copy of
|
||||
// it for every block it modifies. (The actual WAL record is kept in
|
||||
// a Bytes, which uses a reference counter for the underlying buffer,
|
||||
@@ -231,11 +233,11 @@ async fn walreceiver_main(
|
||||
lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset,
|
||||
};
|
||||
|
||||
pcache.put_wal_record(tag, rec);
|
||||
}
|
||||
|
||||
// Now that this record has been handled, let the page cache know that
|
||||
// it is up-to-date to this LSN
|
||||
pcache.advance_last_record_lsn(lsn);
|
||||
@@ -250,7 +252,7 @@ async fn walreceiver_main(
|
||||
// better reflect that, because GetPage@LSN requests might also point in the
|
||||
// middle of a record, if the request LSN was taken from the server's current
|
||||
// flush ptr.
|
||||
pcache.advance_last_valid_lsn(endlsn);
|
||||
pcache.advance_last_valid_lsn(endlsn, true);
|
||||
|
||||
if !caught_up && endlsn >= end_of_wal {
|
||||
info!(
|
||||
|
||||
@@ -29,13 +29,13 @@ use tokio::process::{Child, ChildStdin, ChildStdout, Command};
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::page_cache::CacheEntry;
|
||||
use crate::page_cache::WALRecord;
|
||||
use crate::ZTimelineId;
|
||||
use crate::{page_cache::BufferTag, PageServerConf};
|
||||
use crate::{page_cache::BufferTag, pg_constants, PageServerConf};
|
||||
|
||||
static TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
@@ -89,6 +89,59 @@ pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) {
|
||||
}
|
||||
}
|
||||
|
||||
fn transaction_id_set_status_bit(
|
||||
xl_info: u8,
|
||||
xl_rmid: u8,
|
||||
xl_xid: u32,
|
||||
record: WALRecord,
|
||||
page: &mut BytesMut,
|
||||
) {
|
||||
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
let mut status = 0;
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
status = pg_constants::TRANSACTION_STATUS_ABORTED;
|
||||
} else {
|
||||
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}",
|
||||
status,
|
||||
record.lsn >> 32,
|
||||
record.lsn & 0xffffffff,
|
||||
record.main_data_offset, record.rec.len());
|
||||
return;
|
||||
}
|
||||
|
||||
trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}",
|
||||
status,
|
||||
record.lsn >> 32,
|
||||
record.lsn & 0xffffffff,
|
||||
record.main_data_offset, record.rec.len());
|
||||
|
||||
let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
|
||||
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
|
||||
|
||||
let byteptr = &mut page[byteno..byteno + 1];
|
||||
let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE)
|
||||
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
|
||||
|
||||
let mut curval = byteptr[0];
|
||||
curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK;
|
||||
|
||||
let mut byteval = [0];
|
||||
byteval[0] = curval;
|
||||
byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift);
|
||||
byteval[0] |= status << bshift;
|
||||
|
||||
byteptr.copy_from_slice(&byteval);
|
||||
trace!(
|
||||
"xl_xid {} byteno {} curval {} byteval {}",
|
||||
xl_xid,
|
||||
byteno,
|
||||
curval,
|
||||
byteval[0]
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_apply_request(
|
||||
pcache: &page_cache::PageCache,
|
||||
process: &WalRedoProcess,
|
||||
@@ -105,7 +158,46 @@ fn handle_apply_request(
|
||||
let nrecords = records.len();
|
||||
|
||||
let start = Instant::now();
|
||||
let apply_result = process.apply_wal_records(runtime, tag, base_img, records);
|
||||
|
||||
let apply_result: Result<Bytes, Error>;
|
||||
if tag.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
|
||||
//TODO use base image if any
|
||||
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
let zero_page_bytes: &[u8] = &ZERO_PAGE;
|
||||
let mut page = BytesMut::from(zero_page_bytes);
|
||||
|
||||
for record in records {
|
||||
let mut buf = record.rec.clone();
|
||||
|
||||
// 1. Parse XLogRecord struct
|
||||
// FIXME: refactor to avoid code duplication.
|
||||
let _xl_tot_len = buf.get_u32_le();
|
||||
let xl_xid = buf.get_u32_le();
|
||||
let _xl_prev = buf.get_u64_le();
|
||||
let xl_info = buf.get_u8();
|
||||
let xl_rmid = buf.get_u8();
|
||||
buf.advance(2); // 2 bytes of padding
|
||||
let _xl_crc = buf.get_u32_le();
|
||||
|
||||
if xl_rmid == pg_constants::RM_CLOG_ID {
|
||||
let info = xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
if info == pg_constants::CLOG_ZEROPAGE {
|
||||
page.clone_from_slice(zero_page_bytes);
|
||||
trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}",
|
||||
record.lsn >> 32,
|
||||
record.lsn & 0xffffffff,
|
||||
record.main_data_offset, record.rec.len());
|
||||
}
|
||||
} else if xl_rmid == pg_constants::RM_XACT_ID {
|
||||
transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page);
|
||||
}
|
||||
}
|
||||
|
||||
apply_result = Ok::<Bytes, Error>(page.freeze());
|
||||
} else {
|
||||
apply_result = process.apply_wal_records(runtime, tag, base_img, records);
|
||||
}
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
let result;
|
||||
|
||||
Reference in New Issue
Block a user