From ab432dff014b1ca305bd4d6b393e7f3eb908d359 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 18 Mar 2021 10:35:00 +0200 Subject: [PATCH] page server: Track the range of valid LSNs that we hold --- src/page_cache.rs | 66 +++++++++++++++++++++++++++++++++++----------- src/walreceiver.rs | 4 +++ 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/src/page_cache.rs b/src/page_cache.rs index daebade530..3468f3ed4d 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -1,5 +1,5 @@ // -// Page Cache holds all the different all the different page versions and WAL records +// Page Cache holds all the different page versions and WAL records // // // @@ -29,6 +29,32 @@ pub struct WALRecord { pub rec: Bytes } +// +// Shared data structure, holding page cache and related auxiliary information +// +struct PageCacheShared { + + // The actual page cache + pagecache: BTreeMap, + + // What page versions do we hold in the cache? If we get GetPage with + // LSN < first_valid_lsn, that's an error because we (no longer) hold that + // page version. If we get a request > last_valid_lsn, we need to wait until + // we receive all the WAL up to the request. + // + first_valid_lsn: u64, + last_valid_lsn: u64 +} + +lazy_static! { + static ref PAGECACHE: Mutex = Mutex::new( + PageCacheShared { + pagecache: BTreeMap::new(), + first_valid_lsn: 0, + last_valid_lsn: 0, + }); +} + // // We store two kinds of entries in the page cache: // @@ -55,12 +81,6 @@ enum CacheEntry { WALRecord(WALRecord) } -lazy_static! { - static ref PAGECACHE: Mutex> = Mutex::new(BTreeMap::new()); -} - - - // Public interface functions // @@ -68,8 +88,6 @@ lazy_static! { // // Returns an 8k page image // -#[allow(dead_code)] -#[allow(unused_variables)] pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result> { // TODO: @@ -88,15 +106,23 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result lsn: lsn + 1 }; - let pagecache = PAGECACHE.lock().unwrap(); + let shared = PAGECACHE.lock().unwrap(); + if lsn > shared.last_valid_lsn { + // TODO: Wait for the WAL receiver to catch up + } + if lsn < shared.first_valid_lsn { + return Err(format!("LSN {} has already been removed", lsn))?; + } + + let pagecache = &shared.pagecache; let entries = pagecache.range(&minkey .. &maxkey); let mut records: Vec = Vec::new(); let mut base_img: Option = None; - for (key, e) in entries.rev() { + for (_key, e) in entries.rev() { match e { CacheEntry::PageImage(img) => { // We have a base image. No need to dig deeper into the list of @@ -138,8 +164,6 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result // // Adds a WAL record to the page cache // -#[allow(dead_code)] -#[allow(unused_variables)] pub fn put_wal_record(tag: BufferTag, rec: WALRecord) { let key = CacheKey { @@ -149,12 +173,23 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord) let entry = CacheEntry::WALRecord(rec); - let mut pagecache = PAGECACHE.lock().unwrap(); + let mut shared = PAGECACHE.lock().unwrap(); + let pagecache = &mut shared.pagecache; let oldentry = pagecache.insert(key, entry); assert!(oldentry.is_none()); } +// +pub fn advance_last_valid_lsn(lsn: u64) +{ + let mut shared = PAGECACHE.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.last_valid_lsn); + + shared.last_valid_lsn = lsn; +} // @@ -174,7 +209,8 @@ pub fn test_get_page_at_lsn() let mut tag: Option = None; { - let pagecache = PAGECACHE.lock().unwrap(); + let shared = PAGECACHE.lock().unwrap(); + let pagecache = &shared.pagecache; if pagecache.is_empty() { println!("page cache is empty"); diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 08cbd7f2a7..cb6c4e373d 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -109,6 +109,10 @@ async fn walreceiver_main() -> Result<(), Error> { page_cache::put_wal_record(tag, rec); } + // Now that this record has been handled, let the page cache know that + // it is up-to-date to this LSN + page_cache::advance_last_valid_lsn(lsn); + } else { break; }