diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index fd4e8dac67..79d8593315 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -3,6 +3,7 @@ use control_plane::compute::ComputeControlPlane; use control_plane::local_env; use control_plane::local_env::PointInTime; use control_plane::storage::TestStorageControlPlane; +use std::{thread, time}; // XXX: force all redo at the end // -- restart + seqscan won't read deleted stuff @@ -112,6 +113,9 @@ fn test_pageserver_two_timelines() { node1.start().unwrap(); node2.start().unwrap(); + //give walreceiver time to connect + thread::sleep(time::Duration::from_secs(3)); + // check node1 node1.safe_psql( "postgres", diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 04bd65a319..e5e78446fb 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -99,6 +99,7 @@ struct PageCacheShared { first_valid_lsn: u64, last_valid_lsn: u64, last_record_lsn: u64, + walreceiver_works: bool, } lazy_static! { @@ -184,6 +185,7 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache first_valid_lsn: 0, last_valid_lsn: 0, last_record_lsn: 0, + walreceiver_works: false, }), valid_lsn_condvar: Condvar::new(), @@ -342,6 +344,10 @@ pub struct WALRecord { pub will_init: bool, pub truncate: bool, pub rec: Bytes, + // Remember the offset of main_data in rec, + // so that we don't have to parse the record again. + // If record has no main_data, this offset equals rec.len(). + pub main_data_offset: u32, } impl WALRecord { @@ -349,6 +355,7 @@ impl WALRecord { buf.put_u64(self.lsn); buf.put_u8(self.will_init as u8); buf.put_u8(self.truncate as u8); + buf.put_u32(self.main_data_offset); buf.put_u32(self.rec.len() as u32); buf.put_slice(&self.rec[..]); } @@ -356,6 +363,7 @@ impl WALRecord { let lsn = buf.get_u64(); let will_init = buf.get_u8() != 0; let truncate = buf.get_u8() != 0; + let main_data_offset = buf.get_u32(); let mut dst = vec![0u8; buf.get_u32() as usize]; buf.copy_to_slice(&mut dst); WALRecord { @@ -363,6 +371,7 @@ impl WALRecord { will_init, truncate, rec: Bytes::from(dst), + main_data_offset } } } @@ -478,26 +487,41 @@ impl PageCache { let mut shared = self.shared.lock().unwrap(); let mut waited = false; - while lsn > shared.last_valid_lsn { - // TODO: Wait for the WAL receiver to catch up - waited = true; - trace!( - "not caught up yet: {}, requested {}", - shared.last_valid_lsn, - lsn - ); - let wait_result = self - .valid_lsn_condvar - .wait_timeout(shared, TIMEOUT) - .unwrap(); + // There is a a race at postgres instance start + // when we request a page before walsender established connection + // and was able to stream the page. Just don't wait and return what we have. + // TODO is there any corner case when this is incorrect? + if !shared.walreceiver_works { + trace!( + " walreceiver doesn't work yet last_valid_lsn {}, requested {}", + shared.last_valid_lsn, + lsn + ); + } - shared = wait_result.0; - if wait_result.1.timed_out() { - bail!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff + if shared.walreceiver_works { + + while lsn > shared.last_valid_lsn { + // TODO: Wait for the WAL receiver to catch up + waited = true; + trace!( + "not caught up yet: {}, requested {}", + shared.last_valid_lsn, + lsn ); + let wait_result = self + .valid_lsn_condvar + .wait_timeout(shared, TIMEOUT) + .unwrap(); + + shared = wait_result.0; + if wait_result.1.timed_out() { + bail!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ); + } } } if waited { @@ -528,6 +552,7 @@ impl PageCache { // ask the WAL redo service to reconstruct the page image from the WAL records. let minkey = CacheKey { tag, lsn: 0 }; let maxkey = CacheKey { tag, lsn }; + let mut buf = BytesMut::new(); minkey.pack(&mut buf); @@ -718,12 +743,17 @@ impl PageCache { } // - pub fn advance_last_valid_lsn(&self, lsn: u64) { + pub fn advance_last_valid_lsn(&self, lsn: u64, from_walreceiver: bool) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { + // Now we receive entries from walreceiver and should wait + if from_walreceiver { + shared.walreceiver_works = true; + } + shared.last_valid_lsn = lsn; self.valid_lsn_condvar.notify_all(); diff --git a/pageserver/src/pg_constants.rs b/pageserver/src/pg_constants.rs index b59ddb5396..cd23ef6b1c 100644 --- a/pageserver/src/pg_constants.rs +++ b/pageserver/src/pg_constants.rs @@ -9,3 +9,48 @@ pub const PG_FILENODEMAP_FORKNUM: u32 = 43; pub const PG_XACT_FORKNUM: u32 = 44; pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45; pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46; + +// +// constants from clog.h +// +pub const CLOG_XACTS_PER_BYTE: u32 = 4; +pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE; +pub const CLOG_BITS_PER_XACT: u8 = 2; +pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; + +pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; +pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; +pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; + +pub const CLOG_ZEROPAGE: u8 = 0x00; +pub const CLOG_TRUNCATE: u8 = 0x10; + +// From xact.h +pub const XLOG_XACT_COMMIT: u8 = 0x00; +pub const XLOG_XACT_ABORT: u8 = 0x20; + +/* mask for filtering opcodes out of xl_info */ +pub const XLOG_XACT_OPMASK: u8 = 0x70; +/* does this record have a 'xinfo' field or not */ +pub const XLOG_XACT_HAS_INFO: u8 = 0x80; + +/* + * The following flags, stored in xinfo, determine which information is + * contained in commit/abort records. + */ +pub const XACT_XINFO_HAS_DBINFO: u32 = 1; +pub const XACT_XINFO_HAS_SUBXACTS: u32 = 2; +pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4; + +// From pg_control.h and rmgrlist.h +pub const XLOG_SWITCH: u8 = 0x40; +pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; +pub const RM_XLOG_ID: u8 = 0; +pub const RM_XACT_ID: u8 = 1; +pub const RM_SMGR_ID: u8 = 2; +pub const RM_CLOG_ID: u8 = 3; +// pub const RM_MULTIXACT_ID:u8 = 6; + +// from xlogreader.h +pub const XLR_INFO_MASK: u8 = 0x0F; +pub const XLR_RMGR_INFO_MASK: u8 = 0xF0; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 4308fd66a9..bf5e48a76e 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -30,7 +30,7 @@ use crate::page_cache; use crate::page_cache::RelTag; use crate::page_cache::BufferTag; use crate::page_cache::PageCache; -use crate::waldecoder::WalStreamDecoder; +use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; @@ -295,8 +295,7 @@ fn restore_wal( break; } if let Some((lsn, recdata)) = rec.unwrap() { - let decoded = crate::waldecoder::decode_wal_record(recdata.clone()); - + let decoded = decode_wal_record(recdata.clone()); // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifies. (The actual WAL record is kept in // a Bytes, which uses a reference counter for the underlying buffer, @@ -317,14 +316,14 @@ fn restore_wal( will_init: blk.will_init || blk.apply_image, truncate: false, rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, }; pcache.put_wal_record(tag, rec); } - // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - pcache.advance_last_valid_lsn(lsn); + pcache.advance_last_valid_lsn(lsn, false); last_lsn = lsn; } else { break; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 40a2ac7a43..623b9b7189 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -1,3 +1,4 @@ +use crate::pg_constants; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use std::cmp::min; @@ -248,6 +249,7 @@ const BLCKSZ: u16 = 8192; // // Constants from xlogrecord.h // + const XLR_MAX_BLOCK_ID: u8 = 32; const XLR_BLOCK_ID_DATA_SHORT: u8 = 255; @@ -276,6 +278,7 @@ pub struct DecodedBkpBlock { pub rnode_spcnode: u32, pub rnode_dbnode: u32, pub rnode_relnode: u32, + // Note that we have a few special forknum values for non-rel files. pub forknum: u8, pub blkno: u32, @@ -294,10 +297,33 @@ pub struct DecodedBkpBlock { /* Buffer holding the rmgr-specific data associated with this block */ has_data: bool, - //char *data; data_len: u16, } +impl DecodedBkpBlock { + pub fn new() -> DecodedBkpBlock { + DecodedBkpBlock { + rnode_spcnode: 0, + rnode_dbnode: 0, + rnode_relnode: 0, + forknum: 0, + blkno: 0, + + flags: 0, + has_image: false, + apply_image: false, + will_init: false, + hole_offset: 0, + hole_length: 0, + bimg_len: 0, + bimg_info: 0, + + has_data: false, + data_len: 0, + } + } +} + #[allow(non_upper_case_globals)] const SizeOfXLogRecord: u32 = 24; @@ -307,17 +333,9 @@ pub struct DecodedWALRecord { pub record: Bytes, // raw XLogRecord pub blocks: Vec, + pub main_data_offset: usize, } -// From pg_control.h and rmgrlist.h -pub const XLOG_SWITCH: u8 = 0x40; -pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; -pub const XLR_RMGR_INFO_MASK: u8 = 0xF0; - -pub const RM_XLOG_ID: u8 = 0; -pub const RM_XACT_ID: u8 = 1; -pub const RM_SMGR_ID: u8 = 2; - // Is this record an XLOG_SWITCH record? They need some special processing, // so we need to check for that before the rest of the parsing. // @@ -334,7 +352,7 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool { buf.advance(2); // 2 bytes of padding let _xl_crc = buf.get_u32_le(); - return xl_info == XLOG_SWITCH && xl_rmid == RM_XLOG_ID; + return xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID; } pub type Oid = u32; @@ -344,7 +362,7 @@ pub const MAIN_FORKNUM: u8 = 0; pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct RelFileNode { pub spcnode: Oid, /* tablespace */ pub dbnode: Oid, /* database */ @@ -376,49 +394,75 @@ pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate { // // Routines to decode a WAL record and figure out which blocks are modified // +// See xlogrecord.h for details +// The overall layout of an XLOG record is: +// Fixed-size header (XLogRecord struct) +// XLogRecordBlockHeader struct +// If BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows +// If BKPIMAGE_HAS_HOLE and BKPIMAGE_IS_COMPRESSED, an +// XLogRecordBlockCompressHeader struct follows. +// If BKPBLOCK_SAME_REL is not set, a RelFileNode follows +// BlockNumber follows +// XLogRecordBlockHeader struct +// ... +// XLogRecordDataHeader[Short|Long] struct +// block data +// block data +// ... +// main data pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { + let mut rnode_spcnode: u32 = 0; + let mut rnode_dbnode: u32 = 0; + let mut rnode_relnode: u32 = 0; + let mut got_rnode = false; + let mut buf = record.clone(); + // 1. Parse XLogRecord struct + // FIXME: assume little-endian here let xl_tot_len = buf.get_u32_le(); - let _xl_xid = buf.get_u32_le(); - let _xl_prev = buf.get_u64_le(); + let xl_xid = buf.get_u32_le(); + let xl_prev = buf.get_u64_le(); let xl_info = buf.get_u8(); let xl_rmid = buf.get_u8(); buf.advance(2); // 2 bytes of padding let _xl_crc = buf.get_u32_le(); + trace!( + "decode_wal_record xl_rmid = {} xl_info = {}", + xl_rmid, + xl_info + ); + let remaining = xl_tot_len - SizeOfXLogRecord; if buf.remaining() != remaining as usize { //TODO error } - let mut rnode_spcnode: u32 = 0; - let mut rnode_dbnode: u32 = 0; - let mut rnode_relnode: u32 = 0; - let mut got_rnode = false; - - // Decode the headers - let mut max_block_id = 0; + let mut blocks_total_len: u32 = 0; + let mut main_data_len = 0; let mut datatotal: u32 = 0; let mut blocks: Vec = Vec::new(); + + // 2. Decode the headers. + // XLogRecordBlockHeaders if any, + // XLogRecordDataHeader[Short|Long] while buf.remaining() > datatotal as usize { let block_id = buf.get_u8(); match block_id { XLR_BLOCK_ID_DATA_SHORT => { /* XLogRecordDataHeaderShort */ - let main_data_len = buf.get_u8() as u32; - + main_data_len = buf.get_u8() as u32; datatotal += main_data_len; } XLR_BLOCK_ID_DATA_LONG => { - /* XLogRecordDataHeaderShort */ - let main_data_len = buf.get_u32(); - + /* XLogRecordDataHeaderLong */ + main_data_len = buf.get_u32_le(); datatotal += main_data_len; } @@ -434,25 +478,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { 0..=XLR_MAX_BLOCK_ID => { /* XLogRecordBlockHeader */ - let mut blk = DecodedBkpBlock { - rnode_spcnode: 0, - rnode_dbnode: 0, - rnode_relnode: 0, - forknum: 0, - blkno: 0, - - flags: 0, - has_image: false, - apply_image: false, - will_init: false, - hole_offset: 0, - hole_length: 0, - bimg_len: 0, - bimg_info: 0, - - has_data: false, - data_len: 0, - }; + let mut blk = DecodedBkpBlock::new(); let fork_flags: u8; if block_id <= max_block_id { @@ -472,28 +498,12 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0; blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0; blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0; - blk.data_len = buf.get_u16_le(); - /* cross-check that the HAS_DATA flag is set iff data_length > 0 */ - // TODO - /* - if (blk->has_data && blk->data_len == 0) - { - report_invalid_record(state, - "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - } - if (!blk->has_data && blk->data_len != 0) - { - report_invalid_record(state, - "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", - (unsigned int) blk->data_len, - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - } - */ + + /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */ + datatotal += blk.data_len as u32; + blocks_total_len += blk.data_len as u32; if blk.has_image { blk.bimg_len = buf.get_u16_le(); @@ -512,6 +522,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { blk.hole_length = BLCKSZ - blk.bimg_len; } datatotal += blk.bimg_len as u32; + blocks_total_len += blk.bimg_len as u32; /* * cross-check that hole_offset > 0, hole_length > 0 and @@ -587,28 +598,28 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { rnode_spcnode = buf.get_u32_le(); rnode_dbnode = buf.get_u32_le(); rnode_relnode = buf.get_u32_le(); - //rnode = &blk->rnode; got_rnode = true; - } else { - if !got_rnode { - // TODO - /* - report_invalid_record(state, - "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", - (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); - goto err; - */ - } - - //blk->rnode = *rnode; + } else if !got_rnode { + // TODO + /* + report_invalid_record(state, + "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; */ } + blk.rnode_spcnode = rnode_spcnode; blk.rnode_dbnode = rnode_dbnode; blk.rnode_relnode = rnode_relnode; blk.blkno = buf.get_u32_le(); - - //println!("this record affects {}/{}/{} blk {}",rnode_spcnode, rnode_dbnode, rnode_relnode, blk.blkno); + trace!( + "this record affects {}/{}/{} blk {}", + rnode_spcnode, + rnode_dbnode, + rnode_relnode, + blk.blkno + ); blocks.push(blk); } @@ -619,22 +630,60 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } } - /* - * Ok, we've parsed the fragment headers, and verified that the total - * length of the payload in the fragments is equal to the amount of data - * left. Copy the data of each fragment to a separate buffer. - * - * We could just set up pointers into readRecordBuf, but we want to align - * the data for the convenience of the callers. Backup images are not - * copied, however; they don't need alignment. - */ + // 3. Decode blocks. + // We don't need them, so just skip blocks_total_len bytes + buf.advance(blocks_total_len as usize); - // Since we don't care about the data payloads here, we're done. + let main_data_offset = (xl_tot_len - main_data_len) as usize; - return DecodedWALRecord { - xl_info, + // 4. Decode main_data + if main_data_len > 0 { + assert_eq!(buf.remaining(), main_data_len as usize); + } + + //5. Handle special CLOG and XACT records + if xl_rmid == pg_constants::RM_CLOG_ID { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.blkno = buf.get_i32_le() as u32; + trace!("RM_CLOG_ID updates block {}", blk.blkno); + blocks.push(blk); + } else if xl_rmid == pg_constants::RM_XACT_ID { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + if info == pg_constants::XLOG_XACT_COMMIT { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + trace!( + "XLOG_XACT_COMMIT xl_prev {:X}/{:X} xid {} updates block {}", + (xl_prev >> 32), + xl_prev & 0xffffffff, + xl_xid, + blk.blkno + ); + blocks.push(blk); + //TODO parse commit record to extract subtrans entries + } else if info == pg_constants::XLOG_XACT_ABORT { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + trace!( + "XLOG_XACT_ABORT xl_prev {:X}/{:X} xid {} updates block {}", + (xl_prev >> 32), + xl_prev & 0xffffffff, + xl_xid, + blk.blkno + ); + blocks.push(blk); + //TODO parse abort record to extract subtrans entries + } + } + + DecodedWALRecord { + xl_info, xl_rmid, record, blocks, - }; + main_data_offset, + } } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index e483b27005..8e8b61989e 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -8,6 +8,7 @@ use crate::page_cache; use crate::page_cache::{BufferTag, RelTag}; +use crate::pg_constants; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; @@ -145,6 +146,7 @@ async fn walreceiver_main( // Start streaming the WAL, from where we left off previously. // let mut startpoint = pcache.get_last_valid_lsn(); + let last_valid_lsn = pcache.get_last_valid_lsn(); if startpoint == 0 { // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn @@ -167,7 +169,9 @@ async fn walreceiver_main( } } debug!( - "starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...", + "last_valid_lsn {:X}/{:X} starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...", + (last_valid_lsn >> 32), + (last_valid_lsn & 0xffffffff), (startpoint >> 32), (startpoint & 0xffffffff), timelineid, @@ -213,7 +217,6 @@ async fn walreceiver_main( loop { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { let decoded = decode_wal_record(recdata.clone()); - // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifies. (The actual WAL record is kept in // a Bytes, which uses a reference counter for the underlying buffer, @@ -234,13 +237,14 @@ async fn walreceiver_main( will_init: blk.will_init || blk.apply_image, truncate: false, rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, }; pcache.put_wal_record(tag, rec); } // include truncate wal record in all pages - if decoded.xl_rmid == RM_SMGR_ID - && (decoded.xl_info & XLR_RMGR_INFO_MASK) == XLOG_SMGR_TRUNCATE + if decoded.xl_rmid == pg_constants::RM_SMGR_ID + && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE { let truncate = decode_truncate_record(&decoded); if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 { @@ -258,6 +262,7 @@ async fn walreceiver_main( will_init: false, truncate: true, rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, }; pcache.put_rel_wal_record(tag, rec); } @@ -276,7 +281,7 @@ async fn walreceiver_main( // better reflect that, because GetPage@LSN requests might also point in the // middle of a record, if the request LSN was taken from the server's current // flush ptr. - pcache.advance_last_valid_lsn(endlsn); + pcache.advance_last_valid_lsn(endlsn, true); if !caught_up && endlsn >= end_of_wal { info!( diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f20b7935c2..106c17ba22 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -32,13 +32,13 @@ use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use tokio::runtime::Runtime; use tokio::time::timeout; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use crate::page_cache; use crate::page_cache::CacheEntry; use crate::page_cache::WALRecord; use crate::ZTimelineId; -use crate::{page_cache::BufferTag, PageServerConf}; +use crate::{page_cache::BufferTag, pg_constants, PageServerConf}; static TIMEOUT: Duration = Duration::from_secs(20); @@ -93,6 +93,59 @@ pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) { } } +fn transaction_id_set_status_bit( + xl_info: u8, + xl_rmid: u8, + xl_xid: u32, + record: WALRecord, + page: &mut BytesMut, +) { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + let mut status = 0; + if info == pg_constants::XLOG_XACT_COMMIT { + status = pg_constants::TRANSACTION_STATUS_COMMITTED; + } else if info == pg_constants::XLOG_XACT_ABORT { + status = pg_constants::TRANSACTION_STATUS_ABORTED; + } else { + trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", + status, + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + return; + } + + trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", + status, + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + + let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) + / pg_constants::CLOG_XACTS_PER_BYTE) as usize; + + let byteptr = &mut page[byteno..byteno + 1]; + let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE) + * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; + + let mut curval = byteptr[0]; + curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK; + + let mut byteval = [0]; + byteval[0] = curval; + byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift); + byteval[0] |= status << bshift; + + byteptr.copy_from_slice(&byteval); + trace!( + "xl_xid {} byteno {} curval {} byteval {}", + xl_xid, + byteno, + curval, + byteval[0] + ); +} + fn handle_apply_request( pcache: &page_cache::PageCache, process: &WalRedoProcess, @@ -110,7 +163,46 @@ fn handle_apply_request( let nrecords = records.len(); let start = Instant::now(); - let apply_result = process.apply_wal_records(runtime, tag, base_img, records); + + let apply_result: Result; + if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 { + //TODO use base image if any + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let zero_page_bytes: &[u8] = &ZERO_PAGE; + let mut page = BytesMut::from(zero_page_bytes); + + for record in records { + let mut buf = record.rec.clone(); + + // 1. Parse XLogRecord struct + // FIXME: refactor to avoid code duplication. + let _xl_tot_len = buf.get_u32_le(); + let xl_xid = buf.get_u32_le(); + let _xl_prev = buf.get_u64_le(); + let xl_info = buf.get_u8(); + let xl_rmid = buf.get_u8(); + buf.advance(2); // 2 bytes of padding + let _xl_crc = buf.get_u32_le(); + + if xl_rmid == pg_constants::RM_CLOG_ID { + let info = xl_info & !pg_constants::XLR_INFO_MASK; + if info == pg_constants::CLOG_ZEROPAGE { + page.clone_from_slice(zero_page_bytes); + trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + } + } else if xl_rmid == pg_constants::RM_XACT_ID { + transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page); + } + } + + apply_result = Ok::(page.freeze()); + } else { + apply_result = process.apply_wal_records(runtime, tag, base_img, records); + } + let duration = start.elapsed(); let result;