page server: Track the range of valid LSNs that we hold

This commit is contained in:
Heikki Linnakangas
2021-03-18 10:35:00 +02:00
committed by Stas Kelvich
parent b16462268d
commit ab432dff01
2 changed files with 55 additions and 15 deletions

View File

@@ -1,5 +1,5 @@
//
// Page Cache holds all the different all the different page versions and WAL records
// Page Cache holds all the different page versions and WAL records
//
//
//
@@ -29,6 +29,32 @@ pub struct WALRecord {
pub rec: Bytes
}
//
// Shared data structure, holding page cache and related auxiliary information
//
struct PageCacheShared {
// The actual page cache
pagecache: BTreeMap<CacheKey, CacheEntry>,
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
// we receive all the WAL up to the request.
//
first_valid_lsn: u64,
last_valid_lsn: u64
}
lazy_static! {
static ref PAGECACHE: Mutex<PageCacheShared> = Mutex::new(
PageCacheShared {
pagecache: BTreeMap::new(),
first_valid_lsn: 0,
last_valid_lsn: 0,
});
}
//
// We store two kinds of entries in the page cache:
//
@@ -55,12 +81,6 @@ enum CacheEntry {
WALRecord(WALRecord)
}
lazy_static! {
static ref PAGECACHE: Mutex<BTreeMap<CacheKey, CacheEntry>> = Mutex::new(BTreeMap::new());
}
// Public interface functions
//
@@ -68,8 +88,6 @@ lazy_static! {
//
// Returns an 8k page image
//
#[allow(dead_code)]
#[allow(unused_variables)]
pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>>
{
// TODO:
@@ -88,15 +106,23 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
lsn: lsn + 1
};
let pagecache = PAGECACHE.lock().unwrap();
let shared = PAGECACHE.lock().unwrap();
if lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
}
if lsn < shared.first_valid_lsn {
return Err(format!("LSN {} has already been removed", lsn))?;
}
let pagecache = &shared.pagecache;
let entries = pagecache.range(&minkey .. &maxkey);
let mut records: Vec<WALRecord> = Vec::new();
let mut base_img: Option<Bytes> = None;
for (key, e) in entries.rev() {
for (_key, e) in entries.rev() {
match e {
CacheEntry::PageImage(img) => {
// We have a base image. No need to dig deeper into the list of
@@ -138,8 +164,6 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
//
// Adds a WAL record to the page cache
//
#[allow(dead_code)]
#[allow(unused_variables)]
pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
{
let key = CacheKey {
@@ -149,12 +173,23 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
let entry = CacheEntry::WALRecord(rec);
let mut pagecache = PAGECACHE.lock().unwrap();
let mut shared = PAGECACHE.lock().unwrap();
let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, entry);
assert!(oldentry.is_none());
}
//
pub fn advance_last_valid_lsn(lsn: u64)
{
let mut shared = PAGECACHE.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
shared.last_valid_lsn = lsn;
}
//
@@ -174,7 +209,8 @@ pub fn test_get_page_at_lsn()
let mut tag: Option<BufferTag> = None;
{
let pagecache = PAGECACHE.lock().unwrap();
let shared = PAGECACHE.lock().unwrap();
let pagecache = &shared.pagecache;
if pagecache.is_empty() {
println!("page cache is empty");

View File

@@ -109,6 +109,10 @@ async fn walreceiver_main() -> Result<(), Error> {
page_cache::put_wal_record(tag, rec);
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
page_cache::advance_last_valid_lsn(lsn);
} else {
break;
}