mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-11 12:20:38 +00:00
Compare commits
10 Commits
conrad/pro
...
bojan-psbe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
098d7046f8 | ||
|
|
02aa7c023a | ||
|
|
180631da1f | ||
|
|
811d46f070 | ||
|
|
728f299641 | ||
|
|
fb49418e7f | ||
|
|
887dc8f112 | ||
|
|
aa7b32d892 | ||
|
|
d7ed9d8e01 | ||
|
|
96c2b3a80a |
221
pageserver/src/bin/psbench.rs
Normal file
221
pageserver/src/bin/psbench.rs
Normal file
@@ -0,0 +1,221 @@
|
|||||||
|
//! Pageserver benchmark tool
|
||||||
|
//!
|
||||||
|
//! Usually it's easier to write python perf tests, but here the performance
|
||||||
|
//! of the tester matters, and the API is easier to work with from rust.
|
||||||
|
use std::{collections::HashMap, io::{BufRead, BufReader, Cursor}, net::SocketAddr, ops::AddAssign};
|
||||||
|
use byteorder::ReadBytesExt;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
|
use clap::{App, Arg};
|
||||||
|
use std::fs::File;
|
||||||
|
use zenith_utils::{GIT_VERSION, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
pub fn read_lines_buffered(file_name: &str) -> impl Iterator<Item = String> {
|
||||||
|
BufReader::new(File::open(file_name).unwrap())
|
||||||
|
.lines()
|
||||||
|
.map(|result| result.unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_page(
|
||||||
|
pagestream: &mut tokio::net::TcpStream,
|
||||||
|
lsn: &Lsn,
|
||||||
|
page: &Page,
|
||||||
|
) -> anyhow::Result<Vec<u8>> {
|
||||||
|
let msg = {
|
||||||
|
let query = {
|
||||||
|
let mut query = BytesMut::new();
|
||||||
|
query.put_u8(2); // Specifies get_page query
|
||||||
|
query.put_u8(0); // Specifies this is not a "latest page" query
|
||||||
|
query.put_u64(lsn.0);
|
||||||
|
page.write(&mut query).await?;
|
||||||
|
query.freeze()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
let copy_msg = BeMessage::CopyData(&query);
|
||||||
|
BeMessage::write(&mut buf, ©_msg)?;
|
||||||
|
buf.freeze()
|
||||||
|
};
|
||||||
|
|
||||||
|
pagestream.write(&msg).await?;
|
||||||
|
|
||||||
|
let response = match FeMessage::read_fut(pagestream).await? {
|
||||||
|
Some(FeMessage::CopyData(page)) => page,
|
||||||
|
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||||
|
};
|
||||||
|
|
||||||
|
let page = {
|
||||||
|
let mut cursor = Cursor::new(response);
|
||||||
|
let tag = AsyncReadExt::read_u8(&mut cursor).await?;
|
||||||
|
|
||||||
|
match tag {
|
||||||
|
102 => {
|
||||||
|
let mut page = Vec::<u8>::new();
|
||||||
|
cursor.read_to_end(&mut page).await?;
|
||||||
|
dbg!(page.len());
|
||||||
|
if page.len() != 8 * 1024 {
|
||||||
|
panic!("Expected 8kb page, got: {:?}", page.len());
|
||||||
|
}
|
||||||
|
page
|
||||||
|
},
|
||||||
|
103 => {
|
||||||
|
let mut bytes = Vec::<u8>::new();
|
||||||
|
cursor.read_to_end(&mut bytes).await?;
|
||||||
|
let message = String::from_utf8(bytes)?;
|
||||||
|
panic!("Got error message: {}", message);
|
||||||
|
},
|
||||||
|
_ => panic!("Unhandled tag {:?}", tag)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(page)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug)]
|
||||||
|
pub struct Lsn(pub u64);
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug)]
|
||||||
|
pub struct Page {
|
||||||
|
spcnode: u32,
|
||||||
|
dbnode: u32,
|
||||||
|
relnode: u32,
|
||||||
|
forknum: u8,
|
||||||
|
blkno: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Page {
|
||||||
|
async fn read<Reader>(buf: &mut Reader) -> Result<Page>
|
||||||
|
where
|
||||||
|
Reader: tokio::io::AsyncRead + Unpin,
|
||||||
|
{
|
||||||
|
let spcnode = buf.read_u32().await?;
|
||||||
|
let dbnode = buf.read_u32().await?;
|
||||||
|
let relnode = buf.read_u32().await?;
|
||||||
|
let forknum = buf.read_u8().await?;
|
||||||
|
let blkno = buf.read_u32().await?;
|
||||||
|
Ok(Page { spcnode, dbnode, relnode, forknum, blkno })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write(&self, buf: &mut BytesMut) -> Result<()> {
|
||||||
|
buf.put_u32(self.spcnode);
|
||||||
|
buf.put_u32(self.dbnode);
|
||||||
|
buf.put_u32(self.relnode);
|
||||||
|
buf.put_u8(self.forknum);
|
||||||
|
buf.put_u32(self.blkno);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()> {
|
||||||
|
let arg_matches = App::new("LALALA")
|
||||||
|
.about("lalala")
|
||||||
|
.version(GIT_VERSION)
|
||||||
|
.arg(
|
||||||
|
Arg::new("path")
|
||||||
|
.help("Path to file to dump")
|
||||||
|
.required(true)
|
||||||
|
.index(1),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::new("ps_connstr")
|
||||||
|
.help("Connection string to pageserver")
|
||||||
|
.required(true)
|
||||||
|
.index(2),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::new("tenant_hex")
|
||||||
|
.help("TODO")
|
||||||
|
.required(true)
|
||||||
|
.index(3),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::new("timeline")
|
||||||
|
.help("TODO")
|
||||||
|
.required(true)
|
||||||
|
.index(4),
|
||||||
|
)
|
||||||
|
.get_matches();
|
||||||
|
|
||||||
|
let log_file = arg_matches.value_of("path").unwrap();
|
||||||
|
let ps_connstr = arg_matches.value_of("ps_connstr").unwrap();
|
||||||
|
let tenant_hex = arg_matches.value_of("tenant_hex").unwrap();
|
||||||
|
let timeline = arg_matches.value_of("timeline").unwrap();
|
||||||
|
|
||||||
|
// Parse log lines
|
||||||
|
let relevant = read_lines_buffered(log_file) .filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string()));
|
||||||
|
let mut lsn_page_pairs = Vec::<(Lsn, Page)>::new();
|
||||||
|
for line in relevant {
|
||||||
|
let (lsn, page) = line.split_once(" ").unwrap();
|
||||||
|
|
||||||
|
let lsn = hex::decode(lsn)?;
|
||||||
|
let lsn = Lsn(AsyncReadExt::read_u64(&mut Cursor::new(lsn)).await?);
|
||||||
|
|
||||||
|
let page = hex::decode(page)?;
|
||||||
|
let page = Page::read(&mut Cursor::new(page)).await?;
|
||||||
|
|
||||||
|
lsn_page_pairs.push((lsn, page))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Organize write info
|
||||||
|
let mut writes_per_entry = HashMap::<Lsn, Vec<Page>>::new();
|
||||||
|
for (lsn, page) in lsn_page_pairs.clone() {
|
||||||
|
writes_per_entry.entry(lsn).or_insert(vec![]).push(page);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print some stats
|
||||||
|
let mut updates_per_page = HashMap::<Page, usize>::new();
|
||||||
|
for (_, page) in lsn_page_pairs.clone() {
|
||||||
|
updates_per_page.entry(page).or_insert(0).add_assign(1);
|
||||||
|
}
|
||||||
|
let mut updates_per_page: Vec<(&usize, &Page)> = updates_per_page
|
||||||
|
.iter().map(|(k, v)| (v, k)).collect();
|
||||||
|
updates_per_page.sort();
|
||||||
|
updates_per_page.reverse();
|
||||||
|
dbg!(&updates_per_page);
|
||||||
|
|
||||||
|
let hottest_page = updates_per_page[0].1;
|
||||||
|
let first_update = lsn_page_pairs
|
||||||
|
.iter()
|
||||||
|
.filter(|(_lsn, page)| page == hottest_page)
|
||||||
|
.map(|(lsn, _page)| lsn)
|
||||||
|
.min()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Get raw TCP connection to the pageserver postgres protocol port
|
||||||
|
let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?;
|
||||||
|
let (client, conn) = tokio_postgres::Config::new()
|
||||||
|
.host("127.0.0.1")
|
||||||
|
.port(15000)
|
||||||
|
.dbname("postgres")
|
||||||
|
.user("zenith_admin")
|
||||||
|
.connect_raw(&mut socket, tokio_postgres::NoTls)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Enter pagestream protocol
|
||||||
|
let init_query = format!("pagestream {} {}", tenant_hex, timeline);
|
||||||
|
tokio::select! {
|
||||||
|
_ = conn => panic!("AAAA"),
|
||||||
|
_ = client.query(init_query.as_str(), &[]) => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO merge with LSM branch. Nothing to test otherwise, too many images.
|
||||||
|
// - I get error: tried to request a page version that was garbage collected
|
||||||
|
// TODO be mindful of caching, take multiple measurements, use monotonic time.
|
||||||
|
// TODO make harder test case. More writes, fewer images.
|
||||||
|
// TODO concurrent requests: multiple reads, also writes.
|
||||||
|
use std::time::Instant;
|
||||||
|
for (lsn, _pages) in writes_per_entry {
|
||||||
|
if lsn >= *first_update {
|
||||||
|
println!("Running get_page {:?} at {:?}", hottest_page, lsn);
|
||||||
|
let start = Instant::now();
|
||||||
|
let _page = get_page(&mut socket, &lsn, &hottest_page).await?;
|
||||||
|
let duration = start.elapsed();
|
||||||
|
println!("Time: {:?}", duration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -21,6 +21,7 @@
|
|||||||
//! redo Postgres process, but some records it can handle directly with
|
//! redo Postgres process, but some records it can handle directly with
|
||||||
//! bespoken Rust code.
|
//! bespoken Rust code.
|
||||||
|
|
||||||
|
use chrono::format::format;
|
||||||
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
|
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
|
||||||
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
|
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
@@ -270,6 +271,25 @@ impl WalIngest {
|
|||||||
// Iterate through all the blocks that the record modifies, and
|
// Iterate through all the blocks that the record modifies, and
|
||||||
// "put" a separate copy of the record for each block.
|
// "put" a separate copy of the record for each block.
|
||||||
for blk in decoded.blocks.iter() {
|
for blk in decoded.blocks.iter() {
|
||||||
|
|
||||||
|
let lsn_hex = {
|
||||||
|
use bytes::BufMut;
|
||||||
|
let mut bytes = BytesMut::new();
|
||||||
|
bytes.put_u64(lsn.0);
|
||||||
|
hex::encode(bytes.freeze())
|
||||||
|
};
|
||||||
|
let page_hex = {
|
||||||
|
use bytes::BufMut;
|
||||||
|
let mut page = BytesMut::new();
|
||||||
|
page.put_u32(blk.rnode_spcnode);
|
||||||
|
page.put_u32(blk.rnode_dbnode);
|
||||||
|
page.put_u32(blk.rnode_relnode);
|
||||||
|
page.put_u8(blk.forknum);
|
||||||
|
page.put_u32(blk.blkno);
|
||||||
|
hex::encode(page.freeze())
|
||||||
|
};
|
||||||
|
println!("wal-at-lsn-modified-page {} {}", lsn_hex, page_hex);
|
||||||
|
|
||||||
self.ingest_decoded_block(timeline, lsn, &decoded, blk)?;
|
self.ingest_decoded_block(timeline, lsn, &decoded, blk)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -613,6 +613,15 @@ class ZenithEnv:
|
|||||||
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
|
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
|
||||||
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
|
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
|
||||||
|
|
||||||
|
def run_psbench(self, timeline):
|
||||||
|
ps_log_filename = os.path.join(self.repo_dir, "pageserver.log")
|
||||||
|
ps_connstr = self.pageserver.connstr()
|
||||||
|
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
|
||||||
|
tenant_hex = self.initial_tenant.hex
|
||||||
|
print("AAAAAAAA", ps_connstr)
|
||||||
|
args = [psbench_binpath, ps_log_filename, ps_connstr, tenant_hex, timeline]
|
||||||
|
subprocess.run(args)
|
||||||
|
|
||||||
@cached_property
|
@cached_property
|
||||||
def auth_keys(self) -> AuthKeys:
|
def auth_keys(self) -> AuthKeys:
|
||||||
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()
|
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()
|
||||||
|
|||||||
32
test_runner/performance/test_pageserver.py
Normal file
32
test_runner/performance/test_pageserver.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
from contextlib import closing
|
||||||
|
from fixtures.zenith_fixtures import ZenithEnv
|
||||||
|
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_page(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker):
|
||||||
|
env = zenith_simple_env
|
||||||
|
env.zenith_cli.create_branch("test_pageserver", "empty")
|
||||||
|
pg = env.postgres.create_start('test_pageserver')
|
||||||
|
tenant_hex = env.initial_tenant.hex
|
||||||
|
timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
|
||||||
|
|
||||||
|
# Long-lived cursor, useful for flushing
|
||||||
|
psconn = env.pageserver.connect()
|
||||||
|
pscur = psconn.cursor()
|
||||||
|
|
||||||
|
with closing(pg.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute('create table t (i integer);')
|
||||||
|
cur.execute('insert into t values (0);')
|
||||||
|
|
||||||
|
for i in range(1000):
|
||||||
|
cur.execute(f'update t set i = {i};')
|
||||||
|
|
||||||
|
pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0")
|
||||||
|
|
||||||
|
cur.execute("select * from t;")
|
||||||
|
res = cur.fetchall()
|
||||||
|
print("AAAA")
|
||||||
|
print(res)
|
||||||
|
|
||||||
|
env.run_psbench(timeline)
|
||||||
Reference in New Issue
Block a user