From 063429aade12d768e0a496585fdee05e524c0c68 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 4 Jun 2021 20:11:56 +0300 Subject: [PATCH] Implement GC for new object_store API (#229) * Implement GC for new object_store API * Add comments for GC * Revert postgres module version reference --- pageserver/src/object_repository.rs | 172 +++++++++------------------- pageserver/src/object_store.rs | 4 + pageserver/src/rocksdb_storage.rs | 113 ++++++++++++++---- pageserver/src/waldecoder.rs | 1 - 4 files changed, 143 insertions(+), 147 deletions(-) diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 7d64904d85..d74817cd11 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -23,12 +23,13 @@ use bytes::Bytes; use log::*; use postgres_ffi::pg_constants; use serde::{Deserialize, Serialize}; +use std::cmp::max; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; @@ -622,141 +623,70 @@ impl ObjectTimeline { fn do_gc(&self, conf: &'static PageServerConf) -> Result<()> { loop { thread::sleep(conf.gc_period); - - // FIXME: broken - /* let last_lsn = self.get_last_valid_lsn(); // checked_sub() returns None on overflow. if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) { - let mut maxkey = StorageKey { - tag: BufferTag { - rel: RelTag { - spcnode: u32::MAX, - dbnode: u32::MAX, - relnode: u32::MAX, - forknum: u8::MAX, - }, - blknum: u32::MAX, - }, - lsn: Lsn::MAX, - }; + // WAL is large enough to perform GC let now = Instant::now(); - let mut reconstructed = 0u64; let mut truncated = 0u64; - let mut inspected = 0u64; let mut deleted = 0u64; - loop { - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(maxkey.to_bytes()); - if iter.valid() { - let key = StorageKey::des(iter.key().unwrap()); - let val = StorageValue::des(iter.value().unwrap()); - inspected += 1; + // Iterate through all relations + for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? { + let mut last_version = true; + let mut key = relation_size_key(self.timelineid, *rels); + let mut max_size = 0u32; + let mut relation_dropped = false; - // Construct boundaries for old records cleanup - maxkey.tag = key.tag; - let last_lsn = key.lsn; - maxkey.lsn = min(horizon, last_lsn); // do not remove last version - - let mut minkey = maxkey.clone(); - minkey.lsn = Lsn(0); // first version - - // reconstruct most recent page version - if let StorageValueContent::Image(_) = val.content { - // force reconstruction of most recent page version - let (base_img, records) = - self.collect_records_for_apply(key.tag, key.lsn); - - trace!( - "Reconstruct most recent page {} blk {} at {} from {} records", - key.tag.rel, - key.tag.blknum, - key.lsn, - records.len() - ); - - let new_img = self - .walredo_mgr - .request_redo(key.tag, key.lsn, base_img, records)?; - self.put_page_image(key.tag, key.lsn, new_img.clone()); - - reconstructed += 1; - } - - iter.seek_for_prev(maxkey.to_bytes()); - if iter.valid() { - // do not remove last version - if last_lsn > horizon { - // locate most recent record before horizon - let key = StorageKey::des(iter.key().unwrap()); - if key.tag == maxkey.tag { - let val = StorageValue::des(iter.value().unwrap()); - if let StorageValueContent::Image(_) = val.content { - let (base_img, records) = - self.collect_records_for_apply(key.tag, key.lsn); - trace!("Reconstruct horizon page {} blk {} at {} from {} records", - key.tag.rel, key.tag.blknum, key.lsn, records.len()); - let new_img = self - .walredo_mgr - .request_redo(key.tag, key.lsn, base_img, records)?; - self.put_page_image(key.tag, key.lsn, new_img.clone()); - - truncated += 1; - } else { - trace!( - "Keeping horizon page {} blk {} at {}", - key.tag.rel, - key.tag.blknum, - key.lsn - ); - } - } - } else { - trace!( - "Last page {} blk {} at {}, horizon {}", - key.tag.rel, - key.tag.blknum, - key.lsn, - horizon - ); - } - // remove records prior to horizon - loop { - iter.prev(); - if !iter.valid() { - break; - } - let key = StorageKey::des(iter.key().unwrap()); - if key.tag != maxkey.tag { - break; - } - let mut val = StorageValue::des(iter.value().unwrap()); - if val.alive { - val.alive = false; - self.storage.put(key, val)?; - deleted += 1; - trace!( - "deleted: {} blk {} at {}", - key.tag.rel, - key.tag.blknum, - key.lsn - ); - } else { - break; + // Process relation metadata versions + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + let rel_meta = RelationSizeEntry::des(&vers.1)?; + // If relation is dropped at the horizon, + // we can remove all its versions including last (Unlink) + match rel_meta { + RelationSizeEntry::Size(size) => max_size = max(max_size, size), + RelationSizeEntry::Unlink => { + if last_version { + relation_dropped = true; + info!("Relation {:?} dropped", rels); } } } - maxkey = minkey; - } else { - break; + if last_version { + last_version = false; + if !relation_dropped { + // preserve last version + continue; + } + } + self.obj_store.unlink(&key, lsn)?; + deleted += 1; + } + // Now process all relation blocks + for blknum in 0..max_size { + key.buf_tag.blknum = blknum; + last_version = true; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + last_version = false; + truncated += 1; + if !relation_dropped { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(key.buf_tag, lsn)?; + continue; + } + } + self.obj_store.unlink(&key, lsn)?; + deleted += 1; + } } } - info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted", - now.elapsed(), inspected, reconstructed, truncated, deleted); + info!("Garbage collection completed in {:?}: {} version histories truncated, {} versions deleted", + now.elapsed(), truncated, deleted); } - */ } } diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs index 3ef1d9d4f7..6bc6c86a24 100644 --- a/pageserver/src/object_store.rs +++ b/pageserver/src/object_store.rs @@ -61,6 +61,7 @@ pub trait ObjectStore: Send + Sync { ) -> Result)>> + 'a>>; /// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'. + /// Both dbnode and spcnode can be InvalidId (0) which means get all relations in tablespace/cluster /// /// This is used to implement 'create database' fn list_rels( @@ -70,4 +71,7 @@ pub trait ObjectStore: Send + Sync { dbnode: u32, lsn: Lsn, ) -> Result>; + + /// Unlink object (used by GC). This mehod may actually delete object or just mark it for deletion. + fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()>; } diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index 9da9b04d7f..a8903fa4c8 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -8,6 +8,7 @@ use crate::ZTimelineId; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; +use std::sync::{Arc, Mutex}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; @@ -30,11 +31,53 @@ impl StorageKey { } } +/// +/// RocksDB very inefficiently delete random record. Instead of it we have to use merge +/// filter, which allows to throw away records at LSM merge phase. +/// Unfortunately, it is hard (if ever possible) to determine whether version can be removed +/// at merge time. Version ca be removed if: +/// 1. It is above PITR horizon (we need to get current LSN and gc_horizon from config) +/// 2. Page is reconstructed at horizon (all WAL records above horizon are applied and can be removed) +/// +/// So we have GC process which reconstructs pages at horizon and mark deteriorated WAL record +/// for deletion. To mark object for deletion we can either set some flag in object itself. +/// But it is complicated with new object value format, because RocksDB storage knows nothing about +/// this format. Also updating whole record just to set one bit seems to be inefficient in any case. +/// This is why we keep keys of marked for deletion versions in HashSet in memory. +/// When LSM merge filter found key in this map, it removes it from the set preventing memory overflow. +/// +struct GarbageCollector { + garbage: Mutex>>, +} + +impl GarbageCollector { + fn new() -> GarbageCollector { + GarbageCollector { + garbage: Mutex::new(HashSet::new()), + } + } + + /// Called by GC to mark version as delete + fn mark_for_deletion(&self, key: &[u8]) { + let mut garbage = self.garbage.lock().unwrap(); + garbage.insert(key.to_vec()); + } + + /// Called by LSM merge filter. If it finds key in the set, then + /// it doesn't merge it and removes from this set. + fn was_deleted(&self, key: &[u8]) -> bool { + let key = key.to_vec(); + let mut garbage = self.garbage.lock().unwrap(); + return garbage.remove(&key); + } +} + pub struct RocksObjectStore { _conf: &'static PageServerConf, // RocksDB handle db: rocksdb::DB, + gc: Arc, } impl ObjectStore for RocksObjectStore { @@ -61,6 +104,14 @@ impl ObjectStore for RocksObjectStore { Ok(()) } + fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()> { + self.gc.mark_for_deletion(&StorageKey::ser(&StorageKey { + obj_key: key.clone(), + lsn, + })?); + Ok(()) + } + /// Iterate through page versions of given page, starting from the given LSN. /// The versions are walked in descending LSN order. fn object_versions<'a>( @@ -89,7 +140,7 @@ impl ObjectStore for RocksObjectStore { let mut rels: HashSet = HashSet::new(); - let searchkey = StorageKey { + let mut search_key = StorageKey { obj_key: ObjectKey { timeline: timelineid, buf_tag: BufferTag { @@ -105,19 +156,26 @@ impl ObjectStore for RocksObjectStore { lsn: Lsn(0), }; let mut iter = self.db.raw_iterator(); - iter.seek(searchkey.ser()?); - while iter.valid() { + loop { + iter.seek(search_key.ser()?); + if !iter.valid() { + break; + } let key = StorageKey::des(iter.key().unwrap())?; - if key.obj_key.buf_tag.rel.spcnode != spcnode - || key.obj_key.buf_tag.rel.dbnode != dbnode + if (spcnode != 0 && key.obj_key.buf_tag.rel.spcnode != spcnode) + || (dbnode != 0 && key.obj_key.buf_tag.rel.dbnode != dbnode) { break; } - if key.lsn < lsn { + if key.obj_key.buf_tag.rel.relnode != 0 // skip non-relational records (like timeline metadata) + && key.lsn < lsn + // visible in this snapshot + { rels.insert(key.obj_key.buf_tag.rel); } - iter.next(); + search_key = key.clone(); + search_key.obj_key.buf_tag.rel.relnode += 1; // skip to next relation } Ok(rels) @@ -150,11 +208,9 @@ impl ObjectStore for RocksObjectStore { impl RocksObjectStore { /// Open a RocksDB database. pub fn open(conf: &'static PageServerConf) -> Result { - let path = conf.workdir.join("rocksdb-storage"); - let db = rocksdb::DB::open(&Self::get_rocksdb_opts(), path)?; - - let storage = RocksObjectStore { _conf: conf, db }; - Ok(storage) + let opts = Self::get_rocksdb_opts(); + let obj_store = Self::new(conf, opts)?; + Ok(obj_store) } /// Create a new, empty RocksDB database. @@ -165,9 +221,27 @@ impl RocksObjectStore { let mut opts = Self::get_rocksdb_opts(); opts.create_if_missing(true); opts.set_error_if_exists(true); - let db = rocksdb::DB::open(&opts, &path)?; + let obj_store = Self::new(conf, opts)?; + Ok(obj_store) + } - let obj_store = RocksObjectStore { _conf: conf, db }; + fn new(conf: &'static PageServerConf, mut opts: rocksdb::Options) -> Result { + let path = conf.workdir.join("rocksdb-storage"); + let gc = Arc::new(GarbageCollector::new()); + let gc_ref = gc.clone(); + opts.set_compaction_filter("ttl", move |_level: u32, key: &[u8], _val: &[u8]| { + if gc_ref.was_deleted(key) { + rocksdb::compaction_filter::Decision::Remove + } else { + rocksdb::compaction_filter::Decision::Keep + } + }); + let db = rocksdb::DB::open(&opts, &path)?; + let obj_store = RocksObjectStore { + _conf: conf, + db, + gc, + }; Ok(obj_store) } @@ -176,17 +250,6 @@ impl RocksObjectStore { let mut opts = rocksdb::Options::default(); opts.set_use_fsync(true); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - - // FIXME - /* - opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| { - if (val[0] & UNUSED_VERSION_FLAG) != 0 { - rocksdb::compaction_filter::Decision::Remove - } else { - rocksdb::compaction_filter::Decision::Keep - } - }); - */ opts } } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 9def20104c..c93ecb0180 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -532,7 +532,6 @@ impl XlMultiXactTruncate { } } - /// Main routine to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details