diff --git a/Cargo.lock b/Cargo.lock index a933b44356..c6f6320068 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -306,17 +306,41 @@ dependencies = [ [[package]] name = "clap" -version = "3.0.14" +version = "3.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b63edc3f163b3c71ec8aa23f9bd6070f77edbf3d1d198b164afa90ff00e4ec62" +checksum = "3124f3f75ce09e22d1410043e1e24f2ecc44fad3afe4f08408f1f7663d68da2b" dependencies = [ "atty", "bitflags", + "clap_derive", + "clap_lex", "indexmap", - "os_str_bytes", + "lazy_static", "strsim 0.10.0", "termcolor", - "textwrap 0.14.2", + "textwrap 0.15.0", +] + +[[package]] +name = "clap_derive" +version = "3.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1" +dependencies = [ + "heck 0.4.0", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669" +dependencies = [ + "os_str_bytes", ] [[package]] @@ -335,7 +359,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", - "clap 3.0.14", + "clap 3.1.10", "env_logger", "hyper", "libc", @@ -948,6 +972,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1499,9 +1529,6 @@ name = "os_str_bytes" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" -dependencies = [ - "memchr", -] [[package]] name = "pageserver" @@ -1513,7 +1540,7 @@ dependencies = [ "byteorder", "bytes", "chrono", - "clap 3.0.14", + "clap 3.1.10", "const_format", "crc32c", "crossbeam-utils", @@ -1777,6 +1804,30 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -1823,7 +1874,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" dependencies = [ "bytes", - "heck", + "heck 0.3.3", "itertools", "lazy_static", "log", @@ -1867,7 +1918,7 @@ dependencies = [ "async-trait", "base64 0.13.0", "bytes", - "clap 3.0.14", + "clap 3.1.10", "fail", "futures", "hashbrown", @@ -2297,7 +2348,7 @@ dependencies = [ "anyhow", "byteorder", "bytes", - "clap 3.0.14", + "clap 3.1.10", "const_format", "crc32c", "daemonize", @@ -2655,9 +2706,9 @@ dependencies = [ [[package]] name = "textwrap" -version = "0.14.2" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80" +checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" @@ -3364,7 +3415,7 @@ name = "zenith" version = "0.1.0" dependencies = [ "anyhow", - "clap 3.0.14", + "clap 3.1.10", "control_plane", "pageserver", "postgres", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index ca5e2a6b55..3f3d51d30f 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -12,7 +12,7 @@ fs2 = "0.4.3" lazy_static = "1.4.0" serde_json = "1" tracing = "0.1.27" -clap = "3.0" +clap = { version = "3.1.8", features = ["derive"] } daemonize = "0.4.1" tokio = { version = "1.17", features = ["macros", "fs"] } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } diff --git a/safekeeper/src/bin/adversarial_proposer.rs b/safekeeper/src/bin/adversarial_proposer.rs new file mode 100644 index 0000000000..14276eb252 --- /dev/null +++ b/safekeeper/src/bin/adversarial_proposer.rs @@ -0,0 +1,107 @@ +use safekeeper::safekeeper::{AcceptorGreeting, AcceptorProposerMessage, ProposerAcceptorMessage, ProposerGreeting, SafeKeeper}; +use tokio::net::TcpStream; +use bytes::{BufMut, BytesMut}; +use clap::{Parser, Subcommand}; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::time::Instant; +use std::{ + collections::HashSet, + io::{BufRead, BufReader, Cursor}, + time::Duration, +}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use zenith_utils::{ + lsn::Lsn, + pq_proto::{BeMessage, FeMessage}, +}; +use anyhow::Result; + + + +struct SafekeeperApi { + stream: TcpStream, +} + +impl SafekeeperApi { + async fn connect() -> Result { + // XXX that's not the right port + let mut stream = TcpStream::connect("localhost:15000").await?; + + // Connect to safekeeper + // TODO connect to all nodes + // TODO read host, port, dbname, user from command line + let (client, conn) = tokio_postgres::Config::new() + .host("127.0.0.1") + .port(15000) + .dbname("postgres") + .user("zenith_admin") + .connect_raw(&mut stream, tokio_postgres::NoTls) + .await?; + + Ok(SafekeeperApi { stream }) + } + + async fn propose(&mut self, msg: &ProposerAcceptorMessage) -> Result<()> { + let mut buf = BytesMut::new(); + BeMessage::write(&mut buf, &BeMessage::CopyData(&msg.serialize()?)); + self.stream.write(&buf).await?; + + Ok(()) + } + + async fn get_response(&mut self) -> Result { + // See process_msg in safekeeper.rs to find out which msgs have replies + let response = match FeMessage::read_fut(&mut self.stream).await? { + Some(FeMessage::CopyData(page)) => page, + r => panic!("Expected CopyData message, got: {:?}", r), + }; + + AcceptorProposerMessage::parse(response) + } +} + +async fn say_hi(api: &mut SafekeeperApi, tenant: &ZTenantId, timeline: &ZTimelineId) -> Result<()> { + // TODO this is just copied from somewhere + api.propose(&ProposerAcceptorMessage::Greeting(ProposerGreeting { + protocol_version: 1, // current protocol + pg_version: 0, // unknown + proposer_id: [0u8; 16], + system_id: 0, + ztli: timeline.clone(), + tenant_id: tenant.clone(), + tli: 0, + wal_seg_size: 16 * 1024, // 16MB, default for tests + })).await?; + + match api.get_response().await? { + AcceptorProposerMessage::Greeting(g) => println!("response: {:?}", g), + _ => panic!("Expected greeting response") + } + + Ok(()) +} + +#[derive(Parser, Debug)] +struct Args { + tenant: ZTenantId, + timeline: ZTimelineId, + + #[clap(subcommand)] + strategy: AdversarialStrategy, +} + +#[derive(Subcommand, Debug)] +enum AdversarialStrategy { + JustSayHi, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let mut api = SafekeeperApi::connect().await?; + say_hi(&mut api, &args.tenant, &args.timeline).await?; + println!("YAY IT WORKED"); + Ok(()) +} diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index cf56261ee6..e6bffb1c00 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -1,6 +1,7 @@ //! Acceptor part of proposer-acceptor consensus algorithm. use anyhow::{bail, Context, Result}; +use byteorder::WriteBytesExt; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -9,6 +10,7 @@ use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; use std::fmt; +use std::io::Cursor; use std::io::Read; use tracing::*; use zenith_utils::zid::ZNodeId; @@ -247,7 +249,7 @@ impl SafeKeeperState { // protocol messages /// Initial Proposer -> Acceptor message -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct ProposerGreeting { /// proposer-acceptor protocol version pub protocol_version: u32, @@ -364,6 +366,23 @@ pub enum ProposerAcceptorMessage { } impl ProposerAcceptorMessage { + + pub fn serialize(&self) -> Result { + match self { + ProposerAcceptorMessage::Greeting(msg) => { + let mut buf = Vec::::new(); + buf.write_u64::('g' as u64); + msg.ser_into(&mut buf); + Ok(buf.into()) + } + ProposerAcceptorMessage::VoteRequest(_) => todo!(), + ProposerAcceptorMessage::Elected(_) => todo!(), + ProposerAcceptorMessage::AppendRequest(_) => todo!(), + ProposerAcceptorMessage::NoFlushAppendRequest(_) => todo!(), + ProposerAcceptorMessage::FlushWAL => todo!(), + } + } + /// Parse proposer message. pub fn parse(msg_bytes: Bytes) -> Result { // xxx using Reader is inefficient but easy to work with bincode @@ -464,6 +483,10 @@ impl AcceptorProposerMessage { Ok(()) } + + pub fn parse(msg_bytes: Bytes) -> Result { + todo!() + } } lazy_static! { diff --git a/test_runner/batch_fuzz/test_fuzz_safekeeper.py b/test_runner/batch_fuzz/test_fuzz_safekeeper.py index 6758f10032..6ff75573bc 100644 --- a/test_runner/batch_fuzz/test_fuzz_safekeeper.py +++ b/test_runner/batch_fuzz/test_fuzz_safekeeper.py @@ -1,3 +1,13 @@ +from contextlib import closing + +import pytest + +from fixtures.zenith_fixtures import ZenithEnv, PgBin, ZenithEnvBuilder, DEFAULT_BRANCH_NAME, AdversarialProposerBin +from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker + + + + # TODO PR execution plan: # 1. Write an adversarial proposer that successfully sends greeting # 1. Make pg connection @@ -6,12 +16,17 @@ # 3. Add TODOs, merge the harness into main, improve later -def test_fuzz_safekeeper(zenith_env_builder: ZenithEnvBuilder): +def test_fuzz_safekeeper(zenith_env_builder: ZenithEnvBuilder, + adversarial_proposer_bin: AdversarialProposerBin): zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() env.zenith_cli.create_branch('test_fuzz_safekeeper') pg = env.postgres.create_start('test_fuzz_safekeeper') + timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0] + + output = adversarial_proposer_bin.say_hi(env.initial_tenant.hex, timeline) + print(output) # TODO: # 1. Start an adversarial proposer (new rust binary) that tries to take over diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 8dfe219966..6366767802 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1259,6 +1259,25 @@ def pg_bin(test_output_dir: str) -> PgBin: return PgBin(test_output_dir) +@dataclass +class AdversarialProposerBin: + def say_hi(self, tenant_hex: str, timeline: str) -> str: + binpath = os.path.join(str(zenith_binpath), 'adversarial_proposer') + args = [ + binpath, + tenant_hex, + timeline, + "just-say-hi", + ] + return subprocess.run(args) + # return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip() + + +@pytest.fixture(scope='function') +def adversarial_proposer_bin(test_output_dir): + return AdversarialProposerBin() + + class VanillaPostgres(PgProtocol): def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int): super().__init__(host='localhost', port=port, dbname='postgres')