From d67fb28a59dee89f3e5ee45c98a7a3cb040a15af Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 4 May 2022 14:16:43 -0400 Subject: [PATCH] WIP --- pageserver/src/bin/replay.rs | 57 +++++++++++++++++++++- test_runner/fixtures/zenith_fixtures.py | 16 ++++++ test_runner/performance/test_pageserver.py | 16 ++++++ 3 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 test_runner/performance/test_pageserver.py diff --git a/pageserver/src/bin/replay.rs b/pageserver/src/bin/replay.rs index a16e151cf1..523bb04413 100644 --- a/pageserver/src/bin/replay.rs +++ b/pageserver/src/bin/replay.rs @@ -1,20 +1,75 @@ +use std::str::FromStr; + use anyhow::Result; use postgres_ffi::{pg_constants::WAL_SEGMENT_SIZE, waldecoder::WalStreamDecoder}; +use utils::zid::{ZTenantId, ZTimelineId}; +use tokio::net::TcpStream; use utils::lsn::Lsn; +struct PageServiceApi { + stream: TcpStream, +} + +impl PageServiceApi { + async fn connect(tenant: &ZTenantId, timeline: &ZTimelineId, connstr: &str) -> Result { + let mut stream = TcpStream::connect("localhost:15000").await?; + + // Connect to pageserver + // TODO read host, port, dbname, user from command line + let (client, conn) = tokio_postgres::Config::new() + .host("127.0.0.1") + .port(15000) + .dbname("postgres") + .user("zenith_admin") + .connect_raw(&mut stream, tokio_postgres::NoTls) + .await?; + + let init_query = format!("callmemaybe {} {} {}", tenant, timeline, connstr); + tokio::select! { + _ = conn => panic!("connection closed during callmemaybe"), + _ = client.query(init_query.as_str(), &[]) => (), + }; + + Ok(Self { stream }) + } +} + + #[tokio::main] async fn main() -> Result<()> { + use clap::{App, Arg}; + let arg_matches = App::new("Replay") + .arg( + Arg::new("tenant") + .long("tenant") + .takes_value(true) + ) + .arg( + Arg::new("timeline") + .long("timeline") + .takes_value(true) + ) + .get_matches(); + let partial_path = "/home/bojan/tmp/sk_wal"; let startpos = Lsn(23761464); // I got this by grepping sk log for "restart decoder" let xlogoff: usize = startpos.segment_offset(WAL_SEGMENT_SIZE); let mut decoder = WalStreamDecoder::new(startpos); let bytes = std::fs::read(partial_path)?; - decoder.feed_bytes(&bytes[xlogoff..]); + decoder.feed_bytes(&bytes[xlogoff..(xlogoff+10000)]); while let Some((lsn, rec)) = decoder.poll_decode()? { println!("lsn: {}", lsn); } + + // TODO start replication server, get connstr + + let tenant = ZTenantId::from_str(arg_matches.value_of("tenant").unwrap())?; + let timeline = ZTimelineId::from_str(arg_matches.value_of("timeline").unwrap())?; + let connstr = "lol"; + let mut api = PageServiceApi::connect(&tenant, &timeline, connstr).await?; + Ok(()) } diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 9319a53778..beb6e6741d 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1303,6 +1303,22 @@ def pg_bin(test_output_dir: str) -> PgBin: return PgBin(test_output_dir) +@dataclass +class ReplayBin: + """A helper class for running the pageserver benchmarker tool.""" + def run(self, tenant, timeline): + replay_binpath = os.path.join(str(zenith_binpath), 'replay') + args = [replay_binpath, + "--tenant", tenant.hex, + "--timeline", timeline.hex] + return subprocess.run(args) + + +@pytest.fixture(scope='function') +def replay_bin(test_output_dir): + return ReplayBin() + + class VanillaPostgres(PgProtocol): def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int): super().__init__(host='localhost', port=port, dbname='postgres') diff --git a/test_runner/performance/test_pageserver.py b/test_runner/performance/test_pageserver.py new file mode 100644 index 0000000000..7b5d3b6633 --- /dev/null +++ b/test_runner/performance/test_pageserver.py @@ -0,0 +1,16 @@ +from contextlib import closing + +import pytest + +from fixtures.zenith_fixtures import ZenithEnv, PgBin, ZenithEnvBuilder, DEFAULT_BRANCH_NAME, ReplayBin +from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker + + +def test_replay(zenith_env_builder: ZenithEnvBuilder, + zenbenchmark: ZenithBenchmarker, + replay_bin: ReplayBin): + env = zenith_env_builder.init_start() + + tenant = env.zenith_cli.create_tenant() + timeline = env.zenith_cli.create_timeline("test_replay", tenant) + replay_bin.run(tenant, timeline)