From ecbda947901bb6cd7abdfe1487a68267291d6558 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Tue, 13 Sep 2022 14:09:15 -0400 Subject: [PATCH] Connect via pagestream --- pageserver/src/bin/replay.rs | 69 ++++++++++++++++++-- pageserver/src/page_service.rs | 55 ++++++++-------- test_runner/fixtures/neon_fixtures.py | 6 +- test_runner/performance/test_trace_replay.py | 4 +- 4 files changed, 100 insertions(+), 34 deletions(-) diff --git a/pageserver/src/bin/replay.rs b/pageserver/src/bin/replay.rs index 83e9da018c..9e44362d7a 100644 --- a/pageserver/src/bin/replay.rs +++ b/pageserver/src/bin/replay.rs @@ -1,14 +1,64 @@ +use pageserver::page_service::PagestreamFeMessage; use std::{ fs::{read_dir, File}, io::BufReader, path::PathBuf, str::FromStr, }; +use tokio::net::TcpStream; use clap::{App, Arg}; use utils::zid::{ZTenantId, ZTimelineId}; -async fn replay_trace(reader: R) -> anyhow::Result<()> { +// TODO put this in library, dedup with stuff in control_plane +/// Client for the pageserver's pagestream API +struct PagestreamApi { + stream: TcpStream, +} + +/// Good enough implementation for these tests +impl PagestreamApi { + async fn connect( + connstr: &str, + tenant: &ZTenantId, + timeline: &ZTimelineId, + ) -> anyhow::Result { + let mut stream = TcpStream::connect("localhost:15000").await?; + + // Connect to pageserver + println!("connecting to: {}", connstr); + let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr"); + let (client, conn) = config + .connect_raw(&mut stream, tokio_postgres::NoTls) + .await?; + + // Enter pagestream protocol + let init_query = format!("pagestream {} {}", tenant, timeline); + tokio::select! { + _ = conn => panic!("connection closed during pagestream initialization"), + _ = client.query(init_query.as_str(), &[]) => (), + }; + + println!("connected"); + + Ok(PagestreamApi { stream }) + } + + async fn make_request(&mut self) -> anyhow::Result<()> { + // TODO implement + Ok(()) + } +} + +async fn replay_trace( + reader: &mut R, + mut pagestream: PagestreamApi, +) -> anyhow::Result<()> { + while let Ok(msg) = PagestreamFeMessage::parse(reader) { + println!("Parsed message {:?}", msg); + pagestream.make_request().await?; + } + Ok(()) } @@ -22,27 +72,36 @@ async fn main() -> anyhow::Result<()> { .takes_value(true) .help("Directory where the read traces are stored"), ) + .arg( + Arg::new("pageserver_connstr") + .takes_value(true) + .help("Pageserver pg endpoint to connect to"), + ) .get_matches(); + let connstr = arg_matches.value_of("pageserver_connstr").unwrap(); + let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap()); for tenant_dir in read_dir(traces_dir)? { let entry = tenant_dir?; let path = entry.path(); - let _tenant_id = ZTenantId::from_str(path.file_name().unwrap().to_str().unwrap())?; + let tenant_id = ZTenantId::from_str(path.file_name().unwrap().to_str().unwrap())?; for timeline_dir in read_dir(path)? { let entry = timeline_dir?; let path = entry.path(); - let _timeline_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?; + let timeline_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?; for trace_dir in read_dir(path)? { let entry = trace_dir?; let path = entry.path(); let _conn_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?; + let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?; + let file = File::open(path)?; - let reader = BufReader::new(file); - replay_trace(reader).await?; + let mut reader = BufReader::new(file); + replay_trace(&mut reader, pagestream).await?; } } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 014ac61580..a2b4eb8c08 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,6 +10,7 @@ // use anyhow::{bail, ensure, Context, Result}; +use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use regex::Regex; use std::io::{self, Read}; @@ -47,6 +48,7 @@ use postgres_ffi::BLCKSZ; // Wrapped in libpq CopyData // TODO these should be in a library outside the pageserver +#[derive(Debug)] pub enum PagestreamFeMessage { Exists(PagestreamExistsRequest), Nblocks(PagestreamNblocksRequest), @@ -162,52 +164,52 @@ impl PagestreamFeMessage { bytes.into() } - fn parse(mut body: Bytes) -> anyhow::Result { + pub fn parse(body: &mut R) -> anyhow::Result { // TODO these gets can fail // these correspond to the ZenithMessageTag enum in pagestore_client.h // // TODO: consider using protobuf or serde bincode for less error prone // serialization. - let msg_tag = body.get_u8(); + let msg_tag = body.read_u8()?; match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, }, })), 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, }, - blkno: body.get_u32(), + blkno: body.read_u32::()?, })), 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), - dbnode: body.get_u32(), + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), + dbnode: body.read_u32::()?, })), - _ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body), + _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } } @@ -528,7 +530,8 @@ impl PageServerHandler { t.trace(©_data_bytes) } - let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; + let zenith_fe_msg = + PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; let tenant_id = tenant_id.to_string(); let timeline_id = timeline_id.to_string(); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 041378d741..c0f54f6158 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1642,13 +1642,15 @@ class ReplayBin: traces_dir: str - def replay_all(self) -> str: + def replay_all(self, pageserver_connstr): replay_binpath = os.path.join(str(neon_binpath), "replay") args = [ replay_binpath, self.traces_dir, + pageserver_connstr, ] - return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip() + # return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip() + subprocess.run(args) @pytest.fixture(scope="function") diff --git a/test_runner/performance/test_trace_replay.py b/test_runner/performance/test_trace_replay.py index 1209422af9..604a008335 100644 --- a/test_runner/performance/test_trace_replay.py +++ b/test_runner/performance/test_trace_replay.py @@ -18,5 +18,7 @@ def test_trace_replay(neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin): trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline) assert trace_path.exists() - output = replay_bin.replay_all() + print("replaying") + ps_connstr = env.pageserver.connstr().replace("'", "\\'") + output = replay_bin.replay_all(ps_connstr) print(output)