diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 0d0d589c56..57ad0c186f 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -3,6 +3,7 @@ use control_plane::compute::ComputeControlPlane; use control_plane::local_env; use control_plane::local_env::PointInTime; use control_plane::storage::TestStorageControlPlane; +use std::{thread, time}; // XXX: force all redo at the end // -- restart + seqscan won't read deleted stuff @@ -96,6 +97,9 @@ fn test_pageserver_two_timelines() { node1.start().unwrap(); node2.start().unwrap(); + //give walreceiver time to connect + thread::sleep(time::Duration::from_secs(3)); + // check node1 node1.safe_psql( "postgres", diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index c25e5cadb5..7064467067 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -106,6 +106,7 @@ struct PageCacheShared { first_valid_lsn: u64, last_valid_lsn: u64, last_record_lsn: u64, + walreceiver_works: bool, } lazy_static! { @@ -169,6 +170,7 @@ fn init_page_cache() -> PageCache { first_valid_lsn: 0, last_valid_lsn: 0, last_record_lsn: 0, + walreceiver_works: false, }), valid_lsn_condvar: Condvar::new(), @@ -246,7 +248,7 @@ pub struct RelTag { pub forknum: u8, } -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] pub struct BufferTag { pub spcnode: u32, pub dbnode: u32, @@ -260,6 +262,10 @@ pub struct WALRecord { pub lsn: u64, // LSN at the *end* of the record pub will_init: bool, pub rec: Bytes, + // Remember the offset of main_data in rec, + // so that we don't have to parse the record again. + // If record has no main_data, this offset equals rec.len(). + pub main_data_offset: usize, } // Public interface functions @@ -283,26 +289,40 @@ impl PageCache { let mut shared = self.shared.lock().unwrap(); let mut waited = false; - while lsn > shared.last_valid_lsn { - // TODO: Wait for the WAL receiver to catch up - waited = true; + // There is a a race at postgres instance start + // when we request a page before walsender established connection + // and was able to stream the page. Just don't wait and return what we have. + // TODO is there any corner case when this is incorrect? + if !shared.walreceiver_works { trace!( - "not caught up yet: {}, requested {}", + " walreceiver doesn't work yet last_valid_lsn {}, requested {}", shared.last_valid_lsn, lsn ); - let wait_result = self - .valid_lsn_condvar - .wait_timeout(shared, TIMEOUT) - .unwrap(); + } - shared = wait_result.0; - if wait_result.1.timed_out() { - bail!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff + if shared.walreceiver_works { + while lsn > shared.last_valid_lsn { + // TODO: Wait for the WAL receiver to catch up + waited = true; + trace!( + "not caught up yet: {}, requested {}", + shared.last_valid_lsn, + lsn ); + let wait_result = self + .valid_lsn_condvar + .wait_timeout(shared, TIMEOUT) + .unwrap(); + + shared = wait_result.0; + if wait_result.1.timed_out() { + bail!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ); + } } } if waited { @@ -512,12 +532,17 @@ impl PageCache { } // - pub fn advance_last_valid_lsn(&self, lsn: u64) { + pub fn advance_last_valid_lsn(&self, lsn: u64, from_walreceiver: bool) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { + // Now we receive entries from walreceiver and should wait + if from_walreceiver { + shared.walreceiver_works = true; + } + shared.last_valid_lsn = lsn; self.valid_lsn_condvar.notify_all(); diff --git a/pageserver/src/pg_constants.rs b/pageserver/src/pg_constants.rs index b59ddb5396..010a24a49a 100644 --- a/pageserver/src/pg_constants.rs +++ b/pageserver/src/pg_constants.rs @@ -9,3 +9,45 @@ pub const PG_FILENODEMAP_FORKNUM: u32 = 43; pub const PG_XACT_FORKNUM: u32 = 44; pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45; pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46; + +// +// constants from clog.h +// +pub const CLOG_XACTS_PER_BYTE: u32 = 4; +pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE; +pub const CLOG_BITS_PER_XACT: u8 = 2; +pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; + +pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; +pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; +pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; + +pub const CLOG_ZEROPAGE: u8 = 0x00; +pub const CLOG_TRUNCATE: u8 = 0x10; + +// From xact.h +pub const XLOG_XACT_COMMIT: u8 = 0x00; +pub const XLOG_XACT_ABORT: u8 = 0x20; + +/* mask for filtering opcodes out of xl_info */ +pub const XLOG_XACT_OPMASK: u8 = 0x70; +/* does this record have a 'xinfo' field or not */ +pub const XLOG_XACT_HAS_INFO: u8 = 0x80; + +/* + * The following flags, stored in xinfo, determine which information is + * contained in commit/abort records. + */ +pub const XACT_XINFO_HAS_DBINFO: u32 = 1; +pub const XACT_XINFO_HAS_SUBXACTS: u32 = 2; +pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4; + +// From pg_control.h and rmgrlist.h +pub const XLOG_SWITCH: u8 = 0x40; +pub const RM_XLOG_ID: u8 = 0; +pub const RM_XACT_ID: u8 = 1; +pub const RM_CLOG_ID: u8 = 3; +// pub const RM_MULTIXACT_ID:u8 = 6; + +// from xlogreader.h +pub const XLR_INFO_MASK: u8 = 0x0F; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 262479a556..5c39d805f6 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -29,7 +29,7 @@ use bytes::Bytes; use crate::page_cache; use crate::page_cache::BufferTag; use crate::page_cache::PageCache; -use crate::waldecoder::WalStreamDecoder; +use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; @@ -300,8 +300,7 @@ fn restore_wal( break; } if let Some((lsn, recdata)) = rec.unwrap() { - let decoded = crate::waldecoder::decode_wal_record(recdata.clone()); - + let decoded = decode_wal_record(recdata.clone()); // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifies. (The actual WAL record is kept in // a Bytes, which uses a reference counter for the underlying buffer, @@ -319,14 +318,14 @@ fn restore_wal( lsn: lsn, will_init: blk.will_init || blk.apply_image, rec: recdata.clone(), + main_data_offset: decoded.main_data_offset, }; pcache.put_wal_record(tag, rec); } - // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - pcache.advance_last_valid_lsn(lsn); + pcache.advance_last_valid_lsn(lsn, false); last_lsn = lsn; } else { break; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index b1daeaceae..61b140eda4 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -1,3 +1,4 @@ +use crate::pg_constants; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use std::cmp::min; @@ -248,6 +249,7 @@ const BLCKSZ: u16 = 8192; // // Constants from xlogrecord.h // + const XLR_MAX_BLOCK_ID: u8 = 32; const XLR_BLOCK_ID_DATA_SHORT: u8 = 255; @@ -276,6 +278,7 @@ pub struct DecodedBkpBlock { pub rnode_spcnode: u32, pub rnode_dbnode: u32, pub rnode_relnode: u32, + // Note that we have a few special forknum values for non-rel files. pub forknum: u8, pub blkno: u32, @@ -294,10 +297,33 @@ pub struct DecodedBkpBlock { /* Buffer holding the rmgr-specific data associated with this block */ has_data: bool, - //char *data; data_len: u16, } +impl DecodedBkpBlock { + pub fn new() -> DecodedBkpBlock { + DecodedBkpBlock { + rnode_spcnode: 0, + rnode_dbnode: 0, + rnode_relnode: 0, + forknum: 0, + blkno: 0, + + flags: 0, + has_image: false, + apply_image: false, + will_init: false, + hole_offset: 0, + hole_length: 0, + bimg_len: 0, + bimg_info: 0, + + has_data: false, + data_len: 0, + } + } +} + #[allow(non_upper_case_globals)] const SizeOfXLogRecord: u32 = 24; @@ -305,12 +331,9 @@ pub struct DecodedWALRecord { pub record: Bytes, // raw XLogRecord pub blocks: Vec, + pub main_data_offset: usize, } -// From pg_control.h and rmgrlist.h -const XLOG_SWITCH: u8 = 0x40; -const RM_XLOG_ID: u8 = 0; - // Is this record an XLOG_SWITCH record? They need some special processing, // so we need to check for that before the rest of the parsing. // @@ -327,55 +350,88 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool { buf.advance(2); // 2 bytes of padding let _xl_crc = buf.get_u32_le(); - return xl_info == XLOG_SWITCH && xl_rmid == RM_XLOG_ID; + return xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID; +} + +#[derive(Clone, Copy)] +pub struct RelFileNode { + pub spcnode: u32, + pub dbnode: u32, + pub relnode: u32, } // // Routines to decode a WAL record and figure out which blocks are modified // +// See xlogrecord.h for details +// The overall layout of an XLOG record is: +// Fixed-size header (XLogRecord struct) +// XLogRecordBlockHeader struct +// If BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows +// If BKPIMAGE_HAS_HOLE and BKPIMAGE_IS_COMPRESSED, an +// XLogRecordBlockCompressHeader struct follows. +// If BKPBLOCK_SAME_REL is not set, a RelFileNode follows +// BlockNumber follows +// XLogRecordBlockHeader struct +// ... +// XLogRecordDataHeader[Short|Long] struct +// block data +// block data +// ... +// main data pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { + let mut rnode_spcnode: u32 = 0; + let mut rnode_dbnode: u32 = 0; + let mut rnode_relnode: u32 = 0; + let mut got_rnode = false; + let mut buf = rec.clone(); + // 1. Parse XLogRecord struct + // FIXME: assume little-endian here let xl_tot_len = buf.get_u32_le(); - let _xl_xid = buf.get_u32_le(); - let _xl_prev = buf.get_u64_le(); - let _xl_info = buf.get_u8(); - let _xl_rmid = buf.get_u8(); + let xl_xid = buf.get_u32_le(); + let xl_prev = buf.get_u64_le(); + let xl_info = buf.get_u8(); + let xl_rmid = buf.get_u8(); buf.advance(2); // 2 bytes of padding let _xl_crc = buf.get_u32_le(); + trace!( + "decode_wal_record xl_rmid = {} xl_info = {}", + xl_rmid, + xl_info + ); + let remaining = xl_tot_len - SizeOfXLogRecord; if buf.remaining() != remaining as usize { //TODO error } - let mut rnode_spcnode: u32 = 0; - let mut rnode_dbnode: u32 = 0; - let mut rnode_relnode: u32 = 0; - let mut got_rnode = false; - - // Decode the headers - let mut max_block_id = 0; + let mut blocks_total_len: u32 = 0; + let mut main_data_len = 0; let mut datatotal: u32 = 0; let mut blocks: Vec = Vec::new(); + + // 2. Decode the headers. + // XLogRecordBlockHeaders if any, + // XLogRecordDataHeader[Short|Long] while buf.remaining() > datatotal as usize { let block_id = buf.get_u8(); match block_id { XLR_BLOCK_ID_DATA_SHORT => { /* XLogRecordDataHeaderShort */ - let main_data_len = buf.get_u8() as u32; - + main_data_len = buf.get_u8() as u32; datatotal += main_data_len; } XLR_BLOCK_ID_DATA_LONG => { - /* XLogRecordDataHeaderShort */ - let main_data_len = buf.get_u32(); - + /* XLogRecordDataHeaderLong */ + main_data_len = buf.get_u32_le(); datatotal += main_data_len; } @@ -391,25 +447,7 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { 0..=XLR_MAX_BLOCK_ID => { /* XLogRecordBlockHeader */ - let mut blk = DecodedBkpBlock { - rnode_spcnode: 0, - rnode_dbnode: 0, - rnode_relnode: 0, - forknum: 0, - blkno: 0, - - flags: 0, - has_image: false, - apply_image: false, - will_init: false, - hole_offset: 0, - hole_length: 0, - bimg_len: 0, - bimg_info: 0, - - has_data: false, - data_len: 0, - }; + let mut blk = DecodedBkpBlock::new(); let fork_flags: u8; if block_id <= max_block_id { @@ -429,28 +467,12 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0; blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0; blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0; - blk.data_len = buf.get_u16_le(); - /* cross-check that the HAS_DATA flag is set iff data_length > 0 */ - // TODO - /* - if (blk->has_data && blk->data_len == 0) - { - report_invalid_record(state, - "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - } - if (!blk->has_data && blk->data_len != 0) - { - report_invalid_record(state, - "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", - (unsigned int) blk->data_len, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - } - */ + + /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */ + datatotal += blk.data_len as u32; + blocks_total_len += blk.data_len as u32; if blk.has_image { blk.bimg_len = buf.get_u16_le(); @@ -469,6 +491,7 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { blk.hole_length = BLCKSZ - blk.bimg_len; } datatotal += blk.bimg_len as u32; + blocks_total_len += blk.bimg_len as u32; /* * cross-check that hole_offset > 0, hole_length > 0 and @@ -544,28 +567,28 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { rnode_spcnode = buf.get_u32_le(); rnode_dbnode = buf.get_u32_le(); rnode_relnode = buf.get_u32_le(); - //rnode = &blk->rnode; got_rnode = true; - } else { - if !got_rnode { - // TODO - /* - report_invalid_record(state, - "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - - //blk->rnode = *rnode; + } else if !got_rnode { + // TODO + /* + report_invalid_record(state, + "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; */ } + blk.rnode_spcnode = rnode_spcnode; blk.rnode_dbnode = rnode_dbnode; blk.rnode_relnode = rnode_relnode; blk.blkno = buf.get_u32_le(); - - //println!("this record affects {}/{}/{} blk {}",rnode_spcnode, rnode_dbnode, rnode_relnode, blk.blkno); + trace!( + "this record affects {}/{}/{} blk {}", + rnode_spcnode, + rnode_dbnode, + rnode_relnode, + blk.blkno + ); blocks.push(blk); } @@ -576,20 +599,58 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { } } - /* - * Ok, we've parsed the fragment headers, and verified that the total - * length of the payload in the fragments is equal to the amount of data - * left. Copy the data of each fragment to a separate buffer. - * - * We could just set up pointers into readRecordBuf, but we want to align - * the data for the convenience of the callers. Backup images are not - * copied, however; they don't need alignment. - */ + // 3. Decode blocks. + // We don't need them, so just skip blocks_total_len bytes + buf.advance(blocks_total_len as usize); - // Since we don't care about the data payloads here, we're done. + let main_data_offset = (xl_tot_len - main_data_len) as usize; - return DecodedWALRecord { + // 4. Decode main_data + if main_data_len > 0 { + assert_eq!(buf.remaining(), main_data_len as usize); + } + + //5. Handle special CLOG and XACT records + if xl_rmid == pg_constants::RM_CLOG_ID { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.blkno = buf.get_i32_le() as u32; + trace!("RM_CLOG_ID updates block {}", blk.blkno); + blocks.push(blk); + } else if xl_rmid == pg_constants::RM_XACT_ID { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + if info == pg_constants::XLOG_XACT_COMMIT { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + trace!( + "XLOG_XACT_COMMIT xl_prev {:X}/{:X} xid {} updates block {}", + (xl_prev >> 32), + xl_prev & 0xffffffff, + xl_xid, + blk.blkno + ); + blocks.push(blk); + //TODO parse commit record to extract subtrans entries + } else if info == pg_constants::XLOG_XACT_ABORT { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + trace!( + "XLOG_XACT_ABORT xl_prev {:X}/{:X} xid {} updates block {}", + (xl_prev >> 32), + xl_prev & 0xffffffff, + xl_xid, + blk.blkno + ); + blocks.push(blk); + //TODO parse abort record to extract subtrans entries + } + } + + DecodedWALRecord { record: rec, blocks, - }; + main_data_offset: main_data_offset, + } } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 99c4142232..3e72b5e747 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -145,6 +145,7 @@ async fn walreceiver_main( // Start streaming the WAL, from where we left off previously. // let mut startpoint = pcache.get_last_valid_lsn(); + let last_valid_lsn = pcache.get_last_valid_lsn(); if startpoint == 0 { // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn @@ -167,7 +168,9 @@ async fn walreceiver_main( } } debug!( - "starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...", + "last_valid_lsn {:X}/{:X} starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...", + (last_valid_lsn >> 32), + (last_valid_lsn & 0xffffffff), (startpoint >> 32), (startpoint & 0xffffffff), timelineid, @@ -213,7 +216,6 @@ async fn walreceiver_main( loop { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { let decoded = decode_wal_record(recdata.clone()); - // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifies. (The actual WAL record is kept in // a Bytes, which uses a reference counter for the underlying buffer, @@ -231,11 +233,11 @@ async fn walreceiver_main( lsn, will_init: blk.will_init || blk.apply_image, rec: recdata.clone(), + main_data_offset: decoded.main_data_offset, }; pcache.put_wal_record(tag, rec); } - // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN pcache.advance_last_record_lsn(lsn); @@ -250,7 +252,7 @@ async fn walreceiver_main( // better reflect that, because GetPage@LSN requests might also point in the // middle of a record, if the request LSN was taken from the server's current // flush ptr. - pcache.advance_last_valid_lsn(endlsn); + pcache.advance_last_valid_lsn(endlsn, true); if !caught_up && endlsn >= end_of_wal { info!( diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 06ac25286b..d942029ee0 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -29,13 +29,13 @@ use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use tokio::runtime::Runtime; use tokio::time::timeout; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use crate::page_cache; use crate::page_cache::CacheEntry; use crate::page_cache::WALRecord; use crate::ZTimelineId; -use crate::{page_cache::BufferTag, PageServerConf}; +use crate::{page_cache::BufferTag, pg_constants, PageServerConf}; static TIMEOUT: Duration = Duration::from_secs(20); @@ -89,6 +89,59 @@ pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) { } } +fn transaction_id_set_status_bit( + xl_info: u8, + xl_rmid: u8, + xl_xid: u32, + record: WALRecord, + page: &mut BytesMut, +) { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + let mut status = 0; + if info == pg_constants::XLOG_XACT_COMMIT { + status = pg_constants::TRANSACTION_STATUS_COMMITTED; + } else if info == pg_constants::XLOG_XACT_ABORT { + status = pg_constants::TRANSACTION_STATUS_ABORTED; + } else { + trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", + status, + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + return; + } + + trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", + status, + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + + let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) + / pg_constants::CLOG_XACTS_PER_BYTE) as usize; + + let byteptr = &mut page[byteno..byteno + 1]; + let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE) + * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; + + let mut curval = byteptr[0]; + curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK; + + let mut byteval = [0]; + byteval[0] = curval; + byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift); + byteval[0] |= status << bshift; + + byteptr.copy_from_slice(&byteval); + trace!( + "xl_xid {} byteno {} curval {} byteval {}", + xl_xid, + byteno, + curval, + byteval[0] + ); +} + fn handle_apply_request( pcache: &page_cache::PageCache, process: &WalRedoProcess, @@ -105,7 +158,46 @@ fn handle_apply_request( let nrecords = records.len(); let start = Instant::now(); - let apply_result = process.apply_wal_records(runtime, tag, base_img, records); + + let apply_result: Result; + if tag.forknum == pg_constants::PG_XACT_FORKNUM as u8 { + //TODO use base image if any + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let zero_page_bytes: &[u8] = &ZERO_PAGE; + let mut page = BytesMut::from(zero_page_bytes); + + for record in records { + let mut buf = record.rec.clone(); + + // 1. Parse XLogRecord struct + // FIXME: refactor to avoid code duplication. + let _xl_tot_len = buf.get_u32_le(); + let xl_xid = buf.get_u32_le(); + let _xl_prev = buf.get_u64_le(); + let xl_info = buf.get_u8(); + let xl_rmid = buf.get_u8(); + buf.advance(2); // 2 bytes of padding + let _xl_crc = buf.get_u32_le(); + + if xl_rmid == pg_constants::RM_CLOG_ID { + let info = xl_info & !pg_constants::XLR_INFO_MASK; + if info == pg_constants::CLOG_ZEROPAGE { + page.clone_from_slice(zero_page_bytes); + trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + } + } else if xl_rmid == pg_constants::RM_XACT_ID { + transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page); + } + } + + apply_result = Ok::(page.freeze()); + } else { + apply_result = process.apply_wal_records(runtime, tag, base_img, records); + } + let duration = start.elapsed(); let result; diff --git a/vendor/postgres b/vendor/postgres index b898ad7e3b..2a9f68e665 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit b898ad7e3b9acce72b64bf064257e392f979a659 +Subproject commit 2a9f68e665507420475f2a2fa3d1563dfc5502f3