diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 3e6846def6..2db389448f 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -198,7 +198,7 @@ impl RelTag { buf.put_u32(self.dbnode); buf.put_u32(self.relnode); } - pub fn unpack(buf: &mut BytesMut) -> RelTag { + pub fn unpack(buf: &mut Bytes) -> RelTag { RelTag { forknum: buf.get_u8(), spcnode: buf.get_u32(), @@ -237,7 +237,7 @@ impl BufferTag { self.rel.pack(buf); buf.put_u32(self.blknum); } - pub fn unpack(buf: &mut BytesMut) -> BufferTag { + pub fn unpack(buf: &mut Bytes) -> BufferTag { BufferTag { rel: RelTag::unpack(buf), blknum: buf.get_u32(), @@ -264,7 +264,7 @@ impl WALRecord { buf.put_u32(self.rec.len() as u32); buf.put_slice(&self.rec[..]); } - pub fn unpack(buf: &mut BytesMut) -> WALRecord { + pub fn unpack(buf: &mut Bytes) -> WALRecord { let lsn = Lsn::from(buf.get_u64()); let will_init = buf.get_u8() != 0; let main_data_offset = buf.get_u32(); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 665803829a..23b9df3a11 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -84,22 +84,33 @@ pub struct RocksTimeline { // routine to generate the page image. // #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -pub struct CacheKey { +struct CacheKey { pub tag: BufferTag, pub lsn: Lsn, } impl CacheKey { - pub fn pack(&self, buf: &mut BytesMut) { + fn pack(&self, buf: &mut BytesMut) { self.tag.pack(buf); buf.put_u64(self.lsn.0); } - pub fn unpack(buf: &mut BytesMut) -> CacheKey { + fn unpack(buf: &mut Bytes) -> CacheKey { CacheKey { tag: BufferTag::unpack(buf), lsn: Lsn::from(buf.get_u64()), } } + + fn from_slice(slice: &[u8]) -> Self { + let mut buf = Bytes::copy_from_slice(slice); + Self::unpack(&mut buf) + } + + fn to_bytes(&self) -> BytesMut { + let mut buf = BytesMut::new(); + self.pack(&mut buf); + buf + } } enum CacheEntryContent { @@ -138,7 +149,7 @@ impl CacheEntryContent { } } } - pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent { + pub fn unpack(buf: &mut Bytes) -> CacheEntryContent { let kind = buf.get_u8() & CONTENT_KIND_MASK; match kind { @@ -153,6 +164,17 @@ impl CacheEntryContent { _ => unreachable!(), } } + + fn from_slice(slice: &[u8]) -> Self { + let mut buf = Bytes::copy_from_slice(slice); + Self::unpack(&mut buf) + } + + fn to_bytes(&self) -> BytesMut { + let mut buf = BytesMut::new(); + self.pack(&mut buf); + buf + } } impl RocksRepository { @@ -273,30 +295,21 @@ impl RocksTimeline { tag: BufferTag, lsn: Lsn, ) -> (Option, Vec) { - let mut buf = BytesMut::new(); let key = CacheKey { tag, lsn }; - key.pack(&mut buf); - let mut base_img: Option = None; let mut records: Vec = Vec::new(); let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); + iter.seek_for_prev(key.to_bytes()); // Scan backwards, collecting the WAL records, until we hit an // old page image. while iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); + let key = CacheKey::from_slice(iter.key().unwrap()); if key.tag != tag { break; } - let v = iter.value().unwrap(); - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); + let content = CacheEntryContent::from_slice(iter.value().unwrap()); if let CacheEntryContent::PageImage(img) = content { // We have a base image. No need to dig deeper into the list of // records @@ -334,31 +347,21 @@ impl RocksTimeline { }, lsn, }; - let mut buf = BytesMut::new(); let mut iter = self.db.raw_iterator(); - loop { - buf.clear(); - key.pack(&mut buf); - iter.seek_for_prev(&buf[..]); + iter.seek_for_prev(key.to_bytes()); if iter.valid() { - let k = iter.key().unwrap(); - let v = iter.value().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == rel { - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); + let thiskey = CacheKey::from_slice(iter.key().unwrap()); + if thiskey.tag.rel == rel { + let content = CacheEntryContent::from_slice(iter.value().unwrap()); if let CacheEntryContent::Truncation = content { - if tag.blknum > 0 { - key.tag.blknum = tag.blknum - 1; + if thiskey.tag.blknum > 0 { + key.tag.blknum = thiskey.tag.blknum - 1; continue; } break; } - let relsize = tag.blknum + 1; + let relsize = thiskey.tag.blknum + 1; debug!("Size of relation {} at {} is {}", rel, lsn, relsize); return Ok(relsize); } @@ -370,7 +373,6 @@ impl RocksTimeline { } fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { - let mut buf = BytesMut::new(); loop { thread::sleep(conf.gc_period); let last_lsn = self.get_last_valid_lsn(); @@ -395,20 +397,14 @@ impl RocksTimeline { let mut inspected = 0u64; let mut deleted = 0u64; loop { - buf.clear(); - maxkey.pack(&mut buf); let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); + iter.seek_for_prev(maxkey.to_bytes()); if iter.valid() { - let k = iter.key().unwrap(); + let key = CacheKey::from_slice(iter.key().unwrap()); let v = iter.value().unwrap(); inspected += 1; - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); - // Construct boundaries for old records cleanup maxkey.tag = key.tag; let last_lsn = key.lsn; @@ -424,7 +420,7 @@ impl RocksTimeline { if (v[0] & UNUSED_VERSION_FLAG) == 0 { let mut v = v.to_owned(); v[0] |= UNUSED_VERSION_FLAG; - self.db.put(k, &v[..])?; + self.db.put(key.to_bytes(), &v[..])?; deleted += 1; } maxkey = minkey; @@ -452,18 +448,12 @@ impl RocksTimeline { reconstructed += 1; } - buf.clear(); - maxkey.pack(&mut buf); - - iter.seek_for_prev(&buf[..]); + iter.seek_for_prev(maxkey.to_bytes()); if iter.valid() { // do not remove last version if last_lsn > horizon { // locate most recent record before horizon - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); + let key = CacheKey::from_slice(iter.key().unwrap()); if key.tag == maxkey.tag { let v = iter.value().unwrap(); if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { @@ -501,10 +491,7 @@ impl RocksTimeline { if !iter.valid() { break; } - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); + let key = CacheKey::from_slice(iter.key().unwrap()); if key.tag != maxkey.tag { break; } @@ -512,7 +499,7 @@ impl RocksTimeline { if (v[0] & UNUSED_VERSION_FLAG) == 0 { let mut v = v.to_owned(); v[0] |= UNUSED_VERSION_FLAG; - self.db.put(k, &v[..])?; + self.db.put(key.to_bytes(), &v[..])?; deleted += 1; trace!( "deleted: {} blk {} at {}", @@ -582,21 +569,13 @@ impl Timeline for RocksTimeline { // ask the WAL redo service to reconstruct the page image from the WAL records. let key = CacheKey { tag, lsn }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); + iter.seek_for_prev(key.to_bytes()); if iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); + let key = CacheKey::from_slice(iter.key().unwrap()); if key.tag == tag { - let v = iter.value().unwrap(); - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); + let content = CacheEntryContent::from_slice(iter.value().unwrap()); let page_img: Bytes; if let CacheEntryContent::PageImage(img) = content { page_img = img; @@ -654,17 +633,12 @@ impl Timeline for RocksTimeline { }, lsn: Lsn(0), }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); let mut gxacts = Vec::new(); let mut iter = self.db.raw_iterator(); - iter.seek(&buf[..]); + iter.seek(key.to_bytes()); while iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); + let key = CacheKey::from_slice(iter.key().unwrap()); if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM { break; // we are done with this fork } @@ -691,18 +665,13 @@ impl Timeline for RocksTimeline { }, lsn: Lsn(0), }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); let mut dbs = Vec::new(); let mut iter = self.db.raw_iterator(); - iter.seek(&buf[..]); + iter.seek(key.to_bytes()); let mut prev_tag = key.tag.rel; while iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); + let key = CacheKey::from_slice(iter.key().unwrap()); if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM { break; // we are done with this fork } @@ -724,29 +693,20 @@ impl Timeline for RocksTimeline { tag: BufferTag { rel, blknum: 0 }, lsn, }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); let mut iter = self.db.raw_iterator(); - iter.seek(&buf[..]); // locate first entry + iter.seek(key.to_bytes()); // locate first entry if iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); + let thiskey = CacheKey::from_slice(iter.key().unwrap()); + let tag = thiskey.tag; if tag.rel == rel { // still trversing this relation let first_blknum = tag.blknum; key.tag.blknum = u32::MAX; // maximal key - buf.clear(); - key.pack(&mut buf); let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); // localte last entry + iter.seek_for_prev(key.to_bytes()); // localte last entry if iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - let last_blknum = tag.blknum; + let thiskey = CacheKey::from_slice(iter.key().unwrap()); + let last_blknum = thiskey.tag.blknum; return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive } } @@ -768,16 +728,11 @@ impl Timeline for RocksTimeline { }, lsn, }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); + iter.seek_for_prev(key.to_bytes()); if iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == rel { + let key = CacheKey::from_slice(iter.key().unwrap()); + if key.tag.rel == rel { debug!("Relation {} exists at {}", rel, lsn); return Ok(true); } @@ -798,12 +753,7 @@ impl Timeline for RocksTimeline { let content = CacheEntryContent::WALRecord(rec); - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - let _res = self.db.put(&key_buf[..], &val_buf[..]); + let _res = self.db.put(key.to_bytes(), content.to_bytes()); trace!( "put_wal_record rel {} blk {} at {}", tag.rel, @@ -828,19 +778,13 @@ impl Timeline for RocksTimeline { // set new relation size trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn); - let mut key_buf = BytesMut::new(); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - for blknum in nblocks..old_rel_size { - key_buf.clear(); let key = CacheKey { tag: BufferTag { rel, blknum }, lsn, }; - key.pack(&mut key_buf); trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(&key_buf[..], &val_buf[..]); + let _res = self.db.put(key.to_bytes(), content.to_bytes()); } let n = (old_rel_size - nblocks) as u64; self.num_entries.fetch_add(n, Ordering::Relaxed); @@ -855,10 +799,8 @@ impl Timeline for RocksTimeline { let img_len = img.len(); let key = CacheKey { tag, lsn }; let content = CacheEntryContent::PageImage(img); - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); + + let mut val_buf = content.to_bytes(); // Zero size of page image indicates that page can be removed if img_len == 0 { @@ -872,7 +814,7 @@ impl Timeline for RocksTimeline { } trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(&key_buf[..], &val_buf[..]); + let _res = self.db.put(key.to_bytes(), content.to_bytes()); trace!( "put_page_image rel {} blk {} at {}", @@ -891,7 +833,6 @@ impl Timeline for RocksTimeline { src_db_id: Oid, src_tablespace_id: Oid, ) -> anyhow::Result<()> { - let mut buf = BytesMut::new(); let key = CacheKey { tag: BufferTag { rel: RelTag { @@ -904,26 +845,20 @@ impl Timeline for RocksTimeline { }, lsn: Lsn(0), }; - key.pack(&mut buf); let mut iter = self.db.raw_iterator(); - iter.seek(&buf[..]); + iter.seek(key.to_bytes()); let mut n = 0; while iter.valid() { - let k = iter.key().unwrap(); - let v = iter.value().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let mut key = CacheKey::unpack(&mut buf); + let mut key = CacheKey::from_slice(iter.key().unwrap()); if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { break; } key.tag.rel.spcnode = tablespace_id; key.tag.rel.dbnode = db_id; key.lsn = lsn; - buf.clear(); - key.pack(&mut buf); - self.db.put(&buf[..], v)?; + let v = iter.value().unwrap(); + self.db.put(key.to_bytes(), v)?; n += 1; iter.next(); }