Implement GC for new object_store API (#229)

* Implement GC for new object_store API

* Add comments for GC

* Revert postgres module version reference
This commit is contained in:
Konstantin Knizhnik
2021-06-04 20:11:56 +03:00
committed by GitHub
parent 445e88f50b
commit 063429aade
4 changed files with 143 additions and 147 deletions

View File

@@ -23,12 +23,13 @@ use bytes::Bytes;
use log::*;
use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
@@ -622,141 +623,70 @@ impl ObjectTimeline {
fn do_gc(&self, conf: &'static PageServerConf) -> Result<()> {
loop {
thread::sleep(conf.gc_period);
// FIXME: broken
/*
let last_lsn = self.get_last_valid_lsn();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
let mut maxkey = StorageKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
dbnode: u32::MAX,
relnode: u32::MAX,
forknum: u8::MAX,
},
blknum: u32::MAX,
},
lsn: Lsn::MAX,
};
// WAL is large enough to perform GC
let now = Instant::now();
let mut reconstructed = 0u64;
let mut truncated = 0u64;
let mut inspected = 0u64;
let mut deleted = 0u64;
loop {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(maxkey.to_bytes());
if iter.valid() {
let key = StorageKey::des(iter.key().unwrap());
let val = StorageValue::des(iter.value().unwrap());
inspected += 1;
// Iterate through all relations
for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? {
let mut last_version = true;
let mut key = relation_size_key(self.timelineid, *rels);
let mut max_size = 0u32;
let mut relation_dropped = false;
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = Lsn(0); // first version
// reconstruct most recent page version
if let StorageValueContent::Image(_) = val.content {
// force reconstruction of most recent page version
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
trace!(
"Reconstruct most recent page {} blk {} at {} from {} records",
key.tag.rel,
key.tag.blknum,
key.lsn,
records.len()
);
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
reconstructed += 1;
}
iter.seek_for_prev(maxkey.to_bytes());
if iter.valid() {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let key = StorageKey::des(iter.key().unwrap());
if key.tag == maxkey.tag {
let val = StorageValue::des(iter.value().unwrap());
if let StorageValueContent::Image(_) = val.content {
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
trace!("Reconstruct horizon page {} blk {} at {} from {} records",
key.tag.rel, key.tag.blknum, key.lsn, records.len());
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
truncated += 1;
} else {
trace!(
"Keeping horizon page {} blk {} at {}",
key.tag.rel,
key.tag.blknum,
key.lsn
);
}
}
} else {
trace!(
"Last page {} blk {} at {}, horizon {}",
key.tag.rel,
key.tag.blknum,
key.lsn,
horizon
);
}
// remove records prior to horizon
loop {
iter.prev();
if !iter.valid() {
break;
}
let key = StorageKey::des(iter.key().unwrap());
if key.tag != maxkey.tag {
break;
}
let mut val = StorageValue::des(iter.value().unwrap());
if val.alive {
val.alive = false;
self.storage.put(key, val)?;
deleted += 1;
trace!(
"deleted: {} blk {} at {}",
key.tag.rel,
key.tag.blknum,
key.lsn
);
} else {
break;
// Process relation metadata versions
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
let rel_meta = RelationSizeEntry::des(&vers.1)?;
// If relation is dropped at the horizon,
// we can remove all its versions including last (Unlink)
match rel_meta {
RelationSizeEntry::Size(size) => max_size = max(max_size, size),
RelationSizeEntry::Unlink => {
if last_version {
relation_dropped = true;
info!("Relation {:?} dropped", rels);
}
}
}
maxkey = minkey;
} else {
break;
if last_version {
last_version = false;
if !relation_dropped {
// preserve last version
continue;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
// Now process all relation blocks
for blknum in 0..max_size {
key.buf_tag.blknum = blknum;
last_version = true;
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
last_version = false;
truncated += 1;
if !relation_dropped {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(key.buf_tag, lsn)?;
continue;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted",
now.elapsed(), inspected, reconstructed, truncated, deleted);
info!("Garbage collection completed in {:?}: {} version histories truncated, {} versions deleted",
now.elapsed(), truncated, deleted);
}
*/
}
}

View File

@@ -61,6 +61,7 @@ pub trait ObjectStore: Send + Sync {
) -> Result<Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>>;
/// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'.
/// Both dbnode and spcnode can be InvalidId (0) which means get all relations in tablespace/cluster
///
/// This is used to implement 'create database'
fn list_rels(
@@ -70,4 +71,7 @@ pub trait ObjectStore: Send + Sync {
dbnode: u32,
lsn: Lsn,
) -> Result<HashSet<RelTag>>;
/// Unlink object (used by GC). This mehod may actually delete object or just mark it for deletion.
fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()>;
}

View File

@@ -8,6 +8,7 @@ use crate::ZTimelineId;
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
@@ -30,11 +31,53 @@ impl StorageKey {
}
}
///
/// RocksDB very inefficiently delete random record. Instead of it we have to use merge
/// filter, which allows to throw away records at LSM merge phase.
/// Unfortunately, it is hard (if ever possible) to determine whether version can be removed
/// at merge time. Version ca be removed if:
/// 1. It is above PITR horizon (we need to get current LSN and gc_horizon from config)
/// 2. Page is reconstructed at horizon (all WAL records above horizon are applied and can be removed)
///
/// So we have GC process which reconstructs pages at horizon and mark deteriorated WAL record
/// for deletion. To mark object for deletion we can either set some flag in object itself.
/// But it is complicated with new object value format, because RocksDB storage knows nothing about
/// this format. Also updating whole record just to set one bit seems to be inefficient in any case.
/// This is why we keep keys of marked for deletion versions in HashSet in memory.
/// When LSM merge filter found key in this map, it removes it from the set preventing memory overflow.
///
struct GarbageCollector {
garbage: Mutex<HashSet<Vec<u8>>>,
}
impl GarbageCollector {
fn new() -> GarbageCollector {
GarbageCollector {
garbage: Mutex::new(HashSet::new()),
}
}
/// Called by GC to mark version as delete
fn mark_for_deletion(&self, key: &[u8]) {
let mut garbage = self.garbage.lock().unwrap();
garbage.insert(key.to_vec());
}
/// Called by LSM merge filter. If it finds key in the set, then
/// it doesn't merge it and removes from this set.
fn was_deleted(&self, key: &[u8]) -> bool {
let key = key.to_vec();
let mut garbage = self.garbage.lock().unwrap();
return garbage.remove(&key);
}
}
pub struct RocksObjectStore {
_conf: &'static PageServerConf,
// RocksDB handle
db: rocksdb::DB,
gc: Arc<GarbageCollector>,
}
impl ObjectStore for RocksObjectStore {
@@ -61,6 +104,14 @@ impl ObjectStore for RocksObjectStore {
Ok(())
}
fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()> {
self.gc.mark_for_deletion(&StorageKey::ser(&StorageKey {
obj_key: key.clone(),
lsn,
})?);
Ok(())
}
/// Iterate through page versions of given page, starting from the given LSN.
/// The versions are walked in descending LSN order.
fn object_versions<'a>(
@@ -89,7 +140,7 @@ impl ObjectStore for RocksObjectStore {
let mut rels: HashSet<RelTag> = HashSet::new();
let searchkey = StorageKey {
let mut search_key = StorageKey {
obj_key: ObjectKey {
timeline: timelineid,
buf_tag: BufferTag {
@@ -105,19 +156,26 @@ impl ObjectStore for RocksObjectStore {
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(searchkey.ser()?);
while iter.valid() {
loop {
iter.seek(search_key.ser()?);
if !iter.valid() {
break;
}
let key = StorageKey::des(iter.key().unwrap())?;
if key.obj_key.buf_tag.rel.spcnode != spcnode
|| key.obj_key.buf_tag.rel.dbnode != dbnode
if (spcnode != 0 && key.obj_key.buf_tag.rel.spcnode != spcnode)
|| (dbnode != 0 && key.obj_key.buf_tag.rel.dbnode != dbnode)
{
break;
}
if key.lsn < lsn {
if key.obj_key.buf_tag.rel.relnode != 0 // skip non-relational records (like timeline metadata)
&& key.lsn < lsn
// visible in this snapshot
{
rels.insert(key.obj_key.buf_tag.rel);
}
iter.next();
search_key = key.clone();
search_key.obj_key.buf_tag.rel.relnode += 1; // skip to next relation
}
Ok(rels)
@@ -150,11 +208,9 @@ impl ObjectStore for RocksObjectStore {
impl RocksObjectStore {
/// Open a RocksDB database.
pub fn open(conf: &'static PageServerConf) -> Result<RocksObjectStore> {
let path = conf.workdir.join("rocksdb-storage");
let db = rocksdb::DB::open(&Self::get_rocksdb_opts(), path)?;
let storage = RocksObjectStore { _conf: conf, db };
Ok(storage)
let opts = Self::get_rocksdb_opts();
let obj_store = Self::new(conf, opts)?;
Ok(obj_store)
}
/// Create a new, empty RocksDB database.
@@ -165,9 +221,27 @@ impl RocksObjectStore {
let mut opts = Self::get_rocksdb_opts();
opts.create_if_missing(true);
opts.set_error_if_exists(true);
let db = rocksdb::DB::open(&opts, &path)?;
let obj_store = Self::new(conf, opts)?;
Ok(obj_store)
}
let obj_store = RocksObjectStore { _conf: conf, db };
fn new(conf: &'static PageServerConf, mut opts: rocksdb::Options) -> Result<RocksObjectStore> {
let path = conf.workdir.join("rocksdb-storage");
let gc = Arc::new(GarbageCollector::new());
let gc_ref = gc.clone();
opts.set_compaction_filter("ttl", move |_level: u32, key: &[u8], _val: &[u8]| {
if gc_ref.was_deleted(key) {
rocksdb::compaction_filter::Decision::Remove
} else {
rocksdb::compaction_filter::Decision::Keep
}
});
let db = rocksdb::DB::open(&opts, &path)?;
let obj_store = RocksObjectStore {
_conf: conf,
db,
gc,
};
Ok(obj_store)
}
@@ -176,17 +250,6 @@ impl RocksObjectStore {
let mut opts = rocksdb::Options::default();
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
// FIXME
/*
opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| {
if (val[0] & UNUSED_VERSION_FLAG) != 0 {
rocksdb::compaction_filter::Decision::Remove
} else {
rocksdb::compaction_filter::Decision::Keep
}
});
*/
opts
}
}

View File

@@ -532,7 +532,6 @@ impl XlMultiXactTruncate {
}
}
/// Main routine to decode a WAL record and figure out which blocks are modified
//
// See xlogrecord.h for details