Simplify construction of rocksdb keys and values.

I'm going nuts with the pattern:

    let k = iter.key().unwrap();
    buf.clear();
    buf.extend_from_slice(&k);
    let key = CacheKey::unpack(&mut buf);

Introduce helper functions to convert a CacheKey into BytesMut, and
from [u8] into CacheKey. Reduces the boilerplate code a lot.

The helper functions create a new BytesMut on each call, whereas the old
coding could reuse a single BytesMut, so this could be a bit slower. I
haven't tried measuring it, but at least it's not immediately noticeable,
and readability is much more imporatant at this point. We can optimize
later
This commit is contained in:
Heikki Linnakangas
2021-05-19 12:32:11 +03:00
parent 709b778904
commit e6a7241c3a
2 changed files with 71 additions and 136 deletions

View File

@@ -198,7 +198,7 @@ impl RelTag {
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
}
pub fn unpack(buf: &mut BytesMut) -> RelTag {
pub fn unpack(buf: &mut Bytes) -> RelTag {
RelTag {
forknum: buf.get_u8(),
spcnode: buf.get_u32(),
@@ -237,7 +237,7 @@ impl BufferTag {
self.rel.pack(buf);
buf.put_u32(self.blknum);
}
pub fn unpack(buf: &mut BytesMut) -> BufferTag {
pub fn unpack(buf: &mut Bytes) -> BufferTag {
BufferTag {
rel: RelTag::unpack(buf),
blknum: buf.get_u32(),
@@ -264,7 +264,7 @@ impl WALRecord {
buf.put_u32(self.rec.len() as u32);
buf.put_slice(&self.rec[..]);
}
pub fn unpack(buf: &mut BytesMut) -> WALRecord {
pub fn unpack(buf: &mut Bytes) -> WALRecord {
let lsn = Lsn::from(buf.get_u64());
let will_init = buf.get_u8() != 0;
let main_data_offset = buf.get_u32();

View File

@@ -84,22 +84,33 @@ pub struct RocksTimeline {
// routine to generate the page image.
//
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct CacheKey {
struct CacheKey {
pub tag: BufferTag,
pub lsn: Lsn,
}
impl CacheKey {
pub fn pack(&self, buf: &mut BytesMut) {
fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn.0);
}
pub fn unpack(buf: &mut BytesMut) -> CacheKey {
fn unpack(buf: &mut Bytes) -> CacheKey {
CacheKey {
tag: BufferTag::unpack(buf),
lsn: Lsn::from(buf.get_u64()),
}
}
fn from_slice(slice: &[u8]) -> Self {
let mut buf = Bytes::copy_from_slice(slice);
Self::unpack(&mut buf)
}
fn to_bytes(&self) -> BytesMut {
let mut buf = BytesMut::new();
self.pack(&mut buf);
buf
}
}
enum CacheEntryContent {
@@ -138,7 +149,7 @@ impl CacheEntryContent {
}
}
}
pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent {
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
let kind = buf.get_u8() & CONTENT_KIND_MASK;
match kind {
@@ -153,6 +164,17 @@ impl CacheEntryContent {
_ => unreachable!(),
}
}
fn from_slice(slice: &[u8]) -> Self {
let mut buf = Bytes::copy_from_slice(slice);
Self::unpack(&mut buf)
}
fn to_bytes(&self) -> BytesMut {
let mut buf = BytesMut::new();
self.pack(&mut buf);
buf
}
}
impl RocksRepository {
@@ -273,30 +295,21 @@ impl RocksTimeline {
tag: BufferTag,
lsn: Lsn,
) -> (Option<Bytes>, Vec<WALRecord>) {
let mut buf = BytesMut::new();
let key = CacheKey { tag, lsn };
key.pack(&mut buf);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
iter.seek_for_prev(key.to_bytes());
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
while iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag != tag {
break;
}
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let content = CacheEntryContent::from_slice(iter.value().unwrap());
if let CacheEntryContent::PageImage(img) = content {
// We have a base image. No need to dig deeper into the list of
// records
@@ -334,31 +347,21 @@ impl RocksTimeline {
},
lsn,
};
let mut buf = BytesMut::new();
let mut iter = self.db.raw_iterator();
loop {
buf.clear();
key.pack(&mut buf);
iter.seek_for_prev(&buf[..]);
iter.seek_for_prev(key.to_bytes());
if iter.valid() {
let k = iter.key().unwrap();
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == rel {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let thiskey = CacheKey::from_slice(iter.key().unwrap());
if thiskey.tag.rel == rel {
let content = CacheEntryContent::from_slice(iter.value().unwrap());
if let CacheEntryContent::Truncation = content {
if tag.blknum > 0 {
key.tag.blknum = tag.blknum - 1;
if thiskey.tag.blknum > 0 {
key.tag.blknum = thiskey.tag.blknum - 1;
continue;
}
break;
}
let relsize = tag.blknum + 1;
let relsize = thiskey.tag.blknum + 1;
debug!("Size of relation {} at {} is {}", rel, lsn, relsize);
return Ok(relsize);
}
@@ -370,7 +373,6 @@ impl RocksTimeline {
}
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut buf = BytesMut::new();
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
@@ -395,20 +397,14 @@ impl RocksTimeline {
let mut inspected = 0u64;
let mut deleted = 0u64;
loop {
buf.clear();
maxkey.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
iter.seek_for_prev(maxkey.to_bytes());
if iter.valid() {
let k = iter.key().unwrap();
let key = CacheKey::from_slice(iter.key().unwrap());
let v = iter.value().unwrap();
inspected += 1;
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
@@ -424,7 +420,7 @@ impl RocksTimeline {
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(k, &v[..])?;
self.db.put(key.to_bytes(), &v[..])?;
deleted += 1;
}
maxkey = minkey;
@@ -452,18 +448,12 @@ impl RocksTimeline {
reconstructed += 1;
}
buf.clear();
maxkey.pack(&mut buf);
iter.seek_for_prev(&buf[..]);
iter.seek_for_prev(maxkey.to_bytes());
if iter.valid() {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag == maxkey.tag {
let v = iter.value().unwrap();
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
@@ -501,10 +491,7 @@ impl RocksTimeline {
if !iter.valid() {
break;
}
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag != maxkey.tag {
break;
}
@@ -512,7 +499,7 @@ impl RocksTimeline {
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(k, &v[..])?;
self.db.put(key.to_bytes(), &v[..])?;
deleted += 1;
trace!(
"deleted: {} blk {} at {}",
@@ -582,21 +569,13 @@ impl Timeline for RocksTimeline {
// ask the WAL redo service to reconstruct the page image from the WAL records.
let key = CacheKey { tag, lsn };
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
iter.seek_for_prev(key.to_bytes());
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag == tag {
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let content = CacheEntryContent::from_slice(iter.value().unwrap());
let page_img: Bytes;
if let CacheEntryContent::PageImage(img) = content {
page_img = img;
@@ -654,17 +633,12 @@ impl Timeline for RocksTimeline {
},
lsn: Lsn(0),
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut gxacts = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]);
iter.seek(key.to_bytes());
while iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
break; // we are done with this fork
}
@@ -691,18 +665,13 @@ impl Timeline for RocksTimeline {
},
lsn: Lsn(0),
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut dbs = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]);
iter.seek(key.to_bytes());
let mut prev_tag = key.tag.rel;
while iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
break; // we are done with this fork
}
@@ -724,29 +693,20 @@ impl Timeline for RocksTimeline {
tag: BufferTag { rel, blknum: 0 },
lsn,
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]); // locate first entry
iter.seek(key.to_bytes()); // locate first entry
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let tag = thiskey.tag;
if tag.rel == rel {
// still trversing this relation
let first_blknum = tag.blknum;
key.tag.blknum = u32::MAX; // maximal key
buf.clear();
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]); // localte last entry
iter.seek_for_prev(key.to_bytes()); // localte last entry
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
let last_blknum = tag.blknum;
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let last_blknum = thiskey.tag.blknum;
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
}
}
@@ -768,16 +728,11 @@ impl Timeline for RocksTimeline {
},
lsn,
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]);
iter.seek_for_prev(key.to_bytes());
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == rel {
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel == rel {
debug!("Relation {} exists at {}", rel, lsn);
return Ok(true);
}
@@ -798,12 +753,7 @@ impl Timeline for RocksTimeline {
let content = CacheEntryContent::WALRecord(rec);
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
trace!(
"put_wal_record rel {} blk {} at {}",
tag.rel,
@@ -828,19 +778,13 @@ impl Timeline for RocksTimeline {
// set new relation size
trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
let mut key_buf = BytesMut::new();
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
for blknum in nblocks..old_rel_size {
key_buf.clear();
let key = CacheKey {
tag: BufferTag { rel, blknum },
lsn,
};
key.pack(&mut key_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
}
let n = (old_rel_size - nblocks) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
@@ -855,10 +799,8 @@ impl Timeline for RocksTimeline {
let img_len = img.len();
let key = CacheKey { tag, lsn };
let content = CacheEntryContent::PageImage(img);
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let mut val_buf = content.to_bytes();
// Zero size of page image indicates that page can be removed
if img_len == 0 {
@@ -872,7 +814,7 @@ impl Timeline for RocksTimeline {
}
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
trace!(
"put_page_image rel {} blk {} at {}",
@@ -891,7 +833,6 @@ impl Timeline for RocksTimeline {
src_db_id: Oid,
src_tablespace_id: Oid,
) -> anyhow::Result<()> {
let mut buf = BytesMut::new();
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
@@ -904,26 +845,20 @@ impl Timeline for RocksTimeline {
},
lsn: Lsn(0),
};
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]);
iter.seek(key.to_bytes());
let mut n = 0;
while iter.valid() {
let k = iter.key().unwrap();
let v = iter.value().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let mut key = CacheKey::unpack(&mut buf);
let mut key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
buf.clear();
key.pack(&mut buf);
self.db.put(&buf[..], v)?;
let v = iter.value().unwrap();
self.db.put(key.to_bytes(), v)?;
n += 1;
iter.next();
}