Compare commits

...

24 Commits

Author SHA1 Message Date
Bojan Serafimov
0d7bce5f72 Fix segfault 2022-10-06 15:35:59 -04:00
Bojan Serafimov
42a0f5be00 WIP 2022-10-06 13:18:31 -04:00
Bojan Serafimov
d4065f2e85 Merge branch 'main' into ps-trace 2022-10-05 13:24:06 -04:00
Bojan Serafimov
5315614974 WIP 2022-10-05 13:21:42 -04:00
Bojan Serafimov
293bc69e4f Implement drawer 2022-09-15 21:57:16 -04:00
Bojan Serafimov
e9b158e8f5 Add draw binary 2022-09-15 21:14:07 -04:00
Bojan Serafimov
8c09ff1764 Add todo 2022-09-15 20:53:26 -04:00
Bojan Serafimov
a66a3b4ed8 Fix merge bugs 2022-09-15 20:38:09 -04:00
Bojan Serafimov
6913bedc09 Merge branch 'main' into ps-trace 2022-09-15 20:15:00 -04:00
Bojan Serafimov
8a43dfd573 Hack read traces into NeonCompare 2022-09-14 14:21:04 -04:00
Bojan Serafimov
065adcf75b Measure duration 2022-09-13 16:38:46 -04:00
Bojan Serafimov
a180273dc5 Add todo 2022-09-13 16:34:56 -04:00
Bojan Serafimov
6c6aa04ce4 Fix bugs 2022-09-13 16:24:09 -04:00
Bojan Serafimov
cd8c96233f Actually connect 2022-09-13 15:29:36 -04:00
Bojan Serafimov
ecbda94790 Connect via pagestream 2022-09-13 14:09:15 -04:00
Bojan Serafimov
0d6d8fefd3 Add replay binary 2022-09-12 16:10:19 -04:00
Bojan Serafimov
d296a76e3e Pass test 2022-09-12 15:28:00 -04:00
Bojan Serafimov
646e0f3581 Write traces to file 2022-09-12 14:43:07 -04:00
Bojan Serafimov
0bfc422eb3 Add trace file path 2022-09-12 13:38:55 -04:00
Bojan Serafimov
1209572cec Pass msg to tracer 2022-09-12 10:36:56 -04:00
Bojan Serafimov
42f603cb13 Fix type error 2022-09-12 10:32:48 -04:00
Bojan Serafimov
6bcfd6441f Add tracer 2022-09-12 10:26:31 -04:00
Bojan Serafimov
dedb03bb5a Parse setting 2022-09-09 14:45:24 -04:00
Bojan Serafimov
abb07df028 Plan request tracing implementation 2022-09-09 00:46:13 -04:00
19 changed files with 704 additions and 51 deletions

7
Cargo.lock generated
View File

