Merge with main branch

This commit is contained in:
Konstantin Knizhnik
2021-04-21 20:19:34 +03:00
7 changed files with 349 additions and 125 deletions

View File

@@ -3,6 +3,7 @@ use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::local_env::PointInTime;
use control_plane::storage::TestStorageControlPlane;
use std::{thread, time};
// XXX: force all redo at the end
// -- restart + seqscan won't read deleted stuff
@@ -112,6 +113,9 @@ fn test_pageserver_two_timelines() {
node1.start().unwrap();
node2.start().unwrap();
//give walreceiver time to connect
thread::sleep(time::Duration::from_secs(3));
// check node1
node1.safe_psql(
"postgres",

View File

@@ -99,6 +99,7 @@ struct PageCacheShared {
first_valid_lsn: u64,
last_valid_lsn: u64,
last_record_lsn: u64,
walreceiver_works: bool,
}
lazy_static! {
@@ -184,6 +185,7 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
walreceiver_works: false,
}),
valid_lsn_condvar: Condvar::new(),
@@ -342,6 +344,10 @@ pub struct WALRecord {
pub will_init: bool,
pub truncate: bool,
pub rec: Bytes,
// Remember the offset of main_data in rec,
// so that we don't have to parse the record again.
// If record has no main_data, this offset equals rec.len().
pub main_data_offset: u32,
}
impl WALRecord {
@@ -349,6 +355,7 @@ impl WALRecord {
buf.put_u64(self.lsn);
buf.put_u8(self.will_init as u8);
buf.put_u8(self.truncate as u8);
buf.put_u32(self.main_data_offset);
buf.put_u32(self.rec.len() as u32);
buf.put_slice(&self.rec[..]);
}
@@ -356,6 +363,7 @@ impl WALRecord {
let lsn = buf.get_u64();
let will_init = buf.get_u8() != 0;
let truncate = buf.get_u8() != 0;
let main_data_offset = buf.get_u32();
let mut dst = vec![0u8; buf.get_u32() as usize];
buf.copy_to_slice(&mut dst);
WALRecord {
@@ -363,6 +371,7 @@ impl WALRecord {
will_init,
truncate,
rec: Bytes::from(dst),
main_data_offset
}
}
}
@@ -478,26 +487,41 @@ impl PageCache {
let mut shared = self.shared.lock().unwrap();
let mut waited = false;
while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
waited = true;
trace!(
"not caught up yet: {}, requested {}",
shared.last_valid_lsn,
lsn
);
let wait_result = self
.valid_lsn_condvar
.wait_timeout(shared, TIMEOUT)
.unwrap();
// There is a a race at postgres instance start
// when we request a page before walsender established connection
// and was able to stream the page. Just don't wait and return what we have.
// TODO is there any corner case when this is incorrect?
if !shared.walreceiver_works {
trace!(
" walreceiver doesn't work yet last_valid_lsn {}, requested {}",
shared.last_valid_lsn,
lsn
);
}
shared = wait_result.0;
if wait_result.1.timed_out() {
bail!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
if shared.walreceiver_works {
while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
waited = true;
trace!(
"not caught up yet: {}, requested {}",
shared.last_valid_lsn,
lsn
);
let wait_result = self
.valid_lsn_condvar
.wait_timeout(shared, TIMEOUT)
.unwrap();
shared = wait_result.0;
if wait_result.1.timed_out() {
bail!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
);
}
}
}
if waited {
@@ -528,6 +552,7 @@ impl PageCache {
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag, lsn: 0 };
let maxkey = CacheKey { tag, lsn };
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
@@ -718,12 +743,17 @@ impl PageCache {
}
//
pub fn advance_last_valid_lsn(&self, lsn: u64) {
pub fn advance_last_valid_lsn(&self, lsn: u64, from_walreceiver: bool) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
// Now we receive entries from walreceiver and should wait
if from_walreceiver {
shared.walreceiver_works = true;
}
shared.last_valid_lsn = lsn;
self.valid_lsn_condvar.notify_all();

View File

@@ -9,3 +9,48 @@ pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
pub const PG_XACT_FORKNUM: u32 = 44;
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;
//
// constants from clog.h
//
pub const CLOG_XACTS_PER_BYTE: u32 = 4;
pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
pub const CLOG_BITS_PER_XACT: u8 = 2;
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
pub const CLOG_ZEROPAGE: u8 = 0x00;
pub const CLOG_TRUNCATE: u8 = 0x10;
// From xact.h
pub const XLOG_XACT_COMMIT: u8 = 0x00;
pub const XLOG_XACT_ABORT: u8 = 0x20;
/* mask for filtering opcodes out of xl_info */
pub const XLOG_XACT_OPMASK: u8 = 0x70;
/* does this record have a 'xinfo' field or not */
pub const XLOG_XACT_HAS_INFO: u8 = 0x80;
/*
* The following flags, stored in xinfo, determine which information is
* contained in commit/abort records.
*/
pub const XACT_XINFO_HAS_DBINFO: u32 = 1;
pub const XACT_XINFO_HAS_SUBXACTS: u32 = 2;
pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4;
// From pg_control.h and rmgrlist.h
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
pub const RM_XLOG_ID: u8 = 0;
pub const RM_XACT_ID: u8 = 1;
pub const RM_SMGR_ID: u8 = 2;
pub const RM_CLOG_ID: u8 = 3;
// pub const RM_MULTIXACT_ID:u8 = 6;
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;

View File

@@ -30,7 +30,7 @@ use crate::page_cache;
use crate::page_cache::RelTag;
use crate::page_cache::BufferTag;
use crate::page_cache::PageCache;
use crate::waldecoder::WalStreamDecoder;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -295,8 +295,7 @@ fn restore_wal(
break;
}
if let Some((lsn, recdata)) = rec.unwrap() {
let decoded = crate::waldecoder::decode_wal_record(recdata.clone());
let decoded = decode_wal_record(recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies. (The actual WAL record is kept in
// a Bytes, which uses a reference counter for the underlying buffer,
@@ -317,14 +316,14 @@ fn restore_wal(
will_init: blk.will_init || blk.apply_image,
truncate: false,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
pcache.put_wal_record(tag, rec);
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
pcache.advance_last_valid_lsn(lsn);
pcache.advance_last_valid_lsn(lsn, false);
last_lsn = lsn;
} else {
break;

View File

@@ -1,3 +1,4 @@
use crate::pg_constants;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use std::cmp::min;
@@ -248,6 +249,7 @@ const BLCKSZ: u16 = 8192;
//
// Constants from xlogrecord.h
//
const XLR_MAX_BLOCK_ID: u8 = 32;
const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
@@ -276,6 +278,7 @@ pub struct DecodedBkpBlock {
pub rnode_spcnode: u32,
pub rnode_dbnode: u32,
pub rnode_relnode: u32,
// Note that we have a few special forknum values for non-rel files.
pub forknum: u8,
pub blkno: u32,
@@ -294,10 +297,33 @@ pub struct DecodedBkpBlock {
/* Buffer holding the rmgr-specific data associated with this block */
has_data: bool,
//char *data;
data_len: u16,
}
impl DecodedBkpBlock {
pub fn new() -> DecodedBkpBlock {
DecodedBkpBlock {
rnode_spcnode: 0,
rnode_dbnode: 0,
rnode_relnode: 0,
forknum: 0,
blkno: 0,
flags: 0,
has_image: false,
apply_image: false,
will_init: false,
hole_offset: 0,
hole_length: 0,
bimg_len: 0,
bimg_info: 0,
has_data: false,
data_len: 0,
}
}
}
#[allow(non_upper_case_globals)]
const SizeOfXLogRecord: u32 = 24;
@@ -307,17 +333,9 @@ pub struct DecodedWALRecord {
pub record: Bytes, // raw XLogRecord
pub blocks: Vec<DecodedBkpBlock>,
pub main_data_offset: usize,
}
// From pg_control.h and rmgrlist.h
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
pub const RM_XLOG_ID: u8 = 0;
pub const RM_XACT_ID: u8 = 1;
pub const RM_SMGR_ID: u8 = 2;
// Is this record an XLOG_SWITCH record? They need some special processing,
// so we need to check for that before the rest of the parsing.
//
@@ -334,7 +352,7 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
return xl_info == XLOG_SWITCH && xl_rmid == RM_XLOG_ID;
return xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID;
}
pub type Oid = u32;
@@ -344,7 +362,7 @@ pub const MAIN_FORKNUM: u8 = 0;
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct RelFileNode {
pub spcnode: Oid, /* tablespace */
pub dbnode: Oid, /* database */
@@ -376,49 +394,75 @@ pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate {
//
// Routines to decode a WAL record and figure out which blocks are modified
//
// See xlogrecord.h for details
// The overall layout of an XLOG record is:
// Fixed-size header (XLogRecord struct)
// XLogRecordBlockHeader struct
// If BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows
// If BKPIMAGE_HAS_HOLE and BKPIMAGE_IS_COMPRESSED, an
// XLogRecordBlockCompressHeader struct follows.
// If BKPBLOCK_SAME_REL is not set, a RelFileNode follows
// BlockNumber follows
// XLogRecordBlockHeader struct
// ...
// XLogRecordDataHeader[Short|Long] struct
// block data
// block data
// ...
// main data
pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
let mut got_rnode = false;
let mut buf = record.clone();
// 1. Parse XLogRecord struct
// FIXME: assume little-endian here
let xl_tot_len = buf.get_u32_le();
let _xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let xl_xid = buf.get_u32_le();
let xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
trace!(
"decode_wal_record xl_rmid = {} xl_info = {}",
xl_rmid,
xl_info
);
let remaining = xl_tot_len - SizeOfXLogRecord;
if buf.remaining() != remaining as usize {
//TODO error
}
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
let mut got_rnode = false;
// Decode the headers
let mut max_block_id = 0;
let mut blocks_total_len: u32 = 0;
let mut main_data_len = 0;
let mut datatotal: u32 = 0;
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
// 2. Decode the headers.
// XLogRecordBlockHeaders if any,
// XLogRecordDataHeader[Short|Long]
while buf.remaining() > datatotal as usize {
let block_id = buf.get_u8();
match block_id {
XLR_BLOCK_ID_DATA_SHORT => {
/* XLogRecordDataHeaderShort */
let main_data_len = buf.get_u8() as u32;
main_data_len = buf.get_u8() as u32;
datatotal += main_data_len;
}
XLR_BLOCK_ID_DATA_LONG => {
/* XLogRecordDataHeaderShort */
let main_data_len = buf.get_u32();
/* XLogRecordDataHeaderLong */
main_data_len = buf.get_u32_le();
datatotal += main_data_len;
}
@@ -434,25 +478,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
0..=XLR_MAX_BLOCK_ID => {
/* XLogRecordBlockHeader */
let mut blk = DecodedBkpBlock {
rnode_spcnode: 0,
rnode_dbnode: 0,
rnode_relnode: 0,
forknum: 0,
blkno: 0,
flags: 0,
has_image: false,
apply_image: false,
will_init: false,
hole_offset: 0,
hole_length: 0,
bimg_len: 0,
bimg_info: 0,
has_data: false,
data_len: 0,
};
let mut blk = DecodedBkpBlock::new();
let fork_flags: u8;
if block_id <= max_block_id {
@@ -472,28 +498,12 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0;
blk.data_len = buf.get_u16_le();
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
// TODO
/*
if (blk->has_data && blk->data_len == 0)
{
report_invalid_record(state,
"BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
if (!blk->has_data && blk->data_len != 0)
{
report_invalid_record(state,
"BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
(unsigned int) blk->data_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
*/
/* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */
datatotal += blk.data_len as u32;
blocks_total_len += blk.data_len as u32;
if blk.has_image {
blk.bimg_len = buf.get_u16_le();
@@ -512,6 +522,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
blk.hole_length = BLCKSZ - blk.bimg_len;
}
datatotal += blk.bimg_len as u32;
blocks_total_len += blk.bimg_len as u32;
/*
* cross-check that hole_offset > 0, hole_length > 0 and
@@ -587,28 +598,28 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
rnode_spcnode = buf.get_u32_le();
rnode_dbnode = buf.get_u32_le();
rnode_relnode = buf.get_u32_le();
//rnode = &blk->rnode;
got_rnode = true;
} else {
if !got_rnode {
// TODO
/*
report_invalid_record(state,
"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
//blk->rnode = *rnode;
} else if !got_rnode {
// TODO
/*
report_invalid_record(state,
"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err; */
}
blk.rnode_spcnode = rnode_spcnode;
blk.rnode_dbnode = rnode_dbnode;
blk.rnode_relnode = rnode_relnode;
blk.blkno = buf.get_u32_le();
//println!("this record affects {}/{}/{} blk {}",rnode_spcnode, rnode_dbnode, rnode_relnode, blk.blkno);
trace!(
"this record affects {}/{}/{} blk {}",
rnode_spcnode,
rnode_dbnode,
rnode_relnode,
blk.blkno
);
blocks.push(blk);
}
@@ -619,22 +630,60 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
}
/*
* Ok, we've parsed the fragment headers, and verified that the total
* length of the payload in the fragments is equal to the amount of data
* left. Copy the data of each fragment to a separate buffer.
*
* We could just set up pointers into readRecordBuf, but we want to align
* the data for the convenience of the callers. Backup images are not
* copied, however; they don't need alignment.
*/
// 3. Decode blocks.
// We don't need them, so just skip blocks_total_len bytes
buf.advance(blocks_total_len as usize);
// Since we don't care about the data payloads here, we're done.
let main_data_offset = (xl_tot_len - main_data_len) as usize;
return DecodedWALRecord {
xl_info,
// 4. Decode main_data
if main_data_len > 0 {
assert_eq!(buf.remaining(), main_data_len as usize);
}
//5. Handle special CLOG and XACT records
if xl_rmid == pg_constants::RM_CLOG_ID {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.blkno = buf.get_i32_le() as u32;
trace!("RM_CLOG_ID updates block {}", blk.blkno);
blocks.push(blk);
} else if xl_rmid == pg_constants::RM_XACT_ID {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_COMMIT xl_prev {:X}/{:X} xid {} updates block {}",
(xl_prev >> 32),
xl_prev & 0xffffffff,
xl_xid,
blk.blkno
);
blocks.push(blk);
//TODO parse commit record to extract subtrans entries
} else if info == pg_constants::XLOG_XACT_ABORT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_ABORT xl_prev {:X}/{:X} xid {} updates block {}",
(xl_prev >> 32),
xl_prev & 0xffffffff,
xl_xid,
blk.blkno
);
blocks.push(blk);
//TODO parse abort record to extract subtrans entries
}
}
DecodedWALRecord {
xl_info,
xl_rmid,
record,
blocks,
};
main_data_offset,
}
}

View File

@@ -8,6 +8,7 @@
use crate::page_cache;
use crate::page_cache::{BufferTag, RelTag};
use crate::pg_constants;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -145,6 +146,7 @@ async fn walreceiver_main(
// Start streaming the WAL, from where we left off previously.
//
let mut startpoint = pcache.get_last_valid_lsn();
let last_valid_lsn = pcache.get_last_valid_lsn();
if startpoint == 0 {
// If we start here with identify.xlogpos we will have race condition with
// postgres start: insert into postgres may request page that was modified with lsn
@@ -167,7 +169,9 @@ async fn walreceiver_main(
}
}
debug!(
"starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...",
"last_valid_lsn {:X}/{:X} starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...",
(last_valid_lsn >> 32),
(last_valid_lsn & 0xffffffff),
(startpoint >> 32),
(startpoint & 0xffffffff),
timelineid,
@@ -213,7 +217,6 @@ async fn walreceiver_main(
loop {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let decoded = decode_wal_record(recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies. (The actual WAL record is kept in
// a Bytes, which uses a reference counter for the underlying buffer,
@@ -234,13 +237,14 @@ async fn walreceiver_main(
will_init: blk.will_init || blk.apply_image,
truncate: false,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
pcache.put_wal_record(tag, rec);
}
// include truncate wal record in all pages
if decoded.xl_rmid == RM_SMGR_ID
&& (decoded.xl_info & XLR_RMGR_INFO_MASK) == XLOG_SMGR_TRUNCATE
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = decode_truncate_record(&decoded);
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
@@ -258,6 +262,7 @@ async fn walreceiver_main(
will_init: false,
truncate: true,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
pcache.put_rel_wal_record(tag, rec);
}
@@ -276,7 +281,7 @@ async fn walreceiver_main(
// better reflect that, because GetPage@LSN requests might also point in the
// middle of a record, if the request LSN was taken from the server's current
// flush ptr.
pcache.advance_last_valid_lsn(endlsn);
pcache.advance_last_valid_lsn(endlsn, true);
if !caught_up && endlsn >= end_of_wal {
info!(

View File

@@ -32,13 +32,13 @@ use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::runtime::Runtime;
use tokio::time::timeout;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::page_cache;
use crate::page_cache::CacheEntry;
use crate::page_cache::WALRecord;
use crate::ZTimelineId;
use crate::{page_cache::BufferTag, PageServerConf};
use crate::{page_cache::BufferTag, pg_constants, PageServerConf};
static TIMEOUT: Duration = Duration::from_secs(20);
@@ -93,6 +93,59 @@ pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) {
}
}
fn transaction_id_set_status_bit(
xl_info: u8,
xl_rmid: u8,
xl_xid: u32,
record: WALRecord,
page: &mut BytesMut,
) {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
} else {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
return;
}
trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
let byteptr = &mut page[byteno..byteno + 1];
let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE)
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
let mut curval = byteptr[0];
curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK;
let mut byteval = [0];
byteval[0] = curval;
byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift);
byteval[0] |= status << bshift;
byteptr.copy_from_slice(&byteval);
trace!(
"xl_xid {} byteno {} curval {} byteval {}",
xl_xid,
byteno,
curval,
byteval[0]
);
}
fn handle_apply_request(
pcache: &page_cache::PageCache,
process: &WalRedoProcess,
@@ -110,7 +163,46 @@ fn handle_apply_request(
let nrecords = records.len();
let start = Instant::now();
let apply_result = process.apply_wal_records(runtime, tag, base_img, records);
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
//TODO use base image if any
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let zero_page_bytes: &[u8] = &ZERO_PAGE;
let mut page = BytesMut::from(zero_page_bytes);
for record in records {
let mut buf = record.rec.clone();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let _xl_tot_len = buf.get_u32_le();
let xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
if xl_rmid == pg_constants::RM_CLOG_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.clone_from_slice(zero_page_bytes);
trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}",
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
}
} else if xl_rmid == pg_constants::RM_XACT_ID {
transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page);
}
}
apply_result = Ok::<Bytes, Error>(page.freeze());
} else {
apply_result = process.apply_wal_records(runtime, tag, base_img, records);
}
let duration = start.elapsed();
let result;