Compare commits

...

10 Commits

Author SHA1 Message Date
Konstantin Knizhnik
7f67f65d92 Use access counter for file cache pages 2022-11-04 12:55:21 +02:00
Konstantin Knizhnik
81527300ef Use access counter for giel cache pages 2022-11-04 12:22:47 +02:00
Konstantin Knizhnik
ba46de96eb Cache reconstructed pages on disk 2022-11-03 19:57:27 +02:00
Konstantin Knizhnik
e8dec662e6 Cache reconstructed pages on disk 2022-11-03 19:01:03 +02:00
Konstantin Knizhnik
ebf1972ea4 Make clippy happy 2022-10-31 23:00:54 +03:00
Joonas Koivunen
35890bb293 suggested refactoring to err out awaiting 2022-10-31 22:32:37 +03:00
Konstantin Knizhnik
8769fef1a5 Fix style 2022-10-31 19:59:34 +03:00
Konstantin Knizhnik
7452f91d5a Replace Arc<Bytes> with BYtes because Bytes maintains its own reference counter 2022-10-31 19:43:38 +03:00
Konstantin Knizhnik
6a50e1f76a Add image layer cache implementation 2022-10-29 14:29:59 +03:00
Konstantin Knizhnik
0c78aa7589 Implement page image cache with invaidtion mechanism 2022-10-28 22:06:54 +03:00
7 changed files with 362 additions and 7 deletions

View File

@@ -14,7 +14,7 @@ use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling, task_mgr,
http, page_cache, page_image_cache, page_service, profiling, task_mgr,
task_mgr::TaskKind,
task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
@@ -101,6 +101,7 @@ fn main() -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
page_image_cache::init(64 * conf.page_cache_size); // temporary hack for benchmarking
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;

View File

@@ -5,6 +5,7 @@ pub mod import_datadir;
pub mod keyspace;
pub mod metrics;
pub mod page_cache;
pub mod page_image_cache;
pub mod page_service;
pub mod pgdatadir_mapping;
pub mod profiling;

View File

