From 3e7b3c21acd471e23f5778ef383b393ffd1656ec Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Mon, 6 Jun 2022 16:50:00 +0300 Subject: [PATCH] Somewhat working and whining about LSN addition owerflow --- Cargo.lock | 43 ++++++++++++++-- libs/utils/Cargo.toml | 2 + libs/utils/src/lsn.rs | 5 +- libs/utils/src/pq_proto.rs | 47 ++++++++++++++---- libs/utils/src/zid.rs | 11 ++-- safekeeper/Cargo.toml | 1 + safekeeper/fuzz/Cargo.lock | 40 ++++++++++++++- safekeeper/fuzz/fuzz_targets/sk.rs | 8 ++- safekeeper/src/json_ctrl.rs | 6 +-- safekeeper/src/metrics.rs | 6 ++- safekeeper/src/safekeeper.rs | 80 ++++++++++++++++++++++-------- safekeeper/src/send_wal.rs | 3 +- 12 files changed, 204 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c615766eb8..0b670bace6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,6 +55,15 @@ dependencies = [ "backtrace", ] +[[package]] +name = "arbitrary" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c38b6b6b79f671c25e1a3e785b7b82d7562ffc9cd3efdc98627e5668a2472490" +dependencies = [ + "derive_arbitrary", +] + [[package]] name = "arrayvec" version = "0.4.12" @@ -196,7 +205,7 @@ dependencies = [ "cexpr", "clang-sys", "clap 2.34.0", - "env_logger", + "env_logger 0.9.0", "lazy_static", "lazycell", "log", @@ -411,7 +420,7 @@ dependencies = [ "anyhow", "chrono", "clap 3.0.14", - "env_logger", + "env_logger 0.9.0", "hyper", "libc", "log", @@ -721,6 +730,17 @@ dependencies = [ "uuid", ] +[[package]] +name = "derive_arbitrary" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98e23c06c035dac87bd802d98f368df73a7f2cb05a66ffbd1f377e821fac4af9" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "digest" version = "0.9.0" @@ -777,6 +797,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_logger" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -2059,7 +2089,7 @@ dependencies = [ "bytes", "chrono", "crc32c", - "env_logger", + "env_logger 0.9.0", "hex", "lazy_static", "log", @@ -2266,6 +2296,8 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6" dependencies = [ + "env_logger 0.8.4", + "log", "rand", ] @@ -2667,6 +2699,7 @@ name = "safekeeper" version = "0.1.0" dependencies = [ "anyhow", + "arbitrary", "async-trait", "byteorder", "bytes", @@ -3587,6 +3620,7 @@ name = "utils" version = "0.1.0" dependencies = [ "anyhow", + "arbitrary", "bincode", "byteorder", "bytes", @@ -3602,6 +3636,7 @@ dependencies = [ "pin-project-lite", "postgres", "postgres-protocol", + "quickcheck", "rand", "routerify", "rustls", @@ -3655,7 +3690,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap 3.0.14", - "env_logger", + "env_logger 0.9.0", "log", "postgres", "tempfile", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index d83b02d7ae..e7e3d7a6ae 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -28,6 +28,8 @@ rustls = "0.20.2" rustls-split = "0.3.0" git-version = "0.3.5" serde_with = "1.12.0" +arbitrary = { version = "1", features = ["derive"] } +quickcheck = "1" metrics = { path = "../metrics" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 3dab2a625c..5703d00bee 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -1,8 +1,9 @@ #![warn(missing_docs)] +use arbitrary::Arbitrary; use serde::{Deserialize, Serialize}; use std::fmt; -use std::ops::{Add, AddAssign}; +use std::ops::{Add, AddAssign, Sub}; use std::path::Path; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; @@ -13,7 +14,7 @@ use crate::seqwait::MonotonicCounter; pub const XLOG_BLCKSZ: u32 = 8192; /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr -#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)] +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, Arbitrary)] #[serde(transparent)] pub struct Lsn(pub u64); diff --git a/libs/utils/src/pq_proto.rs b/libs/utils/src/pq_proto.rs index a36e8342b0..cba7fb9094 100644 --- a/libs/utils/src/pq_proto.rs +++ b/libs/utils/src/pq_proto.rs @@ -4,6 +4,7 @@ use crate::sync::{AsyncishRead, SyncFuture}; use anyhow::{bail, ensure, Context, Result}; +use arbitrary::{Arbitrary, Unstructured}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_protocol::PG_EPOCH; use serde::{Deserialize, Serialize}; @@ -11,7 +12,7 @@ use std::collections::HashMap; use std::future::Future; use std::io::{self, Cursor}; use std::str; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::io::AsyncReadExt; use tracing::{trace, warn}; @@ -22,6 +23,8 @@ pub const INT8_OID: Oid = 20; pub const INT4_OID: Oid = 23; pub const TEXT_OID: Oid = 25; +// static FUZZABLE_PG_EPOCH: FuzzableSystemTime = FuzzableSystemTime(*PG_EPOCH); + #[derive(Debug)] pub enum FeMessage { StartupPacket(FeStartupPacket), @@ -921,7 +924,7 @@ impl<'a> BeMessage<'a> { // Zenith extension of postgres replication protocol // See ZENITH_STATUS_UPDATE_TAG_BYTE -#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Arbitrary)] pub struct ZenithFeedback { // Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, @@ -929,7 +932,31 @@ pub struct ZenithFeedback { pub ps_writelsn: u64, pub ps_applylsn: u64, pub ps_flushlsn: u64, - pub ps_replytime: SystemTime, + pub ps_replytime: FuzzableSystemTime, +} + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub struct FuzzableSystemTime(pub SystemTime); + +impl FuzzableSystemTime { + fn now() -> Self { + FuzzableSystemTime(SystemTime::now()) + } + pub fn duration_since(&self, other: &Self) -> Result { + self.0.duration_since(other.0) + } +} + +impl<'a> Arbitrary<'a> for FuzzableSystemTime { + fn arbitrary(u: &mut Unstructured<'a>) -> Result { + let after_epoch = bool::arbitrary(u)?; + let duration = Duration::arbitrary(u)?; + Ok(FuzzableSystemTime(if after_epoch { + UNIX_EPOCH + duration + } else { + UNIX_EPOCH - duration + })) + } } // NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback. @@ -943,7 +970,7 @@ impl ZenithFeedback { ps_writelsn: 0, ps_applylsn: 0, ps_flushlsn: 0, - ps_replytime: SystemTime::now(), + ps_replytime: FuzzableSystemTime::now(), } } @@ -975,7 +1002,7 @@ impl ZenithFeedback { let timestamp = self .ps_replytime - .duration_since(*PG_EPOCH) + .duration_since(&FuzzableSystemTime(*PG_EPOCH)) .expect("failed to serialize pg_replytime earlier than PG_EPOCH") .as_micros() as i64; @@ -1020,9 +1047,11 @@ impl ZenithFeedback { assert_eq!(len, 8); let raw_time = buf.get_i64(); if raw_time > 0 { - zf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64); + zf.ps_replytime = + FuzzableSystemTime(*PG_EPOCH + Duration::from_micros(raw_time as u64)); } else { - zf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64); + zf.ps_replytime = + FuzzableSystemTime(*PG_EPOCH - Duration::from_micros(-raw_time as u64)); } } _ => { @@ -1051,7 +1080,7 @@ mod tests { zf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, // because it is rounded up to microseconds during serialization. - zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + zf.ps_replytime = FuzzableSystemTime(*PG_EPOCH + Duration::from_secs(100_000_000)); let mut data = BytesMut::new(); zf.serialize(&mut data).unwrap(); @@ -1066,7 +1095,7 @@ mod tests { zf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, // because it is rounded up to microseconds during serialization. - zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + zf.ps_replytime = FuzzableSystemTime(*PG_EPOCH + Duration::from_secs(100_000_000)); let mut data = BytesMut::new(); zf.serialize(&mut data).unwrap(); diff --git a/libs/utils/src/zid.rs b/libs/utils/src/zid.rs index 6da5355f61..cffa7186d0 100644 --- a/libs/utils/src/zid.rs +++ b/libs/utils/src/zid.rs @@ -1,3 +1,4 @@ +use arbitrary::Arbitrary; use std::{fmt, str::FromStr}; use hex::FromHex; @@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize}; /// /// Use `#[serde_as(as = "DisplayFromStr")]` to (de)serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`. /// Check the `serde_with::serde_as` documentation for options for more complex types. -#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)] struct ZId([u8; 16]); impl ZId { @@ -176,7 +177,7 @@ macro_rules! zid_newtype { /// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look /// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`. /// See [`ZId`] for alternative ways to serialize it. -#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize, Arbitrary)] pub struct ZTimelineId(ZId); zid_newtype!(ZTimelineId); @@ -187,7 +188,7 @@ zid_newtype!(ZTimelineId); /// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look /// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`. /// See [`ZId`] for alternative ways to serialize it. -#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)] pub struct ZTenantId(ZId); zid_newtype!(ZTenantId); @@ -224,7 +225,9 @@ impl fmt::Display for ZTenantTimelineId { // Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued // by the console. -#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)] +#[derive( + Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize, Arbitrary, +)] #[serde(transparent)] pub struct NodeId(pub u64); diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 373108c61b..6aba78ae8a 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -34,6 +34,7 @@ async-trait = "0.1" once_cell = "1.10.0" futures = "0.3.13" toml_edit = { version = "0.13", features = ["easy"] } +arbitrary = { version = "1", features = ["derive"] } postgres_ffi = { path = "../libs/postgres_ffi" } metrics = { path = "../libs/metrics" } diff --git a/safekeeper/fuzz/Cargo.lock b/safekeeper/fuzz/Cargo.lock index a040bea2f8..5683783805 100644 --- a/safekeeper/fuzz/Cargo.lock +++ b/safekeeper/fuzz/Cargo.lock @@ -60,6 +60,9 @@ name = "arbitrary" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c38b6b6b79f671c25e1a3e785b7b82d7562ffc9cd3efdc98627e5668a2472490" +dependencies = [ + "derive_arbitrary", +] [[package]] name = "async-stream" @@ -193,7 +196,7 @@ dependencies = [ "cexpr", "clang-sys", "clap 2.34.0", - "env_logger", + "env_logger 0.9.0", "lazy_static", "lazycell", "log", @@ -489,6 +492,17 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_arbitrary" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98e23c06c035dac87bd802d98f368df73a7f2cb05a66ffbd1f377e821fac4af9" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "digest" version = "0.9.0" @@ -536,6 +550,16 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "env_logger" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -1669,6 +1693,17 @@ dependencies = [ "prost", ] +[[package]] +name = "quickcheck" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6" +dependencies = [ + "env_logger 0.8.4", + "log", + "rand", +] + [[package]] name = "quote" version = "1.0.18" @@ -1955,6 +1990,7 @@ name = "safekeeper" version = "0.1.0" dependencies = [ "anyhow", + "arbitrary", "async-trait", "byteorder", "bytes", @@ -2725,6 +2761,7 @@ name = "utils" version = "0.1.0" dependencies = [ "anyhow", + "arbitrary", "bincode", "bytes", "git-version", @@ -2737,6 +2774,7 @@ dependencies = [ "pin-project-lite", "postgres", "postgres-protocol", + "quickcheck", "rand", "routerify", "rustls", diff --git a/safekeeper/fuzz/fuzz_targets/sk.rs b/safekeeper/fuzz/fuzz_targets/sk.rs index 273b706d00..63f1cbdd79 100644 --- a/safekeeper/fuzz/fuzz_targets/sk.rs +++ b/safekeeper/fuzz/fuzz_targets/sk.rs @@ -1,6 +1,10 @@ #![no_main] use libfuzzer_sys::fuzz_target; -fuzz_target!(|data: &[u8]| { - safekeeper::safekeeper::fuzz(data) +// fuzz_target!(|data: &[u8]| { +// safekeeper::safekeeper::fuzz(data) +// }); + +fuzz_target!(|messages: Vec| { + safekeeper::safekeeper::fuzz_messages(messages) }); diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 43514997d4..39c00ff931 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use tracing::*; use crate::handler::SafekeeperPostgresHandler; -use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; +use crate::safekeeper::{self, AcceptorProposerMessage, AppendResponse}; use crate::safekeeper::{ AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting, }; @@ -166,7 +166,7 @@ fn append_logical_message( truncate_lsn: msg.truncate_lsn, proposer_uuid: [0u8; 16], }, - wal_data: Bytes::from(wal_data), + wal_data: safekeeper::WalData(Bytes::from(wal_data)), }); let response = spg.timeline.get().process_msg(&append_request)?; @@ -202,7 +202,7 @@ impl XlLogicalMessage { /// Create new WAL record for non-transactional logical message. /// Used for creating artificial WAL for tests, as LogicalMessage /// record is basically no-op. -fn encode_logical_message(prefix: &str, message: &str) -> Vec { +pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1); prefix_bytes.put(prefix.as_bytes()); prefix_bytes.put_u8(0); diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 5a2e5f125f..3d9443f76b 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -8,7 +8,7 @@ use metrics::{ Gauge, IntGaugeVec, }; use postgres_ffi::xlog_utils::XLogSegNo; -use utils::{lsn::Lsn, zid::ZTenantTimelineId}; +use utils::{lsn::Lsn, pq_proto::FuzzableSystemTime, zid::ZTenantTimelineId}; use crate::{ safekeeper::{SafeKeeperState, SafekeeperMemState}, @@ -290,7 +290,9 @@ impl Collector for TimelineCollector { self.feedback_ps_write_lsn .with_label_values(labels) .set(feedback.ps_writelsn); - if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH) + if let Ok(unix_time) = feedback + .ps_replytime + .duration_since(&FuzzableSystemTime(SystemTime::UNIX_EPOCH)) { self.feedback_last_time_seconds .with_label_values(labels) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 4925b221bf..f5abfb696a 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -1,6 +1,8 @@ //! Acceptor part of proposer-acceptor consensus algorithm. use anyhow::{bail, Context, Result}; +use arbitrary::Arbitrary; +use arbitrary::Unstructured; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -16,6 +18,7 @@ use std::io::Read; use tracing::*; use crate::control_file; +use crate::json_ctrl::encode_logical_message; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; @@ -36,12 +39,12 @@ const UNKNOWN_SERVER_VERSION: u32 = 0; pub type Term = u64; const INVALID_TERM: Term = 0; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Arbitrary)] pub struct TermSwitchEntry { pub term: Term, pub lsn: Lsn, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, Arbitrary)] pub struct TermHistory(pub Vec); impl TermHistory { @@ -257,7 +260,7 @@ impl SafeKeeperState { // protocol messages /// Initial Proposer -> Acceptor message -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Arbitrary)] pub struct ProposerGreeting { /// proposer-acceptor protocol version pub protocol_version: u32, @@ -274,20 +277,20 @@ pub struct ProposerGreeting { /// Acceptor -> Proposer initial response: the highest term known to me /// (acceptor voted for). -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Arbitrary)] pub struct AcceptorGreeting { term: u64, node_id: NodeId, } /// Vote request sent from proposer to safekeepers -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Arbitrary)] pub struct VoteRequest { term: Term, } /// Vote itself, sent from safekeeper to proposer -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Arbitrary)] pub struct VoteResponse { term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. vote_given: u64, // fixme u64 due to padding @@ -303,7 +306,7 @@ pub struct VoteResponse { * Proposer -> Acceptor message announcing proposer is elected and communicating * term history to it. */ -#[derive(Debug)] +#[derive(Debug, Arbitrary)] pub struct ProposerElected { pub term: Term, pub start_streaming_at: Lsn, @@ -313,12 +316,23 @@ pub struct ProposerElected { /// Request with WAL message sent from proposer to safekeeper. Along the way it /// communicates commit_lsn. -#[derive(Debug)] +#[derive(Debug, Arbitrary)] pub struct AppendRequest { pub h: AppendRequestHeader, - pub wal_data: Bytes, + pub wal_data: WalData, } -#[derive(Debug, Clone, Deserialize)] + +#[derive(Debug)] +pub struct WalData(pub Bytes); + +impl<'a> Arbitrary<'a> for WalData { + fn arbitrary(_u: &mut Unstructured<'a>) -> Result { + let data = encode_logical_message("prefix", "message"); + Ok(WalData(bytes::Bytes::from(data))) + } +} + +#[derive(Debug, Clone, Deserialize, Arbitrary)] pub struct AppendRequestHeader { // safekeeper's current term; if it is higher than proposer's, the compute is out of date. pub term: Term, @@ -337,7 +351,7 @@ pub struct AppendRequestHeader { } /// Report safekeeper state to proposer -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Arbitrary)] pub struct AppendResponse { // Current term of the safekeeper; if it is higher than proposer's, the // compute is out of date. @@ -366,7 +380,7 @@ impl AppendResponse { } /// Proposer -> Acceptor messages -#[derive(Debug)] +#[derive(Debug, Arbitrary)] pub enum ProposerAcceptorMessage { Greeting(ProposerGreeting), VoteRequest(VoteRequest), @@ -430,7 +444,10 @@ impl ProposerAcceptorMessage { let mut wal_data_vec: Vec = vec![0; rec_size]; stream.read_exact(&mut wal_data_vec)?; let wal_data = Bytes::from(wal_data_vec); - let msg = AppendRequest { h: hdr, wal_data }; + let msg = AppendRequest { + h: hdr, + wal_data: WalData(wal_data), + }; Ok(ProposerAcceptorMessage::AppendRequest(msg)) } @@ -794,8 +811,8 @@ where self.inmem.proposer_uuid = msg.h.proposer_uuid; // do the job - if !msg.wal_data.is_empty() { - self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; + if !msg.wal_data.0.is_empty() { + self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data.0)?; } // flush wal to the disk, if required @@ -815,15 +832,22 @@ where // Update truncate and commit LSN in control file. // To avoid negative impact on performance of extra fsync, do it only // when truncate_lsn delta exceeds WAL segment size. - if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) - < self.inmem.peer_horizon_lsn + + // using subtraction to avoid LSN overflow + if self + .inmem + .peer_horizon_lsn + .0 + .checked_sub(self.state.peer_horizon_lsn.0) + .unwrap() + > self.state.server.wal_seg_size as u64 { self.persist_control_file(self.state.clone())?; } trace!( "processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}", - msg.wal_data.len(), + msg.wal_data.0.len(), msg.h.end_lsn, msg.h.commit_lsn, msg.h.truncate_lsn, @@ -908,7 +932,6 @@ where } } - use crate::wal_storage::Storage; use std::ops::Deref; @@ -932,7 +955,6 @@ impl Deref for InMemoryState { } } - struct DummyWalStore { lsn: Lsn, } @@ -985,10 +1007,28 @@ fn try_fuzz(data: &[u8]) -> anyhow::Result<()> { } } +fn try_fuzz_messages(messages: Vec) -> anyhow::Result<()> { + let storage = InMemoryState { + persisted_state: SafeKeeperState::empty(), + }; + let wal_store = DummyWalStore { lsn: Lsn(0) }; + let ztli = ZTimelineId::from([0u8; 16]); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap(); + + for msg in messages { + sk.process_msg(&msg)?; + } + + Ok(()) +} + pub fn fuzz(data: &[u8]) { try_fuzz(data); } +pub fn fuzz_messages(messages: Vec) { + try_fuzz_messages(messages); +} #[cfg(test)] mod tests { diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 7a6a8ca9b9..f465cc4393 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -6,6 +6,7 @@ use crate::timeline::{ReplicaState, Timeline, TimelineTools}; use crate::wal_storage::WalReader; use anyhow::{bail, Context, Result}; +use arbitrary::Arbitrary; use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; use bytes::Bytes; @@ -34,7 +35,7 @@ const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; type FullTransactionId = u64; /// Hot standby feedback received from replica -#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Arbitrary)] pub struct HotStandbyFeedback { pub ts: TimestampTz, pub xmin: FullTransactionId,