This commit is contained in:
Bojan Serafimov
2022-04-14 13:09:44 -04:00
parent da66df21f3
commit b92e1763ec

View File

@@ -24,62 +24,95 @@ use anyhow::Result;
const BYTES_IN_PAGE: usize = 8 * 1024;
pub async fn get_page(
pagestream: &mut tokio::net::TcpStream,
lsn: &Lsn,
page: &Page,
latest: bool,
) -> anyhow::Result<Vec<u8>> {
let latest: u8 = if latest { 1 } else { 0 };
let msg = {
let query = {
let mut query = BytesMut::new();
query.put_u8(2); // Specifies get_page query
query.put_u8(latest);
query.put_u64(lsn.0);
page.write(&mut query).await?;
query.freeze()
};
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&query);
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
pagestream.write(&msg).await?;
let response = match FeMessage::read_fut(pagestream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
let page = {
let mut cursor = Cursor::new(response);
let tag = AsyncReadExt::read_u8(&mut cursor).await?;
match tag {
102 => {
let mut page = Vec::<u8>::new();
cursor.read_to_end(&mut page).await?;
if page.len() != BYTES_IN_PAGE {
panic!("Expected 8kb page, got: {:?}", page.len());
}
page
}
103 => {
let mut bytes = Vec::<u8>::new();
cursor.read_to_end(&mut bytes).await?;
let message = String::from_utf8(bytes)?;
panic!("Got error message: {}", message);
}
_ => panic!("Unhandled tag {:?}", tag),
}
};
Ok(page)
/// Client for the pageserver's pagestream API
struct PagestreamApi {
stream: TcpStream
}
/// Good enough implementation for these tests
impl PagestreamApi {
async fn connect(tenant_hex: &str, timeline: &str) -> Result<PagestreamApi> {
let mut stream = TcpStream::connect("localhost:15000").await?;
// Connect to pageserver
// TODO read host, port, dbname, user from command line
let (client, conn) = tokio_postgres::Config::new()
.host("127.0.0.1")
.port(15000)
.dbname("postgres")
.user("zenith_admin")
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant_hex, timeline);
tokio::select! {
_ = conn => panic!("AAAA"),
_ = client.query(init_query.as_str(), &[]) => (),
};
Ok(PagestreamApi{stream})
}
async fn get_page(
&mut self,
lsn: &Lsn,
page: &Page,
latest: bool,
) -> anyhow::Result<Vec<u8>> {
let latest: u8 = if latest { 1 } else { 0 };
let msg = {
let query = {
let mut query = BytesMut::new();
query.put_u8(2); // Specifies get_page query
query.put_u8(latest);
query.put_u64(lsn.0);
page.write(&mut query).await?;
query.freeze()
};
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&query);
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
self.stream.write(&msg).await?;
let response = match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
let page = {
let mut cursor = Cursor::new(response);
let tag = AsyncReadExt::read_u8(&mut cursor).await?;
match tag {
102 => {
let mut page = Vec::<u8>::new();
cursor.read_to_end(&mut page).await?;
if page.len() != BYTES_IN_PAGE {
panic!("Expected 8kb page, got: {:?}", page.len());
}
page
}
103 => {
let mut bytes = Vec::<u8>::new();
cursor.read_to_end(&mut bytes).await?;
let message = String::from_utf8(bytes)?;
panic!("Got error message: {}", message);
}
_ => panic!("Unhandled tag {:?}", tag),
}
};
Ok(page)
}
}
/// Parsed wal_metadata file with additional derived
/// statistics for convenience.
struct Metadata {
// Parsed from metadata file
wal_metadata: Vec<WalEntryMetadata>,
@@ -91,6 +124,7 @@ struct Metadata {
}
impl Metadata {
/// Construct metadata object from wal_metadata file emitted by pageserver
fn build(wal_metadata_path: &PathBuf) -> Result<Metadata> {
let wal_metadata_file = File::open(wal_metadata_path)
.expect("error opening wal_metadata");
@@ -117,42 +151,43 @@ impl Metadata {
})
}
fn report_results(&self, durations: &Vec<Duration>) -> Result<()> {
// Format is optimized for easy parsing from benchmark_fixture.py
/// Print results in a format readable by benchmark_fixture.py
fn report_latency(&self, latencies: &Vec<Duration>) -> Result<()> {
println!("test_param num_pages {}", self.affected_pages.len());
println!("test_param num_wal_entries {}", self.wal_metadata.len());
println!("test_param total_wal_size {} bytes", self.total_wal_size);
println!(
"lower_is_better fastest {:?} microseconds",
durations.first().unwrap().as_micros()
latencies.first().unwrap().as_micros()
);
println!(
"lower_is_better median {:?} microseconds",
durations[durations.len() / 2].as_micros()
latencies[latencies.len() / 2].as_micros()
);
println!(
"lower_is_better p99 {:?} microseconds",
durations[durations.len() - 1 - durations.len() / 100].as_micros()
latencies[latencies.len() - 1 - latencies.len() / 100].as_micros()
);
println!(
"lower_is_better slowest {:?} microseconds",
durations.last().unwrap().as_micros()
latencies.last().unwrap().as_micros()
);
Ok(())
}
}
async fn test_latest_pages(pagestream: &mut TcpStream, metadata: &Metadata) -> Result<Vec<Duration>>{
let mut durations: Vec<Duration> = vec![];
/// Sequentially get the latest version of each page and report latencies
async fn test_latest_pages(api: &mut PagestreamApi, metadata: &Metadata) -> Result<Vec<Duration>>{
let mut latencies: Vec<Duration> = vec![];
for page in &metadata.affected_pages {
let start = Instant::now();
let _page_bytes = get_page(pagestream, &metadata.latest_lsn, &page, true).await?;
let _page_bytes = api.get_page(&metadata.latest_lsn, &page, true).await?;
let duration = start.elapsed();
durations.push(duration);
latencies.push(duration);
}
durations.sort();
Ok(durations)
latencies.sort();
Ok(latencies)
}
#[derive(Parser, Debug)]
@@ -174,31 +209,16 @@ enum PsbenchTest {
async fn main() -> Result<()> {
let args = Args::parse();
// Parse wal metadata from file
// Initialize setup
let metadata = Metadata::build(&args.wal_metadata_path)?;
// Get raw TCP connection to the pageserver postgres protocol port
let mut pagestream = TcpStream::connect("localhost:15000").await?;
let (client, conn) = tokio_postgres::Config::new()
.host("127.0.0.1")
.port(15000)
.dbname("postgres")
.user("zenith_admin")
.connect_raw(&mut pagestream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", args.tenant_hex, args.timeline);
tokio::select! {
_ = conn => panic!("AAAA"),
_ = client.query(init_query.as_str(), &[]) => (),
};
let mut pagestream = PagestreamApi::connect(&args.tenant_hex, &args.timeline).await?;
// Run test
let durations = match args.test {
let latencies = match args.test {
PsbenchTest::GetLatestPages => test_latest_pages(&mut pagestream, &metadata)
}.await?;
metadata.report_results(&durations)?;
// Report results
metadata.report_latency(&latencies)?;
Ok(())
}