switch repository types to serde

Derive Serialize+Deserialize for RelTag, BufferTag, CacheKey. Replace
handwritten pack/unpack functions with ser, des from
zenith_utils::bin_ser (which uses the bincode crate).

There are some ugly hybrids in walredo.rs, but those functions are
already doing a lot of questionable manual byte-twiddling, so hopefully
the weirdness will go away when we get better postgres protocol
wrappers.
This commit is contained in:
Eric Seppanen
2021-05-24 12:08:34 -07:00
parent e8f0a9bb80
commit 7c73afc1af
3 changed files with 65 additions and 90 deletions

View File

@@ -6,6 +6,7 @@ use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::forknumber_to_name;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
@@ -176,7 +177,7 @@ pub struct RepositoryStats {
pub num_getpage_requests: Lsn,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy, Serialize, Deserialize)]
pub struct RelTag {
pub forknum: u8,
pub spcnode: u32,
@@ -184,23 +185,6 @@ pub struct RelTag {
pub relnode: u32,
}
impl RelTag {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u8(self.forknum);
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
}
pub fn unpack(buf: &mut Bytes) -> RelTag {
RelTag {
forknum: buf.get_u8(),
spcnode: buf.get_u32(),
dbnode: buf.get_u32(),
relnode: buf.get_u32(),
}
}
}
/// Display RelTag in the same format that's used in most PostgreSQL debug messages:
///
/// <spcnode>/<dbnode>/<relnode>[_fsm|_vm|_init]
@@ -219,25 +203,12 @@ impl fmt::Display for RelTag {
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)]
pub struct BufferTag {
pub rel: RelTag,
pub blknum: u32,
}
impl BufferTag {
pub fn pack(&self, buf: &mut BytesMut) {
self.rel.pack(buf);
buf.put_u32(self.blknum);
}
pub fn unpack(buf: &mut Bytes) -> BufferTag {
BufferTag {
rel: RelTag::unpack(buf),
blknum: buf.get_u32(),
}
}
}
#[derive(Debug, Clone)]
pub struct WALRecord {
pub lsn: Lsn, // LSN at the *end* of the record

View File

@@ -15,6 +15,7 @@ use anyhow::{anyhow, bail, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::collections::HashMap;
use std::convert::TryInto;
@@ -24,6 +25,7 @@ use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
@@ -82,7 +84,7 @@ pub struct RocksTimeline {
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
struct CacheKey {
pub tag: BufferTag,
pub lsn: Lsn,
@@ -125,30 +127,6 @@ impl CacheKey {
static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0);
static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1);
impl CacheKey {
fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn.0);
}
fn unpack(buf: &mut Bytes) -> CacheKey {
CacheKey {
tag: BufferTag::unpack(buf),
lsn: Lsn::from(buf.get_u64()),
}
}
fn from_slice(slice: &[u8]) -> Self {
let mut buf = Bytes::copy_from_slice(slice);
Self::unpack(&mut buf)
}
fn to_bytes(&self) -> BytesMut {
let mut buf = BytesMut::new();
self.pack(&mut buf);
buf
}
}
enum CacheEntryContent {
PageImage(Bytes),
WALRecord(WALRecord),
@@ -300,7 +278,7 @@ impl Repository for RocksRepository {
iter.seek_to_first();
while iter.valid() {
let k = iter.key().unwrap();
let key = CacheKey::from_slice(k);
let key = CacheKey::des(k)?;
if !key.is_special() && key.lsn <= at_lsn {
let v = iter.value().unwrap();
@@ -339,12 +317,12 @@ impl RocksTimeline {
// Load these into memory
let lsnstr = db
.get(LAST_VALID_LSN_KEY.to_bytes())
.get(LAST_VALID_LSN_KEY.ser()?)
.with_context(|| "last_valid_lsn not found in repository")?
.ok_or(anyhow!("empty last_valid_lsn"))?;
let last_valid_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?;
let lsnstr = db
.get(LAST_VALID_RECORD_LSN_KEY.to_bytes())
.get(LAST_VALID_RECORD_LSN_KEY.ser()?)
.with_context(|| "last_record_lsn not found in repository")?
.ok_or(anyhow!("empty last_record_lsn"))?;
let last_record_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?;
@@ -423,12 +401,13 @@ impl RocksTimeline {
let mut records: Vec<WALRecord> = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes());
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
iter.seek_for_prev(serialized_key);
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = CacheKey::des(iter.key().unwrap()).unwrap();
if key.tag != tag {
break;
}
@@ -472,9 +451,9 @@ impl RocksTimeline {
};
let mut iter = self.db.raw_iterator();
loop {
iter.seek_for_prev(key.to_bytes());
iter.seek_for_prev(key.ser()?);
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let thiskey = CacheKey::des(iter.key().unwrap())?;
if thiskey.tag.rel == rel {
// Ignore entries with later LSNs.
if thiskey.lsn > lsn {
@@ -527,9 +506,9 @@ impl RocksTimeline {
let mut deleted = 0u64;
loop {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(maxkey.to_bytes());
iter.seek_for_prev(maxkey.ser()?);
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = CacheKey::des(iter.key().unwrap())?;
let v = iter.value().unwrap();
inspected += 1;
@@ -564,12 +543,12 @@ impl RocksTimeline {
reconstructed += 1;
}
iter.seek_for_prev(maxkey.to_bytes());
iter.seek_for_prev(maxkey.ser()?);
if iter.valid() {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let key = CacheKey::from_slice(iter.key().unwrap());
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag == maxkey.tag {
let v = iter.value().unwrap();
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
@@ -607,7 +586,7 @@ impl RocksTimeline {
if !iter.valid() {
break;
}
let key = CacheKey::from_slice(iter.key().unwrap());
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag != maxkey.tag {
break;
}
@@ -615,7 +594,7 @@ impl RocksTimeline {
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(key.to_bytes(), &v[..])?;
self.db.put(key.ser()?, &v[..])?;
deleted += 1;
trace!(
"deleted: {} blk {} at {}",
@@ -692,10 +671,10 @@ impl Timeline for RocksTimeline {
let key = CacheKey { tag, lsn };
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes());
iter.seek_for_prev(key.ser()?);
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag == tag {
let content = CacheEntryContent::from_slice(iter.value().unwrap());
let page_img: Bytes;
@@ -755,9 +734,9 @@ impl Timeline for RocksTimeline {
lsn,
};
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes());
iter.seek_for_prev(key.ser()?);
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag.rel == rel {
debug!("Relation {} exists at {}", rel, lsn);
return Ok(true);
@@ -779,7 +758,8 @@ impl Timeline for RocksTimeline {
let content = CacheEntryContent::WALRecord(rec);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
let _res = self.db.put(serialized_key, content.to_bytes());
trace!(
"put_wal_record rel {} blk {} at {}",
tag.rel,
@@ -810,7 +790,7 @@ impl Timeline for RocksTimeline {
lsn,
};
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
let _res = self.db.put(key.ser()?, content.to_bytes());
}
let n = (old_rel_size - nblocks) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
@@ -840,7 +820,8 @@ impl Timeline for RocksTimeline {
}
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
let _res = self.db.put(serialized_key, content.to_bytes());
trace!(
"put_page_image rel {} blk {} at {}",
@@ -872,10 +853,10 @@ impl Timeline for RocksTimeline {
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
iter.seek(key.ser()?);
let mut n = 0;
while iter.valid() {
let mut key = CacheKey::from_slice(iter.key().unwrap());
let mut key = CacheKey::des(iter.key().unwrap())?;
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
@@ -885,7 +866,7 @@ impl Timeline for RocksTimeline {
key.lsn = lsn;
let v = iter.value().unwrap();
self.db.put(key.to_bytes(), v)?;
self.db.put(key.ser()?, v)?;
n += 1;
iter.next();
}
@@ -951,9 +932,9 @@ impl Timeline for RocksTimeline {
fn checkpoint(&self) -> Result<()> {
let last_valid_lsn = self.last_valid_lsn.load();
self.db
.put(LAST_VALID_LSN_KEY.to_bytes(), last_valid_lsn.to_string())?;
.put(LAST_VALID_LSN_KEY.ser()?, last_valid_lsn.to_string())?;
self.db.put(
LAST_VALID_RECORD_LSN_KEY.to_bytes(),
LAST_VALID_RECORD_LSN_KEY.ser()?,
self.last_record_lsn.load().to_string(),
)?;

View File

@@ -16,7 +16,6 @@
//!
use bytes::{BufMut, Bytes, BytesMut};
use log::*;
use std::assert;
use std::cell::RefCell;
use std::fs;
use std::fs::OpenOptions;
@@ -32,6 +31,7 @@ use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{ChildStdin, ChildStdout, Command};
use tokio::time::timeout;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use crate::repository::BufferTag;
@@ -456,9 +456,26 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
buf.put_u8(b'B');
buf.put_u32(len as u32);
tag.pack(&mut buf);
assert!(buf.len() == 1 + len);
// FIXME: this is a temporary hack that should go away when we refactor
// the postgres protocol serialization + handlers.
//
// BytesMut is a dynamic growable buffer, used a lot in tokio code but
// not in the std library. To write to a BytesMut from a serde serializer,
// we need to either:
// - pre-allocate the required buffer space. This is annoying because we
// shouldn't care what the exact serialized size is-- that's the
// serializer's job.
// - Or, we need to create a temporary "writer" (which implements the
// `Write` trait). It's a bit awkward, because the writer consumes the
// underlying BytesMut, and we need to extract it later with
// `into_inner`.
let mut writer = buf.writer();
tag.ser_into(&mut writer)
.expect("serialize BufferTag should always succeed");
let buf = writer.into_inner();
debug_assert!(buf.len() == 1 + len);
buf.freeze()
}
@@ -471,10 +488,13 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
buf.put_u8(b'P');
buf.put_u32(len as u32);
tag.pack(&mut buf);
let mut writer = buf.writer();
tag.ser_into(&mut writer)
.expect("serialize BufferTag should always succeed");
let mut buf = writer.into_inner();
buf.put(base_img);
assert!(buf.len() == 1 + len);
debug_assert!(buf.len() == 1 + len);
buf.freeze()
}
@@ -488,7 +508,7 @@ fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes {
buf.put_u64(endlsn.0);
buf.put(rec);
assert!(buf.len() == 1 + len);
debug_assert!(buf.len() == 1 + len);
buf.freeze()
}
@@ -499,9 +519,12 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes {
buf.put_u8(b'G');
buf.put_u32(len as u32);
tag.pack(&mut buf);
let mut writer = buf.writer();
tag.ser_into(&mut writer)
.expect("serialize BufferTag should always succeed");
let buf = writer.into_inner();
assert!(buf.len() == 1 + len);
debug_assert!(buf.len() == 1 + len);
buf.freeze()
}