@@ -108,10 +108,10 @@ enum CacheKey {
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct MaterializedPageHashKey {
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
pub struct MaterializedPageHashKey {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key: Key,
}
#[derive(Clone)]

View File

@@ -0,0 +1,345 @@
//!
//! Global page image cache
//!
//! Unlike page_cache it holds only most recent version of reconstructed page images.
//! And it uses invalidation mechanism to avoid layer ap lookups.
use crate::page_cache::MaterializedPageHashKey;
use crate::pgdatadir_mapping::{rel_block_to_key, BlockNumber};
use crate::repository::Key;
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use anyhow::{bail, Result};
use bytes::Bytes;
use once_cell::sync::OnceCell;
use pageserver_api::reltag::RelTag;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::os::unix::fs::FileExt;
use std::sync::{Arc, Condvar, Mutex};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
static PAGE_CACHE: OnceCell<Mutex<PageImageCache>> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
enum PageImageState {
Vacant, // entry is not used
Loaded(bool), // page is loaded or has failed
Loading(Option<Arc<Condvar>>), // page in process of loading, Condvar is created on demand when some thread need to wait load completion
}
struct CacheEntry {
key: MaterializedPageHashKey,
// next+prev are used for LRU L2-list and next is also used for L1 free pages list
next: usize,
prev: usize,
collision: usize, // L1 hash collision chain
access_count: u32,
state: PageImageState,
}
pub struct PageImageCache {
free_list: usize, // L1 list of free entries
pages: Vec<CacheEntry>,
hash_table: Vec<usize>, // indexes in pages array
file: Arc<VirtualFile>,
}
///
/// Initialize the page cache. This must be called once at page server startup.
///
pub fn init(size: usize) {
if PAGE_CACHE
.set(Mutex::new(PageImageCache::new(size)))
.is_err()
{
panic!("page cache already initialized");
}
}
///
/// Get a handle to the page cache.
///
pub fn get() -> &'static Mutex<PageImageCache> {
//
// In unit tests, page server startup doesn't happen and no one calls
// page_image_cache::init(). Initialize it here with a tiny cache, so that the
// page cache is usable in unit tests.
//
if cfg!(test) {
PAGE_CACHE.get_or_init(|| Mutex::new(PageImageCache::new(TEST_PAGE_CACHE_SIZE)))
} else {
PAGE_CACHE.get().expect("page cache not initialized")
}
}
fn hash<T: Hash>(t: &T) -> usize {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish() as usize
}
impl PageImageCache {
fn new(size: usize) -> Self {
let mut pages: Vec<CacheEntry> = Vec::with_capacity(size + 1);
let hash_table = vec![0usize; size];
let file = Arc::new(
VirtualFile::open_with_options(
&std::path::PathBuf::from("page.cache"),
std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true),
)
.unwrap(),
);
// Dummy key
let dummy_key = MaterializedPageHashKey {
key: Key::MIN,
tenant_id: TenantId::from([0u8; 16]),
timeline_id: TimelineId::from([0u8; 16]),
};
// LRU list head
pages.push(CacheEntry {
key: dummy_key.clone(),
next: 0,
prev: 0,
access_count: 0,
collision: 0,
state: PageImageState::Vacant,
});
// Construct L1 free page list
for i in 0..size {
pages.push(CacheEntry {
key: dummy_key.clone(),
next: i + 2, // build L1-list of free pages
prev: 0,
access_count: 0,
collision: 0,
state: PageImageState::Vacant,
});
}
pages[size - 1].next = 0; // en of free page list
PageImageCache {
free_list: 1,
pages,
hash_table,
file,
}
}
// Unlink from L2-list
fn unlink(&mut self, index: usize) {
let next = self.pages[index].next;
let prev = self.pages[index].prev;
self.pages[next].prev = prev;
self.pages[prev].next = next;
}
// Link in L2-list after specified element
fn link_after(&mut self, after: usize, index: usize) {
let next = self.pages[after].next;
self.pages[index].prev = after;
self.pages[index].next = next;
self.pages[next].prev = index;
self.pages[after].next = index;
}
fn prune(&mut self, index: usize) {
self.pages[index].prev = index;
self.pages[index].next = index;
}
fn is_empty(&self, index: usize) -> bool {
self.pages[index].next == index
}
}
// Remove entry from cache: o page invalidation or drop relation
pub fn remove(key: Key, tenant_id: TenantId, timeline_id: TimelineId) {
let key = MaterializedPageHashKey {
key,
tenant_id,
timeline_id,
};
let this = get();
let mut cache = this.lock().unwrap();
let h = hash(&key) % cache.hash_table.len();
let mut index = cache.hash_table[h];
let mut prev = 0usize;
while index != 0 {
if cache.pages[index].key == key {
if !cache.is_empty(index) {
cache.pages[index].state = PageImageState::Vacant;
// Remove from LRU list
cache.unlink(index);
// Insert entry in free list
cache.pages[index].next = cache.free_list;
cache.free_list = index;
} else {
// Page is process of loading: we can not remove it righ now,
// so just mark for deletion
cache.pages[index].next = 0; // make is_empty == false
}
// Remove from hash table
if prev == 0 {
cache.hash_table[h] = cache.pages[index].collision;
} else {
cache.pages[prev].collision = cache.pages[index].collision;
}
break;
}
prev = index;
index = cache.pages[index].collision;
}
// It's Ok if image not found
}
// Find or load page image in the cache
pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) -> Result<Bytes> {
let key = MaterializedPageHashKey {
key: rel_block_to_key(rel, blkno),
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
};
let this = get();
let mut cache = this.lock().unwrap();
let h = hash(&key) % cache.hash_table.len();
'lookup: loop {
let mut index = cache.hash_table[h];
while index != 0 {
if cache.pages[index].key == key {
// cache hit
match &cache.pages[index].state {
PageImageState::Loaded(success) => {
if *success {
// Pin page
if cache.pages[index].access_count == 0 {
cache.unlink(index);
}
cache.pages[index].access_count += 1;
let file = cache.file.clone();
drop(cache);
let mut buf = [0u8; PAGE_SZ];
file.read_exact_at(&mut buf, index as u64 * PAGE_SZ as u64)?;
cache = this.lock().unwrap();
assert!(cache.pages[index].access_count > 0);
cache.pages[index].access_count -= 1;
if cache.pages[index].access_count == 0 {
// Move to the head of LRU list
cache.link_after(0, index);
}
return Ok(Bytes::from(buf.to_vec()));
} else {
return Err(anyhow::anyhow!("page loading failed earlier"));
}
}
PageImageState::Loading(event) => {
// Create event on which to sleep if not yet assigned
let cv = match event {
None => {
let cv = Arc::new(Condvar::new());
cache.pages[index].state =
PageImageState::Loading(Some(cv.clone()));
cv
}
Some(cv) => cv.clone(),
};
cache = cv.wait(cache).unwrap();
// Retry lookup
continue 'lookup;
}
PageImageState::Vacant => bail!("Vacant entry is not expected here"),
};
}
index = cache.pages[index].collision;
}
let file = cache.file.clone();
// Cache miss
index = cache.free_list;
if index == 0 {
// no free items
let victim = cache.pages[0].prev; // take least recently used element from the tail of LRU list
assert!(victim != 0);
assert!(cache.pages[victim].access_count == 0);
// Remove victim from hash table
let h = hash(&cache.pages[victim].key) % cache.hash_table.len();
index = cache.hash_table[h];
let mut prev = 0usize;
while index != victim {
assert!(index != 0);
prev = index;
index = cache.pages[index].collision;
}
if prev == 0 {
cache.hash_table[h] = cache.pages[victim].collision;
} else {
cache.pages[prev].collision = cache.pages[victim].collision;
}
// and from LRU list
cache.unlink(victim);
index = victim;
} else {
// Use next free item
cache.free_list = cache.pages[index].next;
}
// Make is_empty(index) == true. If entry is removed in process of loaded,
// it will be updated so that !is_empty(index)
cache.prune(index);
// Insert in hash table
cache.pages[index].collision = cache.hash_table[h];
cache.hash_table[h] = index;
cache.pages[index].key = key;
cache.pages[index].state = PageImageState::Loading(None);
drop(cache); //release lock
// Load page
let result = timeline.get_rel_page_at_lsn(rel, blkno, lsn, true);
let mut success = false;
if let Ok(page) = &result {
success = true;
file.write_all_at(&page, index as u64 * PAGE_SZ as u64)?;
}
cache = this.lock().unwrap();
if let PageImageState::Loading(event) = &cache.pages[index].state {
// Are there some waiting threads?
if let Some(cv) = event {
// If so, then wakeup them
cv.notify_all();
}
} else {
bail!("Loading state is expected");
}
if cache.is_empty(index) {
// entry was not marked as deleted {
// Page is loaded
// match &res { ... } is same as `res.as_ref().ok().cloned()`
cache.pages[index].state = PageImageState::Loaded(success);
// Link the page to the head of LRU list
cache.link_after(0, index);
} else {
cache.pages[index].state = PageImageState::Vacant;
// Return page to free list
cache.pages[index].next = cache.free_list;
cache.free_list = index;
}
// only the first one gets the full error from `get_rel_page_at_lsn`
return result;
}
}

View File

@@ -18,6 +18,7 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use std::io;
use std::net::TcpListener;
use std::str;
@@ -40,6 +41,7 @@ use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::page_image_cache;
use crate::profiling::profpoint_start;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
@@ -580,8 +582,12 @@ impl PageServerHandler {
// current profiling is based on a thread-local variable, so it doesn't work
// across awaits
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
let page = if req.latest {
page_image_cache::lookup(timeline, req.rel, req.blkno, lsn)
} else {
timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, false)
}?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))

View File

@@ -1179,7 +1179,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,

View File

@@ -34,6 +34,7 @@ use crate::tenant::{
use crate::config::{PageServerConf, METADATA_FILE_NAME};
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::TimelineMetrics;
use crate::page_image_cache;
use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
@@ -2315,6 +2316,7 @@ impl<'a> TimelineWriter<'a> {
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
page_image_cache::remove(key, self.tenant_id, self.timeline_id);
self.tl.put_value(key, lsn, value)
}