Files
neon/pageserver/src/page_cache.rs
2021-04-30 12:44:06 +03:00

918 lines
31 KiB
Rust

//
// Page Cache holds all the different page versions and WAL records
//
// Currently, the page cache uses RocksDB to store WAL wal records and
// full page images, keyed by the RelFileNode, blocknumber, and the
// LSN.
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::Oid;
use crate::walredo::WalRedoManager;
use crate::ZTimelineId;
use crate::{zenith_repo_dir, PageServerConf};
use anyhow::{bail, Context};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use log::*;
use std::cmp::min;
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{convert::TryInto, ops::AddAssign};
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
pub struct PageCache {
// RocksDB handle
db: rocksdb::DB,
// WAL redo manager
walredo_mgr: WalRedoManager,
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
// we receive all the WAL up to the request. The SeqWait provides functions
// for that.
//
// last_record_lsn points to the end of last processed WAL record.
// It can lag behind last_valid_lsn, if the WAL receiver has received some WAL
// after the end of last record, but not the whole next record yet. In the
// page cache, we care about last_valid_lsn, but if the WAL receiver needs to
// restart the streaming, it needs to restart at the end of last record, so
// we track them separately. last_record_lsn should perhaps be in
// walreceiver.rs instead of here, but it seems convenient to keep all three
// values together.
//
first_valid_lsn: AtomicLsn,
last_valid_lsn: SeqWait<Lsn>,
last_record_lsn: AtomicLsn,
// Counters, for metrics collection.
pub num_entries: AtomicU64,
pub num_page_images: AtomicU64,
pub num_wal_records: AtomicU64,
pub num_getpage_requests: AtomicU64,
}
#[derive(Clone)]
pub struct PageCacheStats {
pub num_entries: u64,
pub num_page_images: u64,
pub num_wal_records: u64,
pub num_getpage_requests: u64,
}
impl AddAssign for PageCacheStats {
fn add_assign(&mut self, other: Self) {
self.num_entries += other.num_entries;
self.num_page_images += other.num_page_images;
self.num_wal_records += other.num_wal_records;
self.num_getpage_requests += other.num_getpage_requests;
}
}
lazy_static! {
pub static ref PAGECACHES: Mutex<HashMap<ZTimelineId, Arc<PageCache>>> =
Mutex::new(HashMap::new());
}
// Get Page Cache for given timeline. It is assumed to already exist.
pub fn get_pagecache(_conf: &PageServerConf, timelineid: ZTimelineId) -> Option<Arc<PageCache>> {
let pcaches = PAGECACHES.lock().unwrap();
match pcaches.get(&timelineid) {
Some(pcache) => Some(pcache.clone()),
None => None,
}
}
pub fn get_or_restore_pagecache(
conf: &PageServerConf,
timelineid: ZTimelineId,
) -> anyhow::Result<Arc<PageCache>> {
let mut pcaches = PAGECACHES.lock().unwrap();
match pcaches.get(&timelineid) {
Some(pcache) => Ok(pcache.clone()),
None => {
let pcache = init_page_cache(conf, timelineid);
restore_timeline(conf, &pcache, timelineid)?;
let result = Arc::new(pcache);
pcaches.insert(timelineid, result.clone());
if conf.gc_horizon != 0 {
let conf_copy = conf.clone();
let _gc_thread = thread::Builder::new()
.name("Garbage collection thread".into())
.spawn(move || {
gc_thread_main(&conf_copy, timelineid);
})
.unwrap();
}
Ok(result)
}
}
}
fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("Garbage collection thread started {}", timelineid);
let pcache = get_pagecache(conf, timelineid).unwrap();
pcache.do_gc(conf).unwrap();
}
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
let path = zenith_repo_dir().join(timelineid.to_string());
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| {
if (val[0] & UNUSED_VERSION_FLAG) != 0 {
rocksdb::compaction_filter::Decision::Remove
} else {
rocksdb::compaction_filter::Decision::Keep
}
});
rocksdb::DB::open(&opts, &path).unwrap()
}
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
PageCache {
db: open_rocksdb(&conf, timelineid),
walredo_mgr: WalRedoManager::new(conf, timelineid),
first_valid_lsn: AtomicLsn::new(0),
last_valid_lsn: SeqWait::new(Lsn(0)),
last_record_lsn: AtomicLsn::new(0),
num_entries: AtomicU64::new(0),
num_page_images: AtomicU64::new(0),
num_wal_records: AtomicU64::new(0),
num_getpage_requests: AtomicU64::new(0),
}
}
//
// We store two kinds of entries in the page cache:
//
// 1. Ready-made images of the block
// 2. WAL records, to be applied on top of the "previous" entry
//
// Some WAL records will initialize the page from scratch. For such records,
// the 'will_init' flag is set. They don't need the previous page image before
// applying. The 'will_init' flag is set for records containing a full-page image,
// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: Lsn,
}
impl CacheKey {
pub fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn.0);
}
pub fn unpack(buf: &mut BytesMut) -> CacheKey {
CacheKey {
tag: BufferTag::unpack(buf),
lsn: Lsn::from(buf.get_u64()),
}
}
}
pub struct CacheEntryContent {
pub page_image: Option<Bytes>,
pub wal_record: Option<WALRecord>,
}
const PAGE_IMAGE_FLAG: u8 = 1u8;
const UNUSED_VERSION_FLAG: u8 = 2u8;
impl CacheEntryContent {
pub fn pack(&self, buf: &mut BytesMut) {
if let Some(image) = &self.page_image {
buf.put_u8(PAGE_IMAGE_FLAG);
buf.put_u16(image.len() as u16);
buf.put_slice(&image[..]);
} else if let Some(rec) = &self.wal_record {
buf.put_u8(0);
rec.pack(buf);
}
}
pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent {
if (buf.get_u8() & PAGE_IMAGE_FLAG) != 0 {
let mut dst = vec![0u8; buf.get_u16() as usize];
buf.copy_to_slice(&mut dst);
CacheEntryContent {
page_image: Some(Bytes::from(dst)),
wal_record: None,
}
} else {
CacheEntryContent {
page_image: None,
wal_record: Some(WALRecord::unpack(buf)),
}
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
pub struct RelTag {
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u8,
}
impl RelTag {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres
}
pub fn unpack(buf: &mut BytesMut) -> RelTag {
RelTag {
spcnode: buf.get_u32(),
dbnode: buf.get_u32(),
relnode: buf.get_u32(),
forknum: buf.get_u32() as u8,
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub struct BufferTag {
pub rel: RelTag,
pub blknum: u32,
}
impl BufferTag {
pub fn pack(&self, buf: &mut BytesMut) {
self.rel.pack(buf);
buf.put_u32(self.blknum);
}
pub fn unpack(buf: &mut BytesMut) -> BufferTag {
BufferTag {
rel: RelTag::unpack(buf),
blknum: buf.get_u32(),
}
}
}
#[derive(Debug, Clone)]
pub struct WALRecord {
pub lsn: Lsn, // LSN at the *end* of the record
pub will_init: bool,
pub truncate: bool,
pub rec: Bytes,
// Remember the offset of main_data in rec,
// so that we don't have to parse the record again.
// If record has no main_data, this offset equals rec.len().
pub main_data_offset: u32,
}
impl WALRecord {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u64(self.lsn.0);
buf.put_u8(self.will_init as u8);
buf.put_u8(self.truncate as u8);
buf.put_u32(self.main_data_offset);
buf.put_u32(self.rec.len() as u32);
buf.put_slice(&self.rec[..]);
}
pub fn unpack(buf: &mut BytesMut) -> WALRecord {
let lsn = Lsn::from(buf.get_u64());
let will_init = buf.get_u8() != 0;
let truncate = buf.get_u8() != 0;
let main_data_offset = buf.get_u32();
let mut dst = vec![0u8; buf.get_u32() as usize];
buf.copy_to_slice(&mut dst);
WALRecord {
lsn,
will_init,
truncate,
rec: Bytes::from(dst),
main_data_offset,
}
}
}
impl PageCache {
// Public GET interface functions
///
/// GetPage@LSN
///
/// Returns an 8k page image
///
pub fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let lsn = self.wait_lsn(req_lsn)?;
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let key = CacheKey { tag, lsn };
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
if key.tag == tag {
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let page_img: Bytes;
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
// Request the WAL redo manager to apply the WAL records for us.
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
self.put_page_image(tag, lsn, page_img.clone());
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi =
u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo =
u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
debug!(
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
tag.rel.spcnode,
tag.rel.dbnode,
tag.rel.relnode,
tag.rel.forknum,
tag.blknum
);
return Ok(page_img);
}
}
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn);
Ok(Bytes::from_static(&ZERO_PAGE))
/* return Err("could not find page image")?; */
}
///
/// Get size of relation at given LSN.
///
pub fn relsize_get(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result<u32> {
self.wait_lsn(lsn)?;
self.relsize_get_nowait(rel, lsn)
}
///
/// Does relation exist at given LSN?
///
pub fn relsize_exist(&self, rel: &RelTag, req_lsn: Lsn) -> anyhow::Result<bool> {
let lsn = self.wait_lsn(req_lsn)?;
let key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
debug!("Relation {:?} exists at {}", rel, lsn);
return Ok(true);
}
}
debug!("Relation {:?} doesn't exist at {}", rel, lsn);
Ok(false)
}
// Other public functions, for updating the page cache.
// These are used by the WAL receiver and WAL redo.
///
/// Collect all the WAL records that are needed to reconstruct a page
/// image for the given cache entry.
///
/// Returns an old page image (if any), and a vector of WAL records to apply
/// over it.
///
pub fn collect_records_for_apply(
&self,
tag: BufferTag,
lsn: Lsn,
) -> (Option<Bytes>, Vec<WALRecord>) {
let mut buf = BytesMut::new();
let key = CacheKey { tag, lsn };
key.pack(&mut buf);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
while iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
if key.tag != tag {
break;
}
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(img) = &content.page_image {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
} else if let Some(rec) = &content.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
}
} else {
panic!("no base image and no WAL record on cache entry");
}
iter.prev();
}
records.reverse();
(base_img, records)
}
///
/// Adds a WAL record to the page cache
///
pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
};
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//trace!("put_wal_record lsn: {}", lsn);
self.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
///
/// Adds a relation-wide WAL record (like truncate) to the page cache,
/// associating it with all pages started with specified block number
///
pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> {
let mut key = CacheKey { tag, lsn: rec.lsn };
// What was the size of the relation before this record?
let last_lsn = self.last_valid_lsn.load();
let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?;
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
};
// set new relation size
trace!("Truncate relation {:?}", tag);
let mut key_buf = BytesMut::new();
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
for blknum in tag.blknum..old_rel_size {
key_buf.clear();
key.tag.blknum = blknum;
key.pack(&mut key_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
}
let n = (old_rel_size - tag.blknum) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
Ok(())
}
///
/// Memorize a full image of a page version
///
pub fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: Some(img),
wal_record: None,
};
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
pub fn create_database(
&self,
lsn: Lsn,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> anyhow::Result<()> {
let mut buf = BytesMut::new();
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: 0u8,
},
blknum: 0,
},
lsn: Lsn(0),
};
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]);
let mut n = 0;
while iter.valid() {
let k = iter.key().unwrap();
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let mut key = CacheKey::unpack(&mut buf);
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
buf.clear();
key.pack(&mut buf);
self.db.put(&buf[..], v)?;
n += 1;
iter.next();
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
Ok(())
}
/// Remember that WAL has been received and added to the page cache up to the given LSN
pub fn advance_last_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last valid LSN backwards (was {}, new {})",
old, lsn
);
}
}
///
/// Remember the (end of) last valid WAL record remembered in the page cache.
///
/// NOTE: this updates last_valid_lsn as well.
///
pub fn advance_last_record_lsn(&self, lsn: Lsn) {
// Can't move backwards.
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old <= lsn);
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last record LSN backwards (was {}, new {})",
old, lsn
);
}
}
///
/// Remember the beginning of valid WAL.
///
/// TODO: This should be called by garbage collection, so that if an older
/// page is requested, we will return an error to the requestor.
pub fn _advance_first_valid_lsn(&self, lsn: Lsn) {
// Can't overtake last_valid_lsn (except when we're
// initializing the system and last_valid_lsn hasn't been set yet.
let last_valid_lsn = self.last_valid_lsn.load();
assert!(last_valid_lsn == Lsn(0) || lsn < last_valid_lsn);
let old = self.first_valid_lsn.fetch_max(lsn);
// Can't move backwards.
assert!(lsn >= old);
}
pub fn init_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);
assert!(old == Lsn(0));
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old == Lsn(0));
let old = self.first_valid_lsn.fetch_max(lsn);
assert!(old == Lsn(0));
}
pub fn get_last_valid_lsn(&self) -> Lsn {
self.last_valid_lsn.load()
}
//
// Get statistics to be displayed in the user interface.
//
pub fn get_stats(&self) -> PageCacheStats {
PageCacheStats {
num_entries: self.num_entries.load(Ordering::Relaxed),
num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
}
}
// Internal functions
//
// Internal function to get relation size at given LSN.
//
// The caller must ensure that WAL has been received up to 'lsn'.
//
fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result<u32> {
assert!(lsn <= self.last_valid_lsn.load());
let mut key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
let mut iter = self.db.raw_iterator();
loop {
buf.clear();
key.pack(&mut buf);
iter.seek_for_prev(&buf[..]);
if iter.valid() {
let k = iter.key().unwrap();
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(rec) = &content.wal_record {
if rec.truncate {
if tag.blknum > 0 {
key.tag.blknum = tag.blknum - 1;
continue;
}
break;
}
}
let relsize = tag.blknum + 1;
debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
return Ok(relsize);
}
}
break;
}
debug!("Size of relation {:?} at {} is zero", rel, lsn);
Ok(0)
}
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut buf = BytesMut::new();
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
let mut maxkey = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
dbnode: u32::MAX,
relnode: u32::MAX,
forknum: u8::MAX,
},
blknum: u32::MAX,
},
lsn: Lsn::MAX,
};
let now = Instant::now();
let mut reconstructed = 0u64;
let mut truncated = 0u64;
let mut inspected = 0u64;
let mut deleted = 0u64;
loop {
buf.clear();
maxkey.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
if iter.valid() {
let k = iter.key().unwrap();
let v = iter.value().unwrap();
inspected += 1;
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = Lsn(0); // first version
// reconstruct most recent page version
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
trace!("Reconstruct most recent page {:?}", key);
// force reconstruction of most recent page version
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
reconstructed += 1;
}
buf.clear();
maxkey.pack(&mut buf);
iter.seek_for_prev(&buf[..]);
if iter.valid() {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
if key.tag == maxkey.tag {
let v = iter.value().unwrap();
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
trace!("Reconstruct horizon page {:?}", key);
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
truncated += 1;
}
}
}
// remove records prior to horizon
loop {
iter.prev();
if !iter.valid() {
break;
}
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
if key.tag != maxkey.tag {
break;
}
let v = iter.value().unwrap();
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(k, &v[..])?;
deleted += 1;
} else {
break;
}
}
}
maxkey = minkey;
} else {
break;
}
}
info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted",
now.elapsed(), inspected, reconstructed, truncated, deleted);
}
}
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
Ok(lsn)
}
}
//
// Get statistics to be displayed in the user interface.
//
// This combines the stats from all PageCache instances
//
pub fn get_stats() -> PageCacheStats {
let pcaches = PAGECACHES.lock().unwrap();
let mut stats = PageCacheStats {
num_entries: 0,
num_page_images: 0,
num_wal_records: 0,
num_getpage_requests: 0,
};
pcaches.iter().for_each(|(_sys_id, pcache)| {
stats += pcache.get_stats();
});
stats
}