Cache reconstructed pages on disk

This commit is contained in:
Konstantin Knizhnik
2022-11-03 19:01:03 +02:00
parent ebf1972ea4
commit e8dec662e6

View File

@@ -22,10 +22,11 @@ use utils::{
static PAGE_CACHE: OnceCell<Mutex<PageImageCache>> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
enum PageImageState {
Vacant, // entry is not used
Loaded(Option<Bytes>), // page is loaded or has failed
Loaded(bool), // page is loaded or has failed
Loading(Option<Arc<Condvar>>), // page in process of loading, Condvar is created on demand when some thread need to wait load completion
}
@@ -45,6 +46,7 @@ pub struct PageImageCache {
free_list: usize, // L1 list of free entries
pages: Vec<CacheEntry>,
hash_table: Vec<usize>, // indexes in pages array
file: Arc<VirtulFile>,
}
///
@@ -85,7 +87,9 @@ impl PageImageCache {
fn new(size: usize) -> Self {
let mut pages: Vec<CacheEntry> = Vec::with_capacity(size + 1);
let hash_table = vec![0usize; size];
let file = arc::new(VirtualFile::open_with_options(
"page.cache",
OpenOptions::new().write(true).create(true).truncate(true)).unwrap());
// Dummy key
let dummy_key = MaterializedPageHashKey {
key: Key::MIN,
@@ -118,6 +122,7 @@ impl PageImageCache {
free_list: 1,
pages,
hash_table,
file,
}
}
@@ -205,12 +210,21 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
if cache.pages[index].key == key {
// cache hit
match &cache.pages[index].state {
PageImageState::Loaded(cached_page) => {
// Move to the head of LRU list
let page = cached_page.clone();
cache.unlink(index);
cache.link_after(0, index);
return page.ok_or_else(|| anyhow::anyhow!("page loading failed earlier"));
PageImageState::Loaded(success) => {
if success {
// Pin page
cache.unlink(index);
let file = cache.file.clone();
drop(cache);
let mut buf = [0u8; PAGE_SZ];
file.read_exact_at(&mut buf, index as u64 * PAGE_SZ us u64)?;
cache = this.lock().unwrap();
// Move to the head of LRU list
cache.link_after(0, index);
return Ok(Bytes::from(buf));
} else {
return Err(anyhow::anyhow!("page loading failed earlier"));
}
}
PageImageState::Loading(event) => {
// Create event on which to sleep if not yet assigned
@@ -232,6 +246,7 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
}
index = cache.pages[index].collision;
}
let file = cache.file.clone();
// Cache miss
index = cache.free_list;
if index == 0 {
@@ -273,11 +288,15 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
drop(cache); //release lock
// Load page
let res = timeline.get_rel_page_at_lsn(rel, blkno, lsn, true);
let result = timeline.get_rel_page_at_lsn(rel, blkno, lsn, true);
let mut success = false;
if let Ok(page) = &result {
success = true;
file.write_exact_at(&page, i as u64 * PAGE_SZ as u64)?;
}
cache = this.lock().unwrap();
if let PageImageState::Loading(event) = &cache.pages[index].state {
// Are there soMe waiting threads?
// Are there some waiting threads?
if let Some(cv) = event {
// If so, then wakeup them
cv.notify_all();
@@ -290,10 +309,7 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
// Page is loaded
// match &res { ... } is same as `res.as_ref().ok().cloned()`
cache.pages[index].state = PageImageState::Loaded(match &res {
Ok(page) => Some(page.clone()),
Err(_) => None,
});
cache.pages[index].state = PageImageState::Loaded(success);
// Link the page to the head of LRU list
cache.link_after(0, index);
} else {