try using serde to do all the serialization in wal_service

This version validates on every call that our result is exactly the same
as the previous result.

NodeId is a strange corner case: one field is serialized little-endian
and one field is serialized big-endian. Hopefully we can fix that in the
future.
This commit is contained in:
Eric Seppanen
2021-05-07 01:03:40 -07:00
parent 36c12247b9
commit 0cbb3798da
5 changed files with 245 additions and 97 deletions

3
Cargo.lock generated
View File

@@ -1181,6 +1181,7 @@ dependencies = [
"regex",
"rocksdb",
"rust-s3",
"serde",
"slog",
"slog-async",
"slog-scope",
@@ -2246,6 +2247,7 @@ dependencies = [
"postgres_ffi",
"regex",
"rust-s3",
"serde",
"slog",
"slog-async",
"slog-scope",
@@ -2255,6 +2257,7 @@ dependencies = [
"tokio-stream",
"walkdir",
"workspace_hack",
"zenith_utils",
]
[[package]]

View File

@@ -38,6 +38,8 @@ thiserror = "1.0"
hex = "0.4.3"
tar = "0.4.33"
parse_duration = "2.1.1"
serde = { version = "1.0", features = ["derive"] }
postgres_ffi = { path = "../postgres_ffi" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -1,3 +1,4 @@
use serde::{Deserialize, Serialize};
use std::fmt;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -49,7 +50,7 @@ pub struct PageServerConf {
/// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ZTimelineId([u8; 16]);
impl FromStr for ZTimelineId {

View File

@@ -29,8 +29,10 @@ anyhow = "1.0"
crc32c = "0.6.0"
parse_duration = "2.1.1"
walkdir = "2"
serde = { version = "1.0", features = ["derive"] }
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
pageserver = { path = "../pageserver" }
postgres_ffi = { path = "../postgres_ffi" }
workspace_hack = { path = "../workspace_hack" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -10,6 +10,7 @@ use lazy_static::lazy_static;
use log::*;
use postgres::{Client, NoTls};
use regex::Regex;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
@@ -19,6 +20,7 @@ use std::net::{TcpListener, TcpStream};
use std::str;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use zenith_utils::bin_ser::LeSer;
use crate::pq_protocol::*;
use crate::WalAcceptorConf;
@@ -72,99 +74,108 @@ fn read_into(r: &mut impl Read, buf: &mut BytesMut) -> io::Result<usize> {
Ok(num_bytes)
}
/*
* Unique node identifier used by Paxos
*/
/// Unique node identifier used by Paxos
#[repr(C)]
#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
struct NodeId {
term: u64,
uuid: u128,
#[serde(serialize_with = "serialize_u64_flip_endian")]
#[serde(deserialize_with = "deserialize_u64_flip_endian")]
term: u64,
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
struct ServerInfo {
protocol_version: u32, /* proxy-safekeeper protocol version */
pg_version: u32, /* Postgres server version */
/// proxy-safekeeper protocol version
protocol_version: u32,
/// Postgres server version
pg_version: u32,
node_id: NodeId,
system_id: SystemId,
timeline_id: ZTimelineId, /* Zenith timelineid */
/// Zenith timelineid
timeline_id: ZTimelineId,
wal_end: XLogRecPtr,
timeline: TimeLineID,
wal_seg_size: u32,
}
/*
* Vote request sent from proxy to safekeepers
*/
/// Vote request sent from proxy to safekeepers
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct RequestVote {
node_id: NodeId,
vcl: XLogRecPtr, /* volume commit LSN */
epoch: u64, /* new epoch when safekeeper reaches vcl */
/// volume commit LSN
vcl: XLogRecPtr,
/// new epoch when safekeeper reaches vcl
epoch: u64,
}
/*
* Information of about storage node
*/
/// Information of about storage node
#[repr(C)]
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
struct SafeKeeperInfo {
magic: u32, /* magic for verifying content the control file */
format_version: u32, /* safekeeper format version */
epoch: u64, /* safekeeper's epoch */
server: ServerInfo, /* information about server */
commit_lsn: XLogRecPtr, /* part of WAL acknowledged by quorum */
flush_lsn: XLogRecPtr, /* locally flushed part of WAL */
restart_lsn: XLogRecPtr, /* minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers */
/// magic for verifying content the control file
magic: u32,
/// safekeeper format version
format_version: u32,
/// safekeeper's epoch
epoch: u64,
/// information about server
server: ServerInfo,
/// part of WAL acknowledged by quorum
commit_lsn: XLogRecPtr,
/// locally flushed part of WAL
flush_lsn: XLogRecPtr,
/// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers
restart_lsn: XLogRecPtr,
}
/*
* Hot standby feedback received from replica
*/
/// Hot standby feedback received from replica
#[repr(C)]
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
struct HotStandbyFeedback {
ts: TimestampTz,
xmin: FullTransactionId,
catalog_xmin: FullTransactionId,
}
/*
* Request with WAL message sent from proxy to safekeeper.
*/
/// Request with WAL message sent from proxy to safekeeper.
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct SafeKeeperRequest {
sender_id: NodeId, /* Sender's node identifier (looks like we do not need it for TCP streaming connection) */
begin_lsn: XLogRecPtr, /* start position of message in WAL */
end_lsn: XLogRecPtr, /* end position of message in WAL */
restart_lsn: XLogRecPtr, /* restart LSN position (minimal LSN which may be needed by proxy to perform recovery) */
commit_lsn: XLogRecPtr, /* LSN committed by quorum of safekeepers */
/// Sender's node identifier (looks like we do not need it for TCP streaming connection)
sender_id: NodeId,
/// start position of message in WAL
begin_lsn: XLogRecPtr,
/// end position of message in WAL
end_lsn: XLogRecPtr,
/// restart LSN position (minimal LSN which may be needed by proxy to perform recovery)
restart_lsn: XLogRecPtr,
/// LSN committed by quorum of safekeepers
commit_lsn: XLogRecPtr,
}
/*
* Report safekeeper state to proxy
*/
/// Report safekeeper state to proxy
#[repr(C)]
#[derive(Debug)]
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct SafeKeeperResponse {
epoch: u64,
flush_lsn: XLogRecPtr,
hs_feedback: HotStandbyFeedback,
}
/*
* Shared state associated with database instance (tenant)
*/
/// Shared state associated with database instance (tenant)
#[derive(Debug)]
struct SharedState {
commit_lsn: XLogRecPtr, /* quorum commit LSN */
info: SafeKeeperInfo, /* information about this safekeeper */
control_file: Option<File>, /* opened file control file handle (needed to hold exlusive file lock */
hs_feedback: HotStandbyFeedback, /* combined hot standby feedback from all replicas */
/// quorum commit LSN
commit_lsn: XLogRecPtr,
/// information about this safekeeper
info: SafeKeeperInfo,
/// opened file control file handle (needed to hold exlusive file lock
control_file: Option<File>,
/// combined hot standby feedback from all replicas
hs_feedback: HotStandbyFeedback,
}
/// Database instance (tenant)
@@ -176,38 +187,68 @@ pub struct Timeline {
cond: Condvar,
}
/*
* Private data
*/
/// Private data
#[derive(Debug)]
struct Connection {
timeline: Option<Arc<Timeline>>,
stream: TcpStream, /* Postgres connection */
inbuf: BytesMut, /* input buffer */
outbuf: BytesMut, /* output buffer */
init_done: bool, /* startup packet proceeded */
appname: Option<String>, /* assigned application name */
conf: WalAcceptorConf, /* wal acceptor configuration */
/// Postgres connection
stream: TcpStream,
/// input buffer
inbuf: BytesMut,
/// output buffer
outbuf: BytesMut,
/// startup packet proceeded
init_done: bool,
/// assigned application name
appname: Option<String>,
/// wal acceptor configuration
conf: WalAcceptorConf,
}
/*
* Customer serializer API (TODO: use protobuf?)
*/
trait Serializer {
fn pack(&self, buf: &mut BytesMut);
fn unpack(buf: &mut BytesMut) -> Self;
/// Customer serializer API (TODO: use protobuf?)
trait NewSerializer:
OldSerializer + Serialize + DeserializeOwned + PartialEq + std::fmt::Debug
{
fn pack(&self, buf: &mut BytesMut) {
let mut buf1 = BytesMut::with_capacity(buf.capacity());
self.pack2(&mut buf1);
let buf2 = BytesMut::with_capacity(buf.capacity());
let mut buf2w = buf2.writer();
self.ser_into(&mut buf2w).unwrap();
let buf2 = buf2w.into_inner();
assert_eq!(buf1, buf2);
self.pack2(buf);
}
fn unpack(buf: &mut BytesMut) -> Self {
let buf1: &[u8] = buf;
let res1 = Self::des_from(buf1).unwrap();
let res2 = Self::unpack2(buf);
assert_eq!(res1, res2);
res2
}
}
impl<T> NewSerializer for T where
T: OldSerializer + Serialize + DeserializeOwned + PartialEq + std::fmt::Debug
{
}
trait OldSerializer {
fn pack2(&self, buf: &mut BytesMut);
fn unpack2(buf: &mut BytesMut) -> Self;
}
//
// Implementations
//
//Report and return IO error */
/// Report and return IO error
macro_rules! io_error {
($($arg:tt)*) => (error!($($arg)*); return Err(io::Error::new(io::ErrorKind::Other,format!($($arg)*))))
}
// Safe hex string parser returning proper result
/// Safe hex string parser returning proper result
fn parse_hex_str(s: &str) -> Result<u64> {
if let Ok(val) = u32::from_str_radix(s, 16) {
Ok(val as u64)
@@ -216,13 +257,13 @@ fn parse_hex_str(s: &str) -> Result<u64> {
}
}
impl Serializer for NodeId {
fn pack(&self, buf: &mut BytesMut) {
impl OldSerializer for NodeId {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u128_le(self.uuid);
buf.put_u64(self.term); // use big endian to provide compatibility with memcmp
}
fn unpack(buf: &mut BytesMut) -> NodeId {
fn unpack2(buf: &mut BytesMut) -> NodeId {
NodeId {
uuid: buf.get_u128_le(),
term: buf.get_u64(), // use big endian to provide compatibility with memcmp
@@ -230,24 +271,71 @@ impl Serializer for NodeId {
}
}
impl Serializer for ServerInfo {
fn pack(&self, buf: &mut BytesMut) {
// For use with serde's `serialize_with` attribute
fn serialize_u64_flip_endian<S>(x: &u64, s: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_u64(u64::from_be(*x))
}
// For use with serde's `deserialize_with` attribute
fn deserialize_u64_flip_endian<'de, D>(deserializer: D) -> std::result::Result<u64, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let x: u64 = serde::de::Deserialize::deserialize(deserializer)?;
Ok(u64::from_be(x))
}
// impl Serialize for NodeId {
// fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
// where
// S: serde::Serializer,
// {
// use serde::ser::SerializeStruct;
// // 2 is the number of fields in the struct.
// let mut state = serializer.serialize_struct("NodeId", 2)?;
// state.serialize_field("uuid", &self.uuid)?;
// // Flip endian-ness of `term`
// state.serialize_field("term", &u64::from_be(self.term))?;
// state.end()
// }
// }
// // FIXME: this is a bit quick and dirty; it won't work for
// // any serde data formats besides `bincode`.
// impl<'de> Deserialize<'de> for NodeId {
// fn deserialize<D>(deserializer: D) -> std::result::Result<NodeId, D::Error>
// where
// D: serde::Deserializer<'de>,
// {
// let uuid: u128 = Deserialize::deserialize(deserializer)?;
// let term: u64 = Deserialize::deserialize(deserializer)?;
// let term = u64::from_be(term);
// Ok(NodeId { uuid, term })
// }
// }
impl OldSerializer for ServerInfo {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u32_le(self.protocol_version);
buf.put_u32_le(self.pg_version);
self.node_id.pack(buf);
self.node_id.pack2(buf);
buf.put_u64_le(self.system_id);
buf.put_slice(&self.timeline_id.as_arr());
buf.put_slice(&self.timeline_id.as_arr()); // FIXME make timeline use serde
buf.put_u64_le(self.wal_end);
buf.put_u32_le(self.timeline);
buf.put_u32_le(self.wal_seg_size);
}
fn unpack(buf: &mut BytesMut) -> ServerInfo {
fn unpack2(buf: &mut BytesMut) -> ServerInfo {
ServerInfo {
protocol_version: buf.get_u32_le(),
pg_version: buf.get_u32_le(),
node_id: NodeId::unpack(buf),
system_id: buf.get_u64_le(),
timeline_id: ZTimelineId::get_from_buf(buf),
timeline_id: ZTimelineId::get_from_buf(buf), // FIXME make timeline use serde
wal_end: buf.get_u64_le(),
timeline: buf.get_u32_le(),
wal_seg_size: buf.get_u32_le(),
@@ -255,14 +343,14 @@ impl Serializer for ServerInfo {
}
}
impl Serializer for RequestVote {
fn pack(&self, buf: &mut BytesMut) {
self.node_id.pack(buf);
impl OldSerializer for RequestVote {
fn pack2(&self, buf: &mut BytesMut) {
self.node_id.pack2(buf);
buf.put_u64_le(self.vcl);
buf.put_u64_le(self.epoch);
}
fn unpack(buf: &mut BytesMut) -> RequestVote {
fn unpack2(buf: &mut BytesMut) -> RequestVote {
RequestVote {
node_id: NodeId::unpack(buf),
vcl: buf.get_u64_le(),
@@ -271,17 +359,17 @@ impl Serializer for RequestVote {
}
}
impl Serializer for SafeKeeperInfo {
fn pack(&self, buf: &mut BytesMut) {
impl OldSerializer for SafeKeeperInfo {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u32_le(self.magic);
buf.put_u32_le(self.format_version);
buf.put_u64_le(self.epoch);
self.server.pack(buf);
self.server.pack2(buf);
buf.put_u64_le(self.commit_lsn);
buf.put_u64_le(self.flush_lsn);
buf.put_u64_le(self.restart_lsn);
}
fn unpack(buf: &mut BytesMut) -> SafeKeeperInfo {
fn unpack2(buf: &mut BytesMut) -> SafeKeeperInfo {
SafeKeeperInfo {
magic: buf.get_u32_le(),
format_version: buf.get_u32_le(),
@@ -317,13 +405,13 @@ impl SafeKeeperInfo {
}
}
impl Serializer for HotStandbyFeedback {
fn pack(&self, buf: &mut BytesMut) {
impl OldSerializer for HotStandbyFeedback {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u64_le(self.ts);
buf.put_u64_le(self.xmin);
buf.put_u64_le(self.catalog_xmin);
}
fn unpack(buf: &mut BytesMut) -> HotStandbyFeedback {
fn unpack2(buf: &mut BytesMut) -> HotStandbyFeedback {
HotStandbyFeedback {
ts: buf.get_u64_le(),
xmin: buf.get_u64_le(),
@@ -342,15 +430,15 @@ impl HotStandbyFeedback {
}
}
impl Serializer for SafeKeeperRequest {
fn pack(&self, buf: &mut BytesMut) {
self.sender_id.pack(buf);
impl OldSerializer for SafeKeeperRequest {
fn pack2(&self, buf: &mut BytesMut) {
self.sender_id.pack2(buf);
buf.put_u64_le(self.begin_lsn);
buf.put_u64_le(self.end_lsn);
buf.put_u64_le(self.restart_lsn);
buf.put_u64_le(self.commit_lsn);
}
fn unpack(buf: &mut BytesMut) -> SafeKeeperRequest {
fn unpack2(buf: &mut BytesMut) -> SafeKeeperRequest {
SafeKeeperRequest {
sender_id: NodeId::unpack(buf),
begin_lsn: buf.get_u64_le(),
@@ -361,13 +449,13 @@ impl Serializer for SafeKeeperRequest {
}
}
impl Serializer for SafeKeeperResponse {
fn pack(&self, buf: &mut BytesMut) {
impl OldSerializer for SafeKeeperResponse {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u64_le(self.epoch);
buf.put_u64_le(self.flush_lsn);
self.hs_feedback.pack(buf);
self.hs_feedback.pack2(buf);
}
fn unpack(buf: &mut BytesMut) -> SafeKeeperResponse {
fn unpack2(buf: &mut BytesMut) -> SafeKeeperResponse {
SafeKeeperResponse {
epoch: buf.get_u64_le(),
flush_lsn: buf.get_u64_le(),
@@ -576,7 +664,7 @@ impl Connection {
Ok(())
}
fn read_req<T: Serializer>(&mut self) -> Result<T> {
fn read_req<T: NewSerializer>(&mut self) -> Result<T> {
let size = mem::size_of::<T>();
self.inbuf.resize(size, 0u8);
self.stream.read_exact(&mut self.inbuf[0..size])?;
@@ -1213,3 +1301,55 @@ impl Connection {
)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_unpack<T: NewSerializer>() {
let mut buffer = BytesMut::new();
for ii in 7..299 {
buffer.put_u8(ii as u8);
}
let res = T::unpack(&mut buffer);
//eprintln!("{:?}", res);
let mut buffer2 = BytesMut::new();
res.pack(&mut buffer2);
}
#[test]
fn test_x() {
let si = ServerInfo {
protocol_version: 168364039,
pg_version: 235736075,
node_id: NodeId {
uuid: 40027986537151223483757821949182545935,
term: 2242829044932683046,
},
system_id: 3327364263599220775,
timeline_id: ZTimelineId::from([
47u8, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62,
]),
wal_end: 5063528411713060927,
timeline: 1246316615,
wal_seg_size: 1313688651,
};
let mut buffer2 = BytesMut::new();
si.pack(&mut buffer2);
}
#[test]
fn test_pack_unpack() {
test_unpack::<ServerInfo>();
test_unpack::<RequestVote>();
test_unpack::<SafeKeeperInfo>();
test_unpack::<HotStandbyFeedback>();
test_unpack::<SafeKeeperRequest>();
test_unpack::<SafeKeeperResponse>();
test_unpack::<NodeId>();
}
}