diff --git a/src/page_cache.rs b/src/page_cache.rs index f36eb23216..f1f1d24521 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -210,10 +210,12 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result let entry_rc: Arc; { let mut shared = PAGECACHE.shared.lock().unwrap(); + let mut waited = false; while lsn > shared.last_valid_lsn { // TODO: Wait for the WAL receiver to catch up - debug!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn); + waited = true; + trace!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn); let wait_result = PAGECACHE.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap(); shared = wait_result.0; @@ -221,6 +223,10 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result return Err(format!("Timed out while waiting for WAL record at LSN {} to arrive", lsn))?; } } + if waited { + trace!("caught up now, continuing"); + } + if lsn < shared.first_valid_lsn { return Err(format!("LSN {} has already been removed", lsn))?; } diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 3015aac1c8..5e2b5eda6b 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -84,11 +84,15 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { match replication_message? { ReplicationMessage::XLogData(xlog_data) => { - trace!("received XLogData , lsn: {}", xlog_data.wal_start()); - // Pass the WAL data to the decoder, and see if we can decode // more records as a result. - waldecoder.feed_bytes(xlog_data.data()); + let data = xlog_data.data(); + let startlsn = xlog_data.wal_start(); + let endlsn = startlsn + data.len() as u64; + + trace!("received XLogData between {} and {}", startlsn, endlsn); + + waldecoder.feed_bytes(data); loop { if let Some((startlsn, endlsn, recdata)) = waldecoder.poll_decode() { @@ -125,7 +129,16 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { break; } } + + // Update the last_valid LSN value in the page cache one more time. We updated + // it in the loop above, between each WAL record, but we might have received + // a partial record after the last completed record. Our page cache's value + // better reflect that, because GetPage@LSN requests might also point in the + // middle of a record, if the request LSN was taken from the server's current + // flush ptr. + page_cache::advance_last_valid_lsn(endlsn); } + ReplicationMessage::PrimaryKeepAlive(_keepalive) => { trace!("received PrimaryKeepAlive"); // FIXME: Reply, or the connection will time out