mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
10 Commits
conrad/pro
...
file_page_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f67f65d92 | ||
|
|
81527300ef | ||
|
|
ba46de96eb | ||
|
|
e8dec662e6 | ||
|
|
ebf1972ea4 | ||
|
|
35890bb293 | ||
|
|
8769fef1a5 | ||
|
|
7452f91d5a | ||
|
|
6a50e1f76a | ||
|
|
0c78aa7589 |
@@ -14,7 +14,7 @@ use metrics::set_build_info_metric;
|
||||
|
||||
use pageserver::{
|
||||
config::{defaults::*, PageServerConf},
|
||||
http, page_cache, page_service, profiling, task_mgr,
|
||||
http, page_cache, page_image_cache, page_service, profiling, task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::{
|
||||
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
|
||||
@@ -101,6 +101,7 @@ fn main() -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
page_image_cache::init(64 * conf.page_cache_size); // temporary hack for benchmarking
|
||||
|
||||
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ pub mod import_datadir;
|
||||
pub mod keyspace;
|
||||
pub mod metrics;
|
||||
pub mod page_cache;
|
||||
pub mod page_image_cache;
|
||||
pub mod page_service;
|
||||
pub mod pgdatadir_mapping;
|
||||
pub mod profiling;
|
||||
|
||||
@@ -108,10 +108,10 @@ enum CacheKey {
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
struct MaterializedPageHashKey {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
pub struct MaterializedPageHashKey {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key: Key,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
345
pageserver/src/page_image_cache.rs
Normal file
345
pageserver/src/page_image_cache.rs
Normal file
@@ -0,0 +1,345 @@
|
||||
//!
|
||||
//! Global page image cache
|
||||
//!
|
||||
//! Unlike page_cache it holds only most recent version of reconstructed page images.
|
||||
//! And it uses invalidation mechanism to avoid layer ap lookups.
|
||||
|
||||
use crate::page_cache::MaterializedPageHashKey;
|
||||
use crate::pgdatadir_mapping::{rel_block_to_key, BlockNumber};
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::Bytes;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
static PAGE_CACHE: OnceCell<Mutex<PageImageCache>> = OnceCell::new();
|
||||
const TEST_PAGE_CACHE_SIZE: usize = 50;
|
||||
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
|
||||
|
||||
enum PageImageState {
|
||||
Vacant, // entry is not used
|
||||
Loaded(bool), // page is loaded or has failed
|
||||
Loading(Option<Arc<Condvar>>), // page in process of loading, Condvar is created on demand when some thread need to wait load completion
|
||||
}
|
||||
|
||||
struct CacheEntry {
|
||||
key: MaterializedPageHashKey,
|
||||
|
||||
// next+prev are used for LRU L2-list and next is also used for L1 free pages list
|
||||
next: usize,
|
||||
prev: usize,
|
||||
|
||||
collision: usize, // L1 hash collision chain
|
||||
|
||||
access_count: u32,
|
||||
state: PageImageState,
|
||||
}
|
||||
|
||||
pub struct PageImageCache {
|
||||
free_list: usize, // L1 list of free entries
|
||||
pages: Vec<CacheEntry>,
|
||||
hash_table: Vec<usize>, // indexes in pages array
|
||||
file: Arc<VirtualFile>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Initialize the page cache. This must be called once at page server startup.
|
||||
///
|
||||
pub fn init(size: usize) {
|
||||
if PAGE_CACHE
|
||||
.set(Mutex::new(PageImageCache::new(size)))
|
||||
.is_err()
|
||||
{
|
||||
panic!("page cache already initialized");
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Get a handle to the page cache.
|
||||
///
|
||||
pub fn get() -> &'static Mutex<PageImageCache> {
|
||||
//
|
||||
// In unit tests, page server startup doesn't happen and no one calls
|
||||
// page_image_cache::init(). Initialize it here with a tiny cache, so that the
|
||||
// page cache is usable in unit tests.
|
||||
//
|
||||
if cfg!(test) {
|
||||
PAGE_CACHE.get_or_init(|| Mutex::new(PageImageCache::new(TEST_PAGE_CACHE_SIZE)))
|
||||
} else {
|
||||
PAGE_CACHE.get().expect("page cache not initialized")
|
||||
}
|
||||
}
|
||||
|
||||
fn hash<T: Hash>(t: &T) -> usize {
|
||||
let mut s = DefaultHasher::new();
|
||||
t.hash(&mut s);
|
||||
s.finish() as usize
|
||||
}
|
||||
|
||||
impl PageImageCache {
|
||||
fn new(size: usize) -> Self {
|
||||
let mut pages: Vec<CacheEntry> = Vec::with_capacity(size + 1);
|
||||
let hash_table = vec![0usize; size];
|
||||
let file = Arc::new(
|
||||
VirtualFile::open_with_options(
|
||||
&std::path::PathBuf::from("page.cache"),
|
||||
std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
// Dummy key
|
||||
let dummy_key = MaterializedPageHashKey {
|
||||
key: Key::MIN,
|
||||
tenant_id: TenantId::from([0u8; 16]),
|
||||
timeline_id: TimelineId::from([0u8; 16]),
|
||||
};
|
||||
|
||||
// LRU list head
|
||||
pages.push(CacheEntry {
|
||||
key: dummy_key.clone(),
|
||||
next: 0,
|
||||
prev: 0,
|
||||
access_count: 0,
|
||||
collision: 0,
|
||||
state: PageImageState::Vacant,
|
||||
});
|
||||
|
||||
// Construct L1 free page list
|
||||
for i in 0..size {
|
||||
pages.push(CacheEntry {
|
||||
key: dummy_key.clone(),
|
||||
next: i + 2, // build L1-list of free pages
|
||||
prev: 0,
|
||||
access_count: 0,
|
||||
collision: 0,
|
||||
state: PageImageState::Vacant,
|
||||
});
|
||||
}
|
||||
pages[size - 1].next = 0; // en of free page list
|
||||
|
||||
PageImageCache {
|
||||
free_list: 1,
|
||||
pages,
|
||||
hash_table,
|
||||
file,
|
||||
}
|
||||
}
|
||||
|
||||
// Unlink from L2-list
|
||||
fn unlink(&mut self, index: usize) {
|
||||
let next = self.pages[index].next;
|
||||
let prev = self.pages[index].prev;
|
||||
self.pages[next].prev = prev;
|
||||
self.pages[prev].next = next;
|
||||
}
|
||||
|
||||
// Link in L2-list after specified element
|
||||
fn link_after(&mut self, after: usize, index: usize) {
|
||||
let next = self.pages[after].next;
|
||||
self.pages[index].prev = after;
|
||||
self.pages[index].next = next;
|
||||
self.pages[next].prev = index;
|
||||
self.pages[after].next = index;
|
||||
}
|
||||
|
||||
fn prune(&mut self, index: usize) {
|
||||
self.pages[index].prev = index;
|
||||
self.pages[index].next = index;
|
||||
}
|
||||
|
||||
fn is_empty(&self, index: usize) -> bool {
|
||||
self.pages[index].next == index
|
||||
}
|
||||
}
|
||||
|
||||
// Remove entry from cache: o page invalidation or drop relation
|
||||
pub fn remove(key: Key, tenant_id: TenantId, timeline_id: TimelineId) {
|
||||
let key = MaterializedPageHashKey {
|
||||
key,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
let this = get();
|
||||
let mut cache = this.lock().unwrap();
|
||||
let h = hash(&key) % cache.hash_table.len();
|
||||
let mut index = cache.hash_table[h];
|
||||
let mut prev = 0usize;
|
||||
while index != 0 {
|
||||
if cache.pages[index].key == key {
|
||||
if !cache.is_empty(index) {
|
||||
cache.pages[index].state = PageImageState::Vacant;
|
||||
// Remove from LRU list
|
||||
cache.unlink(index);
|
||||
// Insert entry in free list
|
||||
cache.pages[index].next = cache.free_list;
|
||||
cache.free_list = index;
|
||||
} else {
|
||||
// Page is process of loading: we can not remove it righ now,
|
||||
// so just mark for deletion
|
||||
cache.pages[index].next = 0; // make is_empty == false
|
||||
}
|
||||
// Remove from hash table
|
||||
if prev == 0 {
|
||||
cache.hash_table[h] = cache.pages[index].collision;
|
||||
} else {
|
||||
cache.pages[prev].collision = cache.pages[index].collision;
|
||||
}
|
||||
break;
|
||||
}
|
||||
prev = index;
|
||||
index = cache.pages[index].collision;
|
||||
}
|
||||
// It's Ok if image not found
|
||||
}
|
||||
|
||||
// Find or load page image in the cache
|
||||
pub fn lookup(timeline: &Timeline, rel: RelTag, blkno: BlockNumber, lsn: Lsn) -> Result<Bytes> {
|
||||
let key = MaterializedPageHashKey {
|
||||
key: rel_block_to_key(rel, blkno),
|
||||
tenant_id: timeline.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
};
|
||||
let this = get();
|
||||
let mut cache = this.lock().unwrap();
|
||||
let h = hash(&key) % cache.hash_table.len();
|
||||
|
||||
'lookup: loop {
|
||||
let mut index = cache.hash_table[h];
|
||||
while index != 0 {
|
||||
if cache.pages[index].key == key {
|
||||
// cache hit
|
||||
match &cache.pages[index].state {
|
||||
PageImageState::Loaded(success) => {
|
||||
if *success {
|
||||
// Pin page
|
||||
if cache.pages[index].access_count == 0 {
|
||||
cache.unlink(index);
|
||||
}
|
||||
cache.pages[index].access_count += 1;
|
||||
let file = cache.file.clone();
|
||||
drop(cache);
|
||||
let mut buf = [0u8; PAGE_SZ];
|
||||
file.read_exact_at(&mut buf, index as u64 * PAGE_SZ as u64)?;
|
||||
cache = this.lock().unwrap();
|
||||
assert!(cache.pages[index].access_count > 0);
|
||||
cache.pages[index].access_count -= 1;
|
||||
if cache.pages[index].access_count == 0 {
|
||||
// Move to the head of LRU list
|
||||
cache.link_after(0, index);
|
||||
}
|
||||
return Ok(Bytes::from(buf.to_vec()));
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("page loading failed earlier"));
|
||||
}
|
||||
}
|
||||
PageImageState::Loading(event) => {
|
||||
// Create event on which to sleep if not yet assigned
|
||||
let cv = match event {
|
||||
None => {
|
||||
let cv = Arc::new(Condvar::new());
|
||||
cache.pages[index].state =
|
||||
PageImageState::Loading(Some(cv.clone()));
|
||||
cv
|
||||
}
|
||||
Some(cv) => cv.clone(),
|
||||
};
|
||||
cache = cv.wait(cache).unwrap();
|
||||
// Retry lookup
|
||||
continue 'lookup;
|
||||
}
|
||||
PageImageState::Vacant => bail!("Vacant entry is not expected here"),
|
||||
};
|
||||
}
|
||||
index = cache.pages[index].collision;
|
||||
}
|
||||
let file = cache.file.clone();
|
||||
// Cache miss
|
||||
index = cache.free_list;
|
||||
if index == 0 {
|
||||
// no free items
|
||||
let victim = cache.pages[0].prev; // take least recently used element from the tail of LRU list
|
||||
assert!(victim != 0);
|
||||
assert!(cache.pages[victim].access_count == 0);
|
||||
// Remove victim from hash table
|
||||
let h = hash(&cache.pages[victim].key) % cache.hash_table.len();
|
||||
index = cache.hash_table[h];
|
||||
let mut prev = 0usize;
|
||||
while index != victim {
|
||||
assert!(index != 0);
|
||||
prev = index;
|
||||
index = cache.pages[index].collision;
|
||||
}
|
||||
if prev == 0 {
|
||||
cache.hash_table[h] = cache.pages[victim].collision;
|
||||
} else {
|
||||
cache.pages[prev].collision = cache.pages[victim].collision;
|
||||
}
|
||||
// and from LRU list
|
||||
cache.unlink(victim);
|
||||
|
||||
index = victim;
|
||||
} else {
|
||||
// Use next free item
|
||||
cache.free_list = cache.pages[index].next;
|
||||
}
|
||||
// Make is_empty(index) == true. If entry is removed in process of loaded,
|
||||
// it will be updated so that !is_empty(index)
|
||||
cache.prune(index);
|
||||
|
||||
// Insert in hash table
|
||||
cache.pages[index].collision = cache.hash_table[h];
|
||||
cache.hash_table[h] = index;
|
||||
|
||||
cache.pages[index].key = key;
|
||||
cache.pages[index].state = PageImageState::Loading(None);
|
||||
drop(cache); //release lock
|
||||
|
||||
// Load page
|
||||
let result = timeline.get_rel_page_at_lsn(rel, blkno, lsn, true);
|
||||
let mut success = false;
|
||||
if let Ok(page) = &result {
|
||||
success = true;
|
||||
file.write_all_at(&page, index as u64 * PAGE_SZ as u64)?;
|
||||
}
|
||||
cache = this.lock().unwrap();
|
||||
if let PageImageState::Loading(event) = &cache.pages[index].state {
|
||||
// Are there some waiting threads?
|
||||
if let Some(cv) = event {
|
||||
// If so, then wakeup them
|
||||
cv.notify_all();
|
||||
}
|
||||
} else {
|
||||
bail!("Loading state is expected");
|
||||
}
|
||||
if cache.is_empty(index) {
|
||||
// entry was not marked as deleted {
|
||||
// Page is loaded
|
||||
|
||||
// match &res { ... } is same as `res.as_ref().ok().cloned()`
|
||||
cache.pages[index].state = PageImageState::Loaded(success);
|
||||
// Link the page to the head of LRU list
|
||||
cache.link_after(0, index);
|
||||
} else {
|
||||
cache.pages[index].state = PageImageState::Vacant;
|
||||
// Return page to free list
|
||||
cache.pages[index].next = cache.free_list;
|
||||
cache.free_list = index;
|
||||
}
|
||||
// only the first one gets the full error from `get_rel_page_at_lsn`
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@ use pageserver_api::models::{
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
|
||||
PagestreamNblocksRequest, PagestreamNblocksResponse,
|
||||
};
|
||||
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
@@ -40,6 +41,7 @@ use crate::basebackup;
|
||||
use crate::config::{PageServerConf, ProfilingConfig};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::page_image_cache;
|
||||
use crate::profiling::profpoint_start;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
@@ -580,8 +582,12 @@ impl PageServerHandler {
|
||||
// current profiling is based on a thread-local variable, so it doesn't work
|
||||
// across awaits
|
||||
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
|
||||
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
|
||||
|
||||
let page = if req.latest {
|
||||
page_image_cache::lookup(timeline, req.rel, req.blkno, lsn)
|
||||
} else {
|
||||
timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, false)
|
||||
}?;
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
}))
|
||||
|
||||
@@ -1179,7 +1179,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
Key {
|
||||
field1: 0x00,
|
||||
field2: rel.spcnode,
|
||||
|
||||
@@ -34,6 +34,7 @@ use crate::tenant::{
|
||||
use crate::config::{PageServerConf, METADATA_FILE_NAME};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::metrics::TimelineMetrics;
|
||||
use crate::page_image_cache;
|
||||
use crate::pgdatadir_mapping::BlockNumber;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
@@ -2315,6 +2316,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
page_image_cache::remove(key, self.tenant_id, self.timeline_id);
|
||||
self.tl.put_value(key, lsn, value)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user