Do not delete versions in GC

This commit is contained in:
Konstantin Knizhnik
2021-04-24 22:32:22 +03:00
parent 499b4f7eba
commit 3e007b0eb9
3 changed files with 162 additions and 139 deletions

View File

@@ -19,8 +19,10 @@ use slog::Drain;
use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf};
const DEFAULT_GC_HORIZON: u64 = 1024 * 1024 * 1024;
const DEFAULT_GC_PERIOD_SEC: u64 = 600;
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD_SEC: u64 = 10;
//const DEFAULT_GC_HORIZON: u64 = 1024 * 1024 * 1024;
//const DEFAULT_GC_PERIOD_SEC: u64 = 600;
fn main() -> Result<()> {
let arg_matches = App::new("Zenith page server")

View File

@@ -171,8 +171,14 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB
opts.create_if_missing(true);
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.create_missing_column_families(true);
rocksdb::DB::open_cf(&opts, &path, &[rocksdb::DEFAULT_COLUMN_FAMILY_NAME]).unwrap()
opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| {
if (val[0] & UNUSED_VERSION_FLAG) != 0 {
rocksdb::compaction_filter::Decision::Remove
} else {
rocksdb::compaction_filter::Decision::Keep
}
});
rocksdb::DB::open(&opts, &path).unwrap()
}
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
@@ -247,16 +253,20 @@ pub struct CacheEntry {
pub walredo_condvar: Condvar,
}
#[derive(Debug, Clone)]
pub struct CacheEntryContent {
pub page_image: Option<Bytes>,
pub wal_record: Option<WALRecord>,
pub apply_pending: bool,
}
const PAGE_IMAGE_FLAG: u8 = 1u8;
const UNUSED_VERSION_FLAG: u8 = 2u8;
impl CacheEntryContent {
pub fn pack(&self, buf: &mut BytesMut) {
if let Some(image) = &self.page_image {
buf.put_u8(1);
buf.put_u8(PAGE_IMAGE_FLAG);
buf.put_u16(image.len() as u16);
buf.put_slice(&image[..]);
} else if let Some(rec) = &self.wal_record {
@@ -265,7 +275,7 @@ impl CacheEntryContent {
}
}
pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent {
if buf.get_u8() == 1 {
if (buf.get_u8() & PAGE_IMAGE_FLAG) != 0 {
let mut dst = vec![0u8; buf.get_u16() as usize];
buf.copy_to_slice(&mut dst);
CacheEntryContent {
@@ -337,7 +347,7 @@ impl BufferTag {
}
}
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct WALRecord {
pub lsn: u64, // LSN at the *end* of the record
pub will_init: bool,
@@ -379,12 +389,7 @@ impl WALRecord {
impl PageCache {
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut minbuf = BytesMut::new();
let mut maxbuf = BytesMut::new();
let cf = self
.db
.cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
.unwrap();
let mut buf = BytesMut::new();
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
@@ -405,20 +410,26 @@ impl PageCache {
let now = Instant::now();
let mut reconstructed = 0u64;
let mut truncated = 0u64;
let mut inspected = 0u64;
let mut deleted = 0u64;
loop {
maxbuf.clear();
maxkey.pack(&mut maxbuf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&maxbuf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
buf.clear();
maxkey.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
if iter.valid() {
let k = iter.key().unwrap();
let v = iter.value().unwrap();
inspected += 1;
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
@@ -426,7 +437,7 @@ impl PageCache {
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = 0;
minkey.lsn = 0; // first version
// reconstruct most recent page version
if content.wal_record.is_some() {
@@ -434,43 +445,67 @@ impl PageCache {
// force reconstruction of most recent page version
self.reconstruct_page(key, content)?;
reconstructed += 1;
} else {
assert!(content.page_image.is_some());
}
maxbuf.clear();
maxkey.pack(&mut maxbuf);
buf.clear();
maxkey.pack(&mut buf);
if last_lsn > horizon {
// locate most recent record before horizon
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&maxbuf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
if content.wal_record.is_some() {
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
trace!("Reconstruct horizon page {:?}", key);
self.reconstruct_page(key, content)?;
truncated += 1;
iter.seek_for_prev(&buf[..]);
if iter.valid() {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
if key.tag == maxkey.tag {
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if content.wal_record.is_some() {
trace!("Reconstruct horizon page {:?}", key);
self.reconstruct_page(key, content)?;
truncated += 1;
} else {
assert!(content.page_image.is_some());
}
}
}
// remove records prior to horizon
loop {
iter.prev();
if !iter.valid() {
break;
}
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
if key.tag != maxkey.tag {
break;
}
let v = iter.value().unwrap();
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(k, &v[..])?;
deleted += 1;
} else {
break;
}
}
}
// remove records prior to horizon
minbuf.clear();
minkey.pack(&mut minbuf);
trace!("Delete records in range {:?}..{:?}", minkey, maxkey);
self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?;
maxkey = minkey;
} else {
break;
}
}
info!("Garbage collection completed in {:?}: {} pages reconstructed, {} version histories truncated", now.elapsed(), reconstructed, truncated);
info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted",
now.elapsed(), inspected, reconstructed, truncated, deleted);
}
}
}
@@ -492,8 +527,8 @@ impl PageCache {
let page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
bail!("could not apply WAL to reconstruct page image");
error!("could not apply WAL to reconstruct page {:?} image", &key);
bail!("could not apply WAL to reconstruct page {:?} image", &key);
}
};
self.put_page_image(key.tag, key.lsn, page_img.clone());
@@ -538,61 +573,54 @@ impl PageCache {
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag, lsn: 0 };
let maxkey = CacheKey { tag, lsn };
let key = CacheKey { tag, lsn };
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
buf.clear();
maxkey.pack(&mut buf);
let mut iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let entry_opt = iter.next();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn);
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (k, v) = entry_opt.unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let page_img: Bytes;
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
page_img = self.reconstruct_page(key, content)?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
if key.tag == tag {
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let page_img: Bytes;
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
page_img = self.reconstruct_page(key, content)?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi =
u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo =
u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
debug!(
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
tag.rel.spcnode,
tag.rel.dbnode,
tag.rel.relnode,
tag.rel.forknum,
tag.blknum
);
return Ok(page_img);
}
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
debug!(
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
tag.rel.spcnode,
tag.rel.dbnode,
tag.rel.relnode,
tag.rel.forknum,
tag.blknum
);
Ok(page_img)
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn);
Ok(Bytes::from_static(&ZERO_PAGE))
/* return Err("could not find page image")?; */
}
//
@@ -603,33 +631,26 @@ impl PageCache {
// over it.
//
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>) {
let minkey = CacheKey {
tag: BufferTag {
rel: entry.key.tag.rel,
blknum: 0,
},
lsn: 0,
};
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
buf.clear();
entry.key.pack(&mut buf);
let iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
for (_k, v) in iter {
while iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
if key.tag != entry.key.tag {
break;
}
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
@@ -640,7 +661,6 @@ impl PageCache {
break;
} else if let Some(rec) = &content.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
@@ -648,6 +668,7 @@ impl PageCache {
} else {
panic!("no base image and no WAL record on cache entry");
}
iter.prev();
}
records.reverse();
@@ -825,15 +846,15 @@ impl PageCache {
lsn,
};
let mut buf = BytesMut::new();
let mut iter = self.db.raw_iterator();
loop {
buf.clear();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
iter.seek_for_prev(&buf[..]);
if iter.valid() {
let k = iter.key().unwrap();
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
@@ -873,11 +894,10 @@ impl PageCache {
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, _v)) = iter.next() {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
@@ -924,12 +944,12 @@ impl PageCache {
lsn: 0,
};
key.pack(&mut buf);
let iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Forward,
));
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]);
let mut n = 0;
for (k, v) in iter {
while iter.valid() {
let k = iter.key().unwrap();
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let mut key = CacheKey::unpack(&mut buf);
@@ -944,6 +964,7 @@ impl PageCache {
self.db.put(&buf[..], v)?;
n += 1;
iter.next();
}
info!(
"Create database {}/{}, copy {} entries",