@@ -2059,6 +2059,7 @@ dependencies = [
"serde_json",
"serde_with",
"signal-hook",
"svg_fmt",
"tar",
"tempfile",
"thiserror",
@@ -3333,6 +3334,12 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "svg_fmt"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
[[package]]
name = "symbolic-common"
version = "8.8.0"

View File

@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
let url = self.url().to_owned();
Err(PageserverHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Error: {}", err_body.msg),
Ok(err_body) => format!("Response error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
@@ -181,14 +181,15 @@ impl PageServerNode {
new_timeline_id: Option<TimelineId>,
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
.context("Failed to create tenant")?;
let initial_timeline_info = self.timeline_create(
initial_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
).context("Failed to create timeline")?;
Ok(initial_timeline_info.timeline_id)
}
@@ -419,6 +420,11 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -52,6 +52,7 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
#[serde_as]

View File

@@ -67,6 +67,7 @@ remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
svg_fmt = "0.4.1"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -0,0 +1,185 @@
use clap::{App, Arg};
use futures::TryFutureExt;
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
use std::io::Write;
use std::{
fs::{read_dir, File},
io::BufReader,
str::FromStr,
};
use svg_fmt::*;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
};
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
(set.len(), map)
}
fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace visualization tool")
.about("Makes a svg file that displays the read pattern")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.get_matches();
// (blkno, lsn)
let mut dots = Vec::<(u32, Lsn)>::new();
let mut dump_file = File::create("dump.txt").expect("can't make file");
let mut deltas = HashMap::<i32, u32>::new();
let mut prev1: u32 = 0;
let mut prev2: u32 = 0;
let mut prev3: u32 = 0;
println!("scanning trace ...");
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("tenant: {tenant_id}");
println!("opening {path:?}");
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// println!("hi");
// println!("timeline: {timeline_id}");
println!("opening {path:?}");
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
println!("opening {path:?}");
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
// println!("Parsed message {:?}", msg);
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetPage(req) => {
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
// dots.push((req.blkno, req.lsn));
// HACK
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
let delta1 = (req.blkno as i32) - (prev1 as i32);
let delta2 = (req.blkno as i32) - (prev2 as i32);
let delta3 = (req.blkno as i32) - (prev3 as i32);
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
delta1
} else {
delta2
};
if i32::abs(delta3) < i32::abs(delta) {
delta = delta3;
}
let delta = delta1;
prev3 = prev2;
prev2 = prev1;
prev1 = req.blkno;
match deltas.get_mut(&delta) {
Some(c) => {*c += 1;},
None => {deltas.insert(delta, 1);},
};
if delta == 9 {
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
}
},
PagestreamFeMessage::DbSize(_) => {}
};
// HACK
// if dots.len() > 1000 {
// break;
// }
}
}
}
}
let mut other = deltas.len();
deltas.retain(|_, count| *count > 3);
other -= deltas.len();
dbg!(other);
dbg!(deltas);
// Collect all coordinates
let mut keys: Vec<u32> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for dot in &dots {
keys.push(dot.0);
lsns.push(dot.1);
}
// Analyze
let (key_max, key_map) = analyze(keys);
let (lsn_max, lsn_map) = analyze(lsns);
// Draw
println!("drawing trace ...");
let mut svg_file = File::create("out.svg").expect("can't make file");
writeln!(
&mut svg_file,
"{}",
BeginSvg {
w: (key_max + 1) as f32,
h: (lsn_max + 1) as f32,
}
)?;
for (key, lsn) in &dots {
let key = key_map.get(&key).unwrap();
let lsn = lsn_map.get(&lsn).unwrap();
writeln!(
&mut svg_file,
" {}",
rectangle(
*key as f32,
*lsn as f32,
10.0,
10.0
)
.fill(Fill::Color(red()))
.stroke(Stroke::Color(black(), 0.0))
.border_radius(0.5)
)?;
// println!(" {}",
// rectangle(key_start as f32 + stretch * margin,
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
// key_diff as f32 - stretch * 2.0 * margin,
// stretch * (lsn_diff - 2.0 * margin))
// // .fill(rgb(200, 200, 200))
// .fill(fill)
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
// .border_radius(0.4)
// );
}
writeln!(&mut svg_file, "{}", EndSvg)?;
Ok(())
}

View File

@@ -0,0 +1,133 @@
use bytes::BytesMut;
use pageserver::page_service::PagestreamFeMessage;
use std::{
fs::{read_dir, File},
io::BufReader,
path::PathBuf,
str::FromStr,
};
use tokio::{io::AsyncWriteExt, net::TcpStream};
use clap::{App, Arg};
use utils::{
id::{TenantId, TimelineId},
pq_proto::{BeMessage, FeMessage},
};
// TODO put this in library, dedup with stuff in control_plane
/// Client for the pageserver's pagestream API
struct PagestreamApi {
stream: TcpStream,
}
impl PagestreamApi {
async fn connect(
connstr: &str,
tenant: &TenantId,
timeline: &TimelineId,
) -> anyhow::Result<PagestreamApi> {
// Parse connstr
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
// Connect
let mut stream = TcpStream::connect(tcp_addr).await?;
let (client, conn) = config
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant, timeline);
tokio::select! {
_ = conn => panic!("connection closed during pagestream initialization"),
_ = client.query(init_query.as_str(), &[]) => (),
};
Ok(PagestreamApi { stream })
}
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
let request = {
let msg_bytes = msg.serialize();
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&msg_bytes);
// TODO it's actually a fe message but it doesn't have a serializer yet
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
self.stream.write_all(&request).await?;
// TODO It's actually a be message, but it doesn't have a parser.
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
let _response = match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
Ok(())
}
}
async fn replay_trace<R: std::io::Read>(
reader: &mut R,
mut pagestream: PagestreamApi,
) -> anyhow::Result<()> {
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
println!("Parsed message {:?}", msg);
pagestream.make_request(msg).await?;
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace replay tool")
.about("Replays wal or read traces to test pageserver performance")
.arg(
Arg::new("traces_dir")
.takes_value(true)
.help("Directory where the read traces are stored"),
)
.arg(
Arg::new("pageserver_connstr")
.takes_value(true)
.help("Pageserver pg endpoint to connect to"),
)
.get_matches();
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
let entry = tenant_dir?;
let path = entry.path();
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for timeline_dir in read_dir(path)? {
let entry = timeline_dir?;
let path = entry.path();
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
for trace_dir in read_dir(path)? {
let entry = trace_dir?;
let path = entry.path();
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
// TODO The pageserver deletes existing traces?
// LOL yes because I use tenant ID as trace id
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
let file = File::open(path.clone())?;
let mut reader = BufReader::new(file);
// let len = file.metadata().unwrap().len();
// println!("replaying {:?} trace {} bytes", path, len);
replay_trace(&mut reader, pagestream).await?;
}
}
}
Ok(())
}

View File

@@ -364,6 +364,23 @@ impl PageServerConf {
self.timelines_path(tenant_id).join(timeline_id.to_string())
}
pub fn traces_path(&self) -> PathBuf {
self.workdir.join("traces")
}
pub fn trace_path(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
connection_id: &TimelineId, // TODO make a new type
) -> PathBuf {
self.traces_path()
.join(tenant_id.to_string())
.join(timeline_id.to_string())
.join(connection_id.to_string())
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {

View File

@@ -617,6 +617,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {

View File

@@ -16,6 +16,8 @@ pub mod tenant;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_tasks;
// pub mod timelines;
pub mod trace;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;

View File

@@ -10,7 +10,9 @@
//
use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::Buf;
use bytes::{BufMut, Bytes, BytesMut};
use futures::{Stream, StreamExt};
use regex::Regex;
use std::io;
@@ -42,6 +44,7 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::trace::Tracer;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
@@ -49,7 +52,9 @@ use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
// TODO these should be in a library outside the pageserver
#[derive(Debug)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
@@ -57,7 +62,7 @@ enum PagestreamFeMessage {
}
// Wrapped in libpq CopyData
enum PagestreamBeMessage {
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),
GetPage(PagestreamGetPageResponse),
@@ -66,106 +71,152 @@ enum PagestreamBeMessage {
}
#[derive(Debug)]
struct PagestreamExistsRequest {
pub struct PagestreamExistsRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
struct PagestreamNblocksRequest {
pub struct PagestreamNblocksRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
}
#[derive(Debug)]
struct PagestreamGetPageRequest {
pub struct PagestreamGetPageRequest {
latest: bool,
lsn: Lsn,
rel: RelTag,
blkno: u32,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug)]
struct PagestreamDbSizeRequest {
pub struct PagestreamDbSizeRequest {
latest: bool,
lsn: Lsn,
dbnode: u32,
}
#[derive(Debug)]
struct PagestreamExistsResponse {
pub struct PagestreamExistsResponse {
exists: bool,
}
#[derive(Debug)]
struct PagestreamNblocksResponse {
pub struct PagestreamNblocksResponse {
n_blocks: u32,
}
#[derive(Debug)]
struct PagestreamGetPageResponse {
pub struct PagestreamGetPageResponse {
page: Bytes,
}
#[derive(Debug)]
struct PagestreamErrorResponse {
pub struct PagestreamErrorResponse {
message: String,
}
#[derive(Debug)]
struct PagestreamDbSizeResponse {
pub struct PagestreamDbSizeResponse {
db_size: i64,
}
impl PagestreamFeMessage {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.get_u8();
let msg_tag = body.read_u8()?;
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.get_u32(),
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
}
@@ -422,6 +473,17 @@ impl PageServerHandler {
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let mut tracer = if tenant.get_trace_read_requests() {
let path = tenant
.conf
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
Some(Tracer::new(path))
} else {
None
};
// Check that the timeline exists
let timeline = get_local_timeline(tenant_id, timeline_id)?;
@@ -446,15 +508,24 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Sync) => {
// TODO what now?
continue;
}
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
trace!("query: {copy_data_bytes:?}");
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {

View File

@@ -593,6 +593,14 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
// TODO is there a need for all this boilerplate?
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}

View File

@@ -82,6 +82,10 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
/// If enabled, records all read requests for this ... TODO
/// Even though read traces are small, there's no delete meachanism so they can
/// pile up. So we disable this by default.
pub trace_read_requests: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -105,6 +109,7 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
impl TenantConfOpt {
@@ -138,6 +143,9 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
}
}
@@ -207,6 +215,7 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
}
}
@@ -232,6 +241,7 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
trace_read_requests: false,
}
}
}

36
pageserver/src/trace.rs Normal file
View File

@@ -0,0 +1,36 @@
use bytes::Bytes;
use std::{
fs::{create_dir_all, File},
io::{BufWriter, Write},
path::PathBuf,
};
pub struct Tracer {
writer: BufWriter<File>,
}
impl Drop for Tracer {
fn drop(&mut self) {
self.flush()
}
}
impl Tracer {
pub fn new(path: PathBuf) -> Self {
let parent = path.parent().expect("failed to parse parent path");
create_dir_all(parent).expect("failed to create trace dir");
let file = File::create(path).expect("failed to create trace file");
Tracer {
writer: BufWriter::new(file),
}
}
pub fn trace(&mut self, msg: &Bytes) {
self.writer.write(msg).expect("failed to write trace");
}
pub fn flush(&mut self) {
self.writer.flush().expect("failed to flush trace file");
}
}

View File

@@ -264,12 +264,76 @@ pageserver_flush(void)
}
}
// Entry of the page_cache
typedef struct
{
// HACK Just xor of bytes lol
char request_hash;
// Points directly to a NeonResponse. We can't just own the
// NeonResponse because it's a "supertype", so it's not Sized.
char* response;
int len;
} NeonRequestResponse;
#define MAX_PAGE_CACHE_SIZE 50
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
int page_cache_size = 0;
int page_cache_head = 0;
static NeonResponse *
pageserver_call(NeonRequest * request)
{
// Compute hash
char hash = 0;
StringInfoData req_buff;
req_buff = nm_pack_request(request);
for (int i = 0; i < req_buff.len; i++) {
hash ^= req_buff.data[i];
}
pfree(req_buff.data);
// If result is cached, memcpy and return
for (int i = 0; i < page_cache_size; i++) {
if (page_cache[i].request_hash == hash) {
int len = page_cache[0].response->len;
NeonResponse *resp = palloc0(len);
// I'd rather Rc than memcpy, but this is not rust :(
memcpy(resp, page_cache[0].response->data, len);
elog(LOG, "cache hit !!!");
return resp;
}
}
// Send request, get response
pageserver_send(request);
pageserver_flush();
return pageserver_receive();
NeonResponse *resp = pageserver_receive();
// Get length
int len = -1;
switch (resp->tag) {
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
}
// Cache result
page_cache[page_cache_head].request_hash = hash;
if (page_cache_head < page_cache_size) {
pfree(page_cache[page_cache_head].response);
}
page_cache[page_cache_head].len = len;
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
memcpy(page_cache[page_cache_head].response, resp, len);
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
page_cache_size += 1;
}
return resp;
}
page_server_api api = {

View File

@@ -91,10 +91,25 @@ class NeonCompare(PgCompare):
self._pg_bin = pg_bin
self.pageserver_http_client = self.env.pageserver.http_client()
self.tenant, _ = self.env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# HACK enable request tracing, as an experiment
# NOTE This must be done before the pg starts pagestream
# TODO why does it not work?
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
# "trace_read_requests": "true",
# })
# We only use one branch and one timeline
self.env.neon_cli.create_branch(branch_name, "empty")
self._pg = self.env.postgres.create_start(branch_name)
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
self._pg = self.env.postgres.create_start(
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
@property
def pg(self):
@@ -109,10 +124,10 @@ class NeonCompare(PgCompare):
return self._pg_bin
def flush(self):
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
def compact(self):
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
def report_peak_memory_use(self) -> None:
self.zenbenchmark.record(
@@ -124,7 +139,7 @@ class NeonCompare(PgCompare):
def report_size(self) -> None:
timeline_size = self.zenbenchmark.get_timeline_size(
self.env.repo_dir, self.env.initial_tenant, self.timeline
self.env.repo_dir, self.tenant, self.timeline
)
self.zenbenchmark.record(
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER

View File

@@ -1749,6 +1749,37 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_version)
@dataclass
class ReplayBin:
"""A helper class for replaying pageserver wal and read traces."""
traces_dir: str
def replay_all(self, pageserver_connstr):
replay_binpath = os.path.join(str(neon_binpath), "replay")
args = [
replay_binpath,
self.traces_dir,
pageserver_connstr,
]
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
subprocess.run(args)
def draw_all(self):
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
args = [
draw_binpath,
self.traces_dir,
]
subprocess.run(args)
@pytest.fixture(scope="function")
def replay_bin(test_output_dir):
traces_dir = os.path.join(test_output_dir, "repo", "traces")
return ReplayBin(traces_dir)
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host="localhost", port=port, dbname="postgres")

View File

@@ -175,6 +175,11 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
# Run the pgbench tests, and generate a flamegraph from it
# This requires that the pageserver was built with the 'profiling' feature.
#

View File

@@ -0,0 +1,57 @@
from contextlib import closing
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
# This test is a demonstration of how to do trace playback. With the current
# workload it uses it's not testing anything meaningful.
def test_trace_replay(
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant, _ = env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "true",
# })
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
with zenbenchmark.record_duration("run"):
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
pg.safe_psql("select 1;")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{10000}));")
cur.execute("select count(*) from t;")
# Stop pg so we drop the connection and flush the traces
pg.stop()
# TODO This doesn't work because I haven't updated tenant_config_handler
# env.neon_cli.config_tenant(tenant, conf={
# "trace_read_requests": "false",
# })
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
# assert trace_path.exists()
replay_bin.draw_all()
return
print("replaying")
ps_connstr = env.pageserver.connstr()
with zenbenchmark.record_duration("replay"):
output = replay_bin.replay_all(ps_connstr)
print(output)

View File

@@ -1,6 +1,7 @@
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder