Compare commits

...

2 Commits

Author SHA1 Message Date
Stas Kelvich
3e7b3c21ac Somewhat working and whining about LSN addition owerflow 2022-06-06 16:50:00 +03:00
Bojan Serafimov
aa923b2fae WIP 2022-06-05 16:20:39 -04:00
15 changed files with 3350 additions and 96 deletions

43
Cargo.lock generated
View File

@@ -55,6 +55,15 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arbitrary"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c38b6b6b79f671c25e1a3e785b7b82d7562ffc9cd3efdc98627e5668a2472490"
dependencies = [
"derive_arbitrary",
]
[[package]]
name = "arrayvec"
version = "0.4.12"
@@ -196,7 +205,7 @@ dependencies = [
"cexpr",
"clang-sys",
"clap 2.34.0",
"env_logger",
"env_logger 0.9.0",
"lazy_static",
"lazycell",
"log",
@@ -411,7 +420,7 @@ dependencies = [
"anyhow",
"chrono",
"clap 3.0.14",
"env_logger",
"env_logger 0.9.0",
"hyper",
"libc",
"log",
@@ -721,6 +730,17 @@ dependencies = [
"uuid",
]
[[package]]
name = "derive_arbitrary"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98e23c06c035dac87bd802d98f368df73a7f2cb05a66ffbd1f377e821fac4af9"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "digest"
version = "0.9.0"
@@ -777,6 +797,16 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "env_logger"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.9.0"
@@ -2059,7 +2089,7 @@ dependencies = [
"bytes",
"chrono",
"crc32c",
"env_logger",
"env_logger 0.9.0",
"hex",
"lazy_static",
"log",
@@ -2266,6 +2296,8 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
dependencies = [
"env_logger 0.8.4",
"log",
"rand",
]
@@ -2667,6 +2699,7 @@ name = "safekeeper"
version = "0.1.0"
dependencies = [
"anyhow",
"arbitrary",
"async-trait",
"byteorder",
"bytes",
@@ -3587,6 +3620,7 @@ name = "utils"
version = "0.1.0"
dependencies = [
"anyhow",
"arbitrary",
"bincode",
"byteorder",
"bytes",
@@ -3602,6 +3636,7 @@ dependencies = [
"pin-project-lite",
"postgres",
"postgres-protocol",
"quickcheck",
"rand",
"routerify",
"rustls",
@@ -3655,7 +3690,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"clap 3.0.14",
"env_logger",
"env_logger 0.9.0",
"log",
"postgres",
"tempfile",

View File

@@ -28,6 +28,8 @@ rustls = "0.20.2"
rustls-split = "0.3.0"
git-version = "0.3.5"
serde_with = "1.12.0"
arbitrary = { version = "1", features = ["derive"] }
quickcheck = "1"
metrics = { path = "../metrics" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,8 +1,9 @@
#![warn(missing_docs)]
use arbitrary::Arbitrary;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::ops::{Add, AddAssign, Sub};
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
@@ -13,7 +14,7 @@ use crate::seqwait::MonotonicCounter;
pub const XLOG_BLCKSZ: u32 = 8192;
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, Arbitrary)]
#[serde(transparent)]
pub struct Lsn(pub u64);

View File

@@ -4,6 +4,7 @@
use crate::sync::{AsyncishRead, SyncFuture};
use anyhow::{bail, ensure, Context, Result};
use arbitrary::{Arbitrary, Unstructured};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
@@ -11,7 +12,7 @@ use std::collections::HashMap;
use std::future::Future;
use std::io::{self, Cursor};
use std::str;
use std::time::{Duration, SystemTime};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::io::AsyncReadExt;
use tracing::{trace, warn};
@@ -22,6 +23,8 @@ pub const INT8_OID: Oid = 20;
pub const INT4_OID: Oid = 23;
pub const TEXT_OID: Oid = 25;
// static FUZZABLE_PG_EPOCH: FuzzableSystemTime = FuzzableSystemTime(*PG_EPOCH);
#[derive(Debug)]
pub enum FeMessage {
StartupPacket(FeStartupPacket),
@@ -921,7 +924,7 @@ impl<'a> BeMessage<'a> {
// Zenith extension of postgres replication protocol
// See ZENITH_STATUS_UPDATE_TAG_BYTE
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct ZenithFeedback {
// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
@@ -929,7 +932,31 @@ pub struct ZenithFeedback {
pub ps_writelsn: u64,
pub ps_applylsn: u64,
pub ps_flushlsn: u64,
pub ps_replytime: SystemTime,
pub ps_replytime: FuzzableSystemTime,
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct FuzzableSystemTime(pub SystemTime);
impl FuzzableSystemTime {
fn now() -> Self {
FuzzableSystemTime(SystemTime::now())
}
pub fn duration_since(&self, other: &Self) -> Result<Duration, std::time::SystemTimeError> {
self.0.duration_since(other.0)
}
}
impl<'a> Arbitrary<'a> for FuzzableSystemTime {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<FuzzableSystemTime, arbitrary::Error> {
let after_epoch = bool::arbitrary(u)?;
let duration = Duration::arbitrary(u)?;
Ok(FuzzableSystemTime(if after_epoch {
UNIX_EPOCH + duration
} else {
UNIX_EPOCH - duration
}))
}
}
// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback.
@@ -943,7 +970,7 @@ impl ZenithFeedback {
ps_writelsn: 0,
ps_applylsn: 0,
ps_flushlsn: 0,
ps_replytime: SystemTime::now(),
ps_replytime: FuzzableSystemTime::now(),
}
}
@@ -975,7 +1002,7 @@ impl ZenithFeedback {
let timestamp = self
.ps_replytime
.duration_since(*PG_EPOCH)
.duration_since(&FuzzableSystemTime(*PG_EPOCH))
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
@@ -1020,9 +1047,11 @@ impl ZenithFeedback {
assert_eq!(len, 8);
let raw_time = buf.get_i64();
if raw_time > 0 {
zf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
zf.ps_replytime =
FuzzableSystemTime(*PG_EPOCH + Duration::from_micros(raw_time as u64));
} else {
zf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
zf.ps_replytime =
FuzzableSystemTime(*PG_EPOCH - Duration::from_micros(-raw_time as u64));
}
}
_ => {
@@ -1051,7 +1080,7 @@ mod tests {
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
zf.ps_replytime = FuzzableSystemTime(*PG_EPOCH + Duration::from_secs(100_000_000));
let mut data = BytesMut::new();
zf.serialize(&mut data).unwrap();
@@ -1066,7 +1095,7 @@ mod tests {
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
// because it is rounded up to microseconds during serialization.
zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
zf.ps_replytime = FuzzableSystemTime(*PG_EPOCH + Duration::from_secs(100_000_000));
let mut data = BytesMut::new();
zf.serialize(&mut data).unwrap();

View File

@@ -1,3 +1,4 @@
use arbitrary::Arbitrary;
use std::{fmt, str::FromStr};
use hex::FromHex;
@@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize};
///
/// Use `#[serde_as(as = "DisplayFromStr")]` to (de)serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
/// Check the `serde_with::serde_as` documentation for options for more complex types.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
struct ZId([u8; 16]);
impl ZId {
@@ -176,7 +177,7 @@ macro_rules! zid_newtype {
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// See [`ZId`] for alternative ways to serialize it.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize, Arbitrary)]
pub struct ZTimelineId(ZId);
zid_newtype!(ZTimelineId);
@@ -187,7 +188,7 @@ zid_newtype!(ZTimelineId);
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// See [`ZId`] for alternative ways to serialize it.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
pub struct ZTenantId(ZId);
zid_newtype!(ZTenantId);
@@ -224,7 +225,9 @@ impl fmt::Display for ZTenantTimelineId {
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
// by the console.
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)]
#[derive(
Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize, Arbitrary,
)]
#[serde(transparent)]
pub struct NodeId(pub u64);

View File

@@ -34,6 +34,7 @@ async-trait = "0.1"
once_cell = "1.10.0"
futures = "0.3.13"
toml_edit = { version = "0.13", features = ["easy"] }
arbitrary = { version = "1", features = ["derive"] }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }

3
safekeeper/fuzz/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
target
corpus
artifacts

3057
safekeeper/fuzz/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,32 @@
[package]
name = "safekeeper-fuzz"
version = "0.0.0"
authors = ["Automatically generated"]
publish = false
edition = "2018"
[package.metadata]
cargo-fuzz = true
[dependencies]
libfuzzer-sys = "0.4"
bytes = "1.0.1"
[dependencies.safekeeper]
path = ".."
# Prevent this from interfering with workspaces
[workspace]
members = ["."]
[[bin]]
name = "fuzz_target_1"
path = "fuzz_targets/fuzz_target_1.rs"
test = false
doc = false
[[bin]]
name = "sk"
path = "fuzz_targets/sk.rs"
test = false
doc = false

View File

@@ -0,0 +1,6 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
fuzz_target!(|data: &[u8]| {
// fuzzed code goes here
});

View File

@@ -0,0 +1,10 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
// fuzz_target!(|data: &[u8]| {
// safekeeper::safekeeper::fuzz(data)
// });
fuzz_target!(|messages: Vec<safekeeper::safekeeper::ProposerAcceptorMessage>| {
safekeeper::safekeeper::fuzz_messages(messages)
});

View File

@@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use tracing::*;
use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{self, AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
};
@@ -166,7 +166,7 @@ fn append_logical_message(
truncate_lsn: msg.truncate_lsn,
proposer_uuid: [0u8; 16],
},
wal_data: Bytes::from(wal_data),
wal_data: safekeeper::WalData(Bytes::from(wal_data)),
});
let response = spg.timeline.get().process_msg(&append_request)?;
@@ -202,7 +202,7 @@ impl XlLogicalMessage {
/// Create new WAL record for non-transactional logical message.
/// Used for creating artificial WAL for tests, as LogicalMessage
/// record is basically no-op.
fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1);
prefix_bytes.put(prefix.as_bytes());
prefix_bytes.put_u8(0);

View File

@@ -8,7 +8,7 @@ use metrics::{
Gauge, IntGaugeVec,
};
use postgres_ffi::xlog_utils::XLogSegNo;
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use utils::{lsn::Lsn, pq_proto::FuzzableSystemTime, zid::ZTenantTimelineId};
use crate::{
safekeeper::{SafeKeeperState, SafekeeperMemState},
@@ -290,7 +290,9 @@ impl Collector for TimelineCollector {
self.feedback_ps_write_lsn
.with_label_values(labels)
.set(feedback.ps_writelsn);
if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH)
if let Ok(unix_time) = feedback
.ps_replytime
.duration_since(&FuzzableSystemTime(SystemTime::UNIX_EPOCH))
{
self.feedback_last_time_seconds
.with_label_values(labels)

View File

@@ -1,6 +1,8 @@
//! Acceptor part of proposer-acceptor consensus algorithm.
use anyhow::{bail, Context, Result};
use arbitrary::Arbitrary;
use arbitrary::Unstructured;
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -16,6 +18,7 @@ use std::io::Read;
use tracing::*;
use crate::control_file;
use crate::json_ctrl::encode_logical_message;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
@@ -36,12 +39,12 @@ const UNKNOWN_SERVER_VERSION: u32 = 0;
pub type Term = u64;
const INVALID_TERM: Term = 0;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Arbitrary)]
pub struct TermSwitchEntry {
pub term: Term,
pub lsn: Lsn,
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize, Arbitrary)]
pub struct TermHistory(pub Vec<TermSwitchEntry>);
impl TermHistory {
@@ -55,6 +58,9 @@ impl TermHistory {
bail!("TermHistory misses len");
}
let n_entries = bytes.get_u32_le();
if n_entries > 10 {
bail!("too long")
}
let mut res = Vec::with_capacity(n_entries as usize);
for _ in 0..n_entries {
if bytes.remaining() < 16 {
@@ -246,7 +252,6 @@ impl SafeKeeperState {
}
}
#[cfg(test)]
pub fn empty() -> Self {
SafeKeeperState::new(&ZTenantTimelineId::empty(), vec![])
}
@@ -255,7 +260,7 @@ impl SafeKeeperState {
// protocol messages
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Arbitrary)]
pub struct ProposerGreeting {
/// proposer-acceptor protocol version
pub protocol_version: u32,
@@ -272,20 +277,20 @@ pub struct ProposerGreeting {
/// Acceptor -> Proposer initial response: the highest term known to me
/// (acceptor voted for).
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Arbitrary)]
pub struct AcceptorGreeting {
term: u64,
node_id: NodeId,
}
/// Vote request sent from proposer to safekeepers
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Arbitrary)]
pub struct VoteRequest {
term: Term,
}
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Arbitrary)]
pub struct VoteResponse {
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
@@ -301,7 +306,7 @@ pub struct VoteResponse {
* Proposer -> Acceptor message announcing proposer is elected and communicating
* term history to it.
*/
#[derive(Debug)]
#[derive(Debug, Arbitrary)]
pub struct ProposerElected {
pub term: Term,
pub start_streaming_at: Lsn,
@@ -311,12 +316,23 @@ pub struct ProposerElected {
/// Request with WAL message sent from proposer to safekeeper. Along the way it
/// communicates commit_lsn.
#[derive(Debug)]
#[derive(Debug, Arbitrary)]
pub struct AppendRequest {
pub h: AppendRequestHeader,
pub wal_data: Bytes,
pub wal_data: WalData,
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug)]
pub struct WalData(pub Bytes);
impl<'a> Arbitrary<'a> for WalData {
fn arbitrary(_u: &mut Unstructured<'a>) -> Result<WalData, arbitrary::Error> {
let data = encode_logical_message("prefix", "message");
Ok(WalData(bytes::Bytes::from(data)))
}
}
#[derive(Debug, Clone, Deserialize, Arbitrary)]
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
@@ -335,7 +351,7 @@ pub struct AppendRequestHeader {
}
/// Report safekeeper state to proposer
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct AppendResponse {
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
@@ -364,7 +380,7 @@ impl AppendResponse {
}
/// Proposer -> Acceptor messages
#[derive(Debug)]
#[derive(Debug, Arbitrary)]
pub enum ProposerAcceptorMessage {
Greeting(ProposerGreeting),
VoteRequest(VoteRequest),
@@ -428,7 +444,10 @@ impl ProposerAcceptorMessage {
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest { h: hdr, wal_data };
let msg = AppendRequest {
h: hdr,
wal_data: WalData(wal_data),
};
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
@@ -792,8 +811,8 @@ where
self.inmem.proposer_uuid = msg.h.proposer_uuid;
// do the job
if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
if !msg.wal_data.0.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data.0)?;
}
// flush wal to the disk, if required
@@ -813,15 +832,22 @@ where
// Update truncate and commit LSN in control file.
// To avoid negative impact on performance of extra fsync, do it only
// when truncate_lsn delta exceeds WAL segment size.
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< self.inmem.peer_horizon_lsn
// using subtraction to avoid LSN overflow
if self
.inmem
.peer_horizon_lsn
.0
.checked_sub(self.state.peer_horizon_lsn.0)
.unwrap()
> self.state.server.wal_seg_size as u64
{
self.persist_control_file(self.state.clone())?;
}
trace!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
msg.wal_data.len(),
msg.wal_data.0.len(),
msg.h.end_lsn,
msg.h.commit_lsn,
msg.h.truncate_lsn,
@@ -906,64 +932,110 @@ where
}
}
use crate::wal_storage::Storage;
use std::ops::Deref;
// fake storage for tests
struct InMemoryState {
persisted_state: SafeKeeperState,
}
impl control_file::Storage for InMemoryState {
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
self.persisted_state = s.clone();
Ok(())
}
}
impl Deref for InMemoryState {
type Target = SafeKeeperState;
fn deref(&self) -> &Self::Target {
&self.persisted_state
}
}
struct DummyWalStore {
lsn: Lsn,
}
impl wal_storage::Storage for DummyWalStore {
fn flush_lsn(&self) -> Lsn {
self.lsn
}
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
Ok(())
}
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())
}
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
self.lsn = end_pos;
Ok(())
}
fn flush_wal(&mut self) -> Result<()> {
Ok(())
}
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
}
}
fn try_fuzz(data: &[u8]) -> anyhow::Result<()> {
let storage = InMemoryState {
persisted_state: SafeKeeperState::empty(),
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
let bytes = Bytes::copy_from_slice(data);
let mut reader = bytes.reader();
loop {
let len = reader.read_u8()? as usize;
let mut buf: Vec<u8> = vec![0; len];
reader.read_exact(&mut buf)?;
let msg = ProposerAcceptorMessage::parse(Bytes::copy_from_slice(&buf))?;
sk.process_msg(&msg)?;
}
}
fn try_fuzz_messages(messages: Vec<ProposerAcceptorMessage>) -> anyhow::Result<()> {
let storage = InMemoryState {
persisted_state: SafeKeeperState::empty(),
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
for msg in messages {
sk.process_msg(&msg)?;
}
Ok(())
}
pub fn fuzz(data: &[u8]) {
try_fuzz(data);
}
pub fn fuzz_messages(messages: Vec<ProposerAcceptorMessage>) {
try_fuzz_messages(messages);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::wal_storage::Storage;
use std::ops::Deref;
// fake storage for tests
struct InMemoryState {
persisted_state: SafeKeeperState,
}
impl control_file::Storage for InMemoryState {
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
self.persisted_state = s.clone();
Ok(())
}
}
impl Deref for InMemoryState {
type Target = SafeKeeperState;
fn deref(&self) -> &Self::Target {
&self.persisted_state
}
}
struct DummyWalStore {
lsn: Lsn,
}
impl wal_storage::Storage for DummyWalStore {
fn flush_lsn(&self) -> Lsn {
self.lsn
}
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
Ok(())
}
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())
}
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
self.lsn = end_pos;
Ok(())
}
fn flush_wal(&mut self) -> Result<()> {
Ok(())
}
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
}
}
#[test]
fn test_voting() {
let storage = InMemoryState {

View File

@@ -6,6 +6,7 @@ use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use crate::wal_storage::WalReader;
use anyhow::{bail, Context, Result};
use arbitrary::Arbitrary;
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
use bytes::Bytes;
@@ -34,7 +35,7 @@ const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
type FullTransactionId = u64;
/// Hot standby feedback received from replica
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct HotStandbyFeedback {
pub ts: TimestampTz,
pub xmin: FullTransactionId,