diff --git a/src/page_cache.rs b/src/page_cache.rs index e569720cdb..1790c7cffb 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -14,6 +14,7 @@ use std::sync::Condvar; use std::sync::Mutex; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +use std::time::Duration; use bytes::Bytes; use lazy_static::lazy_static; use rand::Rng; @@ -22,6 +23,9 @@ use log::*; use crossbeam_channel::unbounded; use crossbeam_channel::{Sender, Receiver}; +// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. +static TIMEOUT: Duration = Duration::from_secs(20); + pub struct PageCache { shared: Mutex, @@ -29,6 +33,8 @@ pub struct PageCache { pub walredo_sender: Sender>, pub walredo_receiver: Receiver>, + valid_lsn_condvar: Condvar, + // Counters, for metrics collection. pub num_entries: AtomicU64, pub num_page_images: AtomicU64, @@ -90,6 +96,7 @@ fn init_page_cache() -> PageCache first_valid_lsn: 0, last_valid_lsn: 0, }), + valid_lsn_condvar: Condvar::new(), walredo_sender: s, walredo_receiver: r, @@ -202,10 +209,17 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result let entry_rc: Arc; { - let shared = PAGECACHE.shared.lock().unwrap(); + let mut shared = PAGECACHE.shared.lock().unwrap(); - if lsn > shared.last_valid_lsn { + while lsn > shared.last_valid_lsn { // TODO: Wait for the WAL receiver to catch up + debug!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn); + let wait_result = PAGECACHE.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap(); + + shared = wait_result.0; + if wait_result.1.timed_out() { + return Err(format!("Timed out while waiting for WAL record at LSN {} to arrive", lsn))?; + } } if lsn < shared.first_valid_lsn { return Err(format!("LSN {} has already been removed", lsn))?; @@ -404,6 +418,8 @@ pub fn advance_last_valid_lsn(lsn: u64) assert!(lsn >= shared.last_valid_lsn); shared.last_valid_lsn = lsn; + PAGECACHE.valid_lsn_condvar.notify_all(); + PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); } diff --git a/src/page_service.rs b/src/page_service.rs index e95bee07ab..0c9c42b28e 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -64,6 +64,7 @@ struct ZenithRequest { relnode: u32, forknum: u8, blkno: u32, + lsn: u64, } #[derive(Debug)] @@ -176,6 +177,7 @@ impl FeMessage { relnode: body.get_u32(), forknum: body.get_u8(), blkno: body.get_u32(), + lsn: body.get_u64(), }; // TODO: consider using protobuf or serde bincode for less error prone @@ -511,12 +513,11 @@ impl Connection { forknum: req.forknum, blknum: req.blkno }; - - let inf_lsn = 0xffff_ffff_ffff_eeee; + let lsn = req.lsn; let msg; { - let p = page_cache::get_page_at_lsn(buf_tag, inf_lsn); + let p = page_cache::get_page_at_lsn(buf_tag, lsn); if p.is_ok() { msg = ZenithReadResponse { ok: true, diff --git a/src/waldecoder.rs b/src/waldecoder.rs index 05ff4ba9eb..6ae7e5a360 100644 --- a/src/waldecoder.rs +++ b/src/waldecoder.rs @@ -51,7 +51,7 @@ pub struct WalStreamDecoder { lsn: u64, - reclsn: u64, + startlsn: u64, // LSN where this record starts contlen: u32, padlen: u32, @@ -70,7 +70,7 @@ impl WalStreamDecoder { WalStreamDecoder { lsn: lsn, - reclsn: 0, + startlsn: 0, contlen: 0, padlen: 0, @@ -83,7 +83,9 @@ impl WalStreamDecoder { self.inputbuf.extend_from_slice(buf); } - pub fn poll_decode(&mut self) -> Option<(u64, Bytes)> { + // Returns a tuple: + // (start LSN, end LSN + 1, record) + pub fn poll_decode(&mut self) -> Option<(u64, u64, Bytes)> { loop { // parse and verify page boundaries as we go @@ -135,7 +137,7 @@ impl WalStreamDecoder { } // read xl_tot_len FIXME: assumes little-endian - self.reclsn = self.lsn; + self.startlsn = self.lsn; let xl_tot_len = self.inputbuf.get_u32_le(); self.lsn += 4; @@ -165,12 +167,13 @@ impl WalStreamDecoder { if self.contlen == 0 { let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()); - let result = (self.reclsn, recordbuf.freeze()); - if self.lsn % 8 != 0 { self.padlen = 8 - (self.lsn % 8) as u32; } + // Note: the padding is included in the end-lsn that we report + let result = (self.startlsn, self.lsn + self.padlen as u64, recordbuf.freeze()); + return Some(result); } continue; diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 5f4cd5d93f..d65d058139 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -91,9 +91,9 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { waldecoder.feed_bytes(xlog_data.data()); loop { - if let Some((lsn, recdata)) = waldecoder.poll_decode() { + if let Some((startlsn, endlsn, recdata)) = waldecoder.poll_decode() { - let decoded = crate::waldecoder::decode_wal_record(lsn, recdata.clone()); + let decoded = crate::waldecoder::decode_wal_record(startlsn, recdata.clone()); // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifes. (The actual WAL record is kept in @@ -109,7 +109,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { }; let rec = page_cache::WALRecord { - lsn: lsn, + lsn: startlsn, will_init: blk.will_init || blk.apply_image, rec: recdata.clone() }; @@ -119,7 +119,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - page_cache::advance_last_valid_lsn(lsn); + page_cache::advance_last_valid_lsn(endlsn); } else { break;