Extend replication protocol with ZentihFeedback message

to pass current_timeline_size to compute node

Put standby_status_update fields into ZenithFeedback and send them as one message.
Pass values sizes together with keys in ZenithFeedback message.
This commit is contained in:
anastasia
2021-12-22 20:06:23 +03:00
committed by lubennikovaav
parent 63dd7bce7e
commit 5abe2129c6
13 changed files with 373 additions and 66 deletions

View File

@@ -6,11 +6,14 @@ use anyhow::{bail, ensure, Context, Result};
use byteorder::{BigEndian, ByteOrder};
use byteorder::{ReadBytesExt, BE};
use bytes::{Buf, BufMut, Bytes, BytesMut};
// use postgres_ffi::xlog_utils::TimestampTz;
use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Read;
use std::io::{self, Cursor};
use std::str;
use std::time::{Duration, SystemTime};
use tracing::info;
pub type Oid = u32;
pub type SystemId = u64;
@@ -504,7 +507,7 @@ where
}
/// Safe write of s into buf as cstring (String in the protocol).
fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
pub fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
if s.contains(&0) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
@@ -516,6 +519,17 @@ fn write_cstr(s: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
Ok(())
}
// Truncate 0 from C string in Bytes and stringify it (returns slice, no allocations)
// PG protocol strings are always C strings.
fn cstr_to_str(b: &Bytes) -> Result<&str> {
let without_null = if b.last() == Some(&0) {
&b[..b.len() - 1]
} else {
&b[..]
};
std::str::from_utf8(without_null).map_err(|e| e.into())
}
impl<'a> BeMessage<'a> {
/// Write message to the given buf.
// Unlike the reading side, we use BytesMut
@@ -798,3 +812,169 @@ impl<'a> BeMessage<'a> {
Ok(())
}
}
// Zenith extension of postgres replication protocol
// See ZENITH_STATUS_UPDATE_TAG_BYTE
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct ZenithFeedback {
// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
// Parts of StandbyStatusUpdate we resend to compute via safekeeper
pub ps_writelsn: u64,
pub ps_applylsn: u64,
pub ps_flushlsn: u64,
pub ps_replytime: SystemTime,
}
// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const ZENITH_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl ZenithFeedback {
pub fn empty() -> ZenithFeedback {
ZenithFeedback {
current_timeline_size: 0,
ps_writelsn: 0,
ps_applylsn: 0,
ps_flushlsn: 0,
ps_replytime: SystemTime::now(),
}
}
// Serialize ZenithFeedback using custom format
// to support protocol extensibility.
//
// Following layout is used:
// char - number of key-value pairs that follow.
//
// key-value pairs:
// null-terminated string - key,
// uint32 - value length in bytes
// value itself
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u8(ZENITH_FEEDBACK_FIELDS_NUMBER); // # of keys
write_cstr(&Bytes::from("current_timeline_size"), buf)?;
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
write_cstr(&Bytes::from("ps_writelsn"), buf)?;
buf.put_i32(8);
buf.put_u64(self.ps_writelsn);
write_cstr(&Bytes::from("ps_flushlsn"), buf)?;
buf.put_i32(8);
buf.put_u64(self.ps_flushlsn);
write_cstr(&Bytes::from("ps_applylsn"), buf)?;
buf.put_i32(8);
buf.put_u64(self.ps_applylsn);
let timestamp = self
.ps_replytime
.duration_since(*PG_EPOCH)
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
write_cstr(&Bytes::from("ps_replytime"), buf)?;
buf.put_i32(8);
buf.put_i64(timestamp);
Ok(())
}
// Deserialize ZenithFeedback message
pub fn parse(mut buf: Bytes) -> ZenithFeedback {
let mut zf = ZenithFeedback::empty();
let nfields = buf.get_u8();
let mut i = 0;
while i < nfields {
i += 1;
let key_cstr = read_null_terminated(&mut buf).unwrap();
let key = cstr_to_str(&key_cstr).unwrap();
match key {
"current_timeline_size" => {
let len = buf.get_i32();
assert_eq!(len, 8);
zf.current_timeline_size = buf.get_u64();
}
"ps_writelsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
zf.ps_writelsn = buf.get_u64();
}
"ps_flushlsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
zf.ps_flushlsn = buf.get_u64();
}
"ps_applylsn" => {
let len = buf.get_i32();
assert_eq!(len, 8);
zf.ps_applylsn = buf.get_u64();
}
"ps_replytime" => {
let len = buf.get_i32();
assert_eq!(len, 8);
let raw_time = buf.get_i64();
if raw_time > 0 {
zf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
} else {
zf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
}
}
_ => {
let len = buf.get_i32();
info!(
"ZenithFeedback parse. unknown key {} of len {}. Skip it.",
key, len
);
buf.advance(len as usize);
}
}
}
info!("ZenithFeedback parsed is {:?}", zf);
zf
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zenithfeedback_serialization() {
let mut zf = ZenithFeedback::empty();
// Fill zf wih some values
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
zf.serialize(&mut data).unwrap();
let zf_parsed = ZenithFeedback::parse(data.freeze());
assert_eq!(zf, zf_parsed);
}
#[test]
fn test_zenithfeedback_unknown_key() {
let mut zf = ZenithFeedback::empty();
// Fill zf wih some values
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
let mut data = BytesMut::new();
zf.serialize(&mut data).unwrap();
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = ZENITH_FEEDBACK_FIELDS_NUMBER + 1;
}
write_cstr(&Bytes::from("new_field_one"), &mut data).unwrap();
data.put_i32(8);
data.put_u64(42);
// Parse serialized data and check that new field is not parsed
let zf_parsed = ZenithFeedback::parse(data.freeze());
assert_eq!(zf, zf_parsed);
}
}