diff --git a/src/page_cache.rs b/src/page_cache.rs index 883db6ddfb..10db125e94 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -46,6 +46,7 @@ pub struct PageCache { // that they can be read without acquiring the mutex). pub first_valid_lsn: AtomicU64, pub last_valid_lsn: AtomicU64, + pub last_record_lsn: AtomicU64, } pub struct PageCacheStats { @@ -55,6 +56,7 @@ pub struct PageCacheStats { pub num_getpage_requests: u64, pub first_valid_lsn: u64, pub last_valid_lsn: u64, + pub last_record_lsn: u64, } // @@ -77,8 +79,18 @@ struct PageCacheShared { // page version. If we get a request > last_valid_lsn, we need to wait until // we receive all the WAL up to the request. // + // last_record_lsn points to the end of last processed WAL record. + // It can lag behind last_valid_lsn, if the WAL receiver has received some WAL + // after the end of last record, but not the whole next record yet. In the + // page cache, we care about last_valid_lsn, but if the WAL receiver needs to + // restart the streaming, it needs to restart at the end of last record, so + // we track them separately. last_record_lsn should perhaps be in + // walreceiver.rs instead of here, but it seems convenient to keep all three + // values together. + // first_valid_lsn: u64, last_valid_lsn: u64, + last_record_lsn: u64, } lazy_static! { @@ -96,6 +108,7 @@ fn init_page_cache() -> PageCache relsize_cache: HashMap::new(), first_valid_lsn: 0, last_valid_lsn: 0, + last_record_lsn: 0, }), valid_lsn_condvar: Condvar::new(), @@ -109,6 +122,7 @@ fn init_page_cache() -> PageCache first_valid_lsn: AtomicU64::new(0), last_valid_lsn: AtomicU64::new(0), + last_record_lsn: AtomicU64::new(0), } } @@ -438,6 +452,25 @@ pub fn advance_last_valid_lsn(lsn: u64) PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); } +// +// NOTE: this updates last_valid_lsn as well. +// +pub fn advance_last_record_lsn(lsn: u64) +{ + let mut shared = PAGECACHE.shared.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.last_valid_lsn); + assert!(lsn >= shared.last_record_lsn); + + shared.last_valid_lsn = lsn; + shared.last_record_lsn = lsn; + PAGECACHE.valid_lsn_condvar.notify_all(); + + PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); + PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); +} + // pub fn _advance_first_valid_lsn(lsn: u64) { @@ -460,18 +493,21 @@ pub fn init_valid_lsn(lsn: u64) assert!(shared.first_valid_lsn == 0); assert!(shared.last_valid_lsn == 0); + assert!(shared.last_record_lsn == 0); shared.first_valid_lsn = lsn; shared.last_valid_lsn = lsn; + shared.last_record_lsn = lsn; PAGECACHE.first_valid_lsn.store(lsn, Ordering::Relaxed); PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); + PAGECACHE.last_record_lsn.store(lsn, Ordering::Relaxed); } -pub fn get_last_valid_lsn() -> u64 +pub fn get_last_record_lsn() -> u64 { let shared = PAGECACHE.shared.lock().unwrap(); - return shared.last_valid_lsn; + return shared.last_record_lsn; } // @@ -562,5 +598,6 @@ pub fn get_stats() -> PageCacheStats num_getpage_requests: PAGECACHE.num_getpage_requests.load(Ordering::Relaxed), first_valid_lsn: PAGECACHE.first_valid_lsn.load(Ordering::Relaxed), last_valid_lsn: PAGECACHE.last_valid_lsn.load(Ordering::Relaxed), + last_record_lsn: PAGECACHE.last_record_lsn.load(Ordering::Relaxed), } } diff --git a/src/tui.rs b/src/tui.rs index 3a6090aa52..f7f9686978 100644 --- a/src/tui.rs +++ b/src/tui.rs @@ -260,7 +260,10 @@ impl tui::widgets::Widget for MetricsWidget { let lsnrange = format!("{} - {}", format_lsn(page_cache_stats.first_valid_lsn), format_lsn(page_cache_stats.last_valid_lsn)); + let last_valid_recordlsn_str = + format_lsn(page_cache_stats.last_record_lsn); lines.push(get_metric_str("Valid LSN range", &lsnrange)); + lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str)); lines.push(get_metric_u64("# of cache entries", page_cache_stats.num_entries)); lines.push(get_metric_u64("# of page images", page_cache_stats.num_page_images)); lines.push(get_metric_u64("# of WAL records", page_cache_stats.num_wal_records)); diff --git a/src/waldecoder.rs b/src/waldecoder.rs index 7f0975dd41..e44af1de7f 100644 --- a/src/waldecoder.rs +++ b/src/waldecoder.rs @@ -138,6 +138,11 @@ impl WalStreamDecoder { // read xl_tot_len FIXME: assumes little-endian self.startlsn = self.lsn; let xl_tot_len = self.inputbuf.get_u32_le(); + if xl_tot_len < SizeOfXLogRecord { + error!("invalid xl_tot_len {} at {:X}/{:X}", xl_tot_len, + self.lsn >> 32, self.lsn & 0xffffffff); + panic!(); + } self.lsn += 4; self.recordbuf.clear(); diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 9f2f2c36c4..d0c9c4b18b 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -66,18 +66,26 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // // Start streaming the WAL, from where we left off previously. // - let mut startpoint = page_cache::get_last_valid_lsn(); + let mut startpoint = page_cache::get_last_record_lsn(); if startpoint == 0 { + // FIXME: Or should we just error out? page_cache::init_valid_lsn(u64::from(_identify_system.xlogpos())); startpoint = u64::from(_identify_system.xlogpos()); + } else { + // There might be some padding after the last full record, skip it. + // + // FIXME: It probably would be better to always start streaming from the beginning + // of the page, or the segment, so that we could check the page/segment headers + // too. Just for the sake of paranoia. + if startpoint % 8 != 0 { + startpoint += 8 - (startpoint % 8); + } } let startpoint = tokio_postgres::types::Lsn::from(startpoint); - debug!("starting replication from {:?}...", startpoint); let mut physical_stream = rclient .start_physical_replication(None, startpoint, None) .await?; - let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); while let Some(replication_message) = physical_stream.next().await { @@ -123,7 +131,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - page_cache::advance_last_valid_lsn(lsn); + page_cache::advance_last_record_lsn(lsn); } else { break;