mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-23 05:12:56 +00:00
Compare commits
2 Commits
readonly-n
...
kelvich/st
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e7b3c21ac | ||
|
|
aa923b2fae |
43
Cargo.lock
generated
43
Cargo.lock
generated
@@ -55,6 +55,15 @@ dependencies = [
|
||||
"backtrace",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arbitrary"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c38b6b6b79f671c25e1a3e785b7b82d7562ffc9cd3efdc98627e5668a2472490"
|
||||
dependencies = [
|
||||
"derive_arbitrary",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.4.12"
|
||||
@@ -196,7 +205,7 @@ dependencies = [
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"clap 2.34.0",
|
||||
"env_logger",
|
||||
"env_logger 0.9.0",
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"log",
|
||||
@@ -411,7 +420,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"clap 3.0.14",
|
||||
"env_logger",
|
||||
"env_logger 0.9.0",
|
||||
"hyper",
|
||||
"libc",
|
||||
"log",
|
||||
@@ -721,6 +730,17 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derive_arbitrary"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "98e23c06c035dac87bd802d98f368df73a7f2cb05a66ffbd1f377e821fac4af9"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.9.0"
|
||||
@@ -777,6 +797,16 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
|
||||
dependencies = [
|
||||
"log",
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.9.0"
|
||||
@@ -2059,7 +2089,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crc32c",
|
||||
"env_logger",
|
||||
"env_logger 0.9.0",
|
||||
"hex",
|
||||
"lazy_static",
|
||||
"log",
|
||||
@@ -2266,6 +2296,8 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6"
|
||||
dependencies = [
|
||||
"env_logger 0.8.4",
|
||||
"log",
|
||||
"rand",
|
||||
]
|
||||
|
||||
@@ -2667,6 +2699,7 @@ name = "safekeeper"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
@@ -3587,6 +3620,7 @@ name = "utils"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arbitrary",
|
||||
"bincode",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
@@ -3602,6 +3636,7 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"quickcheck",
|
||||
"rand",
|
||||
"routerify",
|
||||
"rustls",
|
||||
@@ -3655,7 +3690,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap 3.0.14",
|
||||
"env_logger",
|
||||
"env_logger 0.9.0",
|
||||
"log",
|
||||
"postgres",
|
||||
"tempfile",
|
||||
|
||||
@@ -28,6 +28,8 @@ rustls = "0.20.2"
|
||||
rustls-split = "0.3.0"
|
||||
git-version = "0.3.5"
|
||||
serde_with = "1.12.0"
|
||||
arbitrary = { version = "1", features = ["derive"] }
|
||||
quickcheck = "1"
|
||||
|
||||
metrics = { path = "../metrics" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use arbitrary::Arbitrary;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::ops::{Add, AddAssign};
|
||||
use std::ops::{Add, AddAssign, Sub};
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
@@ -13,7 +14,7 @@ use crate::seqwait::MonotonicCounter;
|
||||
pub const XLOG_BLCKSZ: u32 = 8192;
|
||||
|
||||
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
|
||||
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
|
||||
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, Arbitrary)]
|
||||
#[serde(transparent)]
|
||||
pub struct Lsn(pub u64);
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
use crate::sync::{AsyncishRead, SyncFuture};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use arbitrary::{Arbitrary, Unstructured};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use postgres_protocol::PG_EPOCH;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -11,7 +12,7 @@ use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::io::{self, Cursor};
|
||||
use std::str;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
@@ -22,6 +23,8 @@ pub const INT8_OID: Oid = 20;
|
||||
pub const INT4_OID: Oid = 23;
|
||||
pub const TEXT_OID: Oid = 25;
|
||||
|
||||
// static FUZZABLE_PG_EPOCH: FuzzableSystemTime = FuzzableSystemTime(*PG_EPOCH);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum FeMessage {
|
||||
StartupPacket(FeStartupPacket),
|
||||
@@ -921,7 +924,7 @@ impl<'a> BeMessage<'a> {
|
||||
|
||||
// Zenith extension of postgres replication protocol
|
||||
// See ZENITH_STATUS_UPDATE_TAG_BYTE
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Arbitrary)]
|
||||
pub struct ZenithFeedback {
|
||||
// Last known size of the timeline. Used to enforce timeline size limit.
|
||||
pub current_timeline_size: u64,
|
||||
@@ -929,7 +932,31 @@ pub struct ZenithFeedback {
|
||||
pub ps_writelsn: u64,
|
||||
pub ps_applylsn: u64,
|
||||
pub ps_flushlsn: u64,
|
||||
pub ps_replytime: SystemTime,
|
||||
pub ps_replytime: FuzzableSystemTime,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||
pub struct FuzzableSystemTime(pub SystemTime);
|
||||
|
||||
impl FuzzableSystemTime {
|
||||
fn now() -> Self {
|
||||
FuzzableSystemTime(SystemTime::now())
|
||||
}
|
||||
pub fn duration_since(&self, other: &Self) -> Result<Duration, std::time::SystemTimeError> {
|
||||
self.0.duration_since(other.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Arbitrary<'a> for FuzzableSystemTime {
|
||||
fn arbitrary(u: &mut Unstructured<'a>) -> Result<FuzzableSystemTime, arbitrary::Error> {
|
||||
let after_epoch = bool::arbitrary(u)?;
|
||||
let duration = Duration::arbitrary(u)?;
|
||||
Ok(FuzzableSystemTime(if after_epoch {
|
||||
UNIX_EPOCH + duration
|
||||
} else {
|
||||
UNIX_EPOCH - duration
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback.
|
||||
@@ -943,7 +970,7 @@ impl ZenithFeedback {
|
||||
ps_writelsn: 0,
|
||||
ps_applylsn: 0,
|
||||
ps_flushlsn: 0,
|
||||
ps_replytime: SystemTime::now(),
|
||||
ps_replytime: FuzzableSystemTime::now(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -975,7 +1002,7 @@ impl ZenithFeedback {
|
||||
|
||||
let timestamp = self
|
||||
.ps_replytime
|
||||
.duration_since(*PG_EPOCH)
|
||||
.duration_since(&FuzzableSystemTime(*PG_EPOCH))
|
||||
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
|
||||
.as_micros() as i64;
|
||||
|
||||
@@ -1020,9 +1047,11 @@ impl ZenithFeedback {
|
||||
assert_eq!(len, 8);
|
||||
let raw_time = buf.get_i64();
|
||||
if raw_time > 0 {
|
||||
zf.ps_replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
|
||||
zf.ps_replytime =
|
||||
FuzzableSystemTime(*PG_EPOCH + Duration::from_micros(raw_time as u64));
|
||||
} else {
|
||||
zf.ps_replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
|
||||
zf.ps_replytime =
|
||||
FuzzableSystemTime(*PG_EPOCH - Duration::from_micros(-raw_time as u64));
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
@@ -1051,7 +1080,7 @@ mod tests {
|
||||
zf.current_timeline_size = 12345678;
|
||||
// Set rounded time to be able to compare it with deserialized value,
|
||||
// because it is rounded up to microseconds during serialization.
|
||||
zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
|
||||
zf.ps_replytime = FuzzableSystemTime(*PG_EPOCH + Duration::from_secs(100_000_000));
|
||||
let mut data = BytesMut::new();
|
||||
zf.serialize(&mut data).unwrap();
|
||||
|
||||
@@ -1066,7 +1095,7 @@ mod tests {
|
||||
zf.current_timeline_size = 12345678;
|
||||
// Set rounded time to be able to compare it with deserialized value,
|
||||
// because it is rounded up to microseconds during serialization.
|
||||
zf.ps_replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
|
||||
zf.ps_replytime = FuzzableSystemTime(*PG_EPOCH + Duration::from_secs(100_000_000));
|
||||
let mut data = BytesMut::new();
|
||||
zf.serialize(&mut data).unwrap();
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use arbitrary::Arbitrary;
|
||||
use std::{fmt, str::FromStr};
|
||||
|
||||
use hex::FromHex;
|
||||
@@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize};
|
||||
///
|
||||
/// Use `#[serde_as(as = "DisplayFromStr")]` to (de)serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
|
||||
/// Check the `serde_with::serde_as` documentation for options for more complex types.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
|
||||
struct ZId([u8; 16]);
|
||||
|
||||
impl ZId {
|
||||
@@ -176,7 +177,7 @@ macro_rules! zid_newtype {
|
||||
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
|
||||
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
|
||||
/// See [`ZId`] for alternative ways to serialize it.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize, Arbitrary)]
|
||||
pub struct ZTimelineId(ZId);
|
||||
|
||||
zid_newtype!(ZTimelineId);
|
||||
@@ -187,7 +188,7 @@ zid_newtype!(ZTimelineId);
|
||||
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
|
||||
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
|
||||
/// See [`ZId`] for alternative ways to serialize it.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
|
||||
pub struct ZTenantId(ZId);
|
||||
|
||||
zid_newtype!(ZTenantId);
|
||||
@@ -224,7 +225,9 @@ impl fmt::Display for ZTenantTimelineId {
|
||||
|
||||
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
|
||||
// by the console.
|
||||
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize)]
|
||||
#[derive(
|
||||
Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Debug, Serialize, Deserialize, Arbitrary,
|
||||
)]
|
||||
#[serde(transparent)]
|
||||
pub struct NodeId(pub u64);
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ async-trait = "0.1"
|
||||
once_cell = "1.10.0"
|
||||
futures = "0.3.13"
|
||||
toml_edit = { version = "0.13", features = ["easy"] }
|
||||
arbitrary = { version = "1", features = ["derive"] }
|
||||
|
||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||
metrics = { path = "../libs/metrics" }
|
||||
|
||||
3
safekeeper/fuzz/.gitignore
vendored
Normal file
3
safekeeper/fuzz/.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
target
|
||||
corpus
|
||||
artifacts
|
||||
3057
safekeeper/fuzz/Cargo.lock
generated
Normal file
3057
safekeeper/fuzz/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
32
safekeeper/fuzz/Cargo.toml
Normal file
32
safekeeper/fuzz/Cargo.toml
Normal file
@@ -0,0 +1,32 @@
|
||||
[package]
|
||||
name = "safekeeper-fuzz"
|
||||
version = "0.0.0"
|
||||
authors = ["Automatically generated"]
|
||||
publish = false
|
||||
edition = "2018"
|
||||
|
||||
[package.metadata]
|
||||
cargo-fuzz = true
|
||||
|
||||
[dependencies]
|
||||
libfuzzer-sys = "0.4"
|
||||
bytes = "1.0.1"
|
||||
|
||||
[dependencies.safekeeper]
|
||||
path = ".."
|
||||
|
||||
# Prevent this from interfering with workspaces
|
||||
[workspace]
|
||||
members = ["."]
|
||||
|
||||
[[bin]]
|
||||
name = "fuzz_target_1"
|
||||
path = "fuzz_targets/fuzz_target_1.rs"
|
||||
test = false
|
||||
doc = false
|
||||
|
||||
[[bin]]
|
||||
name = "sk"
|
||||
path = "fuzz_targets/sk.rs"
|
||||
test = false
|
||||
doc = false
|
||||
6
safekeeper/fuzz/fuzz_targets/fuzz_target_1.rs
Normal file
6
safekeeper/fuzz/fuzz_targets/fuzz_target_1.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
#![no_main]
|
||||
use libfuzzer_sys::fuzz_target;
|
||||
|
||||
fuzz_target!(|data: &[u8]| {
|
||||
// fuzzed code goes here
|
||||
});
|
||||
10
safekeeper/fuzz/fuzz_targets/sk.rs
Normal file
10
safekeeper/fuzz/fuzz_targets/sk.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
#![no_main]
|
||||
use libfuzzer_sys::fuzz_target;
|
||||
|
||||
// fuzz_target!(|data: &[u8]| {
|
||||
// safekeeper::safekeeper::fuzz(data)
|
||||
// });
|
||||
|
||||
fuzz_target!(|messages: Vec<safekeeper::safekeeper::ProposerAcceptorMessage>| {
|
||||
safekeeper::safekeeper::fuzz_messages(messages)
|
||||
});
|
||||
@@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
||||
use crate::safekeeper::{self, AcceptorProposerMessage, AppendResponse};
|
||||
use crate::safekeeper::{
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
|
||||
};
|
||||
@@ -166,7 +166,7 @@ fn append_logical_message(
|
||||
truncate_lsn: msg.truncate_lsn,
|
||||
proposer_uuid: [0u8; 16],
|
||||
},
|
||||
wal_data: Bytes::from(wal_data),
|
||||
wal_data: safekeeper::WalData(Bytes::from(wal_data)),
|
||||
});
|
||||
|
||||
let response = spg.timeline.get().process_msg(&append_request)?;
|
||||
@@ -202,7 +202,7 @@ impl XlLogicalMessage {
|
||||
/// Create new WAL record for non-transactional logical message.
|
||||
/// Used for creating artificial WAL for tests, as LogicalMessage
|
||||
/// record is basically no-op.
|
||||
fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
|
||||
pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
|
||||
let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1);
|
||||
prefix_bytes.put(prefix.as_bytes());
|
||||
prefix_bytes.put_u8(0);
|
||||
|
||||
@@ -8,7 +8,7 @@ use metrics::{
|
||||
Gauge, IntGaugeVec,
|
||||
};
|
||||
use postgres_ffi::xlog_utils::XLogSegNo;
|
||||
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
|
||||
use utils::{lsn::Lsn, pq_proto::FuzzableSystemTime, zid::ZTenantTimelineId};
|
||||
|
||||
use crate::{
|
||||
safekeeper::{SafeKeeperState, SafekeeperMemState},
|
||||
@@ -290,7 +290,9 @@ impl Collector for TimelineCollector {
|
||||
self.feedback_ps_write_lsn
|
||||
.with_label_values(labels)
|
||||
.set(feedback.ps_writelsn);
|
||||
if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH)
|
||||
if let Ok(unix_time) = feedback
|
||||
.ps_replytime
|
||||
.duration_since(&FuzzableSystemTime(SystemTime::UNIX_EPOCH))
|
||||
{
|
||||
self.feedback_last_time_seconds
|
||||
.with_label_values(labels)
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
//! Acceptor part of proposer-acceptor consensus algorithm.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use arbitrary::Arbitrary;
|
||||
use arbitrary::Unstructured;
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
@@ -16,6 +18,7 @@ use std::io::Read;
|
||||
use tracing::*;
|
||||
|
||||
use crate::control_file;
|
||||
use crate::json_ctrl::encode_logical_message;
|
||||
use crate::send_wal::HotStandbyFeedback;
|
||||
|
||||
use crate::wal_storage;
|
||||
@@ -36,12 +39,12 @@ const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
pub type Term = u64;
|
||||
const INVALID_TERM: Term = 0;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Arbitrary)]
|
||||
pub struct TermSwitchEntry {
|
||||
pub term: Term,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
#[derive(Clone, Serialize, Deserialize, Arbitrary)]
|
||||
pub struct TermHistory(pub Vec<TermSwitchEntry>);
|
||||
|
||||
impl TermHistory {
|
||||
@@ -55,6 +58,9 @@ impl TermHistory {
|
||||
bail!("TermHistory misses len");
|
||||
}
|
||||
let n_entries = bytes.get_u32_le();
|
||||
if n_entries > 10 {
|
||||
bail!("too long")
|
||||
}
|
||||
let mut res = Vec::with_capacity(n_entries as usize);
|
||||
for _ in 0..n_entries {
|
||||
if bytes.remaining() < 16 {
|
||||
@@ -246,7 +252,6 @@ impl SafeKeeperState {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn empty() -> Self {
|
||||
SafeKeeperState::new(&ZTenantTimelineId::empty(), vec![])
|
||||
}
|
||||
@@ -255,7 +260,7 @@ impl SafeKeeperState {
|
||||
// protocol messages
|
||||
|
||||
/// Initial Proposer -> Acceptor message
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Deserialize, Arbitrary)]
|
||||
pub struct ProposerGreeting {
|
||||
/// proposer-acceptor protocol version
|
||||
pub protocol_version: u32,
|
||||
@@ -272,20 +277,20 @@ pub struct ProposerGreeting {
|
||||
|
||||
/// Acceptor -> Proposer initial response: the highest term known to me
|
||||
/// (acceptor voted for).
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Arbitrary)]
|
||||
pub struct AcceptorGreeting {
|
||||
term: u64,
|
||||
node_id: NodeId,
|
||||
}
|
||||
|
||||
/// Vote request sent from proposer to safekeepers
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Deserialize, Arbitrary)]
|
||||
pub struct VoteRequest {
|
||||
term: Term,
|
||||
}
|
||||
|
||||
/// Vote itself, sent from safekeeper to proposer
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Arbitrary)]
|
||||
pub struct VoteResponse {
|
||||
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
vote_given: u64, // fixme u64 due to padding
|
||||
@@ -301,7 +306,7 @@ pub struct VoteResponse {
|
||||
* Proposer -> Acceptor message announcing proposer is elected and communicating
|
||||
* term history to it.
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Arbitrary)]
|
||||
pub struct ProposerElected {
|
||||
pub term: Term,
|
||||
pub start_streaming_at: Lsn,
|
||||
@@ -311,12 +316,23 @@ pub struct ProposerElected {
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
/// communicates commit_lsn.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Arbitrary)]
|
||||
pub struct AppendRequest {
|
||||
pub h: AppendRequestHeader,
|
||||
pub wal_data: Bytes,
|
||||
pub wal_data: WalData,
|
||||
}
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WalData(pub Bytes);
|
||||
|
||||
impl<'a> Arbitrary<'a> for WalData {
|
||||
fn arbitrary(_u: &mut Unstructured<'a>) -> Result<WalData, arbitrary::Error> {
|
||||
let data = encode_logical_message("prefix", "message");
|
||||
Ok(WalData(bytes::Bytes::from(data)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Arbitrary)]
|
||||
pub struct AppendRequestHeader {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
@@ -335,7 +351,7 @@ pub struct AppendRequestHeader {
|
||||
}
|
||||
|
||||
/// Report safekeeper state to proposer
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize, Arbitrary)]
|
||||
pub struct AppendResponse {
|
||||
// Current term of the safekeeper; if it is higher than proposer's, the
|
||||
// compute is out of date.
|
||||
@@ -364,7 +380,7 @@ impl AppendResponse {
|
||||
}
|
||||
|
||||
/// Proposer -> Acceptor messages
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Arbitrary)]
|
||||
pub enum ProposerAcceptorMessage {
|
||||
Greeting(ProposerGreeting),
|
||||
VoteRequest(VoteRequest),
|
||||
@@ -428,7 +444,10 @@ impl ProposerAcceptorMessage {
|
||||
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
|
||||
stream.read_exact(&mut wal_data_vec)?;
|
||||
let wal_data = Bytes::from(wal_data_vec);
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
let msg = AppendRequest {
|
||||
h: hdr,
|
||||
wal_data: WalData(wal_data),
|
||||
};
|
||||
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
}
|
||||
@@ -792,8 +811,8 @@ where
|
||||
self.inmem.proposer_uuid = msg.h.proposer_uuid;
|
||||
|
||||
// do the job
|
||||
if !msg.wal_data.is_empty() {
|
||||
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
|
||||
if !msg.wal_data.0.is_empty() {
|
||||
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data.0)?;
|
||||
}
|
||||
|
||||
// flush wal to the disk, if required
|
||||
@@ -813,15 +832,22 @@ where
|
||||
// Update truncate and commit LSN in control file.
|
||||
// To avoid negative impact on performance of extra fsync, do it only
|
||||
// when truncate_lsn delta exceeds WAL segment size.
|
||||
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
|
||||
< self.inmem.peer_horizon_lsn
|
||||
|
||||
// using subtraction to avoid LSN overflow
|
||||
if self
|
||||
.inmem
|
||||
.peer_horizon_lsn
|
||||
.0
|
||||
.checked_sub(self.state.peer_horizon_lsn.0)
|
||||
.unwrap()
|
||||
> self.state.server.wal_seg_size as u64
|
||||
{
|
||||
self.persist_control_file(self.state.clone())?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
|
||||
msg.wal_data.len(),
|
||||
msg.wal_data.0.len(),
|
||||
msg.h.end_lsn,
|
||||
msg.h.commit_lsn,
|
||||
msg.h.truncate_lsn,
|
||||
@@ -906,64 +932,110 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
use crate::wal_storage::Storage;
|
||||
use std::ops::Deref;
|
||||
|
||||
// fake storage for tests
|
||||
struct InMemoryState {
|
||||
persisted_state: SafeKeeperState,
|
||||
}
|
||||
|
||||
impl control_file::Storage for InMemoryState {
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
self.persisted_state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for InMemoryState {
|
||||
type Target = SafeKeeperState;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.persisted_state
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyWalStore {
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
impl wal_storage::Storage for DummyWalStore {
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
self.lsn = startpos + buf.len() as u64;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
self.lsn = end_pos;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_wal(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
|
||||
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn try_fuzz(data: &[u8]) -> anyhow::Result<()> {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: SafeKeeperState::empty(),
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
let ztli = ZTimelineId::from([0u8; 16]);
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
|
||||
|
||||
let bytes = Bytes::copy_from_slice(data);
|
||||
let mut reader = bytes.reader();
|
||||
loop {
|
||||
let len = reader.read_u8()? as usize;
|
||||
let mut buf: Vec<u8> = vec![0; len];
|
||||
reader.read_exact(&mut buf)?;
|
||||
|
||||
let msg = ProposerAcceptorMessage::parse(Bytes::copy_from_slice(&buf))?;
|
||||
sk.process_msg(&msg)?;
|
||||
}
|
||||
}
|
||||
|
||||
fn try_fuzz_messages(messages: Vec<ProposerAcceptorMessage>) -> anyhow::Result<()> {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: SafeKeeperState::empty(),
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
let ztli = ZTimelineId::from([0u8; 16]);
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
|
||||
|
||||
for msg in messages {
|
||||
sk.process_msg(&msg)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fuzz(data: &[u8]) {
|
||||
try_fuzz(data);
|
||||
}
|
||||
|
||||
pub fn fuzz_messages(messages: Vec<ProposerAcceptorMessage>) {
|
||||
try_fuzz_messages(messages);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::wal_storage::Storage;
|
||||
use std::ops::Deref;
|
||||
|
||||
// fake storage for tests
|
||||
struct InMemoryState {
|
||||
persisted_state: SafeKeeperState,
|
||||
}
|
||||
|
||||
impl control_file::Storage for InMemoryState {
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
self.persisted_state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for InMemoryState {
|
||||
type Target = SafeKeeperState;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.persisted_state
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyWalStore {
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
impl wal_storage::Storage for DummyWalStore {
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
self.lsn = startpos + buf.len() as u64;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
self.lsn = end_pos;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_wal(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
|
||||
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_voting() {
|
||||
let storage = InMemoryState {
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::timeline::{ReplicaState, Timeline, TimelineTools};
|
||||
use crate::wal_storage::WalReader;
|
||||
use anyhow::{bail, Context, Result};
|
||||
|
||||
use arbitrary::Arbitrary;
|
||||
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE};
|
||||
|
||||
use bytes::Bytes;
|
||||
@@ -34,7 +35,7 @@ const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
|
||||
type FullTransactionId = u64;
|
||||
|
||||
/// Hot standby feedback received from replica
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Arbitrary)]
|
||||
pub struct HotStandbyFeedback {
|
||||
pub ts: TimestampTz,
|
||||
pub xmin: FullTransactionId,
|
||||
|
||||
Reference in New Issue
Block a user