diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 88b3698d48..de253be46a 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -36,7 +36,7 @@ pub trait Timeline { // Functions used by WAL receiver fn put_wal_record(&self, tag: BufferTag, rec: WALRecord); - fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()>; + fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()>; fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes); fn create_database( &self, @@ -109,7 +109,6 @@ impl BufferTag { pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record pub will_init: bool, - pub truncate: bool, pub rec: Bytes, // Remember the offset of main_data in rec, // so that we don't have to parse the record again. @@ -121,7 +120,6 @@ impl WALRecord { pub fn pack(&self, buf: &mut BytesMut) { buf.put_u64(self.lsn.0); buf.put_u8(self.will_init as u8); - buf.put_u8(self.truncate as u8); buf.put_u32(self.main_data_offset); buf.put_u32(self.rec.len() as u32); buf.put_slice(&self.rec[..]); @@ -129,14 +127,12 @@ impl WALRecord { pub fn unpack(buf: &mut BytesMut) -> WALRecord { let lsn = Lsn::from(buf.get_u64()); let will_init = buf.get_u8() != 0; - let truncate = buf.get_u8() != 0; let main_data_offset = buf.get_u32(); let mut dst = vec![0u8; buf.get_u32() as usize]; buf.copy_to_slice(&mut dst); WALRecord { lsn, will_init, - truncate, rec: Bytes::from(dst), main_data_offset, } diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index af6a4c9f1f..01c63206e5 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -98,38 +98,55 @@ impl CacheKey { } } -pub struct CacheEntryContent { - pub page_image: Option, - pub wal_record: Option, +enum CacheEntryContent { + PageImage(Bytes), + WALRecord(WALRecord), + Truncation, } -const PAGE_IMAGE_FLAG: u8 = 1u8; -const UNUSED_VERSION_FLAG: u8 = 2u8; +// The serialized representation of a CacheEntryContent begins with +// single byte that indicates what kind of entry it is. There is also +// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent +// at all, you must peek into the first byte of the serialized representation +// to read it. +const CONTENT_PAGE_IMAGE: u8 = 1u8; +const CONTENT_WAL_RECORD: u8 = 2u8; +const CONTENT_TRUNCATION: u8 = 3u8; + +const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above + +const UNUSED_VERSION_FLAG: u8 = 4u8; impl CacheEntryContent { pub fn pack(&self, buf: &mut BytesMut) { - if let Some(image) = &self.page_image { - buf.put_u8(PAGE_IMAGE_FLAG); - buf.put_u16(image.len() as u16); - buf.put_slice(&image[..]); - } else if let Some(rec) = &self.wal_record { - buf.put_u8(0); - rec.pack(buf); + match self { + CacheEntryContent::PageImage(image) => { + buf.put_u8(CONTENT_PAGE_IMAGE); + buf.put_u16(image.len() as u16); + buf.put_slice(&image[..]); + } + CacheEntryContent::WALRecord(rec) => { + buf.put_u8(CONTENT_WAL_RECORD); + rec.pack(buf); + } + CacheEntryContent::Truncation => { + buf.put_u8(CONTENT_TRUNCATION); + } } } pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent { - if (buf.get_u8() & PAGE_IMAGE_FLAG) != 0 { - let mut dst = vec![0u8; buf.get_u16() as usize]; - buf.copy_to_slice(&mut dst); - CacheEntryContent { - page_image: Some(Bytes::from(dst)), - wal_record: None, - } - } else { - CacheEntryContent { - page_image: None, - wal_record: Some(WALRecord::unpack(buf)), + let kind = buf.get_u8() & CONTENT_KIND_MASK; + + match kind { + CONTENT_PAGE_IMAGE => { + let len = buf.get_u16() as usize; + let mut dst = vec![0u8; len]; + buf.copy_to_slice(&mut dst); + CacheEntryContent::PageImage(Bytes::from(dst)) } + CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)), + CONTENT_TRUNCATION => CacheEntryContent::Truncation, + _ => unreachable!(), } } } @@ -257,12 +274,12 @@ impl RocksTimeline { buf.clear(); buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); - if let Some(img) = &content.page_image { + if let CacheEntryContent::PageImage(img) = content { // We have a base image. No need to dig deeper into the list of // records - base_img = Some(img.clone()); + base_img = Some(img); break; - } else if let Some(rec) = &content.wal_record { + } else if let CacheEntryContent::WALRecord(rec) = content { records.push(rec.clone()); // If this WAL record initializes the page, no need to dig deeper. if rec.will_init { @@ -311,14 +328,12 @@ impl RocksTimeline { buf.clear(); buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); - if let Some(rec) = &content.wal_record { - if rec.truncate { - if tag.blknum > 0 { - key.tag.blknum = tag.blknum - 1; - continue; - } - break; + if let CacheEntryContent::Truncation = content { + if tag.blknum > 0 { + key.tag.blknum = tag.blknum - 1; + continue; } + break; } let relsize = tag.blknum + 1; debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); @@ -380,7 +395,7 @@ impl RocksTimeline { minkey.lsn = Lsn(0); // first version // reconstruct most recent page version - if (v[0] & PAGE_IMAGE_FLAG) == 0 { + if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { trace!("Reconstruct most recent page {:?}", key); // force reconstruction of most recent page version let (base_img, records) = @@ -408,7 +423,7 @@ impl RocksTimeline { let key = CacheKey::unpack(&mut buf); if key.tag == maxkey.tag { let v = iter.value().unwrap(); - if (v[0] & PAGE_IMAGE_FLAG) == 0 { + if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { trace!("Reconstruct horizon page {:?}", key); let (base_img, records) = self.collect_records_for_apply(key.tag, key.lsn); @@ -518,9 +533,9 @@ impl Timeline for RocksTimeline { buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); let page_img: Bytes; - if let Some(img) = &content.page_image { - page_img = img.clone(); - } else if content.wal_record.is_some() { + if let CacheEntryContent::PageImage(img) = content { + page_img = img; + } else if let CacheEntryContent::WALRecord(_rec) = content { // Request the WAL redo manager to apply the WAL records for us. let (base_img, records) = self.collect_records_for_apply(tag, lsn); page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; @@ -603,10 +618,7 @@ impl Timeline for RocksTimeline { let lsn = rec.lsn; let key = CacheKey { tag, lsn }; - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - }; + let content = CacheEntryContent::WALRecord(rec); let mut key_buf = BytesMut::new(); key.pack(&mut key_buf); @@ -624,32 +636,33 @@ impl Timeline for RocksTimeline { /// Adds a relation-wide WAL record (like truncate) to the repository, /// associating it with all pages started with specified block number /// - fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { - let mut key = CacheKey { tag, lsn: rec.lsn }; - + fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()> { // What was the size of the relation before this record? let last_lsn = self.last_valid_lsn.load(); - let old_rel_size = self.relsize_get_nowait(tag.rel, last_lsn)?; + let old_rel_size = self.relsize_get_nowait(rel, last_lsn)?; - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - }; + let content = CacheEntryContent::Truncation; // set new relation size - trace!("Truncate relation {:?}", tag); + trace!("Truncate relation {:?} to {} blocks", rel, nblocks); let mut key_buf = BytesMut::new(); let mut val_buf = BytesMut::new(); content.pack(&mut val_buf); - for blknum in tag.blknum..old_rel_size { + for blknum in nblocks..old_rel_size { key_buf.clear(); - key.tag.blknum = blknum; + let key = CacheKey { + tag: BufferTag { + rel, + blknum, + }, + lsn, + }; key.pack(&mut key_buf); trace!("put_wal_record lsn: {}", key.lsn); let _res = self.db.put(&key_buf[..], &val_buf[..]); } - let n = (old_rel_size - tag.blknum) as u64; + let n = (old_rel_size - nblocks) as u64; self.num_entries.fetch_add(n, Ordering::Relaxed); self.num_wal_records.fetch_add(n, Ordering::Relaxed); Ok(()) @@ -660,10 +673,7 @@ impl Timeline for RocksTimeline { /// fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { let key = CacheKey { tag, lsn }; - let content = CacheEntryContent { - page_image: Some(img), - wal_record: None, - }; + let content = CacheEntryContent::PageImage(img); let mut key_buf = BytesMut::new(); key.pack(&mut key_buf); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 6b3cb2da15..f6c379d13d 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -370,7 +370,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn let rec = WALRecord { lsn, will_init: blk.will_init || blk.apply_image, - truncate: false, rec: recdata.clone(), main_data_offset: decoded.main_data_offset as u32, }; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 801895509c..d6f7fa4fef 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -234,7 +234,6 @@ fn walreceiver_main( let rec = WALRecord { lsn, will_init: blk.will_init || blk.apply_image, - truncate: false, rec: recdata.clone(), main_data_offset: decoded.main_data_offset as u32, }; @@ -248,23 +247,13 @@ fn walreceiver_main( { let truncate = XlSmgrTruncate::decode(&decoded); if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 { - let tag = BufferTag { - rel: RelTag { - spcnode: truncate.rnode.spcnode, - dbnode: truncate.rnode.dbnode, - relnode: truncate.rnode.relnode, - forknum: MAIN_FORKNUM, - }, - blknum: truncate.blkno, + let rel = RelTag { + spcnode: truncate.rnode.spcnode, + dbnode: truncate.rnode.dbnode, + relnode: truncate.rnode.relnode, + forknum: MAIN_FORKNUM, }; - let rec = WALRecord { - lsn, - will_init: false, - truncate: true, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_rel_wal_record(tag, rec)?; + timeline.put_truncation(rel, lsn, truncate.blkno)?; } } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)