Somewhat working and whining about LSN addition owerflow

This commit is contained in:
Stas Kelvich
2022-06-06 16:50:00 +03:00
parent aa923b2fae
commit 3e7b3c21ac
12 changed files with 204 additions and 48 deletions

43
Cargo.lock generated
View File

@@ -55,6 +55,15 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arbitrary"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c38b6b6b79f671c25e1a3e785b7b82d7562ffc9cd3efdc98627e5668a2472490"
dependencies = [
"derive_arbitrary",
]
[[package]]
name = "arrayvec"
version = "0.4.12"
@@ -196,7 +205,7 @@ dependencies = [
"cexpr",
"clang-sys",
"clap 2.34.0",
"env_logger",
"env_logger 0.9.0",
"lazy_static",
"lazycell",
"log",
@@ -411,7 +420,7 @@ dependencies = [
"anyhow",
"chrono",
"clap 3.0.14",
"env_logger",
"env_logger 0.9.0",
"hyper",
"libc",
"log",
@@ -721,6 +730,17 @@ dependencies = [
"uuid",
]
[[package]]
name = "derive_arbitrary"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98e23c06c035dac87bd802d98f368df73a7f2cb05a66ffbd1f377e821fac4af9"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "digest"
version = "0.9.0"
@@ -777,6 +797,16 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "env_logger"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.9.0"
@@ -2059,7 +2089,7 @@ dependencies = [
"bytes",
"chrono",
"crc32c",
"env_logger",
"env_logger 0.9.0",
"hex",
"lazy_static",
"log",
@@ -2266,6 +2296,8 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
dependencies = [
"env_logger 0.8.4",
"log",
"rand",
]
@@ -2667,6 +2699,7 @@ name = "safekeeper"
version = "0.1.0"
dependencies = [
"anyhow",
"arbitrary",
"async-trait",
"byteorder",
"bytes",
@@ -3587,6 +3620,7 @@ name = "utils"
version = "0.1.0"
dependencies = [
"anyhow",
"arbitrary",
"bincode",
"byteorder",
"bytes",
@@ -3602,6 +3636,7 @@ dependencies = [
"pin-project-lite",
"postgres",
"postgres-protocol",
"quickcheck",
"rand",
"routerify",
"rustls",
@@ -3655,7 +3690,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"clap 3.0.14",
"env_logger",
"env_logger 0.9.0",
"log",
"postgres",
"tempfile",

View File

@@ -28,6 +28,8 @@ rustls = "0.20.2"
rustls-split = "0.3.0"
git-version = "0.3.5"
serde_with = "1.12.0"
arbitrary = { version = "1", features = ["derive"] }
quickcheck = "1"
metrics = { path = "../metrics" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,8 +1,9 @@
#![warn(missing_docs)]
use arbitrary::Arbitrary;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::ops::{Add, AddAssign, Sub};
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
@@ -13,7 +14,7 @@ use crate::seqwait::MonotonicCounter;
pub const XLOG_BLCKSZ: u32 = 8192;
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, Arbitrary)]
#[serde(transparent)]
pub struct Lsn(pub u64);

View File

@@ -4,6 +4,7 @@
use crate::sync::{AsyncishRead, SyncFuture};
use anyhow::{bail, ensure, Context, Result};
use arbitrary::{Arbitrary, Unstructured};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
@@ -11,7 +12,7 @@ use std::collections::HashMap;
use std::future::Future;
use std::io::{self, Cursor};
use std::str;
use std::time::{Duration, SystemTime};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::io::AsyncReadExt;
use tracing::{trace, warn};
@@ -22,6 +23,8 @@ pub const INT8_OID: Oid = 20;
pub const INT4_OID: Oid = 23;
pub const TEXT_OID: Oid = 25;
// static FUZZABLE_PG_EPOCH: FuzzableSystemTime = FuzzableSystemTime(*PG_EPOCH);
#[derive(Debug)]
pub enum FeMessage {
StartupPacket(FeStartupPacket),
@@ -921,7 +924,7 @@ impl<'a> BeMessage<'a> {
// Zenith extension of postgres replication protocol
// See ZENITH_STATUS_UPDATE_TAG_BYTE
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct ZenithFeedback {
// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
@@ -929,7 +932,31 @@ pub struct ZenithFeedback {
pub ps_writelsn: u64,
pub ps_applylsn: u64,
pub ps_flushlsn: u64,
pub ps_replytime: SystemTime,
pub ps_replytime: FuzzableSystemTime,
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct FuzzableSystemTime(pub SystemTime);
impl FuzzableSystemTime {
fn now() -> Self {
FuzzableSystemTime(SystemTime::now())
}
pub fn duration_since(&self, other: &Self) -> Result<Duration, std::time::SystemTimeError> {
self.0.duration_since(other.0)
}
}
impl<'a> Arbitrary<'a> for FuzzableSystemTime {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<FuzzableSystemTime, arbitrary::Error> {
let after_epoch = bool::arbitrary(u)?;
let duration = Duration::arbitrary(u)?;
Ok(FuzzableSystemTime(if after_epoch {
UNIX_EPOCH + duration
} else {
UNIX_EPOCH - duration
}))
}
}
// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback.
@@ -943,7 +970,7 @@ impl ZenithFeedback {
ps_writelsn: 0,
ps_applylsn: 0,
ps_flushlsn: 0,
ps_replytime: SystemTime::now(),
ps_replytime: FuzzableSystemTime::now(),
}
}
@@ -975,7 +1002,7 @@ impl ZenithFeedback {
let timestamp = self
.ps_replytime
.duration_since(*PG_EPOCH)
.duration_since(&FuzzableSystemTime(*PG_EPOCH))
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
@@ -1020,9 +1047,11 @@ impl ZenithFeedback {
assert_eq!(len, 8);
let raw_time = buf.get_i64();
if raw_time > 0 {
zf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
zf.ps_replytime =
FuzzableSystemTime(*PG_EPOCH + Duration::from_micros(raw_time as u64));
} else {
zf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
zf.ps_replytime =
FuzzableSystemTime(*PG_EPOCH - Duration::from_micros(-raw_time as u64));
}
}
_ => {
@@ -1051,7 +1080,7 @@ mod tests {
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
zf.ps_replytime = FuzzableSystemTime(*PG_EPOCH + Duration::from_secs(100_000_000));
let mut data = BytesMut::new();
zf.serialize(&mut data).unwrap();
@@ -1066,7 +1095,7 @@ mod tests {
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
zf.ps_replytime = FuzzableSystemTime(*PG_EPOCH + Duration::from_secs(100_000_000));
let mut data = BytesMut::new();
zf.serialize(&mut data).unwrap();

View File

@@ -1,3 +1,4 @@
use arbitrary::Arbitrary;
use std::{fmt, str::FromStr};
use hex::FromHex;
@@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize};
///
/// Use `#[serde_as(as = "DisplayFromStr")]` to (de)serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
/// Check the `serde_with::serde_as` documentation for options for more complex types.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
struct ZId([u8; 16]);
impl ZId {
@@ -176,7 +177,7 @@ macro_rules! zid_newtype {
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// See [`ZId`] for alternative ways to serialize it.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize, Arbitrary)]
pub struct ZTimelineId(ZId);
zid_newtype!(ZTimelineId);
@@ -187,7 +188,7 @@ zid_newtype!(ZTimelineId);
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// See [`ZId`] for alternative ways to serialize it.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
pub struct ZTenantId(ZId);
zid_newtype!(ZTenantId);
@@ -224,7 +225,9 @@ impl fmt::Display for ZTenantTimelineId {
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
// by the console.
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)]
#[derive(
Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize, Arbitrary,
)]
#[serde(transparent)]
pub struct NodeId(pub u64);

View File

@@ -34,6 +34,7 @@ async-trait = "0.1"
once_cell = "1.10.0"
futures = "0.3.13"
toml_edit = { version = "0.13", features = ["easy"] }
arbitrary = { version = "1", features = ["derive"] }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }

View File

@@ -60,6 +60,9 @@ name = "arbitrary"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c38b6b6b79f671c25e1a3e785b7b82d7562ffc9cd3efdc98627e5668a2472490"
dependencies = [
"derive_arbitrary",
]
[[package]]
name = "async-stream"
@@ -193,7 +196,7 @@ dependencies = [
"cexpr",
"clang-sys",
"clap 2.34.0",
"env_logger",
"env_logger 0.9.0",
"lazy_static",
"lazycell",
"log",
@@ -489,6 +492,17 @@ dependencies = [
"syn",
]
[[package]]
name = "derive_arbitrary"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98e23c06c035dac87bd802d98f368df73a7f2cb05a66ffbd1f377e821fac4af9"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "digest"
version = "0.9.0"
@@ -536,6 +550,16 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "env_logger"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.9.0"
@@ -1669,6 +1693,17 @@ dependencies = [
"prost",
]
[[package]]
name = "quickcheck"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
dependencies = [
"env_logger 0.8.4",
"log",
"rand",
]
[[package]]
name = "quote"
version = "1.0.18"
@@ -1955,6 +1990,7 @@ name = "safekeeper"
version = "0.1.0"
dependencies = [
"anyhow",
"arbitrary",
"async-trait",
"byteorder",
"bytes",
@@ -2725,6 +2761,7 @@ name = "utils"
version = "0.1.0"
dependencies = [
"anyhow",
"arbitrary",
"bincode",
"bytes",
"git-version",
@@ -2737,6 +2774,7 @@ dependencies = [
"pin-project-lite",
"postgres",
"postgres-protocol",
"quickcheck",
"rand",
"routerify",
"rustls",

View File

@@ -1,6 +1,10 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
fuzz_target!(|data: &[u8]| {
safekeeper::safekeeper::fuzz(data)
// fuzz_target!(|data: &[u8]| {
// safekeeper::safekeeper::fuzz(data)
// });
fuzz_target!(|messages: Vec<safekeeper::safekeeper::ProposerAcceptorMessage>| {
safekeeper::safekeeper::fuzz_messages(messages)
});

View File

@@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use tracing::*;
use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{self, AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
};
@@ -166,7 +166,7 @@ fn append_logical_message(
truncate_lsn: msg.truncate_lsn,
proposer_uuid: [0u8; 16],
},
wal_data: Bytes::from(wal_data),
wal_data: safekeeper::WalData(Bytes::from(wal_data)),
});
let response = spg.timeline.get().process_msg(&append_request)?;
@@ -202,7 +202,7 @@ impl XlLogicalMessage {
/// Create new WAL record for non-transactional logical message.
/// Used for creating artificial WAL for tests, as LogicalMessage
/// record is basically no-op.
fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1);
prefix_bytes.put(prefix.as_bytes());
prefix_bytes.put_u8(0);

View File

@@ -8,7 +8,7 @@ use metrics::{
Gauge, IntGaugeVec,
};
use postgres_ffi::xlog_utils::XLogSegNo;
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use utils::{lsn::Lsn, pq_proto::FuzzableSystemTime, zid::ZTenantTimelineId};
use crate::{
safekeeper::{SafeKeeperState, SafekeeperMemState},
@@ -290,7 +290,9 @@ impl Collector for TimelineCollector {
self.feedback_ps_write_lsn
.with_label_values(labels)
.set(feedback.ps_writelsn);
if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH)
if let Ok(unix_time) = feedback
.ps_replytime
.duration_since(&FuzzableSystemTime(SystemTime::UNIX_EPOCH))
{
self.feedback_last_time_seconds
.with_label_values(labels)

View File

@@ -1,6 +1,8 @@
//! Acceptor part of proposer-acceptor consensus algorithm.
use anyhow::{bail, Context, Result};
use arbitrary::Arbitrary;
use arbitrary::Unstructured;
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -16,6 +18,7 @@ use std::io::Read;
use tracing::*;
use crate::control_file;
use crate::json_ctrl::encode_logical_message;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
@@ -36,12 +39,12 @@ const UNKNOWN_SERVER_VERSION: u32 = 0;
pub type Term = u64;
const INVALID_TERM: Term = 0;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Arbitrary)]
pub struct TermSwitchEntry {
pub term: Term,
pub lsn: Lsn,
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize, Arbitrary)]
pub struct TermHistory(pub Vec<TermSwitchEntry>);
impl TermHistory {
@@ -257,7 +260,7 @@ impl SafeKeeperState {
// protocol messages
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Arbitrary)]
pub struct ProposerGreeting {
/// proposer-acceptor protocol version
pub protocol_version: u32,
@@ -274,20 +277,20 @@ pub struct ProposerGreeting {
/// Acceptor -> Proposer initial response: the highest term known to me
/// (acceptor voted for).
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Arbitrary)]
pub struct AcceptorGreeting {
term: u64,
node_id: NodeId,
}
/// Vote request sent from proposer to safekeepers
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Arbitrary)]
pub struct VoteRequest {
term: Term,
}
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Arbitrary)]
pub struct VoteResponse {
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
@@ -303,7 +306,7 @@ pub struct VoteResponse {
* Proposer -> Acceptor message announcing proposer is elected and communicating
* term history to it.
*/
#[derive(Debug)]
#[derive(Debug, Arbitrary)]
pub struct ProposerElected {
pub term: Term,
pub start_streaming_at: Lsn,
@@ -313,12 +316,23 @@ pub struct ProposerElected {
/// Request with WAL message sent from proposer to safekeeper. Along the way it
/// communicates commit_lsn.
#[derive(Debug)]
#[derive(Debug, Arbitrary)]
pub struct AppendRequest {
pub h: AppendRequestHeader,
pub wal_data: Bytes,
pub wal_data: WalData,
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug)]
pub struct WalData(pub Bytes);
impl<'a> Arbitrary<'a> for WalData {
fn arbitrary(_u: &mut Unstructured<'a>) -> Result<WalData, arbitrary::Error> {
let data = encode_logical_message("prefix", "message");
Ok(WalData(bytes::Bytes::from(data)))
}
}
#[derive(Debug, Clone, Deserialize, Arbitrary)]
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
@@ -337,7 +351,7 @@ pub struct AppendRequestHeader {
}
/// Report safekeeper state to proposer
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct AppendResponse {
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
@@ -366,7 +380,7 @@ impl AppendResponse {
}
/// Proposer -> Acceptor messages
#[derive(Debug)]
#[derive(Debug, Arbitrary)]
pub enum ProposerAcceptorMessage {
Greeting(ProposerGreeting),
VoteRequest(VoteRequest),
@@ -430,7 +444,10 @@ impl ProposerAcceptorMessage {
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest { h: hdr, wal_data };
let msg = AppendRequest {
h: hdr,
wal_data: WalData(wal_data),
};
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
@@ -794,8 +811,8 @@ where
self.inmem.proposer_uuid = msg.h.proposer_uuid;
// do the job
if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
if !msg.wal_data.0.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data.0)?;
}
// flush wal to the disk, if required
@@ -815,15 +832,22 @@ where
// Update truncate and commit LSN in control file.
// To avoid negative impact on performance of extra fsync, do it only
// when truncate_lsn delta exceeds WAL segment size.
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< self.inmem.peer_horizon_lsn
// using subtraction to avoid LSN overflow
if self
.inmem
.peer_horizon_lsn
.0
.checked_sub(self.state.peer_horizon_lsn.0)
.unwrap()
> self.state.server.wal_seg_size as u64
{
self.persist_control_file(self.state.clone())?;
}
trace!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
msg.wal_data.len(),
msg.wal_data.0.len(),
msg.h.end_lsn,
msg.h.commit_lsn,
msg.h.truncate_lsn,
@@ -908,7 +932,6 @@ where
}
}
use crate::wal_storage::Storage;
use std::ops::Deref;
@@ -932,7 +955,6 @@ impl Deref for InMemoryState {
}
}
struct DummyWalStore {
lsn: Lsn,
}
@@ -985,10 +1007,28 @@ fn try_fuzz(data: &[u8]) -> anyhow::Result<()> {
}
}
fn try_fuzz_messages(messages: Vec<ProposerAcceptorMessage>) -> anyhow::Result<()> {
let storage = InMemoryState {
persisted_state: SafeKeeperState::empty(),
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
for msg in messages {
sk.process_msg(&msg)?;
}
Ok(())
}
pub fn fuzz(data: &[u8]) {
try_fuzz(data);
}
pub fn fuzz_messages(messages: Vec<ProposerAcceptorMessage>) {
try_fuzz_messages(messages);
}
#[cfg(test)]
mod tests {

View File

@@ -6,6 +6,7 @@ use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use crate::wal_storage::WalReader;
use anyhow::{bail, Context, Result};
use arbitrary::Arbitrary;
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
use bytes::Bytes;
@@ -34,7 +35,7 @@ const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
type FullTransactionId = u64;
/// Hot standby feedback received from replica
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct HotStandbyFeedback {
pub ts: TimestampTz,
pub xmin: FullTransactionId,