From 01e239afa3ef333878dfd216a57a78e6516bd37b Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Fri, 23 Apr 2021 13:58:03 -0700 Subject: [PATCH] apply Lsn type everywhere Use the `Lsn` type everywhere that I can find u64 being used to represent an LSN. --- pageserver/src/page_cache.rs | 105 +++++++++++++-------------- pageserver/src/page_service.rs | 5 +- pageserver/src/restore_local_repo.rs | 33 ++++----- pageserver/src/waldecoder.rs | 38 +++++----- pageserver/src/walreceiver.rs | 53 +++++--------- pageserver/src/walredo.rs | 32 ++++---- 6 files changed, 122 insertions(+), 144 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 9d478d3a61..0f18639354 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -47,6 +47,7 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{convert::TryInto, ops::AddAssign}; +use zenith_utils::lsn::Lsn; use zenith_utils::seqwait::SeqWait; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -62,7 +63,7 @@ pub struct PageCache { walredo_mgr: WalRedoManager, // Allows .await on the arrival of a particular LSN. - seqwait_lsn: SeqWait, + seqwait_lsn: SeqWait, // Counters, for metrics collection. pub num_entries: AtomicU64, @@ -120,9 +121,9 @@ struct PageCacheShared { // walreceiver.rs instead of here, but it seems convenient to keep all three // values together. // - first_valid_lsn: u64, - last_valid_lsn: u64, - last_record_lsn: u64, + first_valid_lsn: Lsn, + last_valid_lsn: Lsn, + last_record_lsn: Lsn, } lazy_static! { @@ -204,16 +205,16 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { PageCache { shared: Mutex::new(PageCacheShared { - first_valid_lsn: 0, - last_valid_lsn: 0, - last_record_lsn: 0, + first_valid_lsn: Lsn(0), + last_valid_lsn: Lsn(0), + last_record_lsn: Lsn(0), }), db: open_rocksdb(&conf, timelineid), walredo_mgr: WalRedoManager::new(conf, timelineid), - seqwait_lsn: SeqWait::new(0), + seqwait_lsn: SeqWait::new(Lsn(0)), num_entries: AtomicU64::new(0), num_page_images: AtomicU64::new(0), @@ -242,18 +243,18 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct CacheKey { pub tag: BufferTag, - pub lsn: u64, + pub lsn: Lsn, } impl CacheKey { pub fn pack(&self, buf: &mut BytesMut) { self.tag.pack(buf); - buf.put_u64(self.lsn); + buf.put_u64(self.lsn.0); } pub fn unpack(buf: &mut BytesMut) -> CacheKey { CacheKey { tag: BufferTag::unpack(buf), - lsn: buf.get_u64(), + lsn: Lsn::from(buf.get_u64()), } } } @@ -343,7 +344,7 @@ impl BufferTag { #[derive(Debug, Clone)] pub struct WALRecord { - pub lsn: u64, // LSN at the *end* of the record + pub lsn: Lsn, // LSN at the *end* of the record pub will_init: bool, pub truncate: bool, pub rec: Bytes, @@ -355,7 +356,7 @@ pub struct WALRecord { impl WALRecord { pub fn pack(&self, buf: &mut BytesMut) { - buf.put_u64(self.lsn); + buf.put_u64(self.lsn.0); buf.put_u8(self.will_init as u8); buf.put_u8(self.truncate as u8); buf.put_u32(self.main_data_offset); @@ -363,7 +364,7 @@ impl WALRecord { buf.put_slice(&self.rec[..]); } pub fn unpack(buf: &mut BytesMut) -> WALRecord { - let lsn = buf.get_u64(); + let lsn = Lsn::from(buf.get_u64()); let will_init = buf.get_u8() != 0; let truncate = buf.get_u8() != 0; let main_data_offset = buf.get_u32(); @@ -387,7 +388,7 @@ impl PageCache { /// /// Returns an 8k page image /// - pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { + pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); let lsn = self.wait_lsn(req_lsn).await?; @@ -448,7 +449,7 @@ impl PageCache { /// /// Get size of relation at given LSN. /// - pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + pub async fn relsize_get(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { self.wait_lsn(lsn).await?; return self.relsize_get_nowait(rel, lsn); } @@ -456,7 +457,7 @@ impl PageCache { /// /// Does relation exist at given LSN? /// - pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { + pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: Lsn) -> anyhow::Result { let lsn = self.wait_lsn(req_lsn).await?; let key = CacheKey { @@ -497,7 +498,7 @@ impl PageCache { pub fn collect_records_for_apply( &self, tag: BufferTag, - lsn: u64, + lsn: Lsn, ) -> (Option, Vec) { let mut buf = BytesMut::new(); let key = CacheKey { tag, lsn }; @@ -576,7 +577,7 @@ impl PageCache { let mut key = CacheKey { tag, lsn: rec.lsn }; // What was the size of the relation before this record? - let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); + let last_lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire)); let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; let content = CacheEntryContent { @@ -606,7 +607,7 @@ impl PageCache { /// /// Memorize a full image of a page version /// - pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { + pub fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { let key = CacheKey { tag, lsn }; let content = CacheEntryContent { page_image: Some(img), @@ -629,7 +630,7 @@ impl PageCache { pub fn create_database( &self, - lsn: u64, + lsn: Lsn, db_id: Oid, tablespace_id: Oid, src_db_id: Oid, @@ -646,7 +647,7 @@ impl PageCache { }, blknum: 0, }, - lsn: 0, + lsn: Lsn(0), }; key.pack(&mut buf); let mut iter = self.db.raw_iterator(); @@ -679,22 +680,19 @@ impl PageCache { } /// Remember that WAL has been received and added to the page cache up to the given LSN - pub fn advance_last_valid_lsn(&self, lsn: u64) { + pub fn advance_last_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { shared.last_valid_lsn = lsn; - self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); self.seqwait_lsn.advance(lsn); } else { warn!( - "attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", - oldlsn >> 32, - oldlsn & 0xffffffff, - lsn >> 32, - lsn & 0xffffffff + "attempted to move last valid LSN backwards (was {}, new {})", + oldlsn, lsn ); } } @@ -704,7 +702,7 @@ impl PageCache { /// /// NOTE: this updates last_valid_lsn as well. /// - pub fn advance_last_record_lsn(&self, lsn: u64) { + pub fn advance_last_record_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. @@ -713,8 +711,8 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.last_record_lsn.store(lsn.0, Ordering::Relaxed); self.seqwait_lsn.advance(lsn); } @@ -723,7 +721,7 @@ impl PageCache { /// /// TODO: This should be called by garbage collection, so that if an older /// page is requested, we will return an error to the requestor. - pub fn _advance_first_valid_lsn(&self, lsn: u64) { + pub fn _advance_first_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. @@ -731,29 +729,29 @@ impl PageCache { // Can't overtake last_valid_lsn (except when we're // initializing the system and last_valid_lsn hasn't been set yet. - assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); + assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn); shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); + self.first_valid_lsn.store(lsn.0, Ordering::Relaxed); } - pub fn init_valid_lsn(&self, lsn: u64) { + pub fn init_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); - assert!(shared.first_valid_lsn == 0); - assert!(shared.last_valid_lsn == 0); - assert!(shared.last_record_lsn == 0); + assert!(shared.first_valid_lsn == Lsn(0)); + assert!(shared.last_valid_lsn == Lsn(0)); + assert!(shared.last_record_lsn == Lsn(0)); shared.first_valid_lsn = lsn; shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); + self.first_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.last_record_lsn.store(lsn.0, Ordering::Relaxed); } - pub fn get_last_valid_lsn(&self) -> u64 { + pub fn get_last_valid_lsn(&self) -> Lsn { let shared = self.shared.lock().unwrap(); shared.last_record_lsn @@ -781,8 +779,8 @@ impl PageCache { // // The caller must ensure that WAL has been received up to 'lsn'. // - fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); + fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { + assert!(lsn.0 <= self.last_valid_lsn.load(Ordering::Acquire)); let mut key = CacheKey { tag: BufferTag { @@ -833,7 +831,7 @@ impl PageCache { loop { thread::sleep(conf.gc_period); let last_lsn = self.get_last_valid_lsn(); - if last_lsn > conf.gc_horizon { + if last_lsn.0 > conf.gc_horizon { let horizon = last_lsn - conf.gc_horizon; let mut maxkey = CacheKey { tag: BufferTag { @@ -845,7 +843,7 @@ impl PageCache { }, blknum: u32::MAX, }, - lsn: u64::MAX, + lsn: Lsn::MAX, }; let now = Instant::now(); let mut reconstructed = 0u64; @@ -873,7 +871,7 @@ impl PageCache { maxkey.lsn = min(horizon, last_lsn); // do not remove last version let mut minkey = maxkey.clone(); - minkey.lsn = 0; // first version + minkey.lsn = Lsn(0); // first version // reconstruct most recent page version if (v[0] & PAGE_IMAGE_FLAG) == 0 { @@ -942,12 +940,12 @@ impl PageCache { // // Wait until WAL has been received up to the given LSN. // - async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { + async fn wait_lsn(&self, req_lsn: Lsn) -> anyhow::Result { let mut lsn = req_lsn; //When invalid LSN is requested, it means "don't wait, return latest version of the page" //This is necessary for bootstrap. - if lsn == 0 { - lsn = self.last_valid_lsn.load(Ordering::Acquire); + if lsn == Lsn(0) { + lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire)); trace!( "walreceiver doesn't work yet last_valid_lsn {}, requested {}", self.last_valid_lsn.load(Ordering::Acquire), @@ -960,9 +958,8 @@ impl PageCache { .await .with_context(|| { format!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff + "Timed out while waiting for WAL record at LSN {} to arrive", + lsn ) })?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 37790b5561..72f97aaaa7 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -25,6 +25,7 @@ use tokio::runtime; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task; +use zenith_utils::lsn::Lsn; use crate::basebackup; use crate::page_cache; @@ -84,7 +85,7 @@ struct ZenithRequest { relnode: u32, forknum: u8, blkno: u32, - lsn: u64, + lsn: Lsn, } #[derive(Debug)] @@ -373,7 +374,7 @@ impl FeMessage { relnode: body.get_u32(), forknum: body.get_u8(), blkno: body.get_u32(), - lsn: body.get_u64(), + lsn: Lsn::from(body.get_u64()), }; // TODO: consider using protobuf or serde bincode for less error prone diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index d090419b0f..c38d7adaa9 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -34,6 +34,7 @@ use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::xlog_utils::*; +use zenith_utils::lsn::Lsn; // From pg_tablespace_d.h // @@ -60,20 +61,21 @@ pub fn restore_timeline( .join(timeline.to_string()) .join("snapshots"); - let mut last_snapshot_lsn: u64 = 0; + let mut last_snapshot_lsn: Lsn = Lsn(0); for direntry in fs::read_dir(&snapshotspath).unwrap() { let direntry = direntry?; - let filename = direntry.file_name().to_str().unwrap().to_owned(); - - let lsn = u64::from_str_radix(&filename, 16)?; + let filename = direntry.file_name(); + let lsn = Lsn::from_filename(&filename)?; last_snapshot_lsn = max(lsn, last_snapshot_lsn); - restore_snapshot(conf, pcache, timeline, &filename)?; - info!("restored snapshot at {}", filename); + // FIXME: pass filename as Path instead of str? + let filename_str = filename.into_string().unwrap(); + restore_snapshot(conf, pcache, timeline, &filename_str)?; + info!("restored snapshot at {:?}", filename_str); } - if last_snapshot_lsn == 0 { + if last_snapshot_lsn == Lsn(0) { error!( "could not find valid snapshot in {}", snapshotspath.display() @@ -183,7 +185,7 @@ fn restore_relfile( dboid: u32, path: &Path, ) -> Result<()> { - let lsn = u64::from_str_radix(snapshot, 16)?; + let lsn = Lsn::from_hex(snapshot)?; // Does it look like a relation file? @@ -245,15 +247,16 @@ fn restore_wal( _conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, - startpoint: u64, + startpoint: Lsn, ) -> Result<()> { let walpath = format!("timelines/{}/wal", timeline); let mut waldecoder = WalStreamDecoder::new(startpoint); - let mut segno = XLByteToSeg(startpoint, 16 * 1024 * 1024); - let mut offset = XLogSegmentOffset(startpoint, 16 * 1024 * 1024); - let mut last_lsn = 0; + const SEG_SIZE: u64 = 16 * 1024 * 1024; + let mut segno = startpoint.segment_number(SEG_SIZE); + let mut offset = startpoint.segment_offset(SEG_SIZE); + let mut last_lsn = Lsn(0); loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, 16 * 1024 * 1024); @@ -336,11 +339,7 @@ fn restore_wal( segno += 1; offset = 0; } - info!( - "reached end of WAL at {:X}/{:X}", - last_lsn >> 32, - last_lsn & 0xffffffff - ); + info!("reached end of WAL at {}", last_lsn); Ok(()) } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 6cfd446a7f..29cd8109de 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -4,6 +4,7 @@ use log::*; use std::cmp::min; use std::str; use thiserror::Error; +use zenith_utils::lsn::Lsn; const XLOG_BLCKSZ: u32 = 8192; @@ -41,9 +42,9 @@ const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4; #[allow(dead_code)] pub struct WalStreamDecoder { - lsn: u64, + lsn: Lsn, - startlsn: u64, // LSN where this record starts + startlsn: Lsn, // LSN where this record starts contlen: u32, padlen: u32, @@ -56,7 +57,7 @@ pub struct WalStreamDecoder { #[error("{msg} at {lsn}")] pub struct WalDecodeError { msg: String, - lsn: u64, + lsn: Lsn, } // @@ -64,11 +65,11 @@ pub struct WalDecodeError { // FIXME: This isn't a proper rust stream // impl WalStreamDecoder { - pub fn new(lsn: u64) -> WalStreamDecoder { + pub fn new(lsn: Lsn) -> WalStreamDecoder { WalStreamDecoder { lsn, - startlsn: 0, + startlsn: Lsn(0), contlen: 0, padlen: 0, @@ -89,10 +90,10 @@ impl WalStreamDecoder { /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function /// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid. /// - pub fn poll_decode(&mut self) -> Result, WalDecodeError> { + pub fn poll_decode(&mut self) -> Result, WalDecodeError> { loop { // parse and verify page boundaries as we go - if self.lsn % WAL_SEGMENT_SIZE == 0 { + if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 { // parse long header if self.inputbuf.remaining() < SizeOfXLogLongPHD { @@ -100,7 +101,7 @@ impl WalStreamDecoder { } let hdr = self.decode_XLogLongPageHeaderData(); - if hdr.std.xlp_pageaddr != self.lsn { + if hdr.std.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { msg: "invalid xlog segment header".into(), lsn: self.lsn, @@ -110,7 +111,8 @@ impl WalStreamDecoder { self.lsn += SizeOfXLogLongPHD as u64; continue; - } else if self.lsn % (XLOG_BLCKSZ as u64) == 0 { + } else if self.lsn.0 % (XLOG_BLCKSZ as u64) == 0 { + // FIXME: make this a member of Lsn, but what should it be called? // parse page header if self.inputbuf.remaining() < SizeOfXLogShortPHD { @@ -118,7 +120,7 @@ impl WalStreamDecoder { } let hdr = self.decode_XLogPageHeaderData(); - if hdr.xlp_pageaddr != self.lsn { + if hdr.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { msg: "invalid xlog page header".into(), lsn: self.lsn, @@ -163,7 +165,8 @@ impl WalStreamDecoder { continue; } else { // we're continuing a record, possibly from previous page. - let pageleft: u32 = XLOG_BLCKSZ - (self.lsn % (XLOG_BLCKSZ as u64)) as u32; + // FIXME: Should any of this math be captured into Lsn or a related type? + let pageleft: u32 = XLOG_BLCKSZ - (self.lsn.0 % (XLOG_BLCKSZ as u64)) as u32; // read the rest of the record, or as much as fits on this page. let n = min(self.contlen, pageleft) as usize; @@ -184,16 +187,13 @@ impl WalStreamDecoder { // XLOG_SWITCH records are special. If we see one, we need to skip // to the next WAL segment. if is_xlog_switch_record(&recordbuf) { - trace!( - "saw xlog switch record at {:X}/{:X}", - (self.lsn >> 32), - self.lsn & 0xffffffff - ); - self.padlen = (WAL_SEGMENT_SIZE - (self.lsn % WAL_SEGMENT_SIZE)) as u32; + trace!("saw xlog switch record at {}", self.lsn); + self.padlen = (WAL_SEGMENT_SIZE - (self.lsn.0 % WAL_SEGMENT_SIZE)) as u32; } - if self.lsn % 8 != 0 { - self.padlen = 8 - (self.lsn % 8) as u32; + // FIXME: what does this code do? + if self.lsn.0 % 8 != 0 { + self.padlen = 8 - (self.lsn.0 % 8) as u32; } let result = (self.lsn, recordbuf); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 6671c13d25..18d7aa842e 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -31,6 +31,7 @@ use tokio::time::{sleep, Duration}; use tokio_postgres::replication::{PgTimestamp, ReplicationStream}; use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; use tokio_stream::StreamExt; +use zenith_utils::lsn::Lsn; // // We keep one WAL Receiver active per timeline. @@ -138,7 +139,7 @@ async fn walreceiver_main( let identify = identify_system(&rclient).await?; info!("{:?}", identify); - let end_of_wal = u64::from(identify.xlogpos); + let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; let pcache = page_cache::get_pagecache(&conf, timelineid).unwrap(); @@ -148,7 +149,7 @@ async fn walreceiver_main( // let mut startpoint = pcache.get_last_valid_lsn(); let last_valid_lsn = pcache.get_last_valid_lsn(); - if startpoint == 0 { + if startpoint == Lsn(0) { // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn // smaller than identify.xlogpos. @@ -157,37 +158,31 @@ async fn walreceiver_main( // different like having 'initdb' method on a pageserver (or importing some shared // empty database snapshot), so for now I just put start of first segment which // seems to be a valid record. - pcache.init_valid_lsn(0x_1_000_000_u64); - startpoint = 0x_1_000_000_u64; + pcache.init_valid_lsn(Lsn(0x_1_000_000)); + startpoint = Lsn(0x_1_000_000); } else { // There might be some padding after the last full record, skip it. // // FIXME: It probably would be better to always start streaming from the beginning // of the page, or the segment, so that we could check the page/segment headers // too. Just for the sake of paranoia. - if startpoint % 8 != 0 { - startpoint += 8 - (startpoint % 8); + // FIXME: should any of this logic move inside the Lsn type? + if startpoint.0 % 8 != 0 { + startpoint += 8 - (startpoint.0 % 8); } } debug!( - "last_valid_lsn {:X}/{:X} starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...", - (last_valid_lsn >> 32), - (last_valid_lsn & 0xffffffff), - (startpoint >> 32), - (startpoint & 0xffffffff), - timelineid, - (end_of_wal >> 32), - (end_of_wal & 0xffffffff) + "last_valid_lsn {} starting replication from {} for timeline {}, server is at {}...", + last_valid_lsn, startpoint, timelineid, end_of_wal ); - let startpoint = PgLsn::from(startpoint); let query = format!("START_REPLICATION PHYSICAL {}", startpoint); let copy_stream = rclient.copy_both_simple::(&query).await?; let physical_stream = ReplicationStream::new(copy_stream); tokio::pin!(physical_stream); - let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); + let mut waldecoder = WalStreamDecoder::new(startpoint); while let Some(replication_message) = physical_stream.next().await { match replication_message? { @@ -195,7 +190,7 @@ async fn walreceiver_main( // Pass the WAL data to the decoder, and see if we can decode // more records as a result. let data = xlog_data.data(); - let startlsn = xlog_data.wal_start(); + let startlsn = Lsn::from(xlog_data.wal_start()); let endlsn = startlsn + data.len() as u64; write_wal_file( @@ -205,13 +200,7 @@ async fn walreceiver_main( data, )?; - trace!( - "received XLogData between {:X}/{:X} and {:X}/{:X}", - (startlsn >> 32), - (startlsn & 0xffffffff), - (endlsn >> 32), - (endlsn & 0xffffffff) - ); + trace!("received XLogData between {} and {}", startlsn, endlsn); waldecoder.feed_bytes(data); @@ -298,11 +287,7 @@ async fn walreceiver_main( pcache.advance_last_valid_lsn(endlsn); if !caught_up && endlsn >= end_of_wal { - info!( - "caught up at LSN {:X}/{:X}", - (endlsn >> 32), - (endlsn & 0xffffffff) - ); + info!("caught up at LSN {}", endlsn); caught_up = true; } } @@ -320,7 +305,7 @@ async fn walreceiver_main( ); if reply_requested { // TODO: More thought should go into what values are sent here. - let last_lsn = PgLsn::from(pcache.get_last_valid_lsn()); + let last_lsn = PgLsn::from(u64::from(pcache.get_last_valid_lsn())); let write_lsn = last_lsn; let flush_lsn = last_lsn; let apply_lsn = PgLsn::INVALID; @@ -387,7 +372,7 @@ pub async fn identify_system(client: &tokio_postgres::Client) -> Result>, } @@ -138,7 +138,7 @@ impl WalRedoManager { /// Request the WAL redo manager to apply WAL records, to reconstruct the page image /// of the given page version. /// - pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result { + pub async fn request_redo(&self, tag: BufferTag, lsn: Lsn) -> Result { // Create a channel where to receive the response let (tx, rx) = oneshot::channel::>(); @@ -225,18 +225,16 @@ impl WalRedoManagerInternal { } else if info == pg_constants::XLOG_XACT_ABORT { status = pg_constants::TRANSACTION_STATUS_ABORTED; } else { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", + trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", status, - record.lsn >> 32, - record.lsn & 0xffffffff, + record.lsn, record.main_data_offset, record.rec.len()); return; } - trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", + trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {} main_data_offset {}, rec.len {}", status, - record.lsn >> 32, - record.lsn & 0xffffffff, + record.lsn, record.main_data_offset, record.rec.len()); let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) @@ -305,9 +303,8 @@ impl WalRedoManagerInternal { let info = xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::CLOG_ZEROPAGE { page.clone_from_slice(zero_page_bytes); - trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", - record.lsn >> 32, - record.lsn & 0xffffffff, + trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {} main_data_offset {}, rec.len {}", + record.lsn, record.main_data_offset, record.rec.len()); } } else if xl_rmid == pg_constants::RM_XACT_ID { @@ -325,11 +322,10 @@ impl WalRedoManagerInternal { let result: Result; trace!( - "applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", + "applied {} WAL records in {} ms to reconstruct page image at LSN {}", nrecords, duration.as_millis(), - lsn >> 32, - lsn & 0xffff_ffff + lsn ); if let Err(e) = apply_result { @@ -536,13 +532,13 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { buf.freeze() } -fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes { +fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes { let len = 4 + 8 + rec.len(); let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8(b'A'); buf.put_u32(len as u32); - buf.put_u64(endlsn); + buf.put_u64(endlsn.0); buf.put(rec); assert!(buf.len() == 1 + len);