Connect via pagestream

This commit is contained in:
Bojan Serafimov
2022-09-13 14:09:15 -04:00
parent 0d6d8fefd3
commit ecbda94790
4 changed files with 100 additions and 34 deletions

View File

@@ -1,14 +1,64 @@
use pageserver::page_service::PagestreamFeMessage;
use std::{
fs::{read_dir, File},
io::BufReader,
path::PathBuf,
str::FromStr,
};
use tokio::net::TcpStream;
use clap::{App, Arg};
use utils::zid::{ZTenantId, ZTimelineId};
async fn replay_trace<R: std::io::Read>(reader: R) -> anyhow::Result<()> {
// TODO put this in library, dedup with stuff in control_plane
/// Client for the pageserver's pagestream API
struct PagestreamApi {
stream: TcpStream,
}
/// Good enough implementation for these tests
impl PagestreamApi {
async fn connect(
connstr: &str,
tenant: &ZTenantId,
timeline: &ZTimelineId,
) -> anyhow::Result<PagestreamApi> {
let mut stream = TcpStream::connect("localhost:15000").await?;
// Connect to pageserver
println!("connecting to: {}", connstr);
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
let (client, conn) = config
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant, timeline);
tokio::select! {
_ = conn => panic!("connection closed during pagestream initialization"),
_ = client.query(init_query.as_str(), &[]) => (),
};
println!("connected");
Ok(PagestreamApi { stream })
}
async fn make_request(&mut self) -> anyhow::Result<()> {
// TODO implement
Ok(())
}
}
async fn replay_trace<R: std::io::Read>(
reader: &mut R,
mut pagestream: PagestreamApi,
) -> anyhow::Result<()> {
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
println!("Parsed message {:?}", msg);
pagestream.make_request().await?;
}
Ok(())
}
@@ -22,27 +72,36 @@ async fn main() -> anyhow::Result<()> {
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.arg(
Arg::new("pageserver_connstr")
.takes_value(true)
.help("Pageserver pg endpoint to connect to"),
)
.get_matches();
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
let _tenant_id = ZTenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
let tenant_id = ZTenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
let _timeline_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
let timeline_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
let _conn_id = ZTimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
let file = File::open(path)?;
let reader = BufReader::new(file);
replay_trace(reader).await?;
let mut reader = BufReader::new(file);
replay_trace(&mut reader, pagestream).await?;
}
}
}

View File

@@ -10,6 +10,7 @@
//
use anyhow::{bail, ensure, Context, Result};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use regex::Regex;
use std::io::{self, Read};
@@ -47,6 +48,7 @@ use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
// TODO these should be in a library outside the pageserver
#[derive(Debug)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
@@ -162,52 +164,52 @@ impl PagestreamFeMessage {
bytes.into()
}
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the ZenithMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.get_u8();
let msg_tag = body.read_u8()?;
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.get_u32(),
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
}
@@ -528,7 +530,8 @@ impl PageServerHandler {
t.trace(&copy_data_bytes)
}
let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let zenith_fe_msg =
PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();

View File

@@ -1642,13 +1642,15 @@ class ReplayBin:
traces_dir: str
def replay_all(self) -> str:
def replay_all(self, pageserver_connstr):
replay_binpath = os.path.join(str(neon_binpath), "replay")
args = [
replay_binpath,
self.traces_dir,
pageserver_connstr,
]
return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
subprocess.run(args)
@pytest.fixture(scope="function")

View File

@@ -18,5 +18,7 @@ def test_trace_replay(neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin):
trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
assert trace_path.exists()
output = replay_bin.replay_all()
print("replaying")
ps_connstr = env.pageserver.connstr().replace("'", "\\'")
output = replay_bin.replay_all(ps_connstr)
print(output)