mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-18 10:00:37 +00:00
Compare commits
3 Commits
statement_
...
fuzz-test-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a74ca297e1 | ||
|
|
c8c76a8b99 | ||
|
|
36673ff709 |
81
Cargo.lock
generated
81
Cargo.lock
generated
@@ -306,17 +306,41 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap"
|
name = "clap"
|
||||||
version = "3.0.14"
|
version = "3.1.10"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b63edc3f163b3c71ec8aa23f9bd6070f77edbf3d1d198b164afa90ff00e4ec62"
|
checksum = "3124f3f75ce09e22d1410043e1e24f2ecc44fad3afe4f08408f1f7663d68da2b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atty",
|
"atty",
|
||||||
"bitflags",
|
"bitflags",
|
||||||
|
"clap_derive",
|
||||||
|
"clap_lex",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"os_str_bytes",
|
"lazy_static",
|
||||||
"strsim 0.10.0",
|
"strsim 0.10.0",
|
||||||
"termcolor",
|
"termcolor",
|
||||||
"textwrap 0.14.2",
|
"textwrap 0.15.0",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "clap_derive"
|
||||||
|
version = "3.1.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1"
|
||||||
|
dependencies = [
|
||||||
|
"heck 0.4.0",
|
||||||
|
"proc-macro-error",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "clap_lex"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669"
|
||||||
|
dependencies = [
|
||||||
|
"os_str_bytes",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -335,7 +359,7 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap 3.0.14",
|
"clap 3.1.10",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"hyper",
|
"hyper",
|
||||||
"libc",
|
"libc",
|
||||||
@@ -948,6 +972,12 @@ dependencies = [
|
|||||||
"unicode-segmentation",
|
"unicode-segmentation",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "heck"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
version = "0.1.19"
|
version = "0.1.19"
|
||||||
@@ -1499,9 +1529,6 @@ name = "os_str_bytes"
|
|||||||
version = "6.0.0"
|
version = "6.0.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
|
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
|
||||||
dependencies = [
|
|
||||||
"memchr",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pageserver"
|
name = "pageserver"
|
||||||
@@ -1513,7 +1540,7 @@ dependencies = [
|
|||||||
"byteorder",
|
"byteorder",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap 3.0.14",
|
"clap 3.1.10",
|
||||||
"const_format",
|
"const_format",
|
||||||
"crc32c",
|
"crc32c",
|
||||||
"crossbeam-utils",
|
"crossbeam-utils",
|
||||||
@@ -1777,6 +1804,30 @@ version = "0.2.16"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
|
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro-error"
|
||||||
|
version = "1.0.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro-error-attr",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro-error-attr"
|
||||||
|
version = "1.0.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro-hack"
|
name = "proc-macro-hack"
|
||||||
version = "0.5.19"
|
version = "0.5.19"
|
||||||
@@ -1823,7 +1874,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
|
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"heck",
|
"heck 0.3.3",
|
||||||
"itertools",
|
"itertools",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"log",
|
"log",
|
||||||
@@ -1867,7 +1918,7 @@ dependencies = [
|
|||||||
"async-trait",
|
"async-trait",
|
||||||
"base64 0.13.0",
|
"base64 0.13.0",
|
||||||
"bytes",
|
"bytes",
|
||||||
"clap 3.0.14",
|
"clap 3.1.10",
|
||||||
"fail",
|
"fail",
|
||||||
"futures",
|
"futures",
|
||||||
"hashbrown",
|
"hashbrown",
|
||||||
@@ -2297,7 +2348,7 @@ dependencies = [
|
|||||||
"anyhow",
|
"anyhow",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"bytes",
|
"bytes",
|
||||||
"clap 3.0.14",
|
"clap 3.1.10",
|
||||||
"const_format",
|
"const_format",
|
||||||
"crc32c",
|
"crc32c",
|
||||||
"daemonize",
|
"daemonize",
|
||||||
@@ -2655,9 +2706,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "textwrap"
|
name = "textwrap"
|
||||||
version = "0.14.2"
|
version = "0.15.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80"
|
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "thiserror"
|
name = "thiserror"
|
||||||
@@ -3364,7 +3415,7 @@ name = "zenith"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"clap 3.0.14",
|
"clap 3.1.10",
|
||||||
"control_plane",
|
"control_plane",
|
||||||
"pageserver",
|
"pageserver",
|
||||||
"postgres",
|
"postgres",
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ fs2 = "0.4.3"
|
|||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
tracing = "0.1.27"
|
tracing = "0.1.27"
|
||||||
clap = "3.0"
|
clap = { version = "3.1.8", features = ["derive"] }
|
||||||
daemonize = "0.4.1"
|
daemonize = "0.4.1"
|
||||||
tokio = { version = "1.17", features = ["macros", "fs"] }
|
tokio = { version = "1.17", features = ["macros", "fs"] }
|
||||||
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
|
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
|
||||||
|
|||||||
107
safekeeper/src/bin/adversarial_proposer.rs
Normal file
107
safekeeper/src/bin/adversarial_proposer.rs
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
use safekeeper::safekeeper::{AcceptorGreeting, AcceptorProposerMessage, ProposerAcceptorMessage, ProposerGreeting, SafeKeeper};
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use bytes::{BufMut, BytesMut};
|
||||||
|
use clap::{Parser, Subcommand};
|
||||||
|
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||||
|
use std::fs::File;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::time::Instant;
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
io::{BufRead, BufReader, Cursor},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use zenith_utils::{
|
||||||
|
lsn::Lsn,
|
||||||
|
pq_proto::{BeMessage, FeMessage},
|
||||||
|
};
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
struct SafekeeperApi {
|
||||||
|
stream: TcpStream,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SafekeeperApi {
|
||||||
|
async fn connect() -> Result<SafekeeperApi> {
|
||||||
|
// XXX that's not the right port
|
||||||
|
let mut stream = TcpStream::connect("localhost:15000").await?;
|
||||||
|
|
||||||
|
// Connect to safekeeper
|
||||||
|
// TODO connect to all nodes
|
||||||
|
// TODO read host, port, dbname, user from command line
|
||||||
|
let (client, conn) = tokio_postgres::Config::new()
|
||||||
|
.host("127.0.0.1")
|
||||||
|
.port(15000)
|
||||||
|
.dbname("postgres")
|
||||||
|
.user("zenith_admin")
|
||||||
|
.connect_raw(&mut stream, tokio_postgres::NoTls)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(SafekeeperApi { stream })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn propose(&mut self, msg: &ProposerAcceptorMessage) -> Result<()> {
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
BeMessage::write(&mut buf, &BeMessage::CopyData(&msg.serialize()?));
|
||||||
|
self.stream.write(&buf).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_response(&mut self) -> Result<AcceptorProposerMessage> {
|
||||||
|
// See process_msg in safekeeper.rs to find out which msgs have replies
|
||||||
|
let response = match FeMessage::read_fut(&mut self.stream).await? {
|
||||||
|
Some(FeMessage::CopyData(page)) => page,
|
||||||
|
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||||
|
};
|
||||||
|
|
||||||
|
AcceptorProposerMessage::parse(response)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn say_hi(api: &mut SafekeeperApi, tenant: &ZTenantId, timeline: &ZTimelineId) -> Result<()> {
|
||||||
|
// TODO this is just copied from somewhere
|
||||||
|
api.propose(&ProposerAcceptorMessage::Greeting(ProposerGreeting {
|
||||||
|
protocol_version: 1, // current protocol
|
||||||
|
pg_version: 0, // unknown
|
||||||
|
proposer_id: [0u8; 16],
|
||||||
|
system_id: 0,
|
||||||
|
ztli: timeline.clone(),
|
||||||
|
tenant_id: tenant.clone(),
|
||||||
|
tli: 0,
|
||||||
|
wal_seg_size: 16 * 1024, // 16MB, default for tests
|
||||||
|
})).await?;
|
||||||
|
|
||||||
|
match api.get_response().await? {
|
||||||
|
AcceptorProposerMessage::Greeting(g) => println!("response: {:?}", g),
|
||||||
|
_ => panic!("Expected greeting response")
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
struct Args {
|
||||||
|
tenant: ZTenantId,
|
||||||
|
timeline: ZTimelineId,
|
||||||
|
|
||||||
|
#[clap(subcommand)]
|
||||||
|
strategy: AdversarialStrategy,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Subcommand, Debug)]
|
||||||
|
enum AdversarialStrategy {
|
||||||
|
JustSayHi,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()> {
|
||||||
|
let args = Args::parse();
|
||||||
|
let mut api = SafekeeperApi::connect().await?;
|
||||||
|
say_hi(&mut api, &args.tenant, &args.timeline).await?;
|
||||||
|
println!("YAY IT WORKED");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
//! Acceptor part of proposer-acceptor consensus algorithm.
|
//! Acceptor part of proposer-acceptor consensus algorithm.
|
||||||
|
|
||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
|
use byteorder::WriteBytesExt;
|
||||||
use byteorder::{LittleEndian, ReadBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt};
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
|
|
||||||
@@ -9,6 +10,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
use std::cmp::max;
|
use std::cmp::max;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::io::Cursor;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use zenith_utils::zid::ZNodeId;
|
use zenith_utils::zid::ZNodeId;
|
||||||
@@ -247,7 +249,7 @@ impl SafeKeeperState {
|
|||||||
// protocol messages
|
// protocol messages
|
||||||
|
|
||||||
/// Initial Proposer -> Acceptor message
|
/// Initial Proposer -> Acceptor message
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
pub struct ProposerGreeting {
|
pub struct ProposerGreeting {
|
||||||
/// proposer-acceptor protocol version
|
/// proposer-acceptor protocol version
|
||||||
pub protocol_version: u32,
|
pub protocol_version: u32,
|
||||||
@@ -364,6 +366,23 @@ pub enum ProposerAcceptorMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ProposerAcceptorMessage {
|
impl ProposerAcceptorMessage {
|
||||||
|
|
||||||
|
pub fn serialize(&self) -> Result<Bytes> {
|
||||||
|
match self {
|
||||||
|
ProposerAcceptorMessage::Greeting(msg) => {
|
||||||
|
let mut buf = Vec::<u8>::new();
|
||||||
|
buf.write_u64::<LittleEndian>('g' as u64);
|
||||||
|
msg.ser_into(&mut buf);
|
||||||
|
Ok(buf.into())
|
||||||
|
}
|
||||||
|
ProposerAcceptorMessage::VoteRequest(_) => todo!(),
|
||||||
|
ProposerAcceptorMessage::Elected(_) => todo!(),
|
||||||
|
ProposerAcceptorMessage::AppendRequest(_) => todo!(),
|
||||||
|
ProposerAcceptorMessage::NoFlushAppendRequest(_) => todo!(),
|
||||||
|
ProposerAcceptorMessage::FlushWAL => todo!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse proposer message.
|
/// Parse proposer message.
|
||||||
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
|
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
|
||||||
// xxx using Reader is inefficient but easy to work with bincode
|
// xxx using Reader is inefficient but easy to work with bincode
|
||||||
@@ -464,6 +483,10 @@ impl AcceptorProposerMessage {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn parse(msg_bytes: Bytes) -> Result<AcceptorProposerMessage> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
|
|||||||
39
test_runner/batch_fuzz/test_fuzz_safekeeper.py
Normal file
39
test_runner/batch_fuzz/test_fuzz_safekeeper.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
from contextlib import closing
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from fixtures.zenith_fixtures import ZenithEnv, PgBin, ZenithEnvBuilder, DEFAULT_BRANCH_NAME, AdversarialProposerBin
|
||||||
|
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# TODO PR execution plan:
|
||||||
|
# 1. Write an adversarial proposer that successfully sends greeting
|
||||||
|
# 1. Make pg connection
|
||||||
|
# 2. Send a FeMessage::CopyData that contains ('g' as char as u8 as u64) in LittleEndian
|
||||||
|
# 2. Send Elected, and see what happens
|
||||||
|
# 3. Add TODOs, merge the harness into main, improve later
|
||||||
|
|
||||||
|
|
||||||
|
def test_fuzz_safekeeper(zenith_env_builder: ZenithEnvBuilder,
|
||||||
|
adversarial_proposer_bin: AdversarialProposerBin):
|
||||||
|
zenith_env_builder.num_safekeepers = 3
|
||||||
|
env = zenith_env_builder.init_start()
|
||||||
|
|
||||||
|
env.zenith_cli.create_branch('test_fuzz_safekeeper')
|
||||||
|
pg = env.postgres.create_start('test_fuzz_safekeeper')
|
||||||
|
timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
|
||||||
|
|
||||||
|
output = adversarial_proposer_bin.say_hi(env.initial_tenant.hex, timeline)
|
||||||
|
print(output)
|
||||||
|
|
||||||
|
# TODO:
|
||||||
|
# 1. Start an adversarial proposer (new rust binary) that tries to take over
|
||||||
|
# 2. Kill pg node, let the proposer take over
|
||||||
|
# 3. Start multiple adversarial proposers
|
||||||
|
# 4. Run for a while, checking safekeeper invariants
|
||||||
|
|
||||||
|
# TODO:
|
||||||
|
# 1. Explore fuzzing techniques, look into https://docs.rs/stateright/latest/stateright/
|
||||||
|
# 2. Simulate network delay between nodes, both random and deliberate
|
||||||
@@ -1259,6 +1259,25 @@ def pg_bin(test_output_dir: str) -> PgBin:
|
|||||||
return PgBin(test_output_dir)
|
return PgBin(test_output_dir)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AdversarialProposerBin:
|
||||||
|
def say_hi(self, tenant_hex: str, timeline: str) -> str:
|
||||||
|
binpath = os.path.join(str(zenith_binpath), 'adversarial_proposer')
|
||||||
|
args = [
|
||||||
|
binpath,
|
||||||
|
tenant_hex,
|
||||||
|
timeline,
|
||||||
|
"just-say-hi",
|
||||||
|
]
|
||||||
|
return subprocess.run(args)
|
||||||
|
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='function')
|
||||||
|
def adversarial_proposer_bin(test_output_dir):
|
||||||
|
return AdversarialProposerBin()
|
||||||
|
|
||||||
|
|
||||||
class VanillaPostgres(PgProtocol):
|
class VanillaPostgres(PgProtocol):
|
||||||
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int):
|
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int):
|
||||||
super().__init__(host='localhost', port=port, dbname='postgres')
|
super().__init__(host='localhost', port=port, dbname='postgres')
|
||||||
|
|||||||
Reference in New Issue
Block a user