From 3e007b0eb9e5e1bfff1b39a199ca4bf29a16bb1d Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 24 Apr 2021 22:32:22 +0300 Subject: [PATCH] Do not delete versions in GC --- pageserver/src/bin/pageserver.rs | 6 +- pageserver/src/page_cache.rs | 293 +++++++++++++++++-------------- vendor/postgres | 2 +- 3 files changed, 162 insertions(+), 139 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 3d9cc9b891..b0e626f2d5 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -19,8 +19,10 @@ use slog::Drain; use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf}; -const DEFAULT_GC_HORIZON: u64 = 1024 * 1024 * 1024; -const DEFAULT_GC_PERIOD_SEC: u64 = 600; +const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; +const DEFAULT_GC_PERIOD_SEC: u64 = 10; +//const DEFAULT_GC_HORIZON: u64 = 1024 * 1024 * 1024; +//const DEFAULT_GC_PERIOD_SEC: u64 = 600; fn main() -> Result<()> { let arg_matches = App::new("Zenith page server") diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8060b385f1..6a6640e452 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -171,8 +171,14 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB opts.create_if_missing(true); opts.set_use_fsync(true); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - opts.create_missing_column_families(true); - rocksdb::DB::open_cf(&opts, &path, &[rocksdb::DEFAULT_COLUMN_FAMILY_NAME]).unwrap() + opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| { + if (val[0] & UNUSED_VERSION_FLAG) != 0 { + rocksdb::compaction_filter::Decision::Remove + } else { + rocksdb::compaction_filter::Decision::Keep + } + }); + rocksdb::DB::open(&opts, &path).unwrap() } fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { @@ -247,16 +253,20 @@ pub struct CacheEntry { pub walredo_condvar: Condvar, } +#[derive(Debug, Clone)] pub struct CacheEntryContent { pub page_image: Option, pub wal_record: Option, pub apply_pending: bool, } +const PAGE_IMAGE_FLAG: u8 = 1u8; +const UNUSED_VERSION_FLAG: u8 = 2u8; + impl CacheEntryContent { pub fn pack(&self, buf: &mut BytesMut) { if let Some(image) = &self.page_image { - buf.put_u8(1); + buf.put_u8(PAGE_IMAGE_FLAG); buf.put_u16(image.len() as u16); buf.put_slice(&image[..]); } else if let Some(rec) = &self.wal_record { @@ -265,7 +275,7 @@ impl CacheEntryContent { } } pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent { - if buf.get_u8() == 1 { + if (buf.get_u8() & PAGE_IMAGE_FLAG) != 0 { let mut dst = vec![0u8; buf.get_u16() as usize]; buf.copy_to_slice(&mut dst); CacheEntryContent { @@ -337,7 +347,7 @@ impl BufferTag { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct WALRecord { pub lsn: u64, // LSN at the *end* of the record pub will_init: bool, @@ -379,12 +389,7 @@ impl WALRecord { impl PageCache { fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { - let mut minbuf = BytesMut::new(); - let mut maxbuf = BytesMut::new(); - let cf = self - .db - .cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME) - .unwrap(); + let mut buf = BytesMut::new(); loop { thread::sleep(conf.gc_period); let last_lsn = self.get_last_valid_lsn(); @@ -405,20 +410,26 @@ impl PageCache { let now = Instant::now(); let mut reconstructed = 0u64; let mut truncated = 0u64; + let mut inspected = 0u64; + let mut deleted = 0u64; loop { - maxbuf.clear(); - maxkey.pack(&mut maxbuf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &maxbuf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, v)) = iter.next() { - minbuf.clear(); - minbuf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut minbuf); - minbuf.clear(); - minbuf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut minbuf); + buf.clear(); + maxkey.pack(&mut buf); + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); + if iter.valid() { + let k = iter.key().unwrap(); + let v = iter.value().unwrap(); + + inspected += 1; + + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); // Construct boundaries for old records cleanup maxkey.tag = key.tag; @@ -426,7 +437,7 @@ impl PageCache { maxkey.lsn = min(horizon, last_lsn); // do not remove last version let mut minkey = maxkey.clone(); - minkey.lsn = 0; + minkey.lsn = 0; // first version // reconstruct most recent page version if content.wal_record.is_some() { @@ -434,43 +445,67 @@ impl PageCache { // force reconstruction of most recent page version self.reconstruct_page(key, content)?; reconstructed += 1; + } else { + assert!(content.page_image.is_some()); } - maxbuf.clear(); - maxkey.pack(&mut maxbuf); + buf.clear(); + maxkey.pack(&mut buf); - if last_lsn > horizon { - // locate most recent record before horizon - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &maxbuf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, v)) = iter.next() { - minbuf.clear(); - minbuf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut minbuf); - if content.wal_record.is_some() { - minbuf.clear(); - minbuf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut minbuf); - trace!("Reconstruct horizon page {:?}", key); - self.reconstruct_page(key, content)?; - truncated += 1; + iter.seek_for_prev(&buf[..]); + if iter.valid() { + // do not remove last version + if last_lsn > horizon { + // locate most recent record before horizon + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + if key.tag == maxkey.tag { + let v = iter.value().unwrap(); + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if content.wal_record.is_some() { + trace!("Reconstruct horizon page {:?}", key); + self.reconstruct_page(key, content)?; + truncated += 1; + } else { + assert!(content.page_image.is_some()); + } + } + } + // remove records prior to horizon + loop { + iter.prev(); + if !iter.valid() { + break; + } + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + if key.tag != maxkey.tag { + break; + } + let v = iter.value().unwrap(); + if (v[0] & UNUSED_VERSION_FLAG) == 0 { + let mut v = v.to_owned(); + v[0] |= UNUSED_VERSION_FLAG; + self.db.put(k, &v[..])?; + deleted += 1; + } else { + break; } } } - // remove records prior to horizon - minbuf.clear(); - minkey.pack(&mut minbuf); - trace!("Delete records in range {:?}..{:?}", minkey, maxkey); - self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?; - maxkey = minkey; } else { break; } } - info!("Garbage collection completed in {:?}: {} pages reconstructed, {} version histories truncated", now.elapsed(), reconstructed, truncated); + info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted", + now.elapsed(), inspected, reconstructed, truncated, deleted); } } } @@ -492,8 +527,8 @@ impl PageCache { let page_img = match &entry_content.page_image { Some(p) => p.clone(), None => { - error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); - bail!("could not apply WAL to reconstruct page image"); + error!("could not apply WAL to reconstruct page {:?} image", &key); + bail!("could not apply WAL to reconstruct page {:?} image", &key); } }; self.put_page_image(key.tag, key.lsn, page_img.clone()); @@ -538,61 +573,54 @@ impl PageCache { // Look up cache entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. - let minkey = CacheKey { tag, lsn: 0 }; - let maxkey = CacheKey { tag, lsn }; + let key = CacheKey { tag, lsn }; let mut buf = BytesMut::new(); - minkey.pack(&mut buf); + key.pack(&mut buf); + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); - let mut readopts = rocksdb::ReadOptions::default(); - readopts.set_iterate_lower_bound(buf.to_vec()); - - buf.clear(); - maxkey.pack(&mut buf); - let mut iter = self.db.iterator_opt( - rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), - readopts, - ); - let entry_opt = iter.next(); - - if entry_opt.is_none() { - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); - return Ok(Bytes::from_static(&ZERO_PAGE)); - /* return Err("could not find page image")?; */ - } - let (k, v) = entry_opt.unwrap(); - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - let page_img: Bytes; - if let Some(img) = &content.page_image { - page_img = img.clone(); - } else if content.wal_record.is_some() { + if iter.valid() { + let k = iter.key().unwrap(); buf.clear(); buf.extend_from_slice(&k); let key = CacheKey::unpack(&mut buf); - page_img = self.reconstruct_page(key, content)?; - } else { - // No base image, and no WAL record. Huh? - bail!("no page image or WAL record for requested page"); + if key.tag == tag { + let v = iter.value().unwrap(); + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + let page_img: Bytes; + if let Some(img) = &content.page_image { + page_img = img.clone(); + } else if content.wal_record.is_some() { + page_img = self.reconstruct_page(key, content)?; + } else { + // No base image, and no WAL record. Huh? + bail!("no page image or WAL record for requested page"); + } + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = + u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = + u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + debug!( + "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", + page_lsn_hi, + page_lsn_lo, + tag.rel.spcnode, + tag.rel.dbnode, + tag.rel.relnode, + tag.rel.forknum, + tag.blknum + ); + return Ok(page_img); + } } - - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - debug!( - "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", - page_lsn_hi, - page_lsn_lo, - tag.rel.spcnode, - tag.rel.dbnode, - tag.rel.relnode, - tag.rel.forknum, - tag.blknum - ); - - Ok(page_img) + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); + Ok(Bytes::from_static(&ZERO_PAGE)) + /* return Err("could not find page image")?; */ } // @@ -603,33 +631,26 @@ impl PageCache { // over it. // pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option, Vec) { - let minkey = CacheKey { - tag: BufferTag { - rel: entry.key.tag.rel, - blknum: 0, - }, - lsn: 0, - }; - let mut buf = BytesMut::new(); - minkey.pack(&mut buf); - - let mut readopts = rocksdb::ReadOptions::default(); - readopts.set_iterate_lower_bound(buf.to_vec()); - - buf.clear(); entry.key.pack(&mut buf); - let iter = self.db.iterator_opt( - rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), - readopts, - ); let mut base_img: Option = None; let mut records: Vec = Vec::new(); + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); + // Scan backwards, collecting the WAL records, until we hit an // old page image. - for (_k, v) in iter { + while iter.valid() { + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + if key.tag != entry.key.tag { + break; + } + let v = iter.value().unwrap(); buf.clear(); buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); @@ -640,7 +661,6 @@ impl PageCache { break; } else if let Some(rec) = &content.wal_record { records.push(rec.clone()); - // If this WAL record initializes the page, no need to dig deeper. if rec.will_init { break; @@ -648,6 +668,7 @@ impl PageCache { } else { panic!("no base image and no WAL record on cache entry"); } + iter.prev(); } records.reverse(); @@ -825,15 +846,15 @@ impl PageCache { lsn, }; let mut buf = BytesMut::new(); + let mut iter = self.db.raw_iterator(); loop { buf.clear(); key.pack(&mut buf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, v)) = iter.next() { + iter.seek_for_prev(&buf[..]); + if iter.valid() { + let k = iter.key().unwrap(); + let v = iter.value().unwrap(); buf.clear(); buf.extend_from_slice(&k); let tag = BufferTag::unpack(&mut buf); @@ -873,11 +894,10 @@ impl PageCache { }; let mut buf = BytesMut::new(); key.pack(&mut buf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, _v)) = iter.next() { + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); + if iter.valid() { + let k = iter.key().unwrap(); buf.clear(); buf.extend_from_slice(&k); let tag = BufferTag::unpack(&mut buf); @@ -924,12 +944,12 @@ impl PageCache { lsn: 0, }; key.pack(&mut buf); - let iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Forward, - )); + let mut iter = self.db.raw_iterator(); + iter.seek(&buf[..]); let mut n = 0; - for (k, v) in iter { + while iter.valid() { + let k = iter.key().unwrap(); + let v = iter.value().unwrap(); buf.clear(); buf.extend_from_slice(&k); let mut key = CacheKey::unpack(&mut buf); @@ -944,6 +964,7 @@ impl PageCache { self.db.put(&buf[..], v)?; n += 1; + iter.next(); } info!( "Create database {}/{}, copy {} entries", diff --git a/vendor/postgres b/vendor/postgres index cf2e6d1904..9d498dcbaa 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit cf2e6d1904f9bd4c9fd23d9162744a1e7f3a963c +Subproject commit 9d498dcbaa5c206eef74b11e13aa81a0c189d8fc