From 7c73afc1af38c5bba02b45ee7cbef900ca7df5b6 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Mon, 24 May 2021 12:08:34 -0700 Subject: [PATCH] switch repository types to serde Derive Serialize+Deserialize for RelTag, BufferTag, CacheKey. Replace handwritten pack/unpack functions with ser, des from zenith_utils::bin_ser (which uses the bincode crate). There are some ugly hybrids in walredo.rs, but those functions are already doing a lot of questionable manual byte-twiddling, so hopefully the weirdness will go away when we get better postgres protocol wrappers. --- pageserver/src/repository.rs | 35 ++---------- pageserver/src/repository/rocksdb.rs | 81 +++++++++++----------------- pageserver/src/walredo.rs | 39 +++++++++++--- 3 files changed, 65 insertions(+), 90 deletions(-) diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 8d4350255f..6b18657983 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -6,6 +6,7 @@ use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::forknumber_to_name; +use serde::{Deserialize, Serialize}; use std::fmt; use std::sync::Arc; use zenith_utils::lsn::Lsn; @@ -176,7 +177,7 @@ pub struct RepositoryStats { pub num_getpage_requests: Lsn, } -#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy, Serialize, Deserialize)] pub struct RelTag { pub forknum: u8, pub spcnode: u32, @@ -184,23 +185,6 @@ pub struct RelTag { pub relnode: u32, } -impl RelTag { - pub fn pack(&self, buf: &mut BytesMut) { - buf.put_u8(self.forknum); - buf.put_u32(self.spcnode); - buf.put_u32(self.dbnode); - buf.put_u32(self.relnode); - } - pub fn unpack(buf: &mut Bytes) -> RelTag { - RelTag { - forknum: buf.get_u8(), - spcnode: buf.get_u32(), - dbnode: buf.get_u32(), - relnode: buf.get_u32(), - } - } -} - /// Display RelTag in the same format that's used in most PostgreSQL debug messages: /// /// //[_fsm|_vm|_init] @@ -219,25 +203,12 @@ impl fmt::Display for RelTag { } } -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)] pub struct BufferTag { pub rel: RelTag, pub blknum: u32, } -impl BufferTag { - pub fn pack(&self, buf: &mut BytesMut) { - self.rel.pack(buf); - buf.put_u32(self.blknum); - } - pub fn unpack(buf: &mut Bytes) -> BufferTag { - BufferTag { - rel: RelTag::unpack(buf), - blknum: buf.get_u32(), - } - } -} - #[derive(Debug, Clone)] pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 212ca4ab15..6f51fefcc0 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -15,6 +15,7 @@ use anyhow::{anyhow, bail, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use postgres_ffi::pg_constants; +use serde::{Deserialize, Serialize}; use std::cmp::min; use std::collections::HashMap; use std::convert::TryInto; @@ -24,6 +25,7 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; +use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; @@ -82,7 +84,7 @@ pub struct RocksTimeline { // stored directly in the cache entry in that you still need to run the WAL redo // routine to generate the page image. // -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)] struct CacheKey { pub tag: BufferTag, pub lsn: Lsn, @@ -125,30 +127,6 @@ impl CacheKey { static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0); static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1); -impl CacheKey { - fn pack(&self, buf: &mut BytesMut) { - self.tag.pack(buf); - buf.put_u64(self.lsn.0); - } - fn unpack(buf: &mut Bytes) -> CacheKey { - CacheKey { - tag: BufferTag::unpack(buf), - lsn: Lsn::from(buf.get_u64()), - } - } - - fn from_slice(slice: &[u8]) -> Self { - let mut buf = Bytes::copy_from_slice(slice); - Self::unpack(&mut buf) - } - - fn to_bytes(&self) -> BytesMut { - let mut buf = BytesMut::new(); - self.pack(&mut buf); - buf - } -} - enum CacheEntryContent { PageImage(Bytes), WALRecord(WALRecord), @@ -300,7 +278,7 @@ impl Repository for RocksRepository { iter.seek_to_first(); while iter.valid() { let k = iter.key().unwrap(); - let key = CacheKey::from_slice(k); + let key = CacheKey::des(k)?; if !key.is_special() && key.lsn <= at_lsn { let v = iter.value().unwrap(); @@ -339,12 +317,12 @@ impl RocksTimeline { // Load these into memory let lsnstr = db - .get(LAST_VALID_LSN_KEY.to_bytes()) + .get(LAST_VALID_LSN_KEY.ser()?) .with_context(|| "last_valid_lsn not found in repository")? .ok_or(anyhow!("empty last_valid_lsn"))?; let last_valid_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?; let lsnstr = db - .get(LAST_VALID_RECORD_LSN_KEY.to_bytes()) + .get(LAST_VALID_RECORD_LSN_KEY.ser()?) .with_context(|| "last_record_lsn not found in repository")? .ok_or(anyhow!("empty last_record_lsn"))?; let last_record_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?; @@ -423,12 +401,13 @@ impl RocksTimeline { let mut records: Vec = Vec::new(); let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(key.to_bytes()); + let serialized_key = key.ser().expect("serialize CacheKey should always succeed"); + iter.seek_for_prev(serialized_key); // Scan backwards, collecting the WAL records, until we hit an // old page image. while iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = CacheKey::des(iter.key().unwrap()).unwrap(); if key.tag != tag { break; } @@ -472,9 +451,9 @@ impl RocksTimeline { }; let mut iter = self.db.raw_iterator(); loop { - iter.seek_for_prev(key.to_bytes()); + iter.seek_for_prev(key.ser()?); if iter.valid() { - let thiskey = CacheKey::from_slice(iter.key().unwrap()); + let thiskey = CacheKey::des(iter.key().unwrap())?; if thiskey.tag.rel == rel { // Ignore entries with later LSNs. if thiskey.lsn > lsn { @@ -527,9 +506,9 @@ impl RocksTimeline { let mut deleted = 0u64; loop { let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(maxkey.to_bytes()); + iter.seek_for_prev(maxkey.ser()?); if iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = CacheKey::des(iter.key().unwrap())?; let v = iter.value().unwrap(); inspected += 1; @@ -564,12 +543,12 @@ impl RocksTimeline { reconstructed += 1; } - iter.seek_for_prev(maxkey.to_bytes()); + iter.seek_for_prev(maxkey.ser()?); if iter.valid() { // do not remove last version if last_lsn > horizon { // locate most recent record before horizon - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = CacheKey::des(iter.key().unwrap())?; if key.tag == maxkey.tag { let v = iter.value().unwrap(); if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { @@ -607,7 +586,7 @@ impl RocksTimeline { if !iter.valid() { break; } - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = CacheKey::des(iter.key().unwrap())?; if key.tag != maxkey.tag { break; } @@ -615,7 +594,7 @@ impl RocksTimeline { if (v[0] & UNUSED_VERSION_FLAG) == 0 { let mut v = v.to_owned(); v[0] |= UNUSED_VERSION_FLAG; - self.db.put(key.to_bytes(), &v[..])?; + self.db.put(key.ser()?, &v[..])?; deleted += 1; trace!( "deleted: {} blk {} at {}", @@ -692,10 +671,10 @@ impl Timeline for RocksTimeline { let key = CacheKey { tag, lsn }; let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(key.to_bytes()); + iter.seek_for_prev(key.ser()?); if iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = CacheKey::des(iter.key().unwrap())?; if key.tag == tag { let content = CacheEntryContent::from_slice(iter.value().unwrap()); let page_img: Bytes; @@ -755,9 +734,9 @@ impl Timeline for RocksTimeline { lsn, }; let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(key.to_bytes()); + iter.seek_for_prev(key.ser()?); if iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = CacheKey::des(iter.key().unwrap())?; if key.tag.rel == rel { debug!("Relation {} exists at {}", rel, lsn); return Ok(true); @@ -779,7 +758,8 @@ impl Timeline for RocksTimeline { let content = CacheEntryContent::WALRecord(rec); - let _res = self.db.put(key.to_bytes(), content.to_bytes()); + let serialized_key = key.ser().expect("serialize CacheKey should always succeed"); + let _res = self.db.put(serialized_key, content.to_bytes()); trace!( "put_wal_record rel {} blk {} at {}", tag.rel, @@ -810,7 +790,7 @@ impl Timeline for RocksTimeline { lsn, }; trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(key.to_bytes(), content.to_bytes()); + let _res = self.db.put(key.ser()?, content.to_bytes()); } let n = (old_rel_size - nblocks) as u64; self.num_entries.fetch_add(n, Ordering::Relaxed); @@ -840,7 +820,8 @@ impl Timeline for RocksTimeline { } trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(key.to_bytes(), content.to_bytes()); + let serialized_key = key.ser().expect("serialize CacheKey should always succeed"); + let _res = self.db.put(serialized_key, content.to_bytes()); trace!( "put_page_image rel {} blk {} at {}", @@ -872,10 +853,10 @@ impl Timeline for RocksTimeline { lsn: Lsn(0), }; let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); + iter.seek(key.ser()?); let mut n = 0; while iter.valid() { - let mut key = CacheKey::from_slice(iter.key().unwrap()); + let mut key = CacheKey::des(iter.key().unwrap())?; if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { break; } @@ -885,7 +866,7 @@ impl Timeline for RocksTimeline { key.lsn = lsn; let v = iter.value().unwrap(); - self.db.put(key.to_bytes(), v)?; + self.db.put(key.ser()?, v)?; n += 1; iter.next(); } @@ -951,9 +932,9 @@ impl Timeline for RocksTimeline { fn checkpoint(&self) -> Result<()> { let last_valid_lsn = self.last_valid_lsn.load(); self.db - .put(LAST_VALID_LSN_KEY.to_bytes(), last_valid_lsn.to_string())?; + .put(LAST_VALID_LSN_KEY.ser()?, last_valid_lsn.to_string())?; self.db.put( - LAST_VALID_RECORD_LSN_KEY.to_bytes(), + LAST_VALID_RECORD_LSN_KEY.ser()?, self.last_record_lsn.load().to_string(), )?; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 9ba4e6f515..4ade6d786e 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -16,7 +16,6 @@ //! use bytes::{BufMut, Bytes, BytesMut}; use log::*; -use std::assert; use std::cell::RefCell; use std::fs; use std::fs::OpenOptions; @@ -32,6 +31,7 @@ use tokio::io::AsyncBufReadExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::{ChildStdin, ChildStdout, Command}; use tokio::time::timeout; +use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; use crate::repository::BufferTag; @@ -456,9 +456,26 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { buf.put_u8(b'B'); buf.put_u32(len as u32); - tag.pack(&mut buf); - assert!(buf.len() == 1 + len); + // FIXME: this is a temporary hack that should go away when we refactor + // the postgres protocol serialization + handlers. + // + // BytesMut is a dynamic growable buffer, used a lot in tokio code but + // not in the std library. To write to a BytesMut from a serde serializer, + // we need to either: + // - pre-allocate the required buffer space. This is annoying because we + // shouldn't care what the exact serialized size is-- that's the + // serializer's job. + // - Or, we need to create a temporary "writer" (which implements the + // `Write` trait). It's a bit awkward, because the writer consumes the + // underlying BytesMut, and we need to extract it later with + // `into_inner`. + let mut writer = buf.writer(); + tag.ser_into(&mut writer) + .expect("serialize BufferTag should always succeed"); + let buf = writer.into_inner(); + + debug_assert!(buf.len() == 1 + len); buf.freeze() } @@ -471,10 +488,13 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { buf.put_u8(b'P'); buf.put_u32(len as u32); - tag.pack(&mut buf); + let mut writer = buf.writer(); + tag.ser_into(&mut writer) + .expect("serialize BufferTag should always succeed"); + let mut buf = writer.into_inner(); buf.put(base_img); - assert!(buf.len() == 1 + len); + debug_assert!(buf.len() == 1 + len); buf.freeze() } @@ -488,7 +508,7 @@ fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes { buf.put_u64(endlsn.0); buf.put(rec); - assert!(buf.len() == 1 + len); + debug_assert!(buf.len() == 1 + len); buf.freeze() } @@ -499,9 +519,12 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes { buf.put_u8(b'G'); buf.put_u32(len as u32); - tag.pack(&mut buf); + let mut writer = buf.writer(); + tag.ser_into(&mut writer) + .expect("serialize BufferTag should always succeed"); + let buf = writer.into_inner(); - assert!(buf.len() == 1 + len); + debug_assert!(buf.len() == 1 + len); buf.freeze() }