Add @LSN argument to GetPage requests

This commit is contained in:
Heikki Linnakangas
2021-03-30 14:10:32 +03:00
committed by Stas Kelvich
parent 9a8bda2938
commit cd98818a22
4 changed files with 35 additions and 15 deletions

View File

@@ -14,6 +14,7 @@ use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use bytes::Bytes;
use lazy_static::lazy_static;
use rand::Rng;
@@ -22,6 +23,9 @@ use log::*;
use crossbeam_channel::unbounded;
use crossbeam_channel::{Sender, Receiver};
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(20);
pub struct PageCache {
shared: Mutex<PageCacheShared>,
@@ -29,6 +33,8 @@ pub struct PageCache {
pub walredo_sender: Sender<Arc<CacheEntry>>,
pub walredo_receiver: Receiver<Arc<CacheEntry>>,
valid_lsn_condvar: Condvar,
// Counters, for metrics collection.
pub num_entries: AtomicU64,
pub num_page_images: AtomicU64,
@@ -90,6 +96,7 @@ fn init_page_cache() -> PageCache
first_valid_lsn: 0,
last_valid_lsn: 0,
}),
valid_lsn_condvar: Condvar::new(),
walredo_sender: s,
walredo_receiver: r,
@@ -202,10 +209,17 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
let entry_rc: Arc<CacheEntry>;
{
let shared = PAGECACHE.shared.lock().unwrap();
let mut shared = PAGECACHE.shared.lock().unwrap();
if lsn > shared.last_valid_lsn {
while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
debug!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn);
let wait_result = PAGECACHE.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap();
shared = wait_result.0;
if wait_result.1.timed_out() {
return Err(format!("Timed out while waiting for WAL record at LSN {} to arrive", lsn))?;
}
}
if lsn < shared.first_valid_lsn {
return Err(format!("LSN {} has already been removed", lsn))?;
@@ -404,6 +418,8 @@ pub fn advance_last_valid_lsn(lsn: u64)
assert!(lsn >= shared.last_valid_lsn);
shared.last_valid_lsn = lsn;
PAGECACHE.valid_lsn_condvar.notify_all();
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
}

View File

@@ -64,6 +64,7 @@ struct ZenithRequest {
relnode: u32,
forknum: u8,
blkno: u32,
lsn: u64,
}
#[derive(Debug)]
@@ -176,6 +177,7 @@ impl FeMessage {
relnode: body.get_u32(),
forknum: body.get_u8(),
blkno: body.get_u32(),
lsn: body.get_u64(),
};
// TODO: consider using protobuf or serde bincode for less error prone
@@ -511,12 +513,11 @@ impl Connection {
forknum: req.forknum,
blknum: req.blkno
};
let inf_lsn = 0xffff_ffff_ffff_eeee;
let lsn = req.lsn;
let msg;
{
let p = page_cache::get_page_at_lsn(buf_tag, inf_lsn);
let p = page_cache::get_page_at_lsn(buf_tag, lsn);
if p.is_ok() {
msg = ZenithReadResponse {
ok: true,

View File

@@ -51,7 +51,7 @@ pub struct WalStreamDecoder {
lsn: u64,
reclsn: u64,
startlsn: u64, // LSN where this record starts
contlen: u32,
padlen: u32,
@@ -70,7 +70,7 @@ impl WalStreamDecoder {
WalStreamDecoder {
lsn: lsn,
reclsn: 0,
startlsn: 0,
contlen: 0,
padlen: 0,
@@ -83,7 +83,9 @@ impl WalStreamDecoder {
self.inputbuf.extend_from_slice(buf);
}
pub fn poll_decode(&mut self) -> Option<(u64, Bytes)> {
// Returns a tuple:
// (start LSN, end LSN + 1, record)
pub fn poll_decode(&mut self) -> Option<(u64, u64, Bytes)> {
loop {
// parse and verify page boundaries as we go
@@ -135,7 +137,7 @@ impl WalStreamDecoder {
}
// read xl_tot_len FIXME: assumes little-endian
self.reclsn = self.lsn;
self.startlsn = self.lsn;
let xl_tot_len = self.inputbuf.get_u32_le();
self.lsn += 4;
@@ -165,12 +167,13 @@ impl WalStreamDecoder {
if self.contlen == 0 {
let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new());
let result = (self.reclsn, recordbuf.freeze());
if self.lsn % 8 != 0 {
self.padlen = 8 - (self.lsn % 8) as u32;
}
// Note: the padding is included in the end-lsn that we report
let result = (self.startlsn, self.lsn + self.padlen as u64, recordbuf.freeze());
return Some(result);
}
continue;

View File

@@ -91,9 +91,9 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
waldecoder.feed_bytes(xlog_data.data());
loop {
if let Some((lsn, recdata)) = waldecoder.poll_decode() {
if let Some((startlsn, endlsn, recdata)) = waldecoder.poll_decode() {
let decoded = crate::waldecoder::decode_wal_record(lsn, recdata.clone());
let decoded = crate::waldecoder::decode_wal_record(startlsn, recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifes. (The actual WAL record is kept in
@@ -109,7 +109,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
};
let rec = page_cache::WALRecord {
lsn: lsn,
lsn: startlsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone()
};
@@ -119,7 +119,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
page_cache::advance_last_valid_lsn(lsn);
page_cache::advance_last_valid_lsn(endlsn);
} else {
break;