diff --git a/src/page_cache.rs b/src/page_cache.rs index f1f1d24521..883db6ddfb 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -7,6 +7,7 @@ // use core::ops::Bound::Included; +use std::convert::TryInto; use std::collections::{BTreeMap, HashMap}; use std::error::Error; use std::sync::Arc; @@ -185,7 +186,7 @@ pub struct BufferTag { #[derive(Clone)] pub struct WALRecord { - pub lsn: u64, + pub lsn: u64, // LSN at the *end* of the record pub will_init: bool, pub rec: Bytes } @@ -251,7 +252,7 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result } // Lock the cache entry and dig the page image out of it. - let page_img; + let page_img: Bytes; { let mut entry_content = entry_rc.content.lock().unwrap(); @@ -292,6 +293,12 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result } } + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + trace!("Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", page_lsn_hi, page_lsn_lo, + tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum); + return Ok(page_img); } diff --git a/src/waldecoder.rs b/src/waldecoder.rs index 6ae7e5a360..da57d4401c 100644 --- a/src/waldecoder.rs +++ b/src/waldecoder.rs @@ -84,8 +84,8 @@ impl WalStreamDecoder { } // Returns a tuple: - // (start LSN, end LSN + 1, record) - pub fn poll_decode(&mut self) -> Option<(u64, u64, Bytes)> { + // (end LSN, record) + pub fn poll_decode(&mut self) -> Option<(u64, Bytes)> { loop { // parse and verify page boundaries as we go @@ -95,11 +95,10 @@ impl WalStreamDecoder { if self.inputbuf.remaining() < SizeOfXLogLongPHD { return None; } - + self.decode_XLogLongPageHeaderData(); self.lsn += SizeOfXLogLongPHD as u64; - // TODO: verify the fields in the header continue; @@ -147,7 +146,7 @@ impl WalStreamDecoder { self.contlen = xl_tot_len - 4; continue; - } + } else { // we're continuing a record, possibly from previous page. @@ -171,9 +170,7 @@ impl WalStreamDecoder { self.padlen = 8 - (self.lsn % 8) as u32; } - // Note: the padding is included in the end-lsn that we report - let result = (self.startlsn, self.lsn + self.padlen as u64, recordbuf.freeze()); - + let result = (self.lsn, recordbuf.freeze()); return Some(result); } continue; @@ -192,9 +189,9 @@ impl WalStreamDecoder { fn decode_XLogPageHeaderData(&mut self) -> XLogPageHeaderData { let buf = &mut self.inputbuf; - + // FIXME: Assume little-endian - + let hdr : XLogPageHeaderData = XLogPageHeaderData { xlp_magic: buf.get_u16_le(), xlp_info: buf.get_u16_le(), @@ -285,7 +282,7 @@ pub struct DecodedBkpBlock { const SizeOfXLogRecord:u32 = 24; pub struct DecodedWALRecord { - pub lsn: u64, + pub lsn: u64, // LSN at the *end* of the record pub record: Bytes, // raw XLogRecord pub blocks: Vec @@ -296,7 +293,7 @@ pub struct DecodedWALRecord { // pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { - trace!("decoding record at {:08X}/{:08X} ({} bytes)", lsn >> 32, lsn & 0xffff_ffff, rec.remaining()); + trace!("decoding record with LSN {:08X}/{:08X} ({} bytes)", lsn >> 32, lsn & 0xffff_ffff, rec.remaining()); let mut buf = rec.clone(); @@ -319,7 +316,7 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; let mut got_rnode = false; - + // Decode the headers let mut max_block_id = 0; @@ -537,7 +534,7 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { blk.rnode_spcnode = rnode_spcnode; blk.rnode_dbnode = rnode_dbnode; blk.rnode_relnode = rnode_relnode; - + blk.blkno = buf.get_u32_le(); //println!("this record affects {}/{}/{} blk {}",rnode_spcnode, rnode_dbnode, rnode_relnode, blk.blkno); diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 5e2b5eda6b..dbe67dbd7a 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -95,7 +95,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { waldecoder.feed_bytes(data); loop { - if let Some((startlsn, endlsn, recdata)) = waldecoder.poll_decode() { + if let Some((lsn, recdata)) = waldecoder.poll_decode() { let decoded = crate::waldecoder::decode_wal_record(startlsn, recdata.clone()); @@ -113,7 +113,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { }; let rec = page_cache::WALRecord { - lsn: startlsn, + lsn: lsn, will_init: blk.will_init || blk.apply_image, rec: recdata.clone() }; @@ -123,7 +123,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - page_cache::advance_last_valid_lsn(endlsn); + page_cache::advance_last_valid_lsn(lsn); } else { break; diff --git a/src/walredo.rs b/src/walredo.rs index c19347a5e5..4f5a6fbfee 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -278,14 +278,14 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes return buf.freeze(); } -fn build_apply_record_msg(lsn: u64, rec: Bytes) -> Bytes { +fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes { let len = 4 + 8 + rec.len(); let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8('A' as u8); buf.put_u32(len as u32); - buf.put_u64(lsn); + buf.put_u64(endlsn); buf.put(rec); assert!(buf.len() == 1 + len);