Compare commits

..

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
7f67f65d92 Use access counter for file cache pages 2022-11-04 12:55:21 +02:00
Konstantin Knizhnik
81527300ef Use access counter for giel cache pages 2022-11-04 12:22:47 +02:00
Konstantin Knizhnik
ba46de96eb Cache reconstructed pages on disk 2022-11-03 19:57:27 +02:00
Konstantin Knizhnik
e8dec662e6 Cache reconstructed pages on disk 2022-11-03 19:01:03 +02:00

View File

@@ -8,12 +8,14 @@ use crate::page_cache::MaterializedPageHashKey;
use crate::pgdatadir_mapping::{rel_block_to_key, BlockNumber};
use crate::repository::Key;
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use anyhow::{bail, Result};
use bytes::Bytes;
use once_cell::sync::OnceCell;
use pageserver_api::reltag::RelTag;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::os::unix::fs::FileExt;
use std::sync::{Arc, Condvar, Mutex};
use utils::{
id::{TenantId, TimelineId},
@@ -22,10 +24,11 @@ use utils::{
static PAGE_CACHE: OnceCell<Mutex<PageImageCache>> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
enum PageImageState {
Vacant, // entry is not used
Loaded(Option<Bytes>), // page is loaded or has failed
Loaded(bool), // page is loaded or has failed
Loading(Option<Arc<Condvar>>), // page in process of loading, Condvar is created on demand when some thread need to wait load completion
}
@@ -38,6 +41,7 @@ struct CacheEntry {
collision: usize, // L1 hash collision chain
access_count: u32,
state: PageImageState,
}
@@ -45,6 +49,7 @@ pub struct PageImageCache {
free_list: usize, // L1 list of free entries
pages: Vec<CacheEntry>,
hash_table: Vec<usize>, // indexes in pages array
file: Arc<VirtualFile>,
}
///
@@ -85,7 +90,17 @@ impl PageImageCache {
fn new(size: usize) -> Self {
let mut pages: Vec<CacheEntry> = Vec::with_capacity(size + 1);
let hash_table = vec![0usize; size];
let file = Arc::new(
VirtualFile::open_with_options(
&std::path::PathBuf::from("page.cache"),
std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true),
)
.unwrap(),
);
// Dummy key
let dummy_key = MaterializedPageHashKey {
key: Key::MIN,
@@ -98,6 +113,7 @@ impl PageImageCache {
key: dummy_key.clone(),
next: 0,
prev: 0,
access_count: 0,
collision: 0,
state: PageImageState::Vacant,
});
@@ -108,6 +124,7 @@ impl PageImageCache {
key: dummy_key.clone(),
next: i + 2, // build L1-list of free pages
prev: 0,
access_count: 0,
collision: 0,
state: PageImageState::Vacant,
});
@@ -118,6 +135,7 @@ impl PageImageCache {
free_list: 1,
pages,
hash_table,
file,
}
}
@@ -205,12 +223,28 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
if cache.pages[index].key == key {
// cache hit
match &cache.pages[index].state {
PageImageState::Loaded(cached_page) => {
// Move to the head of LRU list
let page = cached_page.clone();
cache.unlink(index);
cache.link_after(0, index);
return page.ok_or_else(|| anyhow::anyhow!("page loading failed earlier"));
PageImageState::Loaded(success) => {
if *success {
// Pin page
if cache.pages[index].access_count == 0 {
cache.unlink(index);
}
cache.pages[index].access_count += 1;
let file = cache.file.clone();
drop(cache);
let mut buf = [0u8; PAGE_SZ];
file.read_exact_at(&mut buf, index as u64 * PAGE_SZ as u64)?;
cache = this.lock().unwrap();
assert!(cache.pages[index].access_count > 0);
cache.pages[index].access_count -= 1;
if cache.pages[index].access_count == 0 {
// Move to the head of LRU list
cache.link_after(0, index);
}
return Ok(Bytes::from(buf.to_vec()));
} else {
return Err(anyhow::anyhow!("page loading failed earlier"));
}
}
PageImageState::Loading(event) => {
// Create event on which to sleep if not yet assigned
@@ -232,12 +266,14 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
}
index = cache.pages[index].collision;
}
let file = cache.file.clone();
// Cache miss
index = cache.free_list;
if index == 0 {
// no free items
let victim = cache.pages[0].prev; // take least recently used element from the tail of LRU list
assert!(victim != 0);
assert!(cache.pages[victim].access_count == 0);
// Remove victim from hash table
let h = hash(&cache.pages[victim].key) % cache.hash_table.len();
index = cache.hash_table[h];
@@ -273,11 +309,15 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
drop(cache); //release lock
// Load page
let res = timeline.get_rel_page_at_lsn(rel, blkno, lsn, true);
let result = timeline.get_rel_page_at_lsn(rel, blkno, lsn, true);
let mut success = false;
if let Ok(page) = &result {
success = true;
file.write_all_at(&page, index as u64 * PAGE_SZ as u64)?;
}
cache = this.lock().unwrap();
if let PageImageState::Loading(event) = &cache.pages[index].state {
// Are there soMe waiting threads?
// Are there some waiting threads?
if let Some(cv) = event {
// If so, then wakeup them
cv.notify_all();
@@ -290,10 +330,7 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
// Page is loaded
// match &res { ... } is same as `res.as_ref().ok().cloned()`
cache.pages[index].state = PageImageState::Loaded(match &res {
Ok(page) => Some(page.clone()),
Err(_) => None,
});
cache.pages[index].state = PageImageState::Loaded(success);
// Link the page to the head of LRU list
cache.link_after(0, index);
} else {
@@ -303,6 +340,6 @@ pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) ->
cache.free_list = index;
}
// only the first one gets the full error from `get_rel_page_at_lsn`
return res;
return result;
}
}