mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
WIP working pageserver get_page client
This commit is contained in:
143
pageserver/src/bin/psbench.rs
Normal file
143
pageserver/src/bin/psbench.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
//! Pageserver benchmark tool
|
||||
//!
|
||||
//! Usually it's easier to write python perf tests, but here the performance
|
||||
//! of the tester matters, and the API is easier to work with from rust.
|
||||
use std::{io::{BufRead, BufReader, Cursor}, net::SocketAddr};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use clap::{App, Arg};
|
||||
use std::fs::File;
|
||||
use zenith_utils::{GIT_VERSION, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}};
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
pub fn read_lines_buffered(file_name: &str) -> impl Iterator<Item = String> {
|
||||
BufReader::new(File::open(file_name).unwrap())
|
||||
.lines()
|
||||
.map(|result| result.unwrap())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
|
||||
// TODO do I need connection string to pageserver?
|
||||
|
||||
let arg_matches = App::new("LALALA")
|
||||
.about("lalala")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
Arg::new("path")
|
||||
.help("Path to file to dump")
|
||||
.required(true)
|
||||
.index(1),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("ps_connstr")
|
||||
.help("Connection string to pageserver")
|
||||
.required(true)
|
||||
.index(2),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("tenant_hex")
|
||||
.help("TODO")
|
||||
.required(true)
|
||||
.index(3),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("timeline")
|
||||
.help("TODO")
|
||||
.required(true)
|
||||
.index(4),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let log_file = arg_matches.value_of("path").unwrap();
|
||||
let ps_connstr = arg_matches.value_of("ps_connstr").unwrap();
|
||||
let tenant_hex = arg_matches.value_of("tenant_hex").unwrap();
|
||||
let timeline = arg_matches.value_of("timeline").unwrap();
|
||||
|
||||
let lsn_page_pairs: Vec<_> = read_lines_buffered(log_file)
|
||||
.filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string()))
|
||||
.map(|rest| {
|
||||
let (lsn, page) = rest.split_once(" ").unwrap();
|
||||
let lsn = hex::decode(lsn).unwrap();
|
||||
if lsn.len() != 8 {
|
||||
panic!("AAA")
|
||||
}
|
||||
let page = hex::decode(page).unwrap();
|
||||
if page.len() != 17 {
|
||||
panic!("AAA")
|
||||
}
|
||||
(lsn, page)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (some_lsn, some_page) = lsn_page_pairs[0].clone();
|
||||
|
||||
let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?;
|
||||
println!("AYY got socket");
|
||||
let (client, conn) = tokio_postgres::Config::new()
|
||||
.host("127.0.0.1")
|
||||
.port(15000)
|
||||
.dbname("postgres")
|
||||
.user("zenith_admin")
|
||||
.connect_raw(&mut socket, tokio_postgres::NoTls)
|
||||
.await?;
|
||||
|
||||
let query = format!("pagestream {} {}", tenant_hex, timeline);
|
||||
tokio::select! {
|
||||
_ = conn => panic!("AAAA"),
|
||||
_ = client.query(query.as_str(), &[]) => (),
|
||||
};
|
||||
|
||||
println!("AYYYYYYYYYYYY");
|
||||
|
||||
let msg = {
|
||||
let query = {
|
||||
use bytes::buf::BufMut;
|
||||
let mut query = BytesMut::new();
|
||||
query.put_u8(2); // Specifies get_page query
|
||||
query.put_u8(0); // Specifies this is not a "latest page" query
|
||||
for byte in some_lsn {
|
||||
query.put_u8(byte);
|
||||
}
|
||||
for byte in some_page {
|
||||
query.put_u8(byte);
|
||||
}
|
||||
query.freeze()
|
||||
};
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
let copy_msg = BeMessage::CopyData(&query);
|
||||
BeMessage::write(&mut buf, ©_msg)?;
|
||||
buf.freeze()
|
||||
};
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
socket.write(&msg).await?;
|
||||
|
||||
let response = match FeMessage::read_fut(&mut socket).await? {
|
||||
Some(FeMessage::CopyData(page)) => page,
|
||||
_ => panic!("AAAAA"),
|
||||
};
|
||||
|
||||
let page = {
|
||||
let mut cursor = Cursor::new(response);
|
||||
let tag = cursor.read_u8().await?;
|
||||
if tag != 102 {
|
||||
panic!("AA");
|
||||
}
|
||||
|
||||
let mut page = Vec::<u8>::new();
|
||||
cursor.read_to_end(&mut page).await?;
|
||||
dbg!(page.len());
|
||||
if page.len() != 8 * 1024 {
|
||||
panic!("AA");
|
||||
}
|
||||
|
||||
page
|
||||
};
|
||||
|
||||
print!("yay done");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -21,6 +21,7 @@
|
||||
//! redo Postgres process, but some records it can handle directly with
|
||||
//! bespoken Rust code.
|
||||
|
||||
use chrono::format::format;
|
||||
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
|
||||
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||
use std::cmp::min;
|
||||
@@ -270,6 +271,25 @@ impl WalIngest {
|
||||
// Iterate through all the blocks that the record modifies, and
|
||||
// "put" a separate copy of the record for each block.
|
||||
for blk in decoded.blocks.iter() {
|
||||
|
||||
let lsn_hex = {
|
||||
use bytes::BufMut;
|
||||
let mut bytes = BytesMut::new();
|
||||
bytes.put_u64(lsn.0);
|
||||
hex::encode(bytes.freeze())
|
||||
};
|
||||
let page_hex = {
|
||||
use bytes::BufMut;
|
||||
let mut page = BytesMut::new();
|
||||
page.put_u32(blk.rnode_spcnode);
|
||||
page.put_u32(blk.rnode_dbnode);
|
||||
page.put_u32(blk.rnode_relnode);
|
||||
page.put_u8(blk.forknum);
|
||||
page.put_u32(blk.blkno);
|
||||
hex::encode(page.freeze())
|
||||
};
|
||||
println!("wal-at-lsn-modified-page {} {}", lsn_hex, page_hex);
|
||||
|
||||
self.ingest_decoded_block(timeline, lsn, &decoded, blk)?;
|
||||
}
|
||||
|
||||
|
||||
@@ -613,6 +613,15 @@ class ZenithEnv:
|
||||
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
|
||||
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
|
||||
|
||||
def run_psbench(self, timeline):
|
||||
ps_log_filename = os.path.join(self.repo_dir, "pageserver.log")
|
||||
ps_connstr = self.pageserver.connstr()
|
||||
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
|
||||
tenant_hex = self.initial_tenant.hex
|
||||
print("AAAAAAAA", ps_connstr)
|
||||
args = [psbench_binpath, ps_log_filename, ps_connstr, tenant_hex, timeline]
|
||||
subprocess.run(args)
|
||||
|
||||
@cached_property
|
||||
def auth_keys(self) -> AuthKeys:
|
||||
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()
|
||||
|
||||
72
test_runner/performance/test_pageserver.py
Normal file
72
test_runner/performance/test_pageserver.py
Normal file
@@ -0,0 +1,72 @@
|
||||
from contextlib import closing
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
|
||||
def _get_page():
|
||||
# u8 tag: 2, big endian
|
||||
# u8 latest
|
||||
# u64 lsn
|
||||
# reltag:
|
||||
# u32 spcnode
|
||||
# u32 dbnode
|
||||
# u32 relnode
|
||||
# u8 forknum
|
||||
# u32 blkno
|
||||
pass
|
||||
|
||||
|
||||
def test_get_page(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli.create_branch("test_pageserver", "empty")
|
||||
pg = env.postgres.create_start('test_pageserver')
|
||||
tenant_hex = env.initial_tenant.hex
|
||||
timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('create table t (i integer);')
|
||||
cur.execute('insert into t values (generate_series(1,3));')
|
||||
|
||||
cur.execute("select * from t;")
|
||||
res = cur.fetchall()
|
||||
|
||||
cur.execute("select pg_relation_filepath('t');")
|
||||
res = cur.fetchall()
|
||||
print(res)
|
||||
|
||||
env.run_psbench(timeline)
|
||||
return
|
||||
|
||||
import os
|
||||
ps_log_filename = os.path.join(env.repo_dir, "pageserver.log")
|
||||
with open(ps_log_filename) as log_file:
|
||||
log = log_file.readlines()
|
||||
|
||||
ps_connstr = env.pageserver.connstr()
|
||||
|
||||
|
||||
|
||||
latest_write = None
|
||||
for line in log:
|
||||
if line.startswith("wal-at-lsn-modified-page "):
|
||||
tokens = line.split()
|
||||
lsn_hex = tokens[1]
|
||||
page_hex = tokens[2]
|
||||
latest_write = (lsn_hex, page_hex)
|
||||
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
with psconn.cursor() as cur:
|
||||
cur.execute(f"pagestream {tenant_hex} {timeline}")
|
||||
with psconn.cursor() as cur:
|
||||
cur.execute(f"select 1;")
|
||||
|
||||
# res = cur.fetchall()
|
||||
# print(res)
|
||||
# TODO send query to pageserver, see what is logged
|
||||
|
||||
# TODO send queries on these pages
|
||||
# 1. Craft binary message
|
||||
# 2. Send as postgres query
|
||||
|
||||
# TODO maybe make rust program for this side of the protocol?
|
||||
Reference in New Issue
Block a user