Write proposer binary

This commit is contained in:
Bojan Serafimov
2022-04-20 23:00:43 -04:00
parent c8c76a8b99
commit a74ca297e1
6 changed files with 233 additions and 18 deletions

81
Cargo.lock generated
View File

@@ -306,17 +306,41 @@ dependencies = [
[[package]]
name = "clap"
version = "3.0.14"
version = "3.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b63edc3f163b3c71ec8aa23f9bd6070f77edbf3d1d198b164afa90ff00e4ec62"
checksum = "3124f3f75ce09e22d1410043e1e24f2ecc44fad3afe4f08408f1f7663d68da2b"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"indexmap",
"os_str_bytes",
"lazy_static",
"strsim 0.10.0",
"termcolor",
"textwrap 0.14.2",
"textwrap 0.15.0",
]
[[package]]
name = "clap_derive"
version = "3.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1"
dependencies = [
"heck 0.4.0",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669"
dependencies = [
"os_str_bytes",
]
[[package]]
@@ -335,7 +359,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap 3.0.14",
"clap 3.1.10",
"env_logger",
"hyper",
"libc",
@@ -948,6 +972,12 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1499,9 +1529,6 @@ name = "os_str_bytes"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
dependencies = [
"memchr",
]
[[package]]
name = "pageserver"
@@ -1513,7 +1540,7 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clap 3.0.14",
"clap 3.1.10",
"const_format",
"crc32c",
"crossbeam-utils",
@@ -1777,6 +1804,30 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -1823,7 +1874,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
dependencies = [
"bytes",
"heck",
"heck 0.3.3",
"itertools",
"lazy_static",
"log",
@@ -1867,7 +1918,7 @@ dependencies = [
"async-trait",
"base64 0.13.0",
"bytes",
"clap 3.0.14",
"clap 3.1.10",
"fail",
"futures",
"hashbrown",
@@ -2297,7 +2348,7 @@ dependencies = [
"anyhow",
"byteorder",
"bytes",
"clap 3.0.14",
"clap 3.1.10",
"const_format",
"crc32c",
"daemonize",
@@ -2655,9 +2706,9 @@ dependencies = [
[[package]]
name = "textwrap"
version = "0.14.2"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "thiserror"
@@ -3364,7 +3415,7 @@ name = "zenith"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 3.0.14",
"clap 3.1.10",
"control_plane",
"pageserver",
"postgres",

View File

@@ -12,7 +12,7 @@ fs2 = "0.4.3"
lazy_static = "1.4.0"
serde_json = "1"
tracing = "0.1.27"
clap = "3.0"
clap = { version = "3.1.8", features = ["derive"] }
daemonize = "0.4.1"
tokio = { version = "1.17", features = ["macros", "fs"] }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }

View File

@@ -0,0 +1,107 @@
use safekeeper::safekeeper::{AcceptorGreeting, AcceptorProposerMessage, ProposerAcceptorMessage, ProposerGreeting, SafeKeeper};
use tokio::net::TcpStream;
use bytes::{BufMut, BytesMut};
use clap::{Parser, Subcommand};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::time::Instant;
use std::{
collections::HashSet,
io::{BufRead, BufReader, Cursor},
time::Duration,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use zenith_utils::{
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
};
use anyhow::Result;
struct SafekeeperApi {
stream: TcpStream,
}
impl SafekeeperApi {
async fn connect() -> Result<SafekeeperApi> {
// XXX that's not the right port
let mut stream = TcpStream::connect("localhost:15000").await?;
// Connect to safekeeper
// TODO connect to all nodes
// TODO read host, port, dbname, user from command line
let (client, conn) = tokio_postgres::Config::new()
.host("127.0.0.1")
.port(15000)
.dbname("postgres")
.user("zenith_admin")
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
Ok(SafekeeperApi { stream })
}
async fn propose(&mut self, msg: &ProposerAcceptorMessage) -> Result<()> {
let mut buf = BytesMut::new();
BeMessage::write(&mut buf, &BeMessage::CopyData(&msg.serialize()?));
self.stream.write(&buf).await?;
Ok(())
}
async fn get_response(&mut self) -> Result<AcceptorProposerMessage> {
// See process_msg in safekeeper.rs to find out which msgs have replies
let response = match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
AcceptorProposerMessage::parse(response)
}
}
async fn say_hi(api: &mut SafekeeperApi, tenant: &ZTenantId, timeline: &ZTimelineId) -> Result<()> {
// TODO this is just copied from somewhere
api.propose(&ProposerAcceptorMessage::Greeting(ProposerGreeting {
protocol_version: 1, // current protocol
pg_version: 0, // unknown
proposer_id: [0u8; 16],
system_id: 0,
ztli: timeline.clone(),
tenant_id: tenant.clone(),
tli: 0,
wal_seg_size: 16 * 1024, // 16MB, default for tests
})).await?;
match api.get_response().await? {
AcceptorProposerMessage::Greeting(g) => println!("response: {:?}", g),
_ => panic!("Expected greeting response")
}
Ok(())
}
#[derive(Parser, Debug)]
struct Args {
tenant: ZTenantId,
timeline: ZTimelineId,
#[clap(subcommand)]
strategy: AdversarialStrategy,
}
#[derive(Subcommand, Debug)]
enum AdversarialStrategy {
JustSayHi,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let mut api = SafekeeperApi::connect().await?;
say_hi(&mut api, &args.tenant, &args.timeline).await?;
println!("YAY IT WORKED");
Ok(())
}

View File

@@ -1,6 +1,7 @@
//! Acceptor part of proposer-acceptor consensus algorithm.
use anyhow::{bail, Context, Result};
use byteorder::WriteBytesExt;
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -9,6 +10,7 @@ use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Cursor;
use std::io::Read;
use tracing::*;
use zenith_utils::zid::ZNodeId;
@@ -247,7 +249,7 @@ impl SafeKeeperState {
// protocol messages
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct ProposerGreeting {
/// proposer-acceptor protocol version
pub protocol_version: u32,
@@ -364,6 +366,23 @@ pub enum ProposerAcceptorMessage {
}
impl ProposerAcceptorMessage {
pub fn serialize(&self) -> Result<Bytes> {
match self {
ProposerAcceptorMessage::Greeting(msg) => {
let mut buf = Vec::<u8>::new();
buf.write_u64::<LittleEndian>('g' as u64);
msg.ser_into(&mut buf);
Ok(buf.into())
}
ProposerAcceptorMessage::VoteRequest(_) => todo!(),
ProposerAcceptorMessage::Elected(_) => todo!(),
ProposerAcceptorMessage::AppendRequest(_) => todo!(),
ProposerAcceptorMessage::NoFlushAppendRequest(_) => todo!(),
ProposerAcceptorMessage::FlushWAL => todo!(),
}
}
/// Parse proposer message.
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
// xxx using Reader is inefficient but easy to work with bincode
@@ -464,6 +483,10 @@ impl AcceptorProposerMessage {
Ok(())
}
pub fn parse(msg_bytes: Bytes) -> Result<AcceptorProposerMessage> {
todo!()
}
}
lazy_static! {

View File

@@ -1,3 +1,13 @@
from contextlib import closing
import pytest
from fixtures.zenith_fixtures import ZenithEnv, PgBin, ZenithEnvBuilder, DEFAULT_BRANCH_NAME, AdversarialProposerBin
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
# TODO PR execution plan:
# 1. Write an adversarial proposer that successfully sends greeting
# 1. Make pg connection
@@ -6,12 +16,17 @@
# 3. Add TODOs, merge the harness into main, improve later
def test_fuzz_safekeeper(zenith_env_builder: ZenithEnvBuilder):
def test_fuzz_safekeeper(zenith_env_builder: ZenithEnvBuilder,
adversarial_proposer_bin: AdversarialProposerBin):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_fuzz_safekeeper')
pg = env.postgres.create_start('test_fuzz_safekeeper')
timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
output = adversarial_proposer_bin.say_hi(env.initial_tenant.hex, timeline)
print(output)
# TODO:
# 1. Start an adversarial proposer (new rust binary) that tries to take over

View File

@@ -1259,6 +1259,25 @@ def pg_bin(test_output_dir: str) -> PgBin:
return PgBin(test_output_dir)
@dataclass
class AdversarialProposerBin:
def say_hi(self, tenant_hex: str, timeline: str) -> str:
binpath = os.path.join(str(zenith_binpath), 'adversarial_proposer')
args = [
binpath,
tenant_hex,
timeline,
"just-say-hi",
]
return subprocess.run(args)
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
@pytest.fixture(scope='function')
def adversarial_proposer_bin(test_output_dir):
return AdversarialProposerBin()
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int):
super().__init__(host='localhost', port=port, dbname='postgres')