From a29fa4b8be285d3676f6541aacde653bba02aaff Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 26 May 2021 20:27:43 +0300 Subject: [PATCH] [refer #190] Fix bugs in handling realtion drop and truncation --- pageserver/src/repository.rs | 72 +++++++++++----- pageserver/src/repository/rocksdb.rs | 118 +++++++++++++++++++++------ pageserver/src/restore_local_repo.rs | 2 +- pageserver/src/waldecoder.rs | 20 ++++- postgres_ffi/src/pg_constants.rs | 2 + 5 files changed, 162 insertions(+), 52 deletions(-) diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index f923b3a27a..e275695816 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use std::fmt; use std::sync::Arc; use zenith_utils::lsn::Lsn; +use log::*; /// /// A repository corresponds to one .zenith directory. One repository holds multiple @@ -55,14 +56,17 @@ pub trait Timeline { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: BufferTag, rec: WALRecord); + fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>; /// Like put_wal_record, but with ready-made image of the page. - fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes); + fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>; /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; + /// Drop relation or file segment + fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()>; + /// Create a new database from a template database /// /// In PostgreSQL, CREATE DATABASE works by scanning the data directory and @@ -100,14 +104,18 @@ pub trait Timeline { blknum: blk.blkno, }; - let rec = WALRecord { - lsn, - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; + if blk.will_drop { + self.put_drop(tag, lsn)?; + } else { + let rec = WALRecord { + lsn, + will_init: blk.will_init || blk.apply_image, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; - self.put_wal_record(tag, rec); + self.put_wal_record(tag, rec)?; + } } // Handle a few special record types @@ -116,15 +124,37 @@ pub trait Timeline { == pg_constants::XLOG_SMGR_TRUNCATE { let truncate = XlSmgrTruncate::decode(&decoded); + let mut rel = RelTag { + spcnode: truncate.rnode.spcnode, + dbnode: truncate.rnode.dbnode, + relnode: truncate.rnode.relnode, + forknum: pg_constants::MAIN_FORKNUM, + }; if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { - let rel = RelTag { - spcnode: truncate.rnode.spcnode, - dbnode: truncate.rnode.dbnode, - relnode: truncate.rnode.relnode, - forknum: pg_constants::MAIN_FORKNUM, - }; self.put_truncation(rel, lsn, truncate.blkno)?; } + if (truncate.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { + if truncate.blkno == 0 { + rel.forknum = pg_constants::FSM_FORKNUM; + self.put_truncation(rel, lsn, truncate.blkno)?; + } else { + // TODO: handle partial truncation of FSM: + // need to map heap block number to FSM block number + // and clear bits in the tail block + info!("Partial truncation of FSM is not supported"); + } + } + if (truncate.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { + if truncate.blkno == 0 { + rel.forknum = pg_constants::VISIBILITYMAP_FORKNUM; + self.put_truncation(rel, lsn, truncate.blkno)?; + } else { + // TODO: handle partial truncation of VM: + // need to map heap block number to VM block number + // and clear bits in the tail block + info!("Partial truncation of VM is not supported"); + } + } } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE @@ -348,11 +378,11 @@ mod tests { let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; tline.init_valid_lsn(Lsn(1)); - tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2")); - tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2")); - tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3")); - tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4")); - tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5")); + tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?; + tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?; + tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?; + tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?; + tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?; tline.advance_last_valid_lsn(Lsn(5)); @@ -442,7 +472,7 @@ mod tests { for i in 0..pg_constants::RELSEG_SIZE + 1 { let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn))); lsn += 1; - tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img); + tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?; } tline.advance_last_valid_lsn(Lsn(lsn)); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 6f51fefcc0..ebcadfc654 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -127,10 +127,12 @@ impl CacheKey { static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0); static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1); +#[derive(Debug)] enum CacheEntryContent { PageImage(Bytes), WALRecord(WALRecord), Truncation, + Drop, } // The serialized representation of a CacheEntryContent begins with @@ -138,9 +140,10 @@ enum CacheEntryContent { // an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent // at all, you must peek into the first byte of the serialized representation // to read it. -const CONTENT_PAGE_IMAGE: u8 = 1u8; -const CONTENT_WAL_RECORD: u8 = 2u8; -const CONTENT_TRUNCATION: u8 = 3u8; +const CONTENT_PAGE_IMAGE: u8 = 0u8; +const CONTENT_WAL_RECORD: u8 = 1u8; +const CONTENT_TRUNCATION: u8 = 2u8; +const CONTENT_DROP: u8 = 3u8; const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above @@ -161,6 +164,9 @@ impl CacheEntryContent { CacheEntryContent::Truncation => { buf.put_u8(CONTENT_TRUNCATION); } + CacheEntryContent::Drop => { + buf.put_u8(CONTENT_DROP); + } } } pub fn unpack(buf: &mut Bytes) -> CacheEntryContent { @@ -175,6 +181,7 @@ impl CacheEntryContent { } CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)), CONTENT_TRUNCATION => CacheEntryContent::Truncation, + CONTENT_DROP => CacheEntryContent::Drop, _ => unreachable!(), } } @@ -296,8 +303,9 @@ impl RocksTimeline { let mut opts = rocksdb::Options::default(); opts.set_use_fsync(true); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| { + opts.set_compaction_filter("ttl", move |_level: u32, key: &[u8], val: &[u8]| { if (val[0] & UNUSED_VERSION_FLAG) != 0 { + trace!("Delete unused version {:?}", CacheKey::des(key)); rocksdb::compaction_filter::Decision::Remove } else { rocksdb::compaction_filter::Decision::Keep @@ -480,9 +488,53 @@ impl RocksTimeline { Ok(0) } + /// + /// Drop relations with all its forks or non-relational file + /// + fn drop(&self, tag: BufferTag) -> Result<()> { + let mut iter = self.db.raw_iterator(); + let mut key = CacheKey { + tag, + lsn: Lsn(u64::MAX), + }; + if tag.rel.forknum == pg_constants::MAIN_FORKNUM { + // if it is relation then remove all its blocks in all forks + key.tag.blknum = u32::MAX; + key.tag.rel.forknum = pg_constants::INIT_FORKNUM; + } else { + assert!(tag.rel.forknum > pg_constants::INIT_FORKNUM); + } + debug!("Drop relation {:?}", tag); + iter.seek_for_prev(key.ser()?); + while iter.valid() { + let key = CacheKey::des(iter.key().unwrap())?; + if key.tag.rel.relnode != tag.rel.relnode + || key.tag.rel.spcnode != tag.rel.spcnode + || key.tag.rel.dbnode != tag.rel.dbnode + || (key.tag.rel.forknum != tag.rel.forknum + && tag.rel.forknum != pg_constants::MAIN_FORKNUM) + { + // no more entries belonging to this relation or file + break; + } + let v = iter.value().unwrap(); + if (v[0] & UNUSED_VERSION_FLAG) == 0 { + let mut v = v.to_owned(); + v[0] |= UNUSED_VERSION_FLAG; + self.db.put(key.ser()?, &v[..])?; + } else { + // already marked for deletion + break; + } + iter.prev(); + } + Ok(()) + } + fn do_gc(&self, conf: &'static PageServerConf) -> Result { loop { thread::sleep(conf.gc_period); + trace!("Start GC iteration"); let last_lsn = self.get_last_valid_lsn(); // checked_sub() returns None on overflow. @@ -510,17 +562,27 @@ impl RocksTimeline { if iter.valid() { let key = CacheKey::des(iter.key().unwrap())?; let v = iter.value().unwrap(); + let flag = v[0]; + let last_lsn = key.lsn; inspected += 1; // Construct boundaries for old records cleanup maxkey.tag = key.tag; - let last_lsn = key.lsn; maxkey.lsn = min(horizon, last_lsn); // do not remove last version let mut minkey = maxkey.clone(); minkey.lsn = Lsn(0); // first version + if (flag & CONTENT_KIND_MASK) == CONTENT_DROP { + // If drop record in over the horizon then delete all entries from repository + if last_lsn < horizon { + self.drop(key.tag)?; + } + maxkey = minkey; + continue; + } + // reconstruct most recent page version if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { // force reconstruction of most recent page version @@ -538,7 +600,7 @@ impl RocksTimeline { let new_img = self .walredo_mgr .request_redo(key.tag, key.lsn, base_img, records)?; - self.put_page_image(key.tag, key.lsn, new_img.clone()); + self.put_page_image(key.tag, key.lsn, new_img.clone())?; reconstructed += 1; } @@ -559,7 +621,7 @@ impl RocksTimeline { let new_img = self .walredo_mgr .request_redo(key.tag, key.lsn, base_img, records)?; - self.put_page_image(key.tag, key.lsn, new_img.clone()); + self.put_page_image(key.tag, key.lsn, new_img.clone())?; truncated += 1; } else { @@ -663,6 +725,7 @@ impl Timeline for RocksTimeline { /// fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let lsn = self.wait_lsn(req_lsn)?; @@ -685,7 +748,10 @@ impl Timeline for RocksTimeline { let (base_img, records) = self.collect_records_for_apply(tag, lsn); page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; - self.put_page_image(tag, lsn, page_img.clone()); + self.put_page_image(tag, lsn, page_img.clone())?; + } else if let CacheEntryContent::Truncation = content { + // Because FSM pages may be not up-to-date we can request truncated page + return Ok(Bytes::from_static(&ZERO_PAGE)); } else { // No base image, and no WAL record. Huh? bail!("no page image or WAL record for requested page"); @@ -702,7 +768,6 @@ impl Timeline for RocksTimeline { return Ok(page_img); } } - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; debug!( "Page {} blk {} at {}({}) not found", tag.rel, tag.blknum, req_lsn, lsn @@ -752,14 +817,14 @@ impl Timeline for RocksTimeline { /// /// Adds a WAL record to the repository /// - fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { + fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> { let lsn = rec.lsn; let key = CacheKey { tag, lsn }; let content = CacheEntryContent::WALRecord(rec); let serialized_key = key.ser().expect("serialize CacheKey should always succeed"); - let _res = self.db.put(serialized_key, content.to_bytes()); + let _res = self.db.put(serialized_key, content.to_bytes())?; trace!( "put_wal_record rel {} blk {} at {}", tag.rel, @@ -769,6 +834,18 @@ impl Timeline for RocksTimeline { self.num_entries.fetch_add(1, Ordering::Relaxed); self.num_wal_records.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + + /// + /// Put drop record which should completely delete relation with all its forks + /// or non-relational file from repository + /// + fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()> { + let key = CacheKey { tag, lsn }; + let content = CacheEntryContent::Drop; + let _res = self.db.put(key.ser()?, content.to_bytes())?; + Ok(()) } /// @@ -801,27 +878,13 @@ impl Timeline for RocksTimeline { /// /// Memorize a full image of a page version /// - fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { - let img_len = img.len(); + fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> { let key = CacheKey { tag, lsn }; let content = CacheEntryContent::PageImage(img); - let mut val_buf = content.to_bytes(); - - // Zero size of page image indicates that page can be removed - if img_len == 0 { - if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 { - // records already marked for deletion - return; - } else { - // delete truncated multixact page - val_buf[0] |= UNUSED_VERSION_FLAG; - } - } - trace!("put_wal_record lsn: {}", key.lsn); let serialized_key = key.ser().expect("serialize CacheKey should always succeed"); - let _res = self.db.put(serialized_key, content.to_bytes()); + let _res = self.db.put(serialized_key, content.to_bytes())?; trace!( "put_page_image rel {} blk {} at {}", @@ -830,6 +893,7 @@ impl Timeline for RocksTimeline { lsn ); self.num_page_images.fetch_add(1, Ordering::Relaxed); + Ok(()) } fn put_create_database( diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 9610c1b2cb..c9b41e7c9d 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -144,7 +144,7 @@ fn import_relfile( }, blknum, }; - timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); + timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?; /* if oldest_lsn == 0 || p.lsn < oldest_lsn { oldest_lsn = p.lsn; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index b726bf5a0d..05ebbd5f8c 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -264,7 +264,8 @@ pub struct DecodedBkpBlock { /* Information on full-page image, if any */ has_image: bool, /* has image, even for consistency checking */ pub apply_image: bool, /* has image that should be restored */ - pub will_init: bool, + pub will_init: bool, /* record intialize page content */ + pub will_drop: bool, /* record drops relation */ //char *bkp_image; hole_offset: u16, hole_length: u16, @@ -289,6 +290,7 @@ impl DecodedBkpBlock { has_image: false, apply_image: false, will_init: false, + will_drop: false, hole_offset: 0, hole_length: 0, bimg_len: 0, @@ -804,7 +806,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let spcnode = buf.get_u32_le(); let dbnode = buf.get_u32_le(); let relnode = buf.get_u32_le(); - //TODO handle this too? + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::MAIN_FORKNUM; + blk.rnode_spcnode = spcnode; + blk.rnode_dbnode = dbnode; + blk.rnode_relnode = relnode; + blk.will_drop = true; + blocks.push(blk); trace!( "XLOG_XACT_COMMIT relfilenode {}/{}/{}", spcnode, @@ -850,7 +858,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let spcnode = buf.get_u32_le(); let dbnode = buf.get_u32_le(); let relnode = buf.get_u32_le(); - //TODO save these too + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::MAIN_FORKNUM; + blk.rnode_spcnode = spcnode; + blk.rnode_dbnode = dbnode; + blk.rnode_relnode = relnode; + blk.will_drop = true; + blocks.push(blk); trace!( "XLOG_XACT_ABORT relfilenode {}/{}/{}", spcnode, diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 16f603acca..bff7683055 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -20,6 +20,8 @@ pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50; // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; +pub const SMGR_TRUNCATE_VM: u32 = 0x0002; +pub const SMGR_TRUNCATE_FSM: u32 = 0x0004; // // Constants from visbilitymap.h