From eeec1a3dcb0b23dbf46e5e6da1af851244be99c9 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 5 May 2021 09:28:40 +0300 Subject: [PATCH] Refactor the way truncations are handled. Currently, truncation is implemented in the RocksDB repository by storing a special sentinel entry for each page that was truncated away. Hide that implementation detail better in the abstract Repository interface, so that caller doesn't need to construct the special sentinel WAL record. While we're at it, refactor the CacheEntryContent struct to an enum. --- pageserver/src/repository.rs | 6 +- pageserver/src/repository/rocksdb.rs | 126 +++++++++++++++------------ pageserver/src/restore_local_repo.rs | 1 - pageserver/src/walreceiver.rs | 23 ++--- 4 files changed, 75 insertions(+), 81 deletions(-) diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 88b3698d48..de253be46a 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -36,7 +36,7 @@ pub trait Timeline { // Functions used by WAL receiver fn put_wal_record(&self, tag: BufferTag, rec: WALRecord); - fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()>; + fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()>; fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes); fn create_database( &self, @@ -109,7 +109,6 @@ impl BufferTag { pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record pub will_init: bool, - pub truncate: bool, pub rec: Bytes, // Remember the offset of main_data in rec, // so that we don't have to parse the record again. @@ -121,7 +120,6 @@ impl WALRecord { pub fn pack(&self, buf: &mut BytesMut) { buf.put_u64(self.lsn.0); buf.put_u8(self.will_init as u8); - buf.put_u8(self.truncate as u8); buf.put_u32(self.main_data_offset); buf.put_u32(self.rec.len() as u32); buf.put_slice(&self.rec[..]); @@ -129,14 +127,12 @@ impl WALRecord { pub fn unpack(buf: &mut BytesMut) -> WALRecord { let lsn = Lsn::from(buf.get_u64()); let will_init = buf.get_u8() != 0; - let truncate = buf.get_u8() != 0; let main_data_offset = buf.get_u32(); let mut dst = vec![0u8; buf.get_u32() as usize]; buf.copy_to_slice(&mut dst); WALRecord { lsn, will_init, - truncate, rec: Bytes::from(dst), main_data_offset, } diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index af6a4c9f1f..01c63206e5 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -98,38 +98,55 @@ impl CacheKey { } } -pub struct CacheEntryContent { - pub page_image: Option, - pub wal_record: Option, +enum CacheEntryContent { + PageImage(Bytes), + WALRecord(WALRecord), + Truncation, } -const PAGE_IMAGE_FLAG: u8 = 1u8; -const UNUSED_VERSION_FLAG: u8 = 2u8; +// The serialized representation of a CacheEntryContent begins with +// single byte that indicates what kind of entry it is. There is also +// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent +// at all, you must peek into the first byte of the serialized representation +// to read it. +const CONTENT_PAGE_IMAGE: u8 = 1u8; +const CONTENT_WAL_RECORD: u8 = 2u8; +const CONTENT_TRUNCATION: u8 = 3u8; + +const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above + +const UNUSED_VERSION_FLAG: u8 = 4u8; impl CacheEntryContent { pub fn pack(&self, buf: &mut BytesMut) { - if let Some(image) = &self.page_image { - buf.put_u8(PAGE_IMAGE_FLAG); - buf.put_u16(image.len() as u16); - buf.put_slice(&image[..]); - } else if let Some(rec) = &self.wal_record { - buf.put_u8(0); - rec.pack(buf); + match self { + CacheEntryContent::PageImage(image) => { + buf.put_u8(CONTENT_PAGE_IMAGE); + buf.put_u16(image.len() as u16); + buf.put_slice(&image[..]); + } + CacheEntryContent::WALRecord(rec) => { + buf.put_u8(CONTENT_WAL_RECORD); + rec.pack(buf); + } + CacheEntryContent::Truncation => { + buf.put_u8(CONTENT_TRUNCATION); + } } } pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent { - if (buf.get_u8() & PAGE_IMAGE_FLAG) != 0 { - let mut dst = vec![0u8; buf.get_u16() as usize]; - buf.copy_to_slice(&mut dst); - CacheEntryContent { - page_image: Some(Bytes::from(dst)), - wal_record: None, - } - } else { - CacheEntryContent { - page_image: None, - wal_record: Some(WALRecord::unpack(buf)), + let kind = buf.get_u8() & CONTENT_KIND_MASK; + + match kind { + CONTENT_PAGE_IMAGE => { + let len = buf.get_u16() as usize; + let mut dst = vec![0u8; len]; + buf.copy_to_slice(&mut dst); + CacheEntryContent::PageImage(Bytes::from(dst)) } + CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)), + CONTENT_TRUNCATION => CacheEntryContent::Truncation, + _ => unreachable!(), } } } @@ -257,12 +274,12 @@ impl RocksTimeline { buf.clear(); buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); - if let Some(img) = &content.page_image { + if let CacheEntryContent::PageImage(img) = content { // We have a base image. No need to dig deeper into the list of // records - base_img = Some(img.clone()); + base_img = Some(img); break; - } else if let Some(rec) = &content.wal_record { + } else if let CacheEntryContent::WALRecord(rec) = content { records.push(rec.clone()); // If this WAL record initializes the page, no need to dig deeper. if rec.will_init { @@ -311,14 +328,12 @@ impl RocksTimeline { buf.clear(); buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); - if let Some(rec) = &content.wal_record { - if rec.truncate { - if tag.blknum > 0 { - key.tag.blknum = tag.blknum - 1; - continue; - } - break; + if let CacheEntryContent::Truncation = content { + if tag.blknum > 0 { + key.tag.blknum = tag.blknum - 1; + continue; } + break; } let relsize = tag.blknum + 1; debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); @@ -380,7 +395,7 @@ impl RocksTimeline { minkey.lsn = Lsn(0); // first version // reconstruct most recent page version - if (v[0] & PAGE_IMAGE_FLAG) == 0 { + if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { trace!("Reconstruct most recent page {:?}", key); // force reconstruction of most recent page version let (base_img, records) = @@ -408,7 +423,7 @@ impl RocksTimeline { let key = CacheKey::unpack(&mut buf); if key.tag == maxkey.tag { let v = iter.value().unwrap(); - if (v[0] & PAGE_IMAGE_FLAG) == 0 { + if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { trace!("Reconstruct horizon page {:?}", key); let (base_img, records) = self.collect_records_for_apply(key.tag, key.lsn); @@ -518,9 +533,9 @@ impl Timeline for RocksTimeline { buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); let page_img: Bytes; - if let Some(img) = &content.page_image { - page_img = img.clone(); - } else if content.wal_record.is_some() { + if let CacheEntryContent::PageImage(img) = content { + page_img = img; + } else if let CacheEntryContent::WALRecord(_rec) = content { // Request the WAL redo manager to apply the WAL records for us. let (base_img, records) = self.collect_records_for_apply(tag, lsn); page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; @@ -603,10 +618,7 @@ impl Timeline for RocksTimeline { let lsn = rec.lsn; let key = CacheKey { tag, lsn }; - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - }; + let content = CacheEntryContent::WALRecord(rec); let mut key_buf = BytesMut::new(); key.pack(&mut key_buf); @@ -624,32 +636,33 @@ impl Timeline for RocksTimeline { /// Adds a relation-wide WAL record (like truncate) to the repository, /// associating it with all pages started with specified block number /// - fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { - let mut key = CacheKey { tag, lsn: rec.lsn }; - + fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()> { // What was the size of the relation before this record? let last_lsn = self.last_valid_lsn.load(); - let old_rel_size = self.relsize_get_nowait(tag.rel, last_lsn)?; + let old_rel_size = self.relsize_get_nowait(rel, last_lsn)?; - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - }; + let content = CacheEntryContent::Truncation; // set new relation size - trace!("Truncate relation {:?}", tag); + trace!("Truncate relation {:?} to {} blocks", rel, nblocks); let mut key_buf = BytesMut::new(); let mut val_buf = BytesMut::new(); content.pack(&mut val_buf); - for blknum in tag.blknum..old_rel_size { + for blknum in nblocks..old_rel_size { key_buf.clear(); - key.tag.blknum = blknum; + let key = CacheKey { + tag: BufferTag { + rel, + blknum, + }, + lsn, + }; key.pack(&mut key_buf); trace!("put_wal_record lsn: {}", key.lsn); let _res = self.db.put(&key_buf[..], &val_buf[..]); } - let n = (old_rel_size - tag.blknum) as u64; + let n = (old_rel_size - nblocks) as u64; self.num_entries.fetch_add(n, Ordering::Relaxed); self.num_wal_records.fetch_add(n, Ordering::Relaxed); Ok(()) @@ -660,10 +673,7 @@ impl Timeline for RocksTimeline { /// fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { let key = CacheKey { tag, lsn }; - let content = CacheEntryContent { - page_image: Some(img), - wal_record: None, - }; + let content = CacheEntryContent::PageImage(img); let mut key_buf = BytesMut::new(); key.pack(&mut key_buf); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 6b3cb2da15..f6c379d13d 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -370,7 +370,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn let rec = WALRecord { lsn, will_init: blk.will_init || blk.apply_image, - truncate: false, rec: recdata.clone(), main_data_offset: decoded.main_data_offset as u32, }; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 801895509c..d6f7fa4fef 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -234,7 +234,6 @@ fn walreceiver_main( let rec = WALRecord { lsn, will_init: blk.will_init || blk.apply_image, - truncate: false, rec: recdata.clone(), main_data_offset: decoded.main_data_offset as u32, }; @@ -248,23 +247,13 @@ fn walreceiver_main( { let truncate = XlSmgrTruncate::decode(&decoded); if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 { - let tag = BufferTag { - rel: RelTag { - spcnode: truncate.rnode.spcnode, - dbnode: truncate.rnode.dbnode, - relnode: truncate.rnode.relnode, - forknum: MAIN_FORKNUM, - }, - blknum: truncate.blkno, + let rel = RelTag { + spcnode: truncate.rnode.spcnode, + dbnode: truncate.rnode.dbnode, + relnode: truncate.rnode.relnode, + forknum: MAIN_FORKNUM, }; - let rec = WALRecord { - lsn, - will_init: false, - truncate: true, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_rel_wal_record(tag, rec)?; + timeline.put_truncation(rel, lsn, truncate.blkno)?; } } